Fix: implicit conversion of enum types in consumer
[lttng-tools.git] / src / common / consumer.c
index 57e26dcf86c869982e579e86b73ea3a663c75052..152064078d34b354263a79992e93d965a5b738a4 100644 (file)
 #include <inttypes.h>
 #include <signal.h>
 
+#include <bin/lttng-consumerd/health-consumerd.h>
 #include <common/common.h>
 #include <common/utils.h>
 #include <common/compat/poll.h>
+#include <common/index/index.h>
 #include <common/kernel-ctl/kernel-ctl.h>
 #include <common/sessiond-comm/relayd.h>
 #include <common/sessiond-comm/sessiond-comm.h>
 #include <common/kernel-consumer/kernel-consumer.h>
 #include <common/relayd/relayd.h>
 #include <common/ust-consumer/ust-consumer.h>
+#include <common/consumer-timer.h>
 
 #include "consumer.h"
 #include "consumer-stream.h"
@@ -91,22 +94,33 @@ static void notify_thread_lttng_pipe(struct lttng_pipe *pipe)
        (void) lttng_pipe_write(pipe, &null_stream, sizeof(null_stream));
 }
 
+static void notify_health_quit_pipe(int *pipe)
+{
+       ssize_t ret;
+
+       ret = lttng_write(pipe[1], "4", 1);
+       if (ret < 1) {
+               PERROR("write consumer health quit");
+       }
+}
+
 static void notify_channel_pipe(struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_channel *chan,
                uint64_t key,
                enum consumer_channel_action action)
 {
        struct consumer_channel_msg msg;
-       int ret;
+       ssize_t ret;
 
        memset(&msg, 0, sizeof(msg));
 
        msg.action = action;
        msg.chan = chan;
        msg.key = key;
-       do {
-               ret = write(ctx->consumer_channel_pipe[1], &msg, sizeof(msg));
-       } while (ret < 0 && errno == EINTR);
+       ret = lttng_write(ctx->consumer_channel_pipe[1], &msg, sizeof(msg));
+       if (ret < sizeof(msg)) {
+               PERROR("notify_channel_pipe write error");
+       }
 }
 
 void notify_thread_del_channel(struct lttng_consumer_local_data *ctx,
@@ -121,17 +135,18 @@ static int read_channel_pipe(struct lttng_consumer_local_data *ctx,
                enum consumer_channel_action *action)
 {
        struct consumer_channel_msg msg;
-       int ret;
+       ssize_t ret;
 
-       do {
-               ret = read(ctx->consumer_channel_pipe[0], &msg, sizeof(msg));
-       } while (ret < 0 && errno == EINTR);
-       if (ret > 0) {
-               *action = msg.action;
-               *chan = msg.chan;
-               *key = msg.key;
+       ret = lttng_read(ctx->consumer_channel_pipe[0], &msg, sizeof(msg));
+       if (ret < sizeof(msg)) {
+               ret = -1;
+               goto error;
        }
-       return ret;
+       *action = msg.action;
+       *chan = msg.chan;
+       *key = msg.key;
+error:
+       return (int) ret;
 }
 
 /*
@@ -304,6 +319,10 @@ void consumer_del_channel(struct lttng_consumer_channel *channel)
                consumer_stream_destroy(stream, NULL);
        }
 
+       if (channel->live_timer_enabled == 1) {
+               consumer_timer_live_stop(channel);
+       }
+
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
                break;
@@ -457,6 +476,19 @@ void consumer_del_stream(struct lttng_consumer_stream *stream,
        consumer_stream_destroy(stream, ht);
 }
 
+/*
+ * XXX naming of del vs destroy is all mixed up.
+ */
+void consumer_del_stream_for_data(struct lttng_consumer_stream *stream)
+{
+       consumer_stream_destroy(stream, data_ht);
+}
+
+void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream)
+{
+       consumer_stream_destroy(stream, metadata_ht);
+}
+
 struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
                uint64_t stream_key,
                enum lttng_consumer_stream_state state,
@@ -485,12 +517,15 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
        stream->key = stream_key;
        stream->out_fd = -1;
        stream->out_fd_offset = 0;
+       stream->output_written = 0;
        stream->state = state;
        stream->uid = uid;
        stream->gid = gid;
        stream->net_seq_idx = relayd_id;
        stream->session_id = session_id;
        stream->monitor = monitor;
+       stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE;
+       stream->index_fd = -1;
        pthread_mutex_init(&stream->lock, NULL);
 
        /* If channel is the metadata, flag this stream as metadata. */
@@ -498,6 +533,9 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
                stream->metadata_flag = 1;
                /* Metadata is flat out. */
                strncpy(stream->name, DEFAULT_METADATA_NAME, sizeof(stream->name));
+               /* Live rendez-vous point. */
+               pthread_cond_init(&stream->metadata_rdv, NULL);
+               pthread_mutex_init(&stream->metadata_rdv_lock, NULL);
        } else {
                /* Format stream name to <channel_name>_<cpu_number> */
                ret = snprintf(stream->name, sizeof(stream->name), "%s_%d",
@@ -538,9 +576,9 @@ end:
 /*
  * Add a stream to the global list protected by a mutex.
  */
-static int add_stream(struct lttng_consumer_stream *stream,
-               struct lttng_ht *ht)
+int consumer_add_data_stream(struct lttng_consumer_stream *stream)
 {
+       struct lttng_ht *ht = data_ht;
        int ret = 0;
 
        assert(stream);
@@ -550,6 +588,7 @@ static int add_stream(struct lttng_consumer_stream *stream,
 
        pthread_mutex_lock(&consumer_data.lock);
        pthread_mutex_lock(&stream->chan->lock);
+       pthread_mutex_lock(&stream->chan->timer_lock);
        pthread_mutex_lock(&stream->lock);
        rcu_read_lock();
 
@@ -587,12 +626,18 @@ static int add_stream(struct lttng_consumer_stream *stream,
 
        rcu_read_unlock();
        pthread_mutex_unlock(&stream->lock);
+       pthread_mutex_unlock(&stream->chan->timer_lock);
        pthread_mutex_unlock(&stream->chan->lock);
        pthread_mutex_unlock(&consumer_data.lock);
 
        return ret;
 }
 
+void consumer_del_data_stream(struct lttng_consumer_stream *stream)
+{
+       consumer_del_stream(stream, data_ht);
+}
+
 /*
  * Add relayd socket to global consumer data hashtable. RCU read side lock MUST
  * be acquired before calling this.
@@ -705,6 +750,7 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream,
                if (ret < 0) {
                        goto end;
                }
+
                uatomic_inc(&relayd->refcount);
                stream->sent_to_relayd = 1;
        } else {
@@ -815,7 +861,8 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
                uint64_t tracefile_size,
                uint64_t tracefile_count,
                uint64_t session_id_per_pid,
-               unsigned int monitor)
+               unsigned int monitor,
+               unsigned int live_timer_interval)
 {
        struct lttng_consumer_channel *channel;
 
@@ -832,11 +879,26 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
        channel->uid = uid;
        channel->gid = gid;
        channel->relayd_id = relayd_id;
-       channel->output = output;
        channel->tracefile_size = tracefile_size;
        channel->tracefile_count = tracefile_count;
        channel->monitor = monitor;
+       channel->live_timer_interval = live_timer_interval;
        pthread_mutex_init(&channel->lock, NULL);
+       pthread_mutex_init(&channel->timer_lock, NULL);
+
+       switch (output) {
+       case LTTNG_EVENT_SPLICE:
+               channel->output = CONSUMER_CHANNEL_SPLICE;
+               break;
+       case LTTNG_EVENT_MMAP:
+               channel->output = CONSUMER_CHANNEL_MMAP;
+               break;
+       default:
+               assert(0);
+               free(channel);
+               channel = NULL;
+               goto end;
+       }
 
        /*
         * In monitor mode, the streams associated with the channel will be put in
@@ -883,6 +945,7 @@ int consumer_add_channel(struct lttng_consumer_channel *channel,
 
        pthread_mutex_lock(&consumer_data.lock);
        pthread_mutex_lock(&channel->lock);
+       pthread_mutex_lock(&channel->timer_lock);
        rcu_read_lock();
 
        lttng_ht_lookup(consumer_data.channel_ht, &channel->key, &iter);
@@ -899,6 +962,7 @@ int consumer_add_channel(struct lttng_consumer_channel *channel,
 
 end:
        rcu_read_unlock();
+       pthread_mutex_unlock(&channel->timer_lock);
        pthread_mutex_unlock(&channel->lock);
        pthread_mutex_unlock(&consumer_data.lock);
 
@@ -1066,12 +1130,11 @@ void lttng_consumer_cleanup(void)
  */
 void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
 {
-       int ret;
+       ssize_t ret;
+
        consumer_quit = 1;
-       do {
-               ret = write(ctx->consumer_should_quit[1], "4", 1);
-       } while (ret < 0 && errno == EINTR);
-       if (ret < 0 || ret != 1) {
+       ret = lttng_write(ctx->consumer_should_quit[1], "4", 1);
+       if (ret < 1) {
                PERROR("write consumer quit");
        }
 
@@ -1152,6 +1215,7 @@ struct lttng_consumer_local_data *lttng_consumer_create(
 
        ctx->consumer_error_socket = -1;
        ctx->consumer_metadata_socket = -1;
+       pthread_mutex_init(&ctx->metadata_socket_lock, NULL);
        /* assign the callbacks */
        ctx->on_buffer_ready = buffer_ready;
        ctx->on_recv_channel = recv_channel;
@@ -1244,15 +1308,13 @@ static int write_relayd_metadata_id(int fd,
                struct lttng_consumer_stream *stream,
                struct consumer_relayd_sock_pair *relayd, unsigned long padding)
 {
-       int ret;
+       ssize_t ret;
        struct lttcomm_relayd_metadata_payload hdr;
 
        hdr.stream_id = htobe64(stream->relayd_stream_id);
        hdr.padding_size = htobe32(padding);
-       do {
-               ret = write(fd, (void *) &hdr, sizeof(hdr));
-       } while (ret < 0 && errno == EINTR);
-       if (ret < 0 || ret != sizeof(hdr)) {
+       ret = lttng_write(fd, (void *) &hdr, sizeof(hdr));
+       if (ret < sizeof(hdr)) {
                /*
                 * This error means that the fd's end is closed so ignore the perror
                 * not to clubber the error output since this can happen in a normal
@@ -1274,7 +1336,7 @@ static int write_relayd_metadata_id(int fd,
                        stream->relayd_stream_id, padding);
 
 end:
-       return ret;
+       return (int) ret;
 }
 
 /*
@@ -1291,7 +1353,8 @@ end:
 ssize_t lttng_consumer_on_read_subbuffer_mmap(
                struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_stream *stream, unsigned long len,
-               unsigned long padding)
+               unsigned long padding,
+               struct lttng_packet_index *index)
 {
        unsigned long mmap_offset;
        void *mmap_base;
@@ -1309,6 +1372,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
        if (stream->net_seq_idx != (uint64_t) -1ULL) {
                relayd = consumer_find_relayd(stream->net_seq_idx);
                if (relayd == NULL) {
+                       ret = -EPIPE;
                        goto end;
                }
        }
@@ -1318,28 +1382,31 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
        case LTTNG_CONSUMER_KERNEL:
                mmap_base = stream->mmap_base;
                ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset);
+               if (ret != 0) {
+                       PERROR("tracer ctl get_mmap_read_offset");
+                       written = -errno;
+                       goto end;
+               }
                break;
        case LTTNG_CONSUMER32_UST:
        case LTTNG_CONSUMER64_UST:
                mmap_base = lttng_ustctl_get_mmap_base(stream);
                if (!mmap_base) {
                        ERR("read mmap get mmap base for stream %s", stream->name);
-                       written = -1;
+                       written = -EPERM;
                        goto end;
                }
                ret = lttng_ustctl_get_mmap_read_offset(stream, &mmap_offset);
-
+               if (ret != 0) {
+                       PERROR("tracer ctl get_mmap_read_offset");
+                       written = ret;
+                       goto end;
+               }
                break;
        default:
                ERR("Unknown consumer_data type");
                assert(0);
        }
-       if (ret != 0) {
-               errno = -ret;
-               PERROR("tracer ctl get_mmap_read_offset");
-               written = ret;
-               goto end;
-       }
 
        /* Handle stream on the relayd if the output is on the network */
        if (relayd) {
@@ -1394,24 +1461,40 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                        ret = utils_rotate_stream_file(stream->chan->pathname,
                                        stream->name, stream->chan->tracefile_size,
                                        stream->chan->tracefile_count, stream->uid, stream->gid,
-                                       stream->out_fd, &(stream->tracefile_count_current));
+                                       stream->out_fd, &(stream->tracefile_count_current),
+                                       &stream->out_fd);
                        if (ret < 0) {
                                ERR("Rotating output file");
                                goto end;
                        }
-                       outfd = stream->out_fd = ret;
+                       outfd = stream->out_fd;
+
+                       if (stream->index_fd >= 0) {
+                               ret = index_create_file(stream->chan->pathname,
+                                               stream->name, stream->uid, stream->gid,
+                                               stream->chan->tracefile_size,
+                                               stream->tracefile_count_current);
+                               if (ret < 0) {
+                                       goto end;
+                               }
+                               stream->index_fd = ret;
+                       }
+
                        /* Reset current size because we just perform a rotation. */
                        stream->tracefile_size_current = 0;
+                       stream->out_fd_offset = 0;
+                       orig_offset = 0;
                }
                stream->tracefile_size_current += len;
+               if (index) {
+                       index->offset = htobe64(stream->out_fd_offset);
+               }
        }
 
        while (len > 0) {
-               do {
-                       ret = write(outfd, mmap_base + mmap_offset, len);
-               } while (ret < 0 && errno == EINTR);
+               ret = lttng_write(outfd, mmap_base + mmap_offset, len);
                DBG("Consumer mmap write() ret %zd (len %lu)", ret, len);
-               if (ret < 0) {
+               if (ret < len) {
                        /*
                         * This is possible if the fd is closed on the other side (outfd)
                         * or any write problem. It can be verbose a bit for a normal
@@ -1420,7 +1503,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                         */
                        DBG("Error in file write mmap");
                        if (written == 0) {
-                               written = ret;
+                               written = -errno;
                        }
                        /* Socket operation failed. We consider the relayd dead */
                        if (errno == EPIPE || errno == EINVAL) {
@@ -1444,6 +1527,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                                        SYNC_FILE_RANGE_WRITE);
                        stream->out_fd_offset += ret;
                }
+               stream->output_written += ret;
                written += ret;
        }
        lttng_consumer_sync_trace_file(stream, orig_offset);
@@ -1477,7 +1561,8 @@ end:
 ssize_t lttng_consumer_on_read_subbuffer_splice(
                struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_stream *stream, unsigned long len,
-               unsigned long padding)
+               unsigned long padding,
+               struct lttng_packet_index *index)
 {
        ssize_t ret = 0, written = 0, ret_splice = 0;
        loff_t offset = 0;
@@ -1508,6 +1593,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
        if (stream->net_seq_idx != (uint64_t) -1ULL) {
                relayd = consumer_find_relayd(stream->net_seq_idx);
                if (relayd == NULL) {
+                       ret = -EPIPE;
                        goto end;
                }
        }
@@ -1576,16 +1662,32 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                        ret = utils_rotate_stream_file(stream->chan->pathname,
                                        stream->name, stream->chan->tracefile_size,
                                        stream->chan->tracefile_count, stream->uid, stream->gid,
-                                       stream->out_fd, &(stream->tracefile_count_current));
+                                       stream->out_fd, &(stream->tracefile_count_current),
+                                       &stream->out_fd);
                        if (ret < 0) {
                                ERR("Rotating output file");
                                goto end;
                        }
-                       outfd = stream->out_fd = ret;
+                       outfd = stream->out_fd;
+
+                       if (stream->index_fd >= 0) {
+                               ret = index_create_file(stream->chan->pathname,
+                                               stream->name, stream->uid, stream->gid,
+                                               stream->chan->tracefile_size,
+                                               stream->tracefile_count_current);
+                               if (ret < 0) {
+                                       goto end;
+                               }
+                               stream->index_fd = ret;
+                       }
+
                        /* Reset current size because we just perform a rotation. */
                        stream->tracefile_size_current = 0;
+                       stream->out_fd_offset = 0;
+                       orig_offset = 0;
                }
                stream->tracefile_size_current += len;
+               index->offset = htobe64(stream->out_fd_offset);
        }
 
        while (len > 0) {
@@ -1654,6 +1756,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(
                                        SYNC_FILE_RANGE_WRITE);
                        stream->out_fd_offset += ret_splice;
                }
+               stream->output_written += ret_splice;
                written += ret_splice;
        }
        lttng_consumer_sync_trace_file(stream, orig_offset);
@@ -1879,6 +1982,13 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
                break;
        case LTTNG_CONSUMER32_UST:
        case LTTNG_CONSUMER64_UST:
+               if (stream->monitor) {
+                       /* close the write-side in close_metadata */
+                       ret = close(stream->ust_metadata_poll_pipe[0]);
+                       if (ret < 0) {
+                               PERROR("Close UST metadata read-side poll pipe");
+                       }
+               }
                lttng_ustconsumer_del_stream(stream);
                break;
        default:
@@ -1947,8 +2057,8 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
 end:
        /*
         * Nullify the stream reference so it is not used after deletion. The
-        * consumer data lock MUST be acquired before being able to check for a
-        * NULL pointer value.
+        * channel lock MUST be acquired before being able to check for
+        * NULL pointer value.
         */
        stream->chan->metadata_stream = NULL;
 
@@ -1968,9 +2078,9 @@ free_stream_rcu:
  * Action done with the metadata stream when adding it to the consumer internal
  * data structures to handle it.
  */
-static int add_metadata_stream(struct lttng_consumer_stream *stream,
-               struct lttng_ht *ht)
+int consumer_add_metadata_stream(struct lttng_consumer_stream *stream)
 {
+       struct lttng_ht *ht = metadata_ht;
        int ret = 0;
        struct lttng_ht_iter iter;
        struct lttng_ht_node_u64 *node;
@@ -1982,6 +2092,7 @@ static int add_metadata_stream(struct lttng_consumer_stream *stream,
 
        pthread_mutex_lock(&consumer_data.lock);
        pthread_mutex_lock(&stream->chan->lock);
+       pthread_mutex_lock(&stream->chan->timer_lock);
        pthread_mutex_lock(&stream->lock);
 
        /*
@@ -2028,6 +2139,7 @@ static int add_metadata_stream(struct lttng_consumer_stream *stream,
 
        pthread_mutex_unlock(&stream->lock);
        pthread_mutex_unlock(&stream->chan->lock);
+       pthread_mutex_unlock(&stream->chan->timer_lock);
        pthread_mutex_unlock(&consumer_data.lock);
        return ret;
 }
@@ -2091,7 +2203,7 @@ static void validate_endpoint_status_metadata_stream(
  */
 void *consumer_thread_metadata_poll(void *data)
 {
-       int ret, i, pollfd;
+       int ret, i, pollfd, err = -1;
        uint32_t revents, nb_fd;
        struct lttng_consumer_stream *stream = NULL;
        struct lttng_ht_iter iter;
@@ -2102,6 +2214,10 @@ void *consumer_thread_metadata_poll(void *data)
 
        rcu_register_thread();
 
+       health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA);
+
+       health_code_update();
+
        metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
        if (!metadata_ht) {
                /* ENOMEM at this point. Better to bail out. */
@@ -2127,14 +2243,19 @@ void *consumer_thread_metadata_poll(void *data)
        DBG("Metadata main loop started");
 
        while (1) {
+               health_code_update();
+
                /* Only the metadata pipe is set */
                if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) {
+                       err = 0;        /* All is OK */
                        goto end;
                }
 
 restart:
                DBG("Metadata poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
+               health_poll_entry();
                ret = lttng_poll_wait(&events, -1);
+               health_poll_exit();
                DBG("Metadata event catched in thread");
                if (ret < 0) {
                        if (errno == EINTR) {
@@ -2148,14 +2269,11 @@ restart:
 
                /* From here, the event is a metadata wait fd */
                for (i = 0; i < nb_fd; i++) {
+                       health_code_update();
+
                        revents = LTTNG_POLL_GETEV(&events, i);
                        pollfd = LTTNG_POLL_GETFD(&events, i);
 
-                       /* Just don't waste time if no returned events for the fd */
-                       if (!revents) {
-                               continue;
-                       }
-
                        if (pollfd == lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)) {
                                if (revents & (LPOLLERR | LPOLLHUP )) {
                                        DBG("Metadata thread pipe hung up");
@@ -2172,8 +2290,8 @@ restart:
 
                                        pipe_len = lttng_pipe_read(ctx->consumer_metadata_pipe,
                                                        &stream, sizeof(stream));
-                                       if (pipe_len < 0) {
-                                               ERR("read metadata stream, ret: %ld", pipe_len);
+                                       if (pipe_len < sizeof(stream)) {
+                                               PERROR("read metadata stream");
                                                /*
                                                 * Continue here to handle the rest of the streams.
                                                 */
@@ -2190,14 +2308,6 @@ restart:
                                        DBG("Adding metadata stream %d to poll set",
                                                        stream->wait_fd);
 
-                                       ret = add_metadata_stream(stream, metadata_ht);
-                                       if (ret) {
-                                               ERR("Unable to add metadata stream");
-                                               /* Stream was not setup properly. Continuing. */
-                                               consumer_del_metadata_stream(stream, NULL);
-                                               continue;
-                                       }
-
                                        /* Add metadata stream to the global poll events list */
                                        lttng_poll_add(&events, stream->wait_fd,
                                                        LPOLLIN | LPOLLPRI);
@@ -2230,6 +2340,8 @@ restart:
 
                                        /* We just flushed the stream now read it. */
                                        do {
+                                               health_code_update();
+
                                                len = ctx->on_buffer_ready(stream, ctx);
                                                /*
                                                 * We don't check the return value here since if we get
@@ -2251,14 +2363,23 @@ restart:
                                DBG("Metadata available on fd %d", pollfd);
                                assert(stream->wait_fd == pollfd);
 
-                               len = ctx->on_buffer_ready(stream, ctx);
+                               do {
+                                       health_code_update();
+
+                                       len = ctx->on_buffer_ready(stream, ctx);
+                                       /*
+                                        * We don't check the return value here since if we get
+                                        * a negative len, it means an error occured thus we
+                                        * simply remove it from the poll set and free the
+                                        * stream.
+                                        */
+                               } while (len > 0);
+
                                /* It's ok to have an unavailable sub-buffer */
                                if (len < 0 && len != -EAGAIN && len != -ENODATA) {
                                        /* Clean up stream from consumer and free it. */
                                        lttng_poll_del(&events, stream->wait_fd);
                                        consumer_del_metadata_stream(stream, metadata_ht);
-                               } else if (len > 0) {
-                                       stream->data_read = 1;
                                }
                        }
 
@@ -2267,6 +2388,8 @@ restart:
                }
        }
 
+       /* All is OK */
+       err = 0;
 error:
 end:
        DBG("Metadata poll thread exiting");
@@ -2275,6 +2398,11 @@ end:
 end_poll:
        destroy_stream_ht(metadata_ht);
 end_ht:
+       if (err) {
+               health_error();
+               ERR("Health error occurred in %s", __func__);
+       }
+       health_unregister(health_consumerd);
        rcu_unregister_thread();
        return NULL;
 }
@@ -2285,7 +2413,7 @@ end_ht:
  */
 void *consumer_thread_data_poll(void *data)
 {
-       int num_rdy, num_hup, high_prio, ret, i;
+       int num_rdy, num_hup, high_prio, ret, i, err = -1;
        struct pollfd *pollfd = NULL;
        /* local view of the streams */
        struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL;
@@ -2296,6 +2424,10 @@ void *consumer_thread_data_poll(void *data)
 
        rcu_register_thread();
 
+       health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_DATA);
+
+       health_code_update();
+
        data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
        if (data_ht == NULL) {
                /* ENOMEM at this point. Better to bail out. */
@@ -2309,6 +2441,8 @@ void *consumer_thread_data_poll(void *data)
        }
 
        while (1) {
+               health_code_update();
+
                high_prio = 0;
                num_hup = 0;
 
@@ -2355,12 +2489,15 @@ void *consumer_thread_data_poll(void *data)
 
                /* No FDs and consumer_quit, consumer_cleanup the thread */
                if (nb_fd == 0 && consumer_quit == 1) {
+                       err = 0;        /* All is OK */
                        goto end;
                }
                /* poll on the array of fds */
        restart:
                DBG("polling on %d fd", nb_fd + 1);
+               health_poll_entry();
                num_rdy = poll(pollfd, nb_fd + 1, -1);
+               health_poll_exit();
                DBG("poll num_rdy : %d", num_rdy);
                if (num_rdy == -1) {
                        /*
@@ -2388,8 +2525,8 @@ void *consumer_thread_data_poll(void *data)
                        DBG("consumer_data_pipe wake up");
                        pipe_readlen = lttng_pipe_read(ctx->consumer_data_pipe,
                                        &new_stream, sizeof(new_stream));
-                       if (pipe_readlen < 0) {
-                               ERR("Consumer data pipe ret %ld", pipe_readlen);
+                       if (pipe_readlen < sizeof(new_stream)) {
+                               PERROR("Consumer data pipe");
                                /* Continue so we can at least handle the current stream(s). */
                                continue;
                        }
@@ -2404,23 +2541,14 @@ void *consumer_thread_data_poll(void *data)
                                continue;
                        }
 
-                       ret = add_stream(new_stream, data_ht);
-                       if (ret) {
-                               ERR("Consumer add stream %" PRIu64 " failed. Continuing",
-                                               new_stream->key);
-                               /*
-                                * At this point, if the add_stream fails, it is not in the
-                                * hash table thus passing the NULL value here.
-                                */
-                               consumer_del_stream(new_stream, NULL);
-                       }
-
                        /* Continue to update the local streams and handle prio ones */
                        continue;
                }
 
                /* Take care of high priority channels first. */
                for (i = 0; i < nb_fd; i++) {
+                       health_code_update();
+
                        if (local_stream[i] == NULL) {
                                continue;
                        }
@@ -2449,6 +2577,8 @@ void *consumer_thread_data_poll(void *data)
 
                /* Take care of low priority channels. */
                for (i = 0; i < nb_fd; i++) {
+                       health_code_update();
+
                        if (local_stream[i] == NULL) {
                                continue;
                        }
@@ -2469,6 +2599,8 @@ void *consumer_thread_data_poll(void *data)
 
                /* Handle hangup and errors */
                for (i = 0; i < nb_fd; i++) {
+                       health_code_update();
+
                        if (local_stream[i] == NULL) {
                                continue;
                        }
@@ -2514,6 +2646,8 @@ void *consumer_thread_data_poll(void *data)
                        }
                }
        }
+       /* All is OK */
+       err = 0;
 end:
        DBG("polling thread exiting");
        free(pollfd);
@@ -2531,6 +2665,12 @@ end:
 
        destroy_data_stream_ht(data_ht);
 
+       if (err) {
+               health_error();
+               ERR("Health error occurred in %s", __func__);
+       }
+       health_unregister(health_consumerd);
+
        rcu_unregister_thread();
        return NULL;
 }
@@ -2612,7 +2752,7 @@ static void destroy_channel_ht(struct lttng_ht *ht)
  */
 void *consumer_thread_channel_poll(void *data)
 {
-       int ret, i, pollfd;
+       int ret, i, pollfd, err = -1;
        uint32_t revents, nb_fd;
        struct lttng_consumer_channel *chan = NULL;
        struct lttng_ht_iter iter;
@@ -2623,6 +2763,10 @@ void *consumer_thread_channel_poll(void *data)
 
        rcu_register_thread();
 
+       health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_CHANNEL);
+
+       health_code_update();
+
        channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
        if (!channel_ht) {
                /* ENOMEM at this point. Better to bail out. */
@@ -2647,14 +2791,19 @@ void *consumer_thread_channel_poll(void *data)
        DBG("Channel main loop started");
 
        while (1) {
+               health_code_update();
+
                /* Only the channel pipe is set */
                if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) {
+                       err = 0;        /* All is OK */
                        goto end;
                }
 
 restart:
                DBG("Channel poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
+               health_poll_entry();
                ret = lttng_poll_wait(&events, -1);
+               health_poll_exit();
                DBG("Channel event catched in thread");
                if (ret < 0) {
                        if (errno == EINTR) {
@@ -2668,6 +2817,8 @@ restart:
 
                /* From here, the event is a channel wait fd */
                for (i = 0; i < nb_fd; i++) {
+                       health_code_update();
+
                        revents = LTTNG_POLL_GETEV(&events, i);
                        pollfd = LTTNG_POLL_GETFD(&events, i);
 
@@ -2734,6 +2885,8 @@ restart:
                                                        /* Delete streams that might have been left in the stream list. */
                                                        cds_list_for_each_entry_safe(stream, stmp, &chan->streams.head,
                                                                        send_node) {
+                                                               health_code_update();
+
                                                                cds_list_del(&stream->send_node);
                                                                lttng_ustconsumer_del_stream(stream);
                                                                uatomic_sub(&stream->chan->refcount, 1);
@@ -2806,12 +2959,19 @@ restart:
                }
        }
 
+       /* All is OK */
+       err = 0;
 end:
        lttng_poll_clean(&events);
 end_poll:
        destroy_channel_ht(channel_ht);
 end_ht:
        DBG("Channel poll thread exiting");
+       if (err) {
+               health_error();
+               ERR("Health error occurred in %s", __func__);
+       }
+       health_unregister(health_consumerd);
        rcu_unregister_thread();
        return NULL;
 }
@@ -2849,7 +3009,7 @@ error:
  */
 void *consumer_thread_sessiond_poll(void *data)
 {
-       int sock = -1, client_socket, ret;
+       int sock = -1, client_socket, ret, err = -1;
        /*
         * structure to poll for incoming data on communication socket avoids
         * making blocking sockets.
@@ -2859,6 +3019,10 @@ void *consumer_thread_sessiond_poll(void *data)
 
        rcu_register_thread();
 
+       health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_SESSIOND);
+
+       health_code_update();
+
        DBG("Creating command socket %s", ctx->consumer_command_sock_path);
        unlink(ctx->consumer_command_sock_path);
        client_socket = lttcomm_create_unix_sock(ctx->consumer_command_sock_path);
@@ -2919,7 +3083,12 @@ void *consumer_thread_sessiond_poll(void *data)
        consumer_sockpoll[1].events = POLLIN | POLLPRI;
 
        while (1) {
-               if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+               health_code_update();
+
+               health_poll_entry();
+               ret = lttng_consumer_poll_socket(consumer_sockpoll);
+               health_poll_exit();
+               if (ret < 0) {
                        goto end;
                }
                DBG("Incoming command on sock");
@@ -2934,14 +3103,19 @@ void *consumer_thread_sessiond_poll(void *data)
                         * ERR() here.
                         */
                        DBG("Communication interrupted on command socket");
+                       err = 0;
                        goto end;
                }
                if (consumer_quit) {
                        DBG("consumer_thread_receive_fds received quit from signal");
+                       err = 0;        /* All is OK */
                        goto end;
                }
                DBG("received command on sock");
        }
+       /* All is OK */
+       err = 0;
+
 end:
        DBG("Consumer thread sessiond poll exiting");
 
@@ -2967,6 +3141,8 @@ end:
 
        notify_channel_pipe(ctx, NULL, -1, CONSUMER_CHANNEL_QUIT);
 
+       notify_health_quit_pipe(health_quit_pipe);
+
        /* Cleaning up possibly open sockets. */
        if (sock >= 0) {
                ret = close(sock);
@@ -2981,6 +3157,12 @@ end:
                }
        }
 
+       if (err) {
+               health_error();
+               ERR("Health error occurred in %s", __func__);
+       }
+       health_unregister(health_consumerd);
+
        rcu_unregister_thread();
        return NULL;
 }
@@ -2991,6 +3173,9 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
        ssize_t ret;
 
        pthread_mutex_lock(&stream->lock);
+       if (stream->metadata_flag) {
+               pthread_mutex_lock(&stream->metadata_rdv_lock);
+       }
 
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
@@ -3007,6 +3192,10 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
                break;
        }
 
+       if (stream->metadata_flag) {
+               pthread_cond_broadcast(&stream->metadata_rdv);
+               pthread_mutex_unlock(&stream->metadata_rdv_lock);
+       }
        pthread_mutex_unlock(&stream->lock);
        return ret;
 }
@@ -3046,10 +3235,11 @@ void lttng_consumer_init(void)
 int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
                struct lttng_consumer_local_data *ctx, int sock,
                struct pollfd *consumer_sockpoll,
-               struct lttcomm_relayd_sock *relayd_sock, uint64_t sessiond_id)
+               struct lttcomm_relayd_sock *relayd_sock, uint64_t sessiond_id,
+               uint64_t relayd_session_id)
 {
        int fd = -1, ret = -1, relayd_created = 0;
-       enum lttng_error_code ret_code = LTTNG_OK;
+       enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
        struct consumer_relayd_sock_pair *relayd = NULL;
 
        assert(ctx);
@@ -3085,7 +3275,7 @@ int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
        }
 
        /* First send a status message before receiving the fds. */
-       ret = consumer_send_status_msg(sock, LTTNG_OK);
+       ret = consumer_send_status_msg(sock, LTTCOMM_CONSUMERD_SUCCESS);
        if (ret < 0) {
                /* Somehow, the session daemon is not responding anymore. */
                lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
@@ -3146,29 +3336,7 @@ int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
                relayd->control_sock.major = relayd_sock->major;
                relayd->control_sock.minor = relayd_sock->minor;
 
-               /*
-                * Create a session on the relayd and store the returned id. Lock the
-                * control socket mutex if the relayd was NOT created before.
-                */
-               if (!relayd_created) {
-                       pthread_mutex_lock(&relayd->ctrl_sock_mutex);
-               }
-               ret = relayd_create_session(&relayd->control_sock,
-                               &relayd->relayd_session_id);
-               if (!relayd_created) {
-                       pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
-               }
-               if (ret < 0) {
-                       /*
-                        * Close all sockets of a relayd object. It will be freed if it was
-                        * created at the error code path or else it will be garbage
-                        * collect.
-                        */
-                       (void) relayd_close(&relayd->control_sock);
-                       (void) relayd_close(&relayd->data_sock);
-                       ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
-                       goto error;
-               }
+               relayd->relayd_session_id = relayd_session_id;
 
                break;
        case LTTNG_STREAM_DATA:
@@ -3369,6 +3537,15 @@ int consumer_data_pending(uint64_t id)
                 */
                ret = cds_lfht_is_node_deleted(&stream->node.node);
                if (!ret) {
+                       /*
+                        * An empty output file is not valid. We need at least one packet
+                        * generated per stream, even if it contains no event, so it
+                        * contains at least one packet header.
+                        */
+                       if (stream->output_written == 0) {
+                               pthread_mutex_unlock(&stream->lock);
+                               goto data_pending;
+                       }
                        /* Check the stream if there is data in the buffers. */
                        ret = data_pending(stream);
                        if (ret == 1) {
@@ -3459,9 +3636,9 @@ int consumer_send_status_channel(int sock,
        assert(sock >= 0);
 
        if (!channel) {
-               msg.ret_code = -LTTNG_ERR_UST_CHAN_FAIL;
+               msg.ret_code = LTTCOMM_CONSUMERD_CHANNEL_FAIL;
        } else {
-               msg.ret_code = LTTNG_OK;
+               msg.ret_code = LTTCOMM_CONSUMERD_SUCCESS;
                msg.key = channel->key;
                msg.stream_count = channel->streams.count;
        }
This page took 0.037523 seconds and 5 git commands to generate.