Fix: implicit conversion of enum types in consumer
[lttng-tools.git] / src / common / consumer.c
index 044a504cec512443039e15eac186df07e841f896..152064078d34b354263a79992e93d965a5b738a4 100644 (file)
 #include <inttypes.h>
 #include <signal.h>
 
+#include <bin/lttng-consumerd/health-consumerd.h>
 #include <common/common.h>
 #include <common/utils.h>
 #include <common/compat/poll.h>
+#include <common/index/index.h>
 #include <common/kernel-ctl/kernel-ctl.h>
 #include <common/sessiond-comm/relayd.h>
 #include <common/sessiond-comm/sessiond-comm.h>
 #include <common/kernel-consumer/kernel-consumer.h>
 #include <common/relayd/relayd.h>
 #include <common/ust-consumer/ust-consumer.h>
+#include <common/consumer-timer.h>
 
 #include "consumer.h"
+#include "consumer-stream.h"
 
 struct lttng_consumer_global_data consumer_data = {
        .stream_count = 0,
@@ -90,21 +94,33 @@ static void notify_thread_lttng_pipe(struct lttng_pipe *pipe)
        (void) lttng_pipe_write(pipe, &null_stream, sizeof(null_stream));
 }
 
+static void notify_health_quit_pipe(int *pipe)
+{
+       ssize_t ret;
+
+       ret = lttng_write(pipe[1], "4", 1);
+       if (ret < 1) {
+               PERROR("write consumer health quit");
+       }
+}
+
 static void notify_channel_pipe(struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_channel *chan,
                uint64_t key,
                enum consumer_channel_action action)
 {
        struct consumer_channel_msg msg;
-       int ret;
+       ssize_t ret;
 
        memset(&msg, 0, sizeof(msg));
 
        msg.action = action;
        msg.chan = chan;
-       do {
-               ret = write(ctx->consumer_channel_pipe[1], &msg, sizeof(msg));
-       } while (ret < 0 && errno == EINTR);
+       msg.key = key;
+       ret = lttng_write(ctx->consumer_channel_pipe[1], &msg, sizeof(msg));
+       if (ret < sizeof(msg)) {
+               PERROR("notify_channel_pipe write error");
+       }
 }
 
 void notify_thread_del_channel(struct lttng_consumer_local_data *ctx,
@@ -119,17 +135,18 @@ static int read_channel_pipe(struct lttng_consumer_local_data *ctx,
                enum consumer_channel_action *action)
 {
        struct consumer_channel_msg msg;
-       int ret;
+       ssize_t ret;
 
-       do {
-               ret = read(ctx->consumer_channel_pipe[0], &msg, sizeof(msg));
-       } while (ret < 0 && errno == EINTR);
-       if (ret > 0) {
-               *action = msg.action;
-               *chan = msg.chan;
-               *key = msg.key;
+       ret = lttng_read(ctx->consumer_channel_pipe[0], &msg, sizeof(msg));
+       if (ret < sizeof(msg)) {
+               ret = -1;
+               goto error;
        }
-       return ret;
+       *action = msg.action;
+       *chan = msg.chan;
+       *key = msg.key;
+error:
+       return (int) ret;
 }
 
 /*
@@ -163,20 +180,20 @@ static struct lttng_consumer_stream *find_stream(uint64_t key,
        return stream;
 }
 
-static void steal_stream_key(int key, struct lttng_ht *ht)
+static void steal_stream_key(uint64_t key, struct lttng_ht *ht)
 {
        struct lttng_consumer_stream *stream;
 
        rcu_read_lock();
        stream = find_stream(key, ht);
        if (stream) {
-               stream->key = -1ULL;
+               stream->key = (uint64_t) -1ULL;
                /*
                 * We don't want the lookup to match, but we still need
                 * to iterate on this stream when iterating over the hash table. Just
                 * change the node key.
                 */
-               stream->node.key = -1ULL;
+               stream->node.key = (uint64_t) -1ULL;
        }
        rcu_read_unlock();
 }
@@ -253,10 +270,8 @@ static void free_relayd_rcu(struct rcu_head *head)
 
 /*
  * Destroy and free relayd socket pair object.
- *
- * This function MUST be called with the consumer_data lock acquired.
  */
-static void destroy_relayd(struct consumer_relayd_sock_pair *relayd)
+void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd)
 {
        int ret;
        struct lttng_ht_iter iter;
@@ -286,10 +301,27 @@ void consumer_del_channel(struct lttng_consumer_channel *channel)
 {
        int ret;
        struct lttng_ht_iter iter;
+       struct lttng_consumer_stream *stream, *stmp;
 
        DBG("Consumer delete channel key %" PRIu64, channel->key);
 
        pthread_mutex_lock(&consumer_data.lock);
+       pthread_mutex_lock(&channel->lock);
+
+       /* Delete streams that might have been left in the stream list. */
+       cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
+                       send_node) {
+               cds_list_del(&stream->send_node);
+               /*
+                * Once a stream is added to this list, the buffers were created so
+                * we have a guarantee that this call will succeed.
+                */
+               consumer_stream_destroy(stream, NULL);
+       }
+
+       if (channel->live_timer_enabled == 1) {
+               consumer_timer_live_stop(channel);
+       }
 
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
@@ -312,6 +344,7 @@ void consumer_del_channel(struct lttng_consumer_channel *channel)
 
        call_rcu(&channel->node.head, free_channel_rcu);
 end:
+       pthread_mutex_unlock(&channel->lock);
        pthread_mutex_unlock(&consumer_data.lock);
 }
 
@@ -328,7 +361,7 @@ static void cleanup_relayd_ht(void)
 
        cds_lfht_for_each_entry(consumer_data.relayd_ht->ht, &iter.iter, relayd,
                        node.node) {
-               destroy_relayd(relayd);
+               consumer_destroy_relayd(relayd);
        }
 
        rcu_read_unlock();
@@ -343,13 +376,13 @@ static void cleanup_relayd_ht(void)
  * It's atomically set without having the stream mutex locked which is fine
  * because we handle the write/read race with a pipe wakeup for each thread.
  */
