Fix: perform relayd socket pair cleanup on control socket error
[lttng-tools.git] / src / common / consumer / consumer.c
index e182c991f147baf89bcb482da2ff9b4936d775b6..3556b789760eca81a4074193670d18a902f7607b 100644 (file)
@@ -47,6 +47,7 @@
 #include <common/consumer/consumer-stream.h>
 #include <common/consumer/consumer-testpoint.h>
 #include <common/align.h>
+#include <common/consumer/consumer-metadata-cache.h>
 
 struct lttng_consumer_global_data consumer_data = {
        .stream_count = 0,
@@ -319,6 +320,7 @@ static void free_relayd_rcu(struct rcu_head *head)
        (void) relayd_close(&relayd->control_sock);
        (void) relayd_close(&relayd->data_sock);
 
+       pthread_mutex_destroy(&relayd->ctrl_sock_mutex);
        free(relayd);
 }
 
@@ -457,14 +459,13 @@ static void update_endpoint_status_by_netidx(uint64_t net_seq_idx,
  * If a local data context is available, notify the threads that the streams'
  * state have changed.
  */
-static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd,
-               struct lttng_consumer_local_data *ctx)
+void lttng_consumer_cleanup_relayd(struct consumer_relayd_sock_pair *relayd)
 {
        uint64_t netidx;
 
        assert(relayd);
 
-       DBG("Cleaning up relayd sockets");
+       DBG("Cleaning up relayd object ID %"PRIu64, relayd->net_seq_idx);
 
        /* Save the net sequence index before destroying the object */
        netidx = relayd->net_seq_idx;
@@ -484,10 +485,8 @@ static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd,
         * memory barrier ordering the updates of the end point status from the
         * read of this status which happens AFTER receiving this notify.
         */
-       if (ctx) {
-               notify_thread_lttng_pipe(ctx->consumer_data_pipe);
-               notify_thread_lttng_pipe(ctx->consumer_metadata_pipe);
-       }
+       notify_thread_lttng_pipe(relayd->ctx->consumer_data_pipe);
+       notify_thread_lttng_pipe(relayd->ctx->consumer_metadata_pipe);
 }
 
 /*
@@ -570,7 +569,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
        stream->session_id = session_id;
        stream->monitor = monitor;
        stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE;
-       stream->index_fd = -1;
+       stream->index_file = NULL;
        stream->last_sequence_number = -1ULL;
        pthread_mutex_init(&stream->lock, NULL);
        pthread_mutex_init(&stream->metadata_timer_lock, NULL);
@@ -795,6 +794,8 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream,
                                stream->chan->tracefile_size, stream->chan->tracefile_count);
                pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                if (ret < 0) {
+                       ERR("Relayd add stream failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+                       lttng_consumer_cleanup_relayd(relayd);
                        goto end;
                }
 
@@ -836,6 +837,8 @@ int consumer_send_relayd_streams_sent(uint64_t net_seq_idx)
                ret = relayd_streams_sent(&relayd->control_sock);
                pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                if (ret < 0) {
+                       ERR("Relayd streams sent failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+                       lttng_consumer_cleanup_relayd(relayd);
                        goto end;
                }
        } else {
@@ -1070,7 +1073,7 @@ int consumer_add_channel(struct lttng_consumer_channel *channel,
  */
 static int update_poll_array(struct lttng_consumer_local_data *ctx,
                struct pollfd **pollfd, struct lttng_consumer_stream **local_stream,
-               struct lttng_ht *ht)
+               struct lttng_ht *ht, int *nb_inactive_fd)
 {
        int i = 0;
        struct lttng_ht_iter iter;
@@ -1082,6 +1085,7 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx,
        assert(local_stream);
 
        DBG("Updating poll fd array");
+       *nb_inactive_fd = 0;
        rcu_read_lock();
        cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
                /*
@@ -1092,9 +1096,14 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx,
                 * just after the check. However, this is OK since the stream(s) will
                 * be deleted once the thread is notified that the end point state has
                 * changed where this function will be called back again.
+                *
+                * We track the number of inactive FDs because they still need to be
+                * closed by the polling thread after a wakeup on the data_pipe or
+                * metadata_pipe.
                 */
                if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM ||
                                stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) {
+                       (*nb_inactive_fd)++;
                        continue;
                }
                /*
@@ -1530,7 +1539,6 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                mmap_base = stream->mmap_base;
                ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset);
                if (ret < 0) {
-                       ret = -errno;
                        PERROR("tracer ctl get_mmap_read_offset");
                        goto end;
                }
@@ -1625,21 +1633,16 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                        }
                        outfd = stream->out_fd;
 
-                       if (stream->index_fd >= 0) {
-                               ret = close(stream->index_fd);
-                               if (ret < 0) {
-                                       PERROR("Closing index");
-                                       goto end;
-                               }
-                               stream->index_fd = -1;
-                               ret = index_create_file(stream->chan->pathname,
+                       if (stream->index_file) {
+                               lttng_index_file_put(stream->index_file);
+                               stream->index_file = lttng_index_file_create(stream->chan->pathname,
                                                stream->name, stream->uid, stream->gid,
                                                stream->chan->tracefile_size,
-                                               stream->tracefile_count_current);
-                               if (ret < 0) {
+                                               stream->tracefile_count_current,
+                                               CTF_INDEX_MAJOR, CTF_INDEX_MINOR);
+                               if (!stream->index_file) {
                                        goto end;
                                }
-                               stream->index_fd = ret;
                        }
 
                        /* Reset current size because we just perform a rotation. */
