Fix: cleanup relayd sockets on rotation command communication error
[lttng-tools.git] / src / common / consumer / consumer.c
index 3e991c8c71035f6ee9f43f768db27247209e8707..2c82836558287418a171f7e312c45d1fe362f459 100644 (file)
@@ -323,6 +323,7 @@ static void free_relayd_rcu(struct rcu_head *head)
        (void) relayd_close(&relayd->control_sock);
        (void) relayd_close(&relayd->data_sock);
 
+       pthread_mutex_destroy(&relayd->ctrl_sock_mutex);
        free(relayd);
 }
 
@@ -464,14 +465,13 @@ static void update_endpoint_status_by_netidx(uint64_t net_seq_idx,
  * If a local data context is available, notify the threads that the streams'
  * state have changed.
  */
-static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd,
-               struct lttng_consumer_local_data *ctx)
+void lttng_consumer_cleanup_relayd(struct consumer_relayd_sock_pair *relayd)
 {
        uint64_t netidx;
 
        assert(relayd);
 
-       DBG("Cleaning up relayd sockets");
+       DBG("Cleaning up relayd object ID %"PRIu64, relayd->net_seq_idx);
 
        /* Save the net sequence index before destroying the object */
        netidx = relayd->net_seq_idx;
@@ -491,10 +491,8 @@ static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd,
         * memory barrier ordering the updates of the end point status from the
         * read of this status which happens AFTER receiving this notify.
         */
-       if (ctx) {
-               notify_thread_lttng_pipe(ctx->consumer_data_pipe);
-               notify_thread_lttng_pipe(ctx->consumer_metadata_pipe);
-       }
+       notify_thread_lttng_pipe(relayd->ctx->consumer_data_pipe);
+       notify_thread_lttng_pipe(relayd->ctx->consumer_metadata_pipe);
 }
 
 /*
@@ -808,9 +806,12 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream,
                pthread_mutex_lock(&relayd->ctrl_sock_mutex);
                ret = relayd_add_stream(&relayd->control_sock, stream->name,
                                path, &stream->relayd_stream_id,
-                               stream->chan->tracefile_size, stream->chan->tracefile_count);
+                               stream->chan->tracefile_size, stream->chan->tracefile_count,
+                               stream->trace_archive_id);
                pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                if (ret < 0) {
+                       ERR("Relayd add stream failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+                       lttng_consumer_cleanup_relayd(relayd);
                        goto end;
                }
 
@@ -852,6 +853,8 @@ int consumer_send_relayd_streams_sent(uint64_t net_seq_idx)
                ret = relayd_streams_sent(&relayd->control_sock);
                pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                if (ret < 0) {
+                       ERR("Relayd streams sent failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+                       lttng_consumer_cleanup_relayd(relayd);
                        goto end;
                }
        } else {
@@ -1719,7 +1722,8 @@ write_error:
         * cleanup the relayd object and all associated streams.
         */
        if (relayd && relayd_hang_up) {
-               cleanup_relayd(relayd, ctx);
+               ERR("Relayd hangup. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+               lttng_consumer_cleanup_relayd(relayd);
        }
 
 end:
@@ -1945,7 +1949,8 @@ write_error:
         * cleanup the relayd object and all associated streams.
         */
        if (relayd && relayd_hang_up) {
-               cleanup_relayd(relayd, ctx);
+               ERR("Relayd hangup. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+               lttng_consumer_cleanup_relayd(relayd);
                /* Skip splice error so the consumer does not fail */
                goto end;
        }
@@ -2339,7 +2344,6 @@ int consumer_post_rotation(struct lttng_consumer_stream *stream,
                                stream->chan->name);
                ret = rotate_notify_sessiond(ctx, stream->chan->key);
        }
-       assert(stream->chan->nr_stream_rotate_pending >= 0);
        pthread_mutex_unlock(&stream->chan->lock);
 
        return ret;
@@ -2576,7 +2580,9 @@ void *consumer_thread_data_poll(void *data)
        /* local view of the streams */
        struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL;
        /* local view of consumer_data.fds_count */
-       int nb_fd = 0, nb_pipes_fd;
+       int nb_fd = 0;
+       /* 2 for the consumer_data_pipe and wake up pipe */
+       const int nb_pipes_fd = 2;
        /* Number of FDs with CONSUMER_ENDPOINT_INACTIVE but still open. */
        int nb_inactive_fd = 0;
        struct lttng_consumer_local_data *ctx = data;
@@ -2616,12 +2622,7 @@ void *consumer_thread_data_poll(void *data)
                        free(local_stream);
                        local_stream = NULL;
 
-                       /*
-                        * Allocate for all fds + 2:
-                        *   +1 for the consumer_data_pipe
-                        *   +1 for wake up pipe
-                        */
-                       nb_pipes_fd = 2;
+                       /* Allocate for all fds */
                        pollfd = zmalloc((consumer_data.stream_count + nb_pipes_fd) * sizeof(struct pollfd));
                        if (pollfd == NULL) {
                                PERROR("pollfd malloc");
@@ -3648,6 +3649,7 @@ error:
         * Add relayd socket pair to consumer data hashtable. If object already
         * exists or on error, the function gracefully returns.
         */
+       relayd->ctx = ctx;
        add_relayd(relayd);
 
        /* All good! */
@@ -3774,6 +3776,8 @@ int consumer_data_pending(uint64_t id)
                pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                if (ret < 0) {
                        /* Communication error thus the relayd so no data pending. */
+                       ERR("Relayd begin data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+                       lttng_consumer_cleanup_relayd(relayd);
                        goto data_not_pending;
                }
        }
@@ -3815,6 +3819,13 @@ int consumer_data_pending(uint64_t id)
                                                stream->relayd_stream_id,
                                                stream->next_net_seq_num - 1);
                        }