-static void update_endpoint_status_by_netidx(int net_seq_idx,
+static void update_endpoint_status_by_netidx(uint64_t net_seq_idx,
                enum consumer_endpoint_status status)
 {
        struct lttng_ht_iter iter;
        struct lttng_consumer_stream *stream;
 
-       DBG("Consumer set delete flag on stream by idx %d", net_seq_idx);
+       DBG("Consumer set delete flag on stream by idx %" PRIu64, net_seq_idx);
 
        rcu_read_lock();
 
@@ -382,7 +415,7 @@ static void update_endpoint_status_by_netidx(int net_seq_idx,
 static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd,
                struct lttng_consumer_local_data *ctx)
 {
-       int netidx;
+       uint64_t netidx;
 
        assert(relayd);
 
@@ -395,7 +428,7 @@ static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd,
         * Delete the relayd from the relayd hash table, close the sockets and free
         * the object in a RCU call.
         */
-       destroy_relayd(relayd);
+       consumer_destroy_relayd(relayd);
 
        /* Set inactive endpoint to all streams */
        update_endpoint_status_by_netidx(netidx, CONSUMER_ENDPOINT_INACTIVE);
@@ -427,123 +460,33 @@ void consumer_flag_relayd_for_destroy(struct consumer_relayd_sock_pair *relayd)
 
        /* Destroy the relayd if refcount is 0 */
        if (uatomic_read(&relayd->refcount) == 0) {
-               destroy_relayd(relayd);
+               consumer_destroy_relayd(relayd);
        }
 }
 
 /*
- * Remove a stream from the global list protected by a mutex. This
- * function is also responsible for freeing its data structures.
+ * Completly destroy stream from every visiable data structure and the given
+ * hash table if one.
+ *
+ * One this call returns, the stream object is not longer usable nor visible.
  */
 void consumer_del_stream(struct lttng_consumer_stream *stream,
                struct lttng_ht *ht)
 {
-       int ret;
-       struct lttng_ht_iter iter;
-       struct lttng_consumer_channel *free_chan = NULL;
-       struct consumer_relayd_sock_pair *relayd;
-
-       assert(stream);
-
-       DBG("Consumer del stream %d", stream->wait_fd);
-
-       if (ht == NULL) {
-               /* Means the stream was allocated but not successfully added */
-               goto free_stream_rcu;
-       }
-
-       pthread_mutex_lock(&consumer_data.lock);
-       pthread_mutex_lock(&stream->lock);
-
-       switch (consumer_data.type) {
-       case LTTNG_CONSUMER_KERNEL:
-               if (stream->mmap_base != NULL) {
-                       ret = munmap(stream->mmap_base, stream->mmap_len);
-                       if (ret != 0) {
-                               PERROR("munmap");
-                       }
-               }
-               break;
-       case LTTNG_CONSUMER32_UST:
-       case LTTNG_CONSUMER64_UST:
-               lttng_ustconsumer_del_stream(stream);
-               break;
-       default:
-               ERR("Unknown consumer_data type");
-               assert(0);
-               goto end;
-       }
-
-       rcu_read_lock();
-       iter.iter.node = &stream->node.node;
-       ret = lttng_ht_del(ht, &iter);
-       assert(!ret);
-
-       iter.iter.node = &stream->node_channel_id.node;
-       ret = lttng_ht_del(consumer_data.stream_per_chan_id_ht, &iter);
-       assert(!ret);
-
-       iter.iter.node = &stream->node_session_id.node;
-       ret = lttng_ht_del(consumer_data.stream_list_ht, &iter);
-       assert(!ret);
-       rcu_read_unlock();
-
-       assert(consumer_data.stream_count > 0);
-       consumer_data.stream_count--;
-
-       if (stream->out_fd >= 0) {
-               ret = close(stream->out_fd);
-               if (ret) {
-                       PERROR("close");
-               }
-       }
-
-       /* Check and cleanup relayd */
-       rcu_read_lock();
-       relayd = consumer_find_relayd(stream->net_seq_idx);
-       if (relayd != NULL) {
-               uatomic_dec(&relayd->refcount);
-               assert(uatomic_read(&relayd->refcount) >= 0);
-
-               /* Closing streams requires to lock the control socket. */
-               pthread_mutex_lock(&relayd->ctrl_sock_mutex);
-               ret = relayd_send_close_stream(&relayd->control_sock,
-                               stream->relayd_stream_id,
-                               stream->next_net_seq_num - 1);
-               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
-               if (ret < 0) {
-                       DBG("Unable to close stream on the relayd. Continuing");
-                       /*
-                        * Continue here. There is nothing we can do for the relayd.
-                        * Chances are that the relayd has closed the socket so we just
-                        * continue cleaning up.
-                        */
-               }
-
-               /* Both conditions are met, we destroy the relayd. */
-               if (uatomic_read(&relayd->refcount) == 0 &&
-                               uatomic_read(&relayd->destroy_flag)) {
-                       destroy_relayd(relayd);
-               }
-       }
-       rcu_read_unlock();
-
-       if (!uatomic_sub_return(&stream->chan->refcount, 1)
-                       && !uatomic_read(&stream->chan->nb_init_stream_left)) {
-               free_chan = stream->chan;
-       }
-
-end:
-       consumer_data.need_update = 1;
-       pthread_mutex_unlock(&stream->lock);
-       pthread_mutex_unlock(&consumer_data.lock);
+       consumer_stream_destroy(stream, ht);
+}
 
-       if (free_chan) {
-               consumer_del_channel(free_chan);
-       }
+/*
+ * XXX naming of del vs destroy is all mixed up.
+ */
+void consumer_del_stream_for_data(struct lttng_consumer_stream *stream)
+{
+       consumer_stream_destroy(stream, data_ht);
+}
 
-free_stream_rcu:
-       call_rcu(&stream->node.head, free_stream_rcu);
+void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream)
+{
+       consumer_stream_destroy(stream, metadata_ht);
 }
 
 struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
@@ -552,11 +495,12 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
                const char *channel_name,
                uid_t uid,
                gid_t gid,
-               int relayd_id,
+               uint64_t relayd_id,
                uint64_t session_id,
                int cpu,
                int *alloc_ret,
-               enum consumer_channel_type type)
+               enum consumer_channel_type type,
+               unsigned int monitor)
 {
        int ret;
        struct lttng_consumer_stream *stream;
@@ -573,11 +517,15 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
        stream->key = stream_key;
        stream->out_fd = -1;
        stream->out_fd_offset = 0;
+       stream->output_written = 0;
        stream->state = state;
        stream->uid = uid;
        stream->gid = gid;
        stream->net_seq_idx = relayd_id;
        stream->session_id = session_id;
+       stream->monitor = monitor;
+       stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE;
+       stream->index_fd = -1;
        pthread_mutex_init(&stream->lock, NULL);
 
        /* If channel is the metadata, flag this stream as metadata. */
@@ -585,6 +533,9 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
                stream->metadata_flag = 1;
                /* Metadata is flat out. */
                strncpy(stream->name, DEFAULT_METADATA_NAME, sizeof(stream->name));
+               /* Live rendez-vous point. */
+               pthread_cond_init(&stream->metadata_rdv, NULL);
+               pthread_mutex_init(&stream->metadata_rdv_lock, NULL);
        } else {
                /* Format stream name to <channel_name>_<cpu_number> */
                ret = snprintf(stream->name, sizeof(stream->name), "%s_%d",
@@ -604,8 +555,10 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
        /* Init session id node with the stream session id */
        lttng_ht_node_init_u64(&stream->node_session_id, stream->session_id);
 
-       DBG3("Allocated stream %s (key %" PRIu64 ", chan_key %" PRIu64 " relayd_id %" PRIu64 ", session_id %" PRIu64,
-                       stream->name, stream->key, channel_key, stream->net_seq_idx, stream->session_id);
+       DBG3("Allocated stream %s (key %" PRIu64 ", chan_key %" PRIu64
+                       " relayd_id %" PRIu64 ", session_id %" PRIu64,
+                       stream->name, stream->key, channel_key,
+                       stream->net_seq_idx, stream->session_id);
 
        rcu_read_unlock();
        return stream;
@@ -623,11 +576,10 @@ end:
 /*
  * Add a stream to the global list protected by a mutex.
  */
-static int add_stream(struct lttng_consumer_stream *stream,
-               struct lttng_ht *ht)
+int consumer_add_data_stream(struct lttng_consumer_stream *stream)
 {
+       struct lttng_ht *ht = data_ht;
        int ret = 0;
-       struct consumer_relayd_sock_pair *relayd;
 
        assert(stream);
        assert(ht);
@@ -635,6 +587,8 @@ static int add_stream(struct lttng_consumer_stream *stream,
        DBG3("Adding consumer stream %" PRIu64, stream->key);
 
        pthread_mutex_lock(&consumer_data.lock);
+       pthread_mutex_lock(&stream->chan->lock);
+       pthread_mutex_lock(&stream->chan->timer_lock);
        pthread_mutex_lock(&stream->lock);
        rcu_read_lock();
 
@@ -653,12 +607,6 @@ static int add_stream(struct lttng_consumer_stream *stream,
         */
        lttng_ht_add_u64(consumer_data.stream_list_ht, &stream->node_session_id);
 
-       /* Check and cleanup relayd */
-       relayd = consumer_find_relayd(stream->net_seq_idx);
-       if (relayd != NULL) {
-               uatomic_inc(&relayd->refcount);
-       }
-
        /*
         * When nb_init_stream_left reaches 0, we don't need to trigger any action
         * in terms of destroying the associated channel, because the action that
@@ -678,11 +626,18 @@ static int add_stream(struct lttng_consumer_stream *stream,
 
        rcu_read_unlock();
        pthread_mutex_unlock(&stream->lock);
+       pthread_mutex_unlock(&stream->chan->timer_lock);
+       pthread_mutex_unlock(&stream->chan->lock);
        pthread_mutex_unlock(&consumer_data.lock);
 
        return ret;
 }
 
+void consumer_del_data_stream(struct lttng_consumer_stream *stream)
+{
+       consumer_del_stream(stream, data_ht);
+}
+
 /*
  * Add relayd socket to global consumer data hashtable. RCU read side lock MUST
  * be acquired before calling this.
@@ -711,12 +666,12 @@ end:
  * Allocate and return a consumer relayd socket.
  */
 struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(
-               int net_seq_idx)
+               uint64_t net_seq_idx)
 {
        struct consumer_relayd_sock_pair *obj = NULL;
 
-       /* Negative net sequence index is a failure */
-       if (net_seq_idx < 0) {
+       /* net sequence index of -1 is a failure */
+       if (net_seq_idx == (uint64_t) -1ULL) {
                goto error;
        }
 
@@ -767,6 +722,68 @@ error:
        return relayd;
 }
 
+/*
+ * Find a relayd and send the stream
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int consumer_send_relayd_stream(struct lttng_consumer_stream *stream,
+               char *path)
+{
+       int ret = 0;
+       struct consumer_relayd_sock_pair *relayd;
+
+       assert(stream);
+       assert(stream->net_seq_idx != -1ULL);
+       assert(path);
+
+       /* The stream is not metadata. Get relayd reference if exists. */
+       rcu_read_lock();
+       relayd = consumer_find_relayd(stream->net_seq_idx);
+       if (relayd != NULL) {
+               /* Add stream on the relayd */
+               pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+               ret = relayd_add_stream(&relayd->control_sock, stream->name,
+                               path, &stream->relayd_stream_id,
+                               stream->chan->tracefile_size, stream->chan->tracefile_count);
+               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+               if (ret < 0) {
+                       goto end;
+               }
+
+               uatomic_inc(&relayd->refcount);
+               stream->sent_to_relayd = 1;
+       } else {
+               ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't send it.",
+                               stream->key, stream->net_seq_idx);
+               ret = -1;
+               goto end;
+       }
+
+       DBG("Stream %s with key %" PRIu64 " sent to relayd id %" PRIu64,
+                       stream->name, stream->key, stream->net_seq_idx);
+
+end:
+       rcu_read_unlock();
+       return ret;
+}
+
+/*
+ * Find a relayd and close the stream
+ */
+void close_relayd_stream(struct lttng_consumer_stream *stream)
+{
+       struct consumer_relayd_sock_pair *relayd;
+
+       /* The stream is not metadata. Get relayd reference if exists. */
+       rcu_read_lock();
+       relayd = consumer_find_relayd(stream->net_seq_idx);
+       if (relayd) {
+               consumer_stream_relayd_close(stream, relayd);
+       }
+       rcu_read_unlock();
+}
+
 /*
  * Handle stream for relayd transmission if the stream applies for network
  * streaming where the net sequence index is set.
@@ -839,10 +856,13 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
                const char *name,
                uid_t uid,
                gid_t gid,
-               int relayd_id,
+               uint64_t relayd_id,
                enum lttng_event_output output,
                uint64_t tracefile_size,
-               uint64_t tracefile_count)
+               uint64_t tracefile_count,
+               uint64_t session_id_per_pid,
+               unsigned int monitor,
+               unsigned int live_timer_interval)
 {
        struct lttng_consumer_channel *channel;
 
@@ -855,12 +875,43 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
        channel->key = key;
        channel->refcount = 0;
        channel->session_id = session_id;
+       channel->session_id_per_pid = session_id_per_pid;
        channel->uid = uid;
        channel->gid = gid;
        channel->relayd_id = relayd_id;
-       channel->output = output;
        channel->tracefile_size = tracefile_size;
        channel->tracefile_count = tracefile_count;
+       channel->monitor = monitor;
+       channel->live_timer_interval = live_timer_interval;
+       pthread_mutex_init(&channel->lock, NULL);
+       pthread_mutex_init(&channel->timer_lock, NULL);
+
+       switch (output) {
+       case LTTNG_EVENT_SPLICE:
+               channel->output = CONSUMER_CHANNEL_SPLICE;
+               break;
+       case LTTNG_EVENT_MMAP:
+               channel->output = CONSUMER_CHANNEL_MMAP;
+               break;
+       default:
+               assert(0);
+               free(channel);
+               channel = NULL;
+               goto end;
+       }
+
+       /*
+        * In monitor mode, the streams associated with the channel will be put in
+        * a special list ONLY owned by this channel. So, the refcount is set to 1
+        * here meaning that the channel itself has streams that are referenced.
+        *
+        * On a channel deletion, once the channel is no longer visible, the
+        * refcount is decremented and checked for a zero value to delete it. With
+        * streams in no monitor mode, it will now be safe to destroy the channel.
+        */
+       if (!channel->monitor) {
+               channel->refcount = 1;
+       }
 
        strncpy(channel->pathname, pathname, sizeof(channel->pathname));
        channel->pathname[sizeof(channel->pathname) - 1] = '\0';
@@ -882,6 +933,8 @@ end:
 
 /*
  * Add a channel to the global list protected by a mutex.
+ *
+ * On success 0 is returned else a negative value.
  */
 int consumer_add_channel(struct lttng_consumer_channel *channel,
                struct lttng_consumer_local_data *ctx)
@@ -891,6 +944,8 @@ int consumer_add_channel(struct lttng_consumer_channel *channel,
        struct lttng_ht_iter iter;
 
        pthread_mutex_lock(&consumer_data.lock);
+       pthread_mutex_lock(&channel->lock);
+       pthread_mutex_lock(&channel->timer_lock);
        rcu_read_lock();
 
        lttng_ht_lookup(consumer_data.channel_ht, &channel->key, &iter);
@@ -899,7 +954,7 @@ int consumer_add_channel(struct lttng_consumer_channel *channel,
                /* Channel already exist. Ignore the insertion */
                ERR("Consumer add channel key %" PRIu64 " already exists!",
                        channel->key);
-               ret = -1;
+               ret = -EEXIST;
                goto end;
        }
 
@@ -907,10 +962,12 @@ int consumer_add_channel(struct lttng_consumer_channel *channel,
 
 end:
        rcu_read_unlock();
+       pthread_mutex_unlock(&channel->timer_lock);
+       pthread_mutex_unlock(&channel->lock);
        pthread_mutex_unlock(&consumer_data.lock);
 
        if (!ret && channel->wait_fd != -1 &&
-                       channel->metadata_stream == NULL) {
+                       channel->type == CONSUMER_CHANNEL_TYPE_DATA) {
                notify_channel_pipe(ctx, channel, -1, CONSUMER_CHANNEL_ADD);
        }
        return ret;
@@ -1073,12 +1130,11 @@ void lttng_consumer_cleanup(void)
  */
 void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
 {
-       int ret;
+       ssize_t ret;
+
        consumer_quit = 1;
-       do {
-               ret = write(ctx->consumer_should_quit[1], "4", 1);
-       } while (ret < 0 && errno == EINTR);
-       if (ret < 0 || ret != 1) {
+       ret = lttng_write(ctx->consumer_should_quit[1], "4", 1);
+       if (ret < 1) {
                PERROR("write consumer quit");
        }
 
@@ -1142,7 +1198,7 @@ struct lttng_consumer_local_data *lttng_consumer_create(
                        struct lttng_consumer_local_data *ctx),
                int (*recv_channel)(struct lttng_consumer_channel *channel),
                int (*recv_stream)(struct lttng_consumer_stream *stream),
-               int (*update_stream)(int stream_key, uint32_t state))
+               int (*update_stream)(uint64_t stream_key, uint32_t state))
 {
        int ret;
        struct lttng_consumer_local_data *ctx;
@@ -1159,6 +1215,7 @@ struct lttng_consumer_local_data *lttng_consumer_create(
 
        ctx->consumer_error_socket = -1;
        ctx->consumer_metadata_socket = -1;
+       pthread_mutex_init(&ctx->metadata_socket_lock, NULL);
        /* assign the callbacks */
        ctx->on_buffer_ready = buffer_ready;
        ctx->on_recv_channel = recv_channel;
@@ -1251,15 +1308,13 @@ static int write_relayd_metadata_id(int fd,
                struct lttng_consumer_stream *stream,
                struct consumer_relayd_sock_pair *relayd, unsigned long padding)
 {
-       int ret;
+       ssize_t ret;
        struct lttcomm_relayd_metadata_payload hdr;
 
        hdr.stream_id = htobe64(stream->relayd_stream_id);
        hdr.padding_size = htobe32(padding);
-       do {
-               ret = write(fd, (void *) &hdr, sizeof(hdr));
-       } while (ret < 0 && errno == EINTR);
-       if (ret < 0 || ret != sizeof(hdr)) {
+       ret = lttng_write(fd, (void *) &hdr, sizeof(hdr));
+       if (ret < sizeof(hdr)) {
                /*
                 * This error means that the fd's end is closed so ignore the perror
                 * not to clubber the error output since this can happen in a normal
@@ -1281,7 +1336,7 @@ static int write_relayd_metadata_id(int fd,
                        stream->relayd_stream_id, padding);
 
 end:
-       return ret;
+       return (int) ret;
 }
 
 /*
@@ -1298,7 +1353,8 @@ end:
 ssize_t lttng_consumer_on_read_subbuffer_mmap(
                struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_stream *stream, unsigned long len,
-               unsigned long padding)
+               unsigned long padding,
+               struct lttng_packet_index *index)
 {
        unsigned long mmap_offset;
        void *mmap_base;
@@ -1313,9 +1369,10 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
        rcu_read_lock();
 
        /* Flag that the current stream if set for network streaming. */
-       if (stream->net_seq_idx != -1) {
+       if (stream->net_seq_idx != (uint64_t) -1ULL) {
                relayd = consumer_find_relayd(stream->net_seq_idx);
                if (relayd == NULL) {
+                       ret = -EPIPE;
                        goto end;
                }
        }
@@ -1325,28 +1382,31 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
        case LTTNG_CONSUMER_KERNEL:
                mmap_base = stream->mmap_base;
                ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset);
+               if (ret != 0) {
+                       PERROR("tracer ctl get_mmap_read_offset");
+                       written = -errno;
+                       goto end;
+               }
                break;
        case LTTNG_CONSUMER32_UST:
        case LTTNG_CONSUMER64_UST:
                mmap_base = lttng_ustctl_get_mmap_base(stream);
                if (!mmap_base) {
                        ERR("read mmap get mmap base for stream %s", stream->name);
-                       written = -1;
+                       written = -EPERM;
                        goto end;
                }
                ret = lttng_ustctl_get_mmap_read_offset(stream, &mmap_offset);
-
+               if (ret != 0) {
+                       PERROR("tracer ctl get_mmap_read_offset");
+                       written = ret;
+                       goto end;
+               }
                break;
        default:
                ERR("Unknown consumer_data type");
                assert(0);
        }
-       if (ret != 0) {
-               errno = -ret;
-               PERROR("tracer ctl get_mmap_read_offset");
-               written = ret;
-               goto end;
-       }
 
        /* Handle stream on the relayd if the output is on the network */
        if (relayd) {
@@ -1401,24 +1461,40 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                        ret = utils_rotate_stream_file(stream->chan->pathname,
                                        stream->name, stream->chan->tracefile_size,
                                        stream->chan->tracefile_count, stream->uid, stream->gid,
-                                       stream->out_fd, &(stream->tracefile_count_current));
+                                       stream->out_fd, &(stream->tracefile_count_current),
+                                       &stream->out_fd);
                        if (ret < 0) {
                                ERR("Rotating output file");
                                goto end;
                        }
-                       outfd = stream->out_fd = ret;
+                       outfd = stream->out_fd;
+
+                       if (stream->index_fd >= 0) {
+                               ret = index_create_file(stream->chan->pathname,
+                                               stream->name, stream->uid, stream->gid,
+                                               stream->chan->tracefile_size,
+                                               stream->tracefile_count_current);
+                               if (ret < 0) {
+                                       goto end;
+                               }
+                               stream->index_fd = ret;
+                       }
+
                        /* Reset current size because we just perform a rotation. */
                        stream->tracefile_size_current = 0;
+                       stream->out_fd_offset = 0;
+                       orig_offset = 0;
                }
                stream->tracefile_size_current += len;
+               if (index) {
+                       index->offset = htobe64(stream->out_fd_offset);
+               }
        }
 
        while (len > 0) {
-               do {
-                       ret = write(outfd, mmap_base + mmap_offset, len);
-               } while (ret < 0 && errno == EINTR);
+               ret = lttng_write(outfd, mmap_base + mmap_offset, len);
                DBG("Consumer mmap write() ret %zd (len %lu)", ret, len);
-               if (ret < 0) {
+               if (ret < len) {
                        /*
                         * This is possible if the fd is closed on the other side (outfd)
                         * or any write problem. It can be verbose a bit for a normal
@@ -1427,7 +1503,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                         */
                        DBG("Error in file write mmap");
                        if (written == 0) {
-                               written = ret;
+                               written = -errno;
                        }
                        /* Socket operation failed. We consider the relayd dead */
                        if (errno == EPIPE || errno == EINVAL) {
@@ -1451,6 +1527,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                                        SYNC_FILE_RANGE_WRITE);
                        stream->out_fd_offset += ret;
                }
+               stream->output_written += ret;
                written += ret;
        }
        lttng_consumer_sync_trace_file(stream, orig_offset);
@@ -1484,7 +1561,8 @@ end:
 ssize_t lttng_consumer_on_read_subbuffer_splice(
                struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_stream *stream, unsigned long len,
-               unsigned long padding)
+               unsigned long padding,
+               struct lttng_packet_index *index)
 {
        ssize_t ret = 0, written = 0, ret_splice = 0;
        loff_t offset = 0;
@@ -1512,9 +1590,10 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
        rcu_read_lock();
 
        /* Flag that the current stream if set for network streaming. */
-       if (stream->net_seq_idx != -1) {
+       if (stream->net_seq_idx != (uint64_t) -1ULL) {
                relayd = consumer_find_relayd(stream->net_seq_idx);
                if (relayd == NULL) {
+                       ret = -EPIPE;
                        goto end;
                }
        }
@@ -1583,16 +1662,32 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                        ret = utils_rotate_stream_file(stream->chan->pathname,
                                        stream->name, stream->chan->tracefile_size,
                                        stream->chan->tracefile_count, stream->uid, stream->gid,
-                                       stream->out_fd, &(stream->tracefile_count_current));
+                                       stream->out_fd, &(stream->tracefile_count_current),
+                                       &stream->out_fd);
                        if (ret < 0) {
                                ERR("Rotating output file");
                                goto end;
                        }
-                       outfd = stream->out_fd = ret;
+                       outfd = stream->out_fd;
+
+                       if (stream->index_fd >= 0) {
+                               ret = index_create_file(stream->chan->pathname,
+                                               stream->name, stream->uid, stream->gid,
+                                               stream->chan->tracefile_size,
+                                               stream->tracefile_count_current);
+                               if (ret < 0) {
+                                       goto end;
+                               }
+                               stream->index_fd = ret;
+                       }
+
                        /* Reset current size because we just perform a rotation. */
                        stream->tracefile_size_current = 0;
+                       stream->out_fd_offset = 0;
+                       orig_offset = 0;
                }
                stream->tracefile_size_current += len;
+               index->offset = htobe64(stream->out_fd_offset);
        }
 
        while (len > 0) {
@@ -1661,6 +1756,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                                        SYNC_FILE_RANGE_WRITE);
                        stream->out_fd_offset += ret_splice;
                }
+               stream->output_written += ret_splice;
                written += ret_splice;
        }
        lttng_consumer_sync_trace_file(stream, orig_offset);
@@ -1866,6 +1962,7 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
        }
 
        pthread_mutex_lock(&consumer_data.lock);
+       pthread_mutex_lock(&stream->chan->lock);
        pthread_mutex_lock(&stream->lock);
 
        switch (consumer_data.type) {
@@ -1876,9 +1973,22 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
                                PERROR("munmap metadata stream");
                        }
                }
+               if (stream->wait_fd >= 0) {
+                       ret = close(stream->wait_fd);
+                       if (ret < 0) {
+                               PERROR("close kernel metadata wait_fd");
+                       }
+               }
                break;
        case LTTNG_CONSUMER32_UST:
        case LTTNG_CONSUMER64_UST:
+               if (stream->monitor) {
+                       /* close the write-side in close_metadata */
+                       ret = close(stream->ust_metadata_poll_pipe[0]);
+                       if (ret < 0) {
+                               PERROR("Close UST metadata read-side poll pipe");
+                       }
+               }
                lttng_ustconsumer_del_stream(stream);
                break;
        default:
@@ -1932,7 +2042,7 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
                /* Both conditions are met, we destroy the relayd. */
                if (uatomic_read(&relayd->refcount) == 0 &&
                                uatomic_read(&relayd->destroy_flag)) {
-                       destroy_relayd(relayd);
+                       consumer_destroy_relayd(relayd);
                }
        }
        rcu_read_unlock();
@@ -1945,7 +2055,15 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
        }
 
 end:
+       /*
+        * Nullify the stream reference so it is not used after deletion. The
+        * channel lock MUST be acquired before being able to check for
+        * a NULL pointer value.
+        */
+       stream->chan->metadata_stream = NULL;
+
        pthread_mutex_unlock(&stream->lock);
+       pthread_mutex_unlock(&stream->chan->lock);
        pthread_mutex_unlock(&consumer_data.lock);
 
        if (free_chan) {
@@ -1960,11 +2078,10 @@ free_stream_rcu:
  * Action done with the metadata stream when adding it to the consumer internal
  * data structures to handle it.
  */
-static int add_metadata_stream(struct lttng_consumer_stream *stream,
-               struct lttng_ht *ht)
+int consumer_add_metadata_stream(struct lttng_consumer_stream *stream)
 {
+       struct lttng_ht *ht = metadata_ht;
        int ret = 0;
-       struct consumer_relayd_sock_pair *relayd;
        struct lttng_ht_iter iter;
        struct lttng_ht_node_u64 *node;
 
@@ -1974,6 +2091,8 @@ static int add_metadata_stream(struct lttng_consumer_stream *stream,
        DBG3("Adding metadata stream %" PRIu64 " to hash table", stream->key);
 
        pthread_mutex_lock(&consumer_data.lock);
+       pthread_mutex_lock(&stream->chan->lock);
+       pthread_mutex_lock(&stream->chan->timer_lock);
        pthread_mutex_lock(&stream->lock);
 
        /*
@@ -1991,15 +2110,6 @@ static int add_metadata_stream(struct lttng_consumer_stream *stream,
        node = lttng_ht_iter_get_node_u64(&iter);
        assert(!node);
 
-       /* Find relayd and, if one is found, increment refcount. */
-       relayd = consumer_find_relayd(stream->net_seq_idx);
-       if (relayd != NULL) {
-               uatomic_inc(&relayd->refcount);
-       }
-
-       /* Update channel refcount once added without error(s). */
-       uatomic_inc(&stream->chan->refcount);
-
        /*
         * When nb_init_stream_left reaches 0, we don't need to trigger any action
         * in terms of destroying the associated channel, because the action that
@@ -2028,6 +2138,8 @@ static int add_metadata_stream(struct lttng_consumer_stream *stream,
        rcu_read_unlock();
 
        pthread_mutex_unlock(&stream->lock);
+       pthread_mutex_unlock(&stream->chan->lock);
+       pthread_mutex_unlock(&stream->chan->timer_lock);
        pthread_mutex_unlock(&consumer_data.lock);
        return ret;
 }
@@ -2091,7 +2203,7 @@ static void validate_endpoint_status_metadata_stream(
  */
 void *consumer_thread_metadata_poll(void *data)
 {
-       int ret, i, pollfd;
+       int ret, i, pollfd, err = -1;
        uint32_t revents, nb_fd;
        struct lttng_consumer_stream *stream = NULL;
        struct lttng_ht_iter iter;
@@ -2102,6 +2214,10 @@ void *consumer_thread_metadata_poll(void *data)
 
        rcu_register_thread();
 
+       health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA);
+
+       health_code_update();
+
        metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
        if (!metadata_ht) {
                /* ENOMEM at this point. Better to bail out. */
@@ -2127,14 +2243,19 @@ void *consumer_thread_metadata_poll(void *data)
        DBG("Metadata main loop started");
 
        while (1) {
+               health_code_update();
+
                /* Only the metadata pipe is set */
                if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) {
+                       err = 0;        /* All is OK */
                        goto end;
                }
 
 restart:
                DBG("Metadata poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
+               health_poll_entry();
                ret = lttng_poll_wait(&events, -1);
+               health_poll_exit();
                DBG("Metadata event catched in thread");
                if (ret < 0) {
                        if (errno == EINTR) {
@@ -2148,14 +2269,11 @@ restart:
 
                /* From here, the event is a metadata wait fd */
                for (i = 0; i < nb_fd; i++) {
+                       health_code_update();
+
                        revents = LTTNG_POLL_GETEV(&events, i);
                        pollfd = LTTNG_POLL_GETFD(&events, i);
 
-                       /* Just don't waste time if no returned events for the fd */
-                       if (!revents) {
-                               continue;
-                       }
-
                        if (pollfd == lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)) {
                                if (revents & (LPOLLERR | LPOLLHUP )) {
                                        DBG("Metadata thread pipe hung up");
@@ -2172,8 +2290,8 @@ restart:
 
                                        pipe_len = lttng_pipe_read(ctx->consumer_metadata_pipe,
                                                        &stream, sizeof(stream));
-                                       if (pipe_len < 0) {
-                                               ERR("read metadata stream, ret: %ld", pipe_len);
+                                       if (pipe_len < sizeof(stream)) {
+                                               PERROR("read metadata stream");
                                                /*
                                                 * Continue here to handle the rest of the streams.
                                                 */
@@ -2190,14 +2308,6 @@ restart:
                                        DBG("Adding metadata stream %d to poll set",
                                                        stream->wait_fd);
 
-                                       ret = add_metadata_stream(stream, metadata_ht);
-                                       if (ret) {
-                                               ERR("Unable to add metadata stream");
-                                               /* Stream was not setup properly. Continuing. */
-                                               consumer_del_metadata_stream(stream, NULL);
-                                               continue;
-                                       }
-
                                        /* Add metadata stream to the global poll events list */
                                        lttng_poll_add(&events, stream->wait_fd,
                                                        LPOLLIN | LPOLLPRI);
@@ -2230,6 +2340,8 @@ restart:
 
                                        /* We just flushed the stream now read it. */
                                        do {
+                                               health_code_update();
+
                                                len = ctx->on_buffer_ready(stream, ctx);
                                                /*
                                                 * We don't check the return value here since if we get
@@ -2251,14 +2363,23 @@ restart:
                                DBG("Metadata available on fd %d", pollfd);
                                assert(stream->wait_fd == pollfd);
 
-                               len = ctx->on_buffer_ready(stream, ctx);
+                               do {
+                                       health_code_update();
+
+                                       len = ctx->on_buffer_ready(stream, ctx);
+                                       /*
+                                        * We don't check the return value here since if we get
+                                        * a negative len, it means an error occured thus we
+                                        * simply remove it from the poll set and free the
+                                        * stream.
+                                        */
+                               } while (len > 0);
+
                                /* It's ok to have an unavailable sub-buffer */
                                if (len < 0 && len != -EAGAIN && len != -ENODATA) {
                                        /* Clean up stream from consumer and free it. */
                                        lttng_poll_del(&events, stream->wait_fd);
                                        consumer_del_metadata_stream(stream, metadata_ht);
-                               } else if (len > 0) {
-                                       stream->data_read = 1;
                                }
                        }
 
@@ -2267,6 +2388,8 @@ restart:
                }
        }
 
+       /* All is OK */
+       err = 0;
 error:
 end:
        DBG("Metadata poll thread exiting");
@@ -2275,6 +2398,11 @@ end:
 end_poll:
        destroy_stream_ht(metadata_ht);
 end_ht:
+       if (err) {
+               health_error();
+               ERR("Health error occurred in %s", __func__);
+       }
+       health_unregister(health_consumerd);
        rcu_unregister_thread();
        return NULL;
 }
@@ -2285,7 +2413,7 @@ end_ht:
  */
 void *consumer_thread_data_poll(void *data)
 {
-       int num_rdy, num_hup, high_prio, ret, i;
+       int num_rdy, num_hup, high_prio, ret, i, err = -1;
        struct pollfd *pollfd = NULL;
        /* local view of the streams */
        struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL;
@@ -2296,15 +2424,25 @@ void *consumer_thread_data_poll(void *data)
 
        rcu_register_thread();
 
+       health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_DATA);
+
+       health_code_update();
+
        data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
        if (data_ht == NULL) {
                /* ENOMEM at this point. Better to bail out. */
                goto end;
        }
 
-       local_stream = zmalloc(sizeof(struct lttng_consumer_stream));
+       local_stream = zmalloc(sizeof(struct lttng_consumer_stream *));
+       if (local_stream == NULL) {
+               PERROR("local_stream malloc");
+               goto end;
+       }
 
        while (1) {
+               health_code_update();
+
                high_prio = 0;
                num_hup = 0;
 
@@ -2330,7 +2468,7 @@ void *consumer_thread_data_poll(void *data)
 
                        /* allocate for all fds + 1 for the consumer_data_pipe */
                        local_stream = zmalloc((consumer_data.stream_count + 1) *
-                                       sizeof(struct lttng_consumer_stream));
+                                       sizeof(struct lttng_consumer_stream *));
                        if (local_stream == NULL) {
                                PERROR("local_stream malloc");
                                pthread_mutex_unlock(&consumer_data.lock);
@@ -2351,12 +2489,15 @@ void *consumer_thread_data_poll(void *data)
 
                /* No FDs and consumer_quit, consumer_cleanup the thread */
                if (nb_fd == 0 && consumer_quit == 1) {
+                       err = 0;        /* All is OK */
                        goto end;
                }
                /* poll on the array of fds */
        restart:
                DBG("polling on %d fd", nb_fd + 1);
+               health_poll_entry();
                num_rdy = poll(pollfd, nb_fd + 1, -1);
+               health_poll_exit();
                DBG("poll num_rdy : %d", num_rdy);
                if (num_rdy == -1) {
                        /*
@@ -2384,8 +2525,8 @@ void *consumer_thread_data_poll(void *data)
                        DBG("consumer_data_pipe wake up");
                        pipe_readlen = lttng_pipe_read(ctx->consumer_data_pipe,
                                        &new_stream, sizeof(new_stream));
-                       if (pipe_readlen < 0) {
-                               ERR("Consumer data pipe ret %ld", pipe_readlen);
+                       if (pipe_readlen < sizeof(new_stream)) {
+                               PERROR("Consumer data pipe");
                                /* Continue so we can at least handle the current stream(s). */
                                continue;
                        }
@@ -2400,23 +2541,14 @@ void *consumer_thread_data_poll(void *data)
                                continue;
                        }
 
-                       ret = add_stream(new_stream, data_ht);
-                       if (ret) {
-                               ERR("Consumer add stream %" PRIu64 " failed. Continuing",
-                                               new_stream->key);
-                               /*
-                                * At this point, if the add_stream fails, it is not in the
-                                * hash table thus passing the NULL value here.
-                                */
-                               consumer_del_stream(new_stream, NULL);
-                       }
-
                        /* Continue to update the local streams and handle prio ones */
                        continue;
                }
 
                /* Take care of high priority channels first. */
                for (i = 0; i < nb_fd; i++) {
+                       health_code_update();
+
                        if (local_stream[i] == NULL) {
                                continue;
                        }
@@ -2445,6 +2577,8 @@ void *consumer_thread_data_poll(void *data)
 
                /* Take care of low priority channels. */
                for (i = 0; i < nb_fd; i++) {
+                       health_code_update();
+
                        if (local_stream[i] == NULL) {
                                continue;
                        }
@@ -2465,6 +2599,8 @@ void *consumer_thread_data_poll(void *data)
 
                /* Handle hangup and errors */
                for (i = 0; i < nb_fd; i++) {
+                       health_code_update();
+
                        if (local_stream[i] == NULL) {
                                continue;
                        }
@@ -2510,6 +2646,8 @@ void *consumer_thread_data_poll(void *data)
                        }
                }
        }
+       /* All is OK */
+       err = 0;
 end:
        DBG("polling thread exiting");
        free(pollfd);
@@ -2527,6 +2665,12 @@ end:
 
        destroy_data_stream_ht(data_ht);
 
+       if (err) {
+               health_error();
+               ERR("Health error occurred in %s", __func__);
+       }
+       health_unregister(health_consumerd);
+
        rcu_unregister_thread();
        return NULL;
 }
@@ -2608,7 +2752,7 @@ static void destroy_channel_ht(struct lttng_ht *ht)
  */
 void *consumer_thread_channel_poll(void *data)
 {
-       int ret, i, pollfd;
+       int ret, i, pollfd, err = -1;
        uint32_t revents, nb_fd;
        struct lttng_consumer_channel *chan = NULL;
        struct lttng_ht_iter iter;
@@ -2619,6 +2763,10 @@ void *consumer_thread_channel_poll(void *data)
 
        rcu_register_thread();
 
+       health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_CHANNEL);
+
+       health_code_update();
+
        channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
        if (!channel_ht) {
                /* ENOMEM at this point. Better to bail out. */
@@ -2643,14 +2791,19 @@ void *consumer_thread_channel_poll(void *data)
        DBG("Channel main loop started");
 
        while (1) {
+               health_code_update();
+
                /* Only the channel pipe is set */
                if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) {
+                       err = 0;        /* All is OK */
                        goto end;
                }
 
 restart:
                DBG("Channel poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
+               health_poll_entry();
                ret = lttng_poll_wait(&events, -1);
+               health_poll_exit();
                DBG("Channel event catched in thread");
                if (ret < 0) {
                        if (errno == EINTR) {
@@ -2664,6 +2817,8 @@ restart:
 
                /* From here, the event is a channel wait fd */
                for (i = 0; i < nb_fd; i++) {
+                       health_code_update();
+
                        revents = LTTNG_POLL_GETEV(&events, i);
                        pollfd = LTTNG_POLL_GETFD(&events, i);
 
@@ -2697,24 +2852,53 @@ restart:
 
                                                lttng_ht_node_init_u64(&chan->wait_fd_node,
                                                        chan->wait_fd);
+                                               rcu_read_lock();
                                                lttng_ht_add_unique_u64(channel_ht,
                                                                &chan->wait_fd_node);
+                                               rcu_read_unlock();
                                                /* Add channel to the global poll events list */
                                                lttng_poll_add(&events, chan->wait_fd,
                                                                LPOLLIN | LPOLLPRI);
                                                break;
                                        case CONSUMER_CHANNEL_DEL:
                                        {
+                                               struct lttng_consumer_stream *stream, *stmp;
+
+                                               rcu_read_lock();
                                                chan = consumer_find_channel(key);
                                                if (!chan) {
+                                                       rcu_read_unlock();
                                                        ERR("UST consumer get channel key %" PRIu64 " not found for del channel", key);
                                                        break;
                                                }
                                                lttng_poll_del(&events, chan->wait_fd);
+                                               iter.iter.node = &chan->wait_fd_node.node;
                                                ret = lttng_ht_del(channel_ht, &iter);
                                                assert(ret == 0);
                                                consumer_close_channel_streams(chan);
 
+                                               switch (consumer_data.type) {
+                                               case LTTNG_CONSUMER_KERNEL:
+                                                       break;
+                                               case LTTNG_CONSUMER32_UST:
+                                               case LTTNG_CONSUMER64_UST:
+                                                       /* Delete streams that might have been left in the stream list. */
+                                                       cds_list_for_each_entry_safe(stream, stmp, &chan->streams.head,
+                                                                       send_node) {
+                                                               health_code_update();
+
+                                                               cds_list_del(&stream->send_node);
+                                                               lttng_ustconsumer_del_stream(stream);
+                                                               uatomic_sub(&stream->chan->refcount, 1);
+                                                               assert(&chan->refcount);
+                                                               free(stream);
+                                                       }
+                                                       break;
+                                               default:
+                                                       ERR("Unknown consumer_data type");
+                                                       assert(0);
+                                               }
+
                                                /*
                                                 * Release our own refcount. Force channel deletion even if
                                                 * streams were not initialized.
@@ -2722,6 +2906,7 @@ restart:
                                                if (!uatomic_sub_return(&chan->refcount, 1)) {
                                                        consumer_del_channel(chan);
                                                }
+                                               rcu_read_unlock();
                                                goto restart;
                                        }
                                        case CONSUMER_CHANNEL_QUIT:
@@ -2774,12 +2959,19 @@ restart:
                }
        }
 
+       /* All is OK */
+       err = 0;
 end:
        lttng_poll_clean(&events);
 end_poll:
        destroy_channel_ht(channel_ht);
 end_ht:
        DBG("Channel poll thread exiting");
+       if (err) {
+               health_error();
+               ERR("Health error occurred in %s", __func__);
+       }
+       health_unregister(health_consumerd);
        rcu_unregister_thread();
        return NULL;
 }
@@ -2817,7 +3009,7 @@ error:
  */
 void *consumer_thread_sessiond_poll(void *data)
 {
-       int sock = -1, client_socket, ret;
+       int sock = -1, client_socket, ret, err = -1;
        /*
         * structure to poll for incoming data on communication socket avoids
         * making blocking sockets.
@@ -2827,6 +3019,10 @@ void *consumer_thread_sessiond_poll(void *data)
 
        rcu_register_thread();
 
+       health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_SESSIOND);
+
+       health_code_update();
+
        DBG("Creating command socket %s", ctx->consumer_command_sock_path);
        unlink(ctx->consumer_command_sock_path);
        client_socket = lttcomm_create_unix_sock(ctx->consumer_command_sock_path);
@@ -2848,12 +3044,6 @@ void *consumer_thread_sessiond_poll(void *data)
                goto end;
        }
 
-       ret = fcntl(client_socket, F_SETFL, O_NONBLOCK);
-       if (ret < 0) {
-               PERROR("fcntl O_NONBLOCK");
-               goto end;
-       }
-
        /* prepare the FDs to poll : to client socket and the should_quit pipe */
        consumer_sockpoll[0].fd = ctx->consumer_should_quit[0];
        consumer_sockpoll[0].events = POLLIN | POLLPRI;
@@ -2871,11 +3061,6 @@ void *consumer_thread_sessiond_poll(void *data)
                WARN("On accept");
                goto end;
        }
-       ret = fcntl(sock, F_SETFL, O_NONBLOCK);
-       if (ret < 0) {
-               PERROR("fcntl O_NONBLOCK");
-               goto end;
-       }
 
        /*
         * Setup metadata socket which is the second socket connection on the
@@ -2898,7 +3083,12 @@ void *consumer_thread_sessiond_poll(void *data)
        consumer_sockpoll[1].events = POLLIN | POLLPRI;
 
        while (1) {
-               if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+               health_code_update();
+
+               health_poll_entry();
+               ret = lttng_consumer_poll_socket(consumer_sockpoll);
+               health_poll_exit();
+               if (ret < 0) {
                        goto end;
                }
                DBG("Incoming command on sock");
@@ -2913,14 +3103,19 @@ void *consumer_thread_sessiond_poll(void *data)
                         * ERR() here.
                         */
                        DBG("Communication interrupted on command socket");
+                       err = 0;
                        goto end;
                }
                if (consumer_quit) {
                        DBG("consumer_thread_receive_fds received quit from signal");
+                       err = 0;        /* All is OK */
                        goto end;
                }
                DBG("received command on sock");
        }
+       /* All is OK */
+       err = 0;
+
 end:
        DBG("Consumer thread sessiond poll exiting");
 
@@ -2946,6 +3141,8 @@ end:
 
        notify_channel_pipe(ctx, NULL, -1, CONSUMER_CHANNEL_QUIT);
 
+       notify_health_quit_pipe(health_quit_pipe);
+
        /* Cleaning up possibly open sockets. */
        if (sock >= 0) {
                ret = close(sock);
@@ -2954,12 +3151,18 @@ end:
                }
        }
        if (client_socket >= 0) {
-               ret = close(sock);
+               ret = close(client_socket);
                if (ret < 0) {
                        PERROR("close client_socket sessiond poll");
                }
        }
 
+       if (err) {
+               health_error();
+               ERR("Health error occurred in %s", __func__);
+       }
+       health_unregister(health_consumerd);
+
        rcu_unregister_thread();
        return NULL;
 }
@@ -2970,6 +3173,9 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
        ssize_t ret;
 
        pthread_mutex_lock(&stream->lock);
+       if (stream->metadata_flag) {
+               pthread_mutex_lock(&stream->metadata_rdv_lock);
+       }
 
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
@@ -2986,6 +3192,10 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
                break;
        }
 
+       if (stream->metadata_flag) {
+               pthread_cond_broadcast(&stream->metadata_rdv);
+               pthread_mutex_unlock(&stream->metadata_rdv_lock);
+       }
        pthread_mutex_unlock(&stream->lock);
        return ret;
 }
@@ -3022,60 +3232,80 @@ void lttng_consumer_init(void)
  * This will create a relayd socket pair and add it to the relayd hash table.
  * The caller MUST acquire a RCU read side lock before calling it.
  */
-int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
+int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
                struct lttng_consumer_local_data *ctx, int sock,
                struct pollfd *consumer_sockpoll,
-               struct lttcomm_relayd_sock *relayd_sock, unsigned int sessiond_id)
+               struct lttcomm_relayd_sock *relayd_sock, uint64_t sessiond_id,
+               uint64_t relayd_session_id)
 {
        int fd = -1, ret = -1, relayd_created = 0;
-       enum lttng_error_code ret_code = LTTNG_OK;
+       enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
        struct consumer_relayd_sock_pair *relayd = NULL;
 
        assert(ctx);
        assert(relayd_sock);
 
-       DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx);
-
-       /* First send a status message before receiving the fds. */
-       ret = consumer_send_status_msg(sock, ret_code);
-       if (ret < 0) {
-               /* Somehow, the session daemon is not responding anymore. */
-               goto error;
-       }
+       DBG("Consumer adding relayd socket (idx: %" PRIu64 ")", net_seq_idx);
 
        /* Get relayd reference if exists. */
        relayd = consumer_find_relayd(net_seq_idx);
        if (relayd == NULL) {
+               assert(sock_type == LTTNG_STREAM_CONTROL);
                /* Not found. Allocate one. */
                relayd = consumer_allocate_relayd_sock_pair(net_seq_idx);
                if (relayd == NULL) {
-                       lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
-                       ret = -1;
+                       ret = -ENOMEM;
+                       ret_code = LTTCOMM_CONSUMERD_ENOMEM;
                        goto error;
+               } else {
+                       relayd->sessiond_session_id = sessiond_id;
+                       relayd_created = 1;
                }
-               relayd->sessiond_session_id = (uint64_t) sessiond_id;
-               relayd_created = 1;
+
+               /*
+                * This code path MUST continue to the consumer send status message to
+                * we can notify the session daemon and continue our work without
+                * killing everything.
+                */
+       } else {
+               /*
+                * relayd key should never be found for control socket.
+                */
+               assert(sock_type != LTTNG_STREAM_CONTROL);
+       }
+
+       /* First send a status message before receiving the fds. */
+       ret = consumer_send_status_msg(sock, LTTCOMM_CONSUMERD_SUCCESS);
+       if (ret < 0) {
+               /* Somehow, the session daemon is not responding anymore. */
+               lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
+               goto error_nosignal;
        }
 
        /* Poll on consumer socket. */
        if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+               lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
                ret = -EINTR;
-               goto error;
+               goto error_nosignal;
        }
 
        /* Get relayd socket from session daemon */
        ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
        if (ret != sizeof(fd)) {
-               lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
                ret = -1;
                fd = -1;        /* Just in case it gets set with an invalid value. */
-               goto error_close;
-       }
 
-       /* We have the fds without error. Send status back. */
-       ret = consumer_send_status_msg(sock, ret_code);
-       if (ret < 0) {
-               /* Somehow, the session daemon is not responding anymore. */
+               /*
+                * Failing to receive FDs might indicate a major problem such as
+                * reaching a fd limit during the receive where the kernel returns a
+                * MSG_CTRUNC and fails to cleanup the fd in the queue. Any case, we
+                * don't take any chances and stop everything.
+                *
+                * XXX: Feature request #558 will fix that and avoid this possible
+                * issue when reaching the fd limit.
+                */
+               lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
+               ret_code = LTTCOMM_CONSUMERD_ERROR_RECV_FD;
                goto error;
        }
 
@@ -3085,64 +3315,51 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
                /* Copy received lttcomm socket */
                lttcomm_copy_sock(&relayd->control_sock.sock, &relayd_sock->sock);
                ret = lttcomm_create_sock(&relayd->control_sock.sock);
-               /* Immediately try to close the created socket if valid. */
-               if (relayd->control_sock.sock.fd >= 0) {
-                       if (close(relayd->control_sock.sock.fd)) {
-                               PERROR("close relayd control socket");
-                       }
-               }
                /* Handle create_sock error. */
                if (ret < 0) {
+                       ret_code = LTTCOMM_CONSUMERD_ENOMEM;
                        goto error;
                }
+               /*
+                * Close the socket created internally by
+                * lttcomm_create_sock, so we can replace it by the one
+                * received from sessiond.
+                */
+               if (close(relayd->control_sock.sock.fd)) {
+                       PERROR("close");
+               }
 
                /* Assign new file descriptor */
                relayd->control_sock.sock.fd = fd;
+               fd = -1;        /* For error path */
                /* Assign version values. */
                relayd->control_sock.major = relayd_sock->major;
                relayd->control_sock.minor = relayd_sock->minor;
 
-               /*
-                * Create a session on the relayd and store the returned id. Lock the
-                * control socket mutex if the relayd was NOT created before.
-                */
-               if (!relayd_created) {
-                       pthread_mutex_lock(&relayd->ctrl_sock_mutex);
-               }
-               ret = relayd_create_session(&relayd->control_sock,
-                               &relayd->relayd_session_id);
-               if (!relayd_created) {
-                       pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
-               }
-               if (ret < 0) {
-                       /*
-                        * Close all sockets of a relayd object. It will be freed if it was
-                        * created at the error code path or else it will be garbage
-                        * collect.
-                        */
-                       (void) relayd_close(&relayd->control_sock);
-                       (void) relayd_close(&relayd->data_sock);
-                       goto error;
-               }
+               relayd->relayd_session_id = relayd_session_id;
 
                break;
        case LTTNG_STREAM_DATA:
                /* Copy received lttcomm socket */
                lttcomm_copy_sock(&relayd->data_sock.sock, &relayd_sock->sock);
                ret = lttcomm_create_sock(&relayd->data_sock.sock);
-               /* Immediately try to close the created socket if valid. */
-               if (relayd->data_sock.sock.fd >= 0) {
-                       if (close(relayd->data_sock.sock.fd)) {
-                               PERROR("close relayd data socket");
-                       }
-               }
                /* Handle create_sock error. */
                if (ret < 0) {
+                       ret_code = LTTCOMM_CONSUMERD_ENOMEM;
                        goto error;
                }
+               /*
+                * Close the socket created internally by
+                * lttcomm_create_sock, so we can replace it by the one
+                * received from sessiond.
+                */
+               if (close(relayd->data_sock.sock.fd)) {
+                       PERROR("close");
+               }
 
                /* Assign new file descriptor */
                relayd->data_sock.sock.fd = fd;
+               fd = -1;        /* for eventual error paths */
                /* Assign version values. */
                relayd->data_sock.major = relayd_sock->major;
                relayd->data_sock.minor = relayd_sock->minor;
@@ -3150,6 +3367,7 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
        default:
                ERR("Unknown relayd socket type (%d)", sock_type);
                ret = -1;
+               ret_code = LTTCOMM_CONSUMERD_FATAL;
                goto error;
        }
 
@@ -3157,6 +3375,14 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
                        sock_type == LTTNG_STREAM_CONTROL ? "control" : "data",
                        relayd->net_seq_idx, fd);
 
+       /* We successfully added the socket. Send status back. */
+       ret = consumer_send_status_msg(sock, ret_code);
+       if (ret < 0) {
+               /* Somehow, the session daemon is not responding anymore. */
+               lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
+               goto error_nosignal;
+       }
+
        /*
         * Add relayd socket pair to consumer data hashtable. If object already
         * exists or on error, the function gracefully returns.
@@ -3167,6 +3393,11 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
        return 0;
 
 error:
+       if (consumer_send_status_msg(sock, ret_code) < 0) {
+               lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
+       }
+
+error_nosignal:
        /* Close received socket if valid. */
        if (fd >= 0) {
                if (close(fd)) {
@@ -3174,7 +3405,6 @@ error:
                }
        }
 
-error_close:
        if (relayd_created) {
                free(relayd);
        }
@@ -3307,6 +3537,15 @@ int consumer_data_pending(uint64_t id)
                 */
                ret = cds_lfht_is_node_deleted(&stream->node.node);
                if (!ret) {
+                       /*
+                        * An empty output file is not valid. We need at least one packet
+                        * generated per stream, even if it contains no event, so it
+                        * contains at least one packet header.
+                        */
+                       if (stream->output_written == 0) {
+                               pthread_mutex_unlock(&stream->lock);
+                               goto data_pending;
+                       }
                        /* Check the stream if there is data in the buffers. */
                        ret = data_pending(stream);
                        if (ret == 1) {
@@ -3397,12 +3636,32 @@ int consumer_send_status_channel(int sock,
        assert(sock >= 0);
 
        if (!channel) {
-               msg.ret_code = -LTTNG_ERR_UST_CHAN_FAIL;
+               msg.ret_code = LTTCOMM_CONSUMERD_CHANNEL_FAIL;
        } else {
-               msg.ret_code = LTTNG_OK;
+               msg.ret_code = LTTCOMM_CONSUMERD_SUCCESS;
                msg.key = channel->key;
                msg.stream_count = channel->streams.count;
        }
 
        return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
 }
+
+/*
+ * Using a maximum stream size with the produced and consumed position of a
+ * stream, computes the new consumed position to be as close as possible to the
+ * maximum possible stream size.
+ *
+ * If maximum stream size is lower than the possible buffer size (produced -
+ * consumed), the consumed_pos given is returned untouched else the new value
+ * is returned.
+ */
+unsigned long consumer_get_consumed_maxsize(unsigned long consumed_pos,
+               unsigned long produced_pos, uint64_t max_stream_size)
+{
+       if (max_stream_size && max_stream_size < (produced_pos - consumed_pos)) {
+               /* Offset from the produced position to get the latest buffers. */
+               return produced_pos - max_stream_size;
+       }
+
+       return consumed_pos;
+}
This page took 0.074518 seconds and 5 git commands to generate.