@@ -1701,7 +1704,8 @@ write_error:
         * cleanup the relayd object and all associated streams.
         */
        if (relayd && relayd_hang_up) {
-               cleanup_relayd(relayd, ctx);
+               ERR("Relayd hangup. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+               lttng_consumer_cleanup_relayd(relayd);
        }
 
 end:
@@ -1832,22 +1836,16 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                        }
                        outfd = stream->out_fd;
 
-                       if (stream->index_fd >= 0) {
-                               ret = close(stream->index_fd);
-                               if (ret < 0) {
-                                       PERROR("Closing index");
-                                       goto end;
-                               }
-                               stream->index_fd = -1;
-                               ret = index_create_file(stream->chan->pathname,
+                       if (stream->index_file) {
+                               lttng_index_file_put(stream->index_file);
+                               stream->index_file = lttng_index_file_create(stream->chan->pathname,
                                                stream->name, stream->uid, stream->gid,
                                                stream->chan->tracefile_size,
-                                               stream->tracefile_count_current);
-                               if (ret < 0) {
-                                       written = ret;
+                                               stream->tracefile_count_current,
+                                               CTF_INDEX_MAJOR, CTF_INDEX_MINOR);
+                               if (!stream->index_file) {
                                        goto end;
                                }
-                               stream->index_fd = ret;
                        }
 
                        /* Reset current size because we just perform a rotation. */
@@ -1933,7 +1931,8 @@ write_error:
         * cleanup the relayd object and all associated streams.
         */
        if (relayd && relayd_hang_up) {
-               cleanup_relayd(relayd, ctx);
+               ERR("Relayd hangup. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+               lttng_consumer_cleanup_relayd(relayd);
                /* Skip splice error so the consumer does not fail */
                goto end;
        }
@@ -2064,6 +2063,10 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
        pthread_mutex_lock(&consumer_data.lock);
        pthread_mutex_lock(&stream->chan->lock);
        pthread_mutex_lock(&stream->lock);
+       if (stream->chan->metadata_cache) {
+               /* Only applicable to userspace consumers. */
+               pthread_mutex_lock(&stream->chan->metadata_cache->lock);
+       }
 
        /* Remove any reference to that stream. */
        consumer_stream_delete(stream, ht);
@@ -2087,6 +2090,9 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
         */
        stream->chan->metadata_stream = NULL;
 
+       if (stream->chan->metadata_cache) {
+               pthread_mutex_unlock(&stream->chan->metadata_cache->lock);
+       }
        pthread_mutex_unlock(&stream->lock);
        pthread_mutex_unlock(&stream->chan->lock);
        pthread_mutex_unlock(&consumer_data.lock);
@@ -2149,7 +2155,7 @@ int consumer_add_metadata_stream(struct lttng_consumer_stream *stream)
 
        lttng_ht_add_unique_u64(ht, &stream->node);
 
-       lttng_ht_add_unique_u64(consumer_data.stream_per_chan_id_ht,
+       lttng_ht_add_u64(consumer_data.stream_per_chan_id_ht,
                &stream->node_channel_id);
 
        /*
@@ -2374,7 +2380,7 @@ restart:
                                        len = ctx->on_buffer_ready(stream, ctx);
                                        /*
                                         * We don't check the return value here since if we get
-                                        * a negative len, it means an error occured thus we
+                                        * a negative len, it means an error occurred thus we
                                         * simply remove it from the poll set and free the
                                         * stream.
                                         */
@@ -2401,7 +2407,7 @@ restart:
                                                len = ctx->on_buffer_ready(stream, ctx);
                                                /*
                                                 * We don't check the return value here since if we get
-                                                * a negative len, it means an error occured thus we
+                                                * a negative len, it means an error occurred thus we
                                                 * simply remove it from the poll set and free the
                                                 * stream.
                                                 */
@@ -2453,6 +2459,8 @@ void *consumer_thread_data_poll(void *data)
        struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL;
        /* local view of consumer_data.fds_count */
        int nb_fd = 0;
+       /* Number of FDs with CONSUMER_ENDPOINT_INACTIVE but still open. */
+       int nb_inactive_fd = 0;
        struct lttng_consumer_local_data *ctx = data;
        ssize_t len;
 
@@ -2509,7 +2517,7 @@ void *consumer_thread_data_poll(void *data)
                                goto end;
                        }
                        ret = update_poll_array(ctx, &pollfd, local_stream,
-                                       data_ht);
+                                       data_ht, &nb_inactive_fd);
                        if (ret < 0) {
                                ERR("Error in allocating pollfd or local_outfds");
                                lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
@@ -2522,7 +2530,7 @@ void *consumer_thread_data_poll(void *data)
                pthread_mutex_unlock(&consumer_data.lock);
 
                /* No FDs and consumer_quit, consumer_cleanup the thread */
-               if (nb_fd == 0 && consumer_quit == 1) {
+               if (nb_fd == 0 && consumer_quit == 1 && nb_inactive_fd == 0) {
                        err = 0;        /* All is OK */
                        goto end;
                }
@@ -3504,6 +3512,7 @@ int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
         * Add relayd socket pair to consumer data hashtable. If object already
         * exists or on error, the function gracefully returns.
         */
+       relayd->ctx = ctx;
        add_relayd(relayd);
 
        /* All good! */
@@ -3632,6 +3641,8 @@ int consumer_data_pending(uint64_t id)
                pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                if (ret < 0) {
                        /* Communication error thus the relayd so no data pending. */
+                       ERR("Relayd begin data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+                       lttng_consumer_cleanup_relayd(relayd);
                        goto data_not_pending;
                }
        }
@@ -3673,6 +3684,13 @@ int consumer_data_pending(uint64_t id)
                                                stream->relayd_stream_id,
                                                stream->next_net_seq_num - 1);
                        }
+                       if (ret < 0) {
+                               ERR("Relayd data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+                               lttng_consumer_cleanup_relayd(relayd);
+                               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+                               pthread_mutex_unlock(&stream->lock);
+                               goto data_not_pending;
+                       }
                        pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                        if (ret == 1) {
                                pthread_mutex_unlock(&stream->lock);
@@ -3691,6 +3709,8 @@ int consumer_data_pending(uint64_t id)
                                relayd->relayd_session_id, &is_data_inflight);
                pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
                if (ret < 0) {
+                       ERR("Relayd end data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+                       lttng_consumer_cleanup_relayd(relayd);
                        goto data_not_pending;
                }
                if (is_data_inflight) {
This page took 0.032377 seconds and 5 git commands to generate.