+                       if (ret < 0) {
+                               ERR("Relayd data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+                               lttng_consumer_cleanup_relayd(relayd);
+                               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+                               pthread_mutex_unlock(&stream->lock);
+                               goto data_not_pending;
+                       }
                        pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                        if (ret == 1) {
                                pthread_mutex_unlock(&stream->lock);
@@ -3833,6 +3844,8 @@ int consumer_data_pending(uint64_t id)
                                relayd->relayd_session_id, &is_data_inflight);
                pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                if (ret < 0) {
+                       ERR("Relayd end data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+                       lttng_consumer_cleanup_relayd(relayd);
                        goto data_not_pending;
                }
                if (is_data_inflight) {
@@ -4203,6 +4216,10 @@ int rotate_relay_stream(struct lttng_consumer_local_data *ctx,
                        stream->chan->current_chunk_id,
                        stream->last_sequence_number);
        pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+       if (ret < 0) {
+               ERR("Relayd rotate stream failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+               lttng_consumer_cleanup_relayd(relayd);
+       }
        if (ret) {
                ERR("Rotate relay stream");
        }
@@ -4376,6 +4393,10 @@ int rotate_rename_relay(const char *old_path, const char *new_path,
 
        pthread_mutex_lock(&relayd->ctrl_sock_mutex);
        ret = relayd_rotate_rename(&relayd->control_sock, old_path, new_path);
+       if (ret < 0) {
+               ERR("Relayd rotate rename failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+               lttng_consumer_cleanup_relayd(relayd);
+       }
        pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
 end:
        return ret;
@@ -4406,6 +4427,10 @@ int lttng_consumer_rotate_pending_relay(uint64_t session_id,
 
        pthread_mutex_lock(&relayd->ctrl_sock_mutex);
        ret = relayd_rotate_pending(&relayd->control_sock, chunk_id);
+       if (ret < 0) {
+               ERR("Relayd rotate pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+               lttng_consumer_cleanup_relayd(relayd);
+       }
        pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
 
 end:
@@ -4443,6 +4468,10 @@ int mkdir_relay(const char *path, uint64_t relayd_id)
 
        pthread_mutex_lock(&relayd->ctrl_sock_mutex);
        ret = relayd_mkdir(&relayd->control_sock, path);
+       if (ret < 0) {
+               ERR("Relayd mkdir failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+               lttng_consumer_cleanup_relayd(relayd);
+       }
        pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
 
 end:
This page took 0.030654 seconds and 5 git commands to generate.