Backport: Cleanup relayd socket pair on control socket transmission error
authorJonathan Rajotte <jonathan.rajotte-julien@efficios.com>
Wed, 6 Jun 2018 01:00:28 +0000 (21:00 -0400)
committerJonathan Rajotte <jonathan.rajotte-julien@efficios.com>
Fri, 21 Sep 2018 04:00:52 +0000 (00:00 -0400)
A reference to the local ctx for the socket pair is used to "force" an
evaluation of the data and metadata stream since we changed the endpoint
status. This mostly result in the closing of all the streams for which
the relayd socket pair is linked to.

Signed-off-by: Jonathan Rajotte <jonathan.rajotte-julien@efficios.com>
src/common/consumer/consumer-stream.c
src/common/consumer/consumer.c
src/common/consumer/consumer.h

index d0b1ddef2259fc57342cac15ef5003f19fc03e6f..f5ca6b71ea3de705bacee2a78fa15db5b260c6f6 100644 (file)
@@ -71,15 +71,11 @@ void consumer_stream_relayd_close(struct lttng_consumer_stream *stream,
        ret = relayd_send_close_stream(&relayd->control_sock,
                        stream->relayd_stream_id,
                        stream->next_net_seq_num - 1);
-       pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
        if (ret < 0) {
-               DBG("Unable to close stream on the relayd. Continuing");
-               /*
-                * Continue here. There is nothing we can do for the relayd.
-                * Chances are that the relayd has closed the socket so we just
-                * continue cleaning up.
-                */
+               ERR("Relayd send close stream failed. Cleaning up relayd %" PRIu64 ".", relayd->net_seq_idx);
+               lttng_consumer_cleanup_relayd(relayd);
        }
+       pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
 
        /* Both conditions are met, we destroy the relayd. */
        if (uatomic_read(&relayd->refcount) == 0 &&
@@ -371,6 +367,15 @@ int consumer_stream_write_index(struct lttng_consumer_stream *stream,
                        pthread_mutex_lock(&relayd->ctrl_sock_mutex);
                        ret = relayd_send_index(&relayd->control_sock, element,
                                stream->relayd_stream_id, stream->next_net_seq_num - 1);
+                       if (ret < 0) {
+                               /*
+                                * Communication error with lttng-relayd,
+                                * perform cleanup now
+                                */
+                               ERR("Relayd send index failed. Cleaning up relayd %" PRIu64 ".", relayd->net_seq_idx);
+                               lttng_consumer_cleanup_relayd(relayd);
+                               ret = -1;
+                       }
                        pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                } else {
                        ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't write index.",
index 106c705f1b0cc48cd294c8442aa5d42ff2feacc2..c2a2b1587ae6500039dcdddbbedc6eb41fa047b0 100644 (file)
@@ -458,14 +458,13 @@ static void update_endpoint_status_by_netidx(uint64_t net_seq_idx,
  * If a local data context is available, notify the threads that the streams'
  * state have changed.
  */
-static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd,
-               struct lttng_consumer_local_data *ctx)
+void lttng_consumer_cleanup_relayd(struct consumer_relayd_sock_pair *relayd)
 {
        uint64_t netidx;
 
        assert(relayd);
 
-       DBG("Cleaning up relayd sockets");
+       DBG("Cleaning up relayd object ID %"PRIu64, relayd->net_seq_idx);
 
        /* Save the net sequence index before destroying the object */
        netidx = relayd->net_seq_idx;
@@ -485,10 +484,8 @@ static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd,
         * memory barrier ordering the updates of the end point status from the
         * read of this status which happens AFTER receiving this notify.
         */
-       if (ctx) {
-               notify_thread_lttng_pipe(ctx->consumer_data_pipe);
-               notify_thread_lttng_pipe(ctx->consumer_metadata_pipe);
-       }
+       notify_thread_lttng_pipe(relayd->ctx->consumer_data_pipe);
+       notify_thread_lttng_pipe(relayd->ctx->consumer_metadata_pipe);
 }
 
 /*
@@ -794,10 +791,13 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream,
                ret = relayd_add_stream(&relayd->control_sock, stream->name,
                                path, &stream->relayd_stream_id,
                                stream->chan->tracefile_size, stream->chan->tracefile_count);
-               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                if (ret < 0) {
+                       ERR("Relayd add stream failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+                       lttng_consumer_cleanup_relayd(relayd);
+                       pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                        goto end;
                }
+               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
 
                uatomic_inc(&relayd->refcount);
                stream->sent_to_relayd = 1;
@@ -835,10 +835,13 @@ int consumer_send_relayd_streams_sent(uint64_t net_seq_idx)
                /* Add stream on the relayd */
                pthread_mutex_lock(&relayd->ctrl_sock_mutex);
                ret = relayd_streams_sent(&relayd->control_sock);
-               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                if (ret < 0) {
+                       ERR("Relayd streams sent failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+                       lttng_consumer_cleanup_relayd(relayd);
+                       pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                        goto end;
                }
+               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
        } else {
                ERR("Relayd ID %" PRIu64 " unknown. Can't send streams_sent.",
                                net_seq_idx);
@@ -1702,7 +1705,8 @@ write_error:
         * cleanup the relayd object and all associated streams.
         */
        if (relayd && relayd_hang_up) {
-               cleanup_relayd(relayd, ctx);
+               ERR("Relayd hangup. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+               lttng_consumer_cleanup_relayd(relayd);
        }
 
 end:
@@ -1928,7 +1932,8 @@ write_error:
         * cleanup the relayd object and all associated streams.
         */
        if (relayd && relayd_hang_up) {
-               cleanup_relayd(relayd, ctx);
+               ERR("Relayd hangup. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+               lttng_consumer_cleanup_relayd(relayd);
                /* Skip splice error so the consumer does not fail */
                goto end;
        }
@@ -3508,6 +3513,7 @@ int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
         * Add relayd socket pair to consumer data hashtable. If object already
         * exists or on error, the function gracefully returns.
         */
+       relayd->ctx = ctx;
        add_relayd(relayd);
 
        /* All good! */
@@ -3633,11 +3639,14 @@ int consumer_data_pending(uint64_t id)
                pthread_mutex_lock(&relayd->ctrl_sock_mutex);
                ret = relayd_begin_data_pending(&relayd->control_sock,
                                relayd->relayd_session_id);
-               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                if (ret < 0) {
                        /* Communication error thus the relayd so no data pending. */
+                       ERR("Relayd begin data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+                       lttng_consumer_cleanup_relayd(relayd);
+                       pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                        goto data_not_pending;
                }
+               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
        }
 
        cds_lfht_for_each_entry_duplicate(ht->ht,
@@ -3677,6 +3686,13 @@ int consumer_data_pending(uint64_t id)
                                                stream->relayd_stream_id,
                                                stream->next_net_seq_num - 1);
                        }
+                       if (ret < 0) {
+                               ERR("Relayd data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+                               lttng_consumer_cleanup_relayd(relayd);
+                               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+                               pthread_mutex_unlock(&stream->lock);
+                               goto data_not_pending;
+                       }
                        pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                        if (ret == 1) {
                                pthread_mutex_unlock(&stream->lock);
@@ -3693,10 +3709,13 @@ int consumer_data_pending(uint64_t id)
                pthread_mutex_lock(&relayd->ctrl_sock_mutex);
                ret = relayd_end_data_pending(&relayd->control_sock,
                                relayd->relayd_session_id, &is_data_inflight);
-               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                if (ret < 0) {
+                       ERR("Relayd end data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+                       lttng_consumer_cleanup_relayd(relayd);
+                       pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                        goto data_not_pending;
                }
+               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                if (is_data_inflight) {
                        goto data_pending;
                }
index fc4723879f5fce3b10e98b9f31656882a38c52d4..54b919d2885905b8ff341c97462d125239180d9b 100644 (file)
@@ -457,6 +457,7 @@ struct consumer_relayd_sock_pair {
        /* Session id on both sides for the sockets. */
        uint64_t relayd_session_id;
        uint64_t sessiond_session_id;
+       struct lttng_consumer_local_data *ctx;
 };
 
 /*
@@ -739,5 +740,6 @@ void consumer_del_stream_for_data(struct lttng_consumer_stream *stream);
 int consumer_add_metadata_stream(struct lttng_consumer_stream *stream);
 void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream);
 int consumer_create_index_file(struct lttng_consumer_stream *stream);
+void lttng_consumer_cleanup_relayd(struct consumer_relayd_sock_pair *relayd);
 
 #endif /* LIB_CONSUMER_H */
This page took 0.031048 seconds and 5 git commands to generate.