Fix: implicit conversion of enum types in consumer
[lttng-tools.git] / src / common / kernel-consumer / kernel-consumer.c
index a68438253c9d694a6d29d7267b1da50548ad4172..1b01d414a0248b0207bbf72aba620a8fa62b93e4 100644 (file)
@@ -29,6 +29,7 @@
 #include <unistd.h>
 #include <sys/stat.h>
 
+#include <bin/lttng-consumerd/health-consumerd.h>
 #include <common/common.h>
 #include <common/kernel-ctl/kernel-ctl.h>
 #include <common/sessiond-comm/sessiond-comm.h>
@@ -38,6 +39,8 @@
 #include <common/relayd/relayd.h>
 #include <common/utils.h>
 #include <common/consumer-stream.h>
+#include <common/index/index.h>
+#include <common/consumer-timer.h>
 
 #include "kernel-consumer.h"
 
@@ -57,8 +60,8 @@ int lttng_kconsumer_take_snapshot(struct lttng_consumer_stream *stream)
 
        ret = kernctl_snapshot(infd);
        if (ret != 0) {
-               errno = -ret;
                perror("Getting sub-buffer snapshot.");
+               ret = -errno;
        }
 
        return ret;
@@ -77,8 +80,8 @@ int lttng_kconsumer_get_produced_snapshot(struct lttng_consumer_stream *stream,
 
        ret = kernctl_snapshot_get_produced(infd, pos);
        if (ret != 0) {
-               errno = -ret;
                perror("kernctl_snapshot_get_produced");
+               ret = -errno;
        }
 
        return ret;
@@ -97,95 +100,34 @@ int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream,
 
        ret = kernctl_snapshot_get_consumed(infd, pos);
        if (ret != 0) {
-               errno = -ret;
                perror("kernctl_snapshot_get_consumed");
+               ret = -errno;
        }
 
        return ret;
 }
 
-/*
- * Find a relayd and send the stream
- *
- * Returns 0 on success, < 0 on error
- */
-static
-int send_relayd_stream(struct lttng_consumer_stream *stream, char *path)
-{
-       struct consumer_relayd_sock_pair *relayd;
-       int ret = 0;
-       char *stream_path;
-
-       if (path != NULL) {
-               stream_path = path;
-       } else {
-               stream_path = stream->chan->pathname;
-       }
-       /* The stream is not metadata. Get relayd reference if exists. */
-       rcu_read_lock();
-       relayd = consumer_find_relayd(stream->net_seq_idx);
-       if (relayd != NULL) {
-               /* Add stream on the relayd */
-               pthread_mutex_lock(&relayd->ctrl_sock_mutex);
-               ret = relayd_add_stream(&relayd->control_sock,
-                               stream->name, stream_path,
-                               &stream->relayd_stream_id,
-                               stream->chan->tracefile_size,
-                               stream->chan->tracefile_count);
-               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
-               if (ret < 0) {
-                       goto end;
-               }
-               uatomic_inc(&relayd->refcount);
-       } else if (stream->net_seq_idx != (uint64_t) -1ULL) {
-               ERR("Network sequence index %" PRIu64 " unknown. Not adding stream.",
-                               stream->net_seq_idx);
-               ret = -1;
-               goto end;
-       }
-
-end:
-       rcu_read_unlock();
-       return ret;
-}
-
-/*
- * Find a relayd and close the stream
- */
-static
-void close_relayd_stream(struct lttng_consumer_stream *stream)
-{
-       struct consumer_relayd_sock_pair *relayd;
-
-       /* The stream is not metadata. Get relayd reference if exists. */
-       rcu_read_lock();
-       relayd = consumer_find_relayd(stream->net_seq_idx);
-       if (relayd != NULL) {
-               consumer_stream_relayd_close(stream, relayd);
-       }
-       rcu_read_unlock();
-}
-
 /*
  * Take a snapshot of all the stream of a channel
  *
  * Returns 0 on success, < 0 on error
  */
 int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
-               uint64_t relayd_id, struct lttng_consumer_local_data *ctx)
+               uint64_t relayd_id, uint64_t max_stream_size,
+               struct lttng_consumer_local_data *ctx)
 {
        int ret;
        unsigned long consumed_pos, produced_pos;
        struct lttng_consumer_channel *channel;
        struct lttng_consumer_stream *stream;
 
-       DBG("Kernel consumer snapshot channel %lu", key);
+       DBG("Kernel consumer snapshot channel %" PRIu64, key);
 
        rcu_read_lock();
 
        channel = consumer_find_channel(key);
        if (!channel) {
-               ERR("No channel found for key %lu", key);
+               ERR("No channel found for key %" PRIu64, key);
                ret = -1;
                goto end;
        }
@@ -197,8 +139,10 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                goto end;
        }
 
-       cds_list_for_each_entry(stream, &channel->stream_no_monitor_list.head,
-                       no_monitor_node) {
+       cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
+
+               health_code_update();
+
                /*
                 * Lock stream because we are about to change its state.
                 */
@@ -211,16 +155,16 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                stream->net_seq_idx = relayd_id;
                channel->relayd_id = relayd_id;
                if (relayd_id != (uint64_t) -1ULL) {
-                       ret = send_relayd_stream(stream, path);
+                       ret = consumer_send_relayd_stream(stream, path);
                        if (ret < 0) {
                                ERR("sending stream to relayd");
                                goto end_unlock;
                        }
-                       DBG("Stream %s sent to the relayd", stream->name);
                } else {
                        ret = utils_create_stream_file(path, stream->name,
-                                       stream->chan->tracefile_size, stream->tracefile_count_current,
-                                       stream->uid, stream->gid);
+                                       stream->chan->tracefile_size,
+                                       stream->tracefile_count_current,
+                                       stream->uid, stream->gid, NULL);
                        if (ret < 0) {
                                ERR("utils_create_stream_file");
                                goto end_unlock;
@@ -229,13 +173,14 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                        stream->out_fd = ret;
                        stream->tracefile_size_current = 0;
 
-                       DBG("Kernel consumer snapshot stream %s/%s (%lu)", path,
-                                       stream->name, stream->key);
+                       DBG("Kernel consumer snapshot stream %s/%s (%" PRIu64 ")",
+                                       path, stream->name, stream->key);
                }
 
                ret = kernctl_buffer_flush(stream->wait_fd);
                if (ret < 0) {
-                       ERR("Failed to flush kernel metadata stream");
+                       ERR("Failed to flush kernel stream");
+                       ret = -errno;
                        goto end_unlock;
                }
 
@@ -262,20 +207,33 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                                        &stream->max_sb_size);
                        if (ret < 0) {
                                ERR("Getting kernel max_sb_size");
+                               ret = -errno;
                                goto end_unlock;
                        }
                }
 
+               /*
+                * The original value is sent back if max stream size is larger than
+                * the possible size of the snapshot. Also, we asume that the session
+                * daemon should never send a maximum stream size that is lower than
+                * subbuffer size.
+                */
+               consumed_pos = consumer_get_consumed_maxsize(consumed_pos,
+                               produced_pos, max_stream_size);
+
                while (consumed_pos < produced_pos) {
                        ssize_t read_len;
                        unsigned long len, padded_len;
 
+                       health_code_update();
+
                        DBG("Kernel consumer taking snapshot at pos %lu", consumed_pos);
 
                        ret = kernctl_get_subbuf(stream->wait_fd, &consumed_pos);
                        if (ret < 0) {
                                if (errno != EAGAIN) {
                                        PERROR("kernctl_get_subbuf snapshot");
+                                       ret = -errno;
                                        goto end_unlock;
                                }
                                DBG("Kernel consumer get subbuf failed. Skipping it.");
@@ -286,17 +244,19 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                        ret = kernctl_get_subbuf_size(stream->wait_fd, &len);
                        if (ret < 0) {
                                ERR("Snapshot kernctl_get_subbuf_size");
+                               ret = -errno;
                                goto error_put_subbuf;
                        }
 
                        ret = kernctl_get_padded_subbuf_size(stream->wait_fd, &padded_len);
                        if (ret < 0) {
                                ERR("Snapshot kernctl_get_padded_subbuf_size");
+                               ret = -errno;
                                goto error_put_subbuf;
                        }
 
                        read_len = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len,
-                                       padded_len - len);
+                                       padded_len - len, NULL);
                        /*
                         * We write the padded len in local tracefiles but the data len
                         * when using a relay. Display the error but continue processing
@@ -317,18 +277,21 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                        ret = kernctl_put_subbuf(stream->wait_fd);
                        if (ret < 0) {
                                ERR("Snapshot kernctl_put_subbuf");
+                               ret = -errno;
                                goto end_unlock;
                        }
                        consumed_pos += stream->max_sb_size;
                }
 
                if (relayd_id == (uint64_t) -1ULL) {
-                       ret = close(stream->out_fd);
-                       if (ret < 0) {
-                               PERROR("Kernel consumer snapshot close out_fd");
-                               goto end_unlock;
+                       if (stream->out_fd >= 0) {
+                               ret = close(stream->out_fd);
+                               if (ret < 0) {
+                                       PERROR("Kernel consumer snapshot close out_fd");
+                                       goto end_unlock;
+                               }
+                               stream->out_fd = -1;
                        }
-                       stream->out_fd = -1;
                } else {
                        close_relayd_stream(stream);
                        stream->net_seq_idx = (uint64_t) -1ULL;
@@ -343,6 +306,7 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
 error_put_subbuf:
        ret = kernctl_put_subbuf(stream->wait_fd);
        if (ret < 0) {
+               ret = -errno;
                ERR("Snapshot kernctl_put_subbuf error path");
        }
 end_unlock:
@@ -360,9 +324,12 @@ end:
 int lttng_kconsumer_snapshot_metadata(uint64_t key, char *path,
                uint64_t relayd_id, struct lttng_consumer_local_data *ctx)
 {
+       int ret, use_relayd = 0;
+       ssize_t ret_read;
        struct lttng_consumer_channel *metadata_channel;
        struct lttng_consumer_stream *metadata_stream;
-       int ret;
+
+       assert(ctx);
 
        DBG("Kernel consumer snapshot metadata with key %" PRIu64 " at path %s",
                        key, path);
@@ -371,58 +338,73 @@ int lttng_kconsumer_snapshot_metadata(uint64_t key, char *path,
 
        metadata_channel = consumer_find_channel(key);
        if (!metadata_channel) {
-               ERR("Snapshot kernel metadata channel not found for key %lu", key);
+               ERR("Kernel snapshot metadata not found for key %" PRIu64, key);
                ret = -1;
-               goto end;
+               goto error;
        }
 
        metadata_stream = metadata_channel->metadata_stream;
        assert(metadata_stream);
 
+       /* Flag once that we have a valid relayd for the stream. */
        if (relayd_id != (uint64_t) -1ULL) {
-               ret = send_relayd_stream(metadata_stream, path);
+               use_relayd = 1;
+       }
+
+       if (use_relayd) {
+               ret = consumer_send_relayd_stream(metadata_stream, path);
                if (ret < 0) {
-                       ERR("sending stream to relayd");
+                       goto error;
                }
-               DBG("Stream %s sent to the relayd", metadata_stream->name);
        } else {
                ret = utils_create_stream_file(path, metadata_stream->name,
                                metadata_stream->chan->tracefile_size,
                                metadata_stream->tracefile_count_current,
-                               metadata_stream->uid, metadata_stream->gid);
+                               metadata_stream->uid, metadata_stream->gid, NULL);
                if (ret < 0) {
-                       goto end;
+                       goto error;
                }
                metadata_stream->out_fd = ret;
        }
 
-       ret = 0;
-       while (ret >= 0) {
-               ret = lttng_kconsumer_read_subbuffer(metadata_stream, ctx);
-               if (ret < 0) {
-                       if (ret != -EPERM) {
-                               ERR("Kernel snapshot reading subbuffer");
-                               goto end;
+       do {
+               health_code_update();
+
+               ret_read = lttng_kconsumer_read_subbuffer(metadata_stream, ctx);
+               if (ret_read < 0) {
+                       if (ret_read != -EAGAIN) {
+                               ERR("Kernel snapshot reading metadata subbuffer (ret: %zd)",
+                                               ret_read);
+                               goto error;
                        }
-                       /* "ret" is negative at this point so we will exit the loop. */
+                       /* ret_read is negative at this point so we will exit the loop. */
                        continue;
                }
-       }
+       } while (ret_read >= 0);
 
-       if (relayd_id == (uint64_t) -1ULL) {
-               ret = close(metadata_stream->out_fd);
-               if (ret < 0) {
-                       PERROR("Kernel consumer snapshot close out_fd");
-                       goto end;
-               }
-               metadata_stream->out_fd = -1;
-       } else {
+       if (use_relayd) {
                close_relayd_stream(metadata_stream);
                metadata_stream->net_seq_idx = (uint64_t) -1ULL;
+       } else {
+               if (metadata_stream->out_fd >= 0) {
+                       ret = close(metadata_stream->out_fd);
+                       if (ret < 0) {
+                               PERROR("Kernel consumer snapshot metadata close out_fd");
+                               /*
+                                * Don't go on error here since the snapshot was successful at this
+                                * point but somehow the close failed.
+                                */
+                       }
+                       metadata_stream->out_fd = -1;
+               }
        }
 
        ret = 0;
-end:
+
+       cds_list_del(&metadata_stream->send_node);
+       consumer_stream_destroy(metadata_stream, NULL);
+       metadata_channel->metadata_stream = NULL;
+error:
        rcu_read_unlock();
        return ret;
 }
@@ -436,17 +418,22 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                int sock, struct pollfd *consumer_sockpoll)
 {
        ssize_t ret;
-       enum lttng_error_code ret_code = LTTNG_OK;
+       enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
        struct lttcomm_consumer_msg msg;
 
+       health_code_update();
+
        ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
        if (ret != sizeof(msg)) {
-               lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
                if (ret > 0) {
+                       lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
                        ret = -1;
                }
                return ret;
        }
+
+       health_code_update();
+
        if (msg.cmd_type == LTTNG_CONSUMER_STOP) {
                /*
                 * Notify the session daemon that the command is completed.
@@ -459,6 +446,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                return -ENOENT;
        }
 
+       health_code_update();
+
        /* relayd needs RCU read-side protection */
        rcu_read_lock();
 
@@ -468,7 +457,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                /* Session daemon status message are handled in the following call. */
                ret = consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
                                msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll,
-                               &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id);
+                               &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id,
+                                msg.u.relayd_sock.relayd_session_id);
                goto end_nosignal;
        }
        case LTTNG_CONSUMER_ADD_CHANNEL:
@@ -476,12 +466,17 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_channel *new_channel;
                int ret_recv;
 
+               health_code_update();
+
                /* First send a status message before receiving the fds. */
                ret = consumer_send_status_msg(sock, ret_code);
                if (ret < 0) {
                        /* Somehow, the session daemon is not responding anymore. */
                        goto error_fatal;
                }
+
+               health_code_update();
+
                DBG("consumer_add_channel %" PRIu64, msg.u.channel.channel_key);
                new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
                                msg.u.channel.session_id, msg.u.channel.pathname,
@@ -489,7 +484,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                                msg.u.channel.relayd_id, msg.u.channel.output,
                                msg.u.channel.tracefile_size,
                                msg.u.channel.tracefile_count, 0,
-                               msg.u.channel.monitor);
+                               msg.u.channel.monitor,
+                               msg.u.channel.live_timer_interval);
                if (new_channel == NULL) {
                        lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
                        goto end_nosignal;
@@ -518,6 +514,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        goto end_nosignal;
                };
 
+               health_code_update();
+
                if (ctx->on_recv_channel != NULL) {
                        ret_recv = ctx->on_recv_channel(new_channel);
                        if (ret_recv == 0) {
@@ -528,6 +526,12 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                } else {
                        ret = consumer_add_channel(new_channel, ctx);
                }
+               if (CONSUMER_CHANNEL_TYPE_DATA) {
+                       consumer_timer_live_start(new_channel,
+                                       msg.u.channel.live_timer_interval);
+               }
+
+               health_code_update();
 
                /* If we received an error in add_channel, we need to report it. */
                if (ret < 0) {
@@ -562,28 +566,33 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        ret_code = LTTNG_ERR_KERN_CHAN_NOT_FOUND;
                }
 
+               health_code_update();
+
                /* First send a status message before receiving the fds. */
                ret = consumer_send_status_msg(sock, ret_code);
                if (ret < 0) {
-                       /*
-                        * Somehow, the session daemon is not responding
-                        * anymore.
-                        */
+                       /* Somehow, the session daemon is not responding anymore. */
                        goto error_fatal;
                }
-               if (ret_code != LTTNG_OK) {
-                       /*
-                        * Channel was not found.
-                        */
+
+               health_code_update();
+
+               if (ret_code != LTTCOMM_CONSUMERD_SUCCESS) {
+                       /* Channel was not found. */
                        goto end_nosignal;
                }
 
-               /* block */
-               if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+               /* Blocking call */
+               health_poll_entry();
+               ret = lttng_consumer_poll_socket(consumer_sockpoll);
+               health_poll_exit();
+               if (ret < 0) {
                        rcu_read_unlock();
                        return -EINTR;
                }
 
+               health_code_update();
+
                /* Get stream file descriptor from socket */
                ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
                if (ret != sizeof(fd)) {
@@ -592,6 +601,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        return ret;
                }
 
+               health_code_update();
+
                /*
                 * Send status code to session daemon only if the recv works. If the
                 * above recv() failed, the session daemon is notified through the
@@ -603,6 +614,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        goto end_nosignal;
                }
 
+               health_code_update();
+
                new_stream = consumer_allocate_stream(channel->key,
                                fd,
                                LTTNG_CONSUMER_ACTIVE_STREAM,
@@ -613,7 +626,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                                channel->session_id,
                                msg.u.stream.cpu,
                                &alloc_ret,
-                               channel->type);
+                               channel->type,
+                               channel->monitor);
                if (new_stream == NULL) {
                        switch (alloc_ret) {
                        case -ENOMEM:
@@ -624,6 +638,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        }
                        goto end_nosignal;
                }
+
                new_stream->chan = channel;
                new_stream->wait_fd = fd;
                switch (channel->output) {
@@ -658,14 +673,18 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                 */
                new_stream->hangup_flush_done = 0;
 
+               health_code_update();
+
                if (ctx->on_recv_stream) {
                        ret = ctx->on_recv_stream(new_stream);
                        if (ret < 0) {
-                               consumer_del_stream(new_stream, NULL);
+                               consumer_stream_free(new_stream);
                                goto end_nosignal;
                        }
                }
 
+               health_code_update();
+
                if (new_stream->metadata_flag) {
                        channel->metadata_stream = new_stream;
                }
@@ -675,30 +694,56 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        DBG("Kernel consumer add stream %s in no monitor mode with "
                                        "relayd id %" PRIu64, new_stream->name,
                                        new_stream->net_seq_idx);
-                       cds_list_add(&new_stream->no_monitor_node,
-                                       &channel->stream_no_monitor_list.head);
+                       cds_list_add(&new_stream->send_node, &channel->streams.head);
                        break;
                }
 
-               ret = send_relayd_stream(new_stream, NULL);
-               if (ret < 0) {
-                       consumer_del_stream(new_stream, NULL);
-                       goto end_nosignal;
+               /* Send stream to relayd if the stream has an ID. */
+               if (new_stream->net_seq_idx != (uint64_t) -1ULL) {
+                       ret = consumer_send_relayd_stream(new_stream,
+                                       new_stream->chan->pathname);
+                       if (ret < 0) {
+                               consumer_stream_free(new_stream);
+                               goto end_nosignal;
+                       }
                }
 
                /* Get the right pipe where the stream will be sent. */
                if (new_stream->metadata_flag) {
+                       ret = consumer_add_metadata_stream(new_stream);
+                       if (ret) {
+                               ERR("Consumer add metadata stream %" PRIu64 " failed. Continuing",
+                                               new_stream->key);
+                               consumer_stream_free(new_stream);
+                               goto end_nosignal;
+                       }
                        stream_pipe = ctx->consumer_metadata_pipe;
                } else {
+                       ret = consumer_add_data_stream(new_stream);
+                       if (ret) {
+                               ERR("Consumer add stream %" PRIu64 " failed. Continuing",
+                                               new_stream->key);
+                               consumer_stream_free(new_stream);
+                               goto end_nosignal;
+                       }
                        stream_pipe = ctx->consumer_data_pipe;
                }
 
+               /* Vitible to other threads */
+               new_stream->globally_visible = 1;
+
+               health_code_update();
+
                ret = lttng_pipe_write(stream_pipe, &new_stream, sizeof(new_stream));
                if (ret < 0) {
                        ERR("Consumer write %s stream to pipe %d",
                                        new_stream->metadata_flag ? "metadata" : "data",
                                        lttng_pipe_get_writefd(stream_pipe));
-                       consumer_del_stream(new_stream, NULL);
+                       if (new_stream->metadata_flag) {
+                               consumer_del_stream_for_metadata(new_stream);
+                       } else {
+                               consumer_del_stream_for_data(new_stream);
+                       }
                        goto end_nosignal;
                }
 
@@ -739,6 +784,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        consumer_flag_relayd_for_destroy(relayd);
                }
 
+               health_code_update();
+
                ret = consumer_send_status_msg(sock, ret_code);
                if (ret < 0) {
                        /* Somehow, the session daemon is not responding anymore. */
@@ -756,6 +803,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
                ret = consumer_data_pending(id);
 
+               health_code_update();
+
                /* Send back returned value to session daemon */
                ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret));
                if (ret < 0) {
@@ -782,13 +831,17 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                } else {
                        ret = lttng_kconsumer_snapshot_channel(msg.u.snapshot_channel.key,
                                        msg.u.snapshot_channel.pathname,
-                                       msg.u.snapshot_channel.relayd_id, ctx);
+                                       msg.u.snapshot_channel.relayd_id,
+                                       msg.u.snapshot_channel.max_stream_size,
+                                       ctx);
                        if (ret < 0) {
                                ERR("Snapshot channel failed");
                                ret_code = LTTNG_ERR_KERN_CHAN_FAIL;
                        }
                }
 
+               health_code_update();
+
                ret = consumer_send_status_msg(sock, ret_code);
                if (ret < 0) {
                        /* Somehow, the session daemon is not responding anymore. */
@@ -807,12 +860,16 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        ret_code = LTTNG_ERR_KERN_CHAN_NOT_FOUND;
                }
 
+               health_code_update();
+
                ret = consumer_send_status_msg(sock, ret_code);
                if (ret < 0) {
                        /* Somehow, the session daemon is not responding anymore. */
                        goto end_nosignal;
                }
 
+               health_code_update();
+
                /*
                 * This command should ONLY be issued for channel with streams set in
                 * no monitor mode.
@@ -840,6 +897,7 @@ end_nosignal:
         * Return 1 to indicate success since the 0 value can be a socket
         * shutdown during the recv() or send() call.
         */
+       health_code_update();
        return 1;
 
 error_fatal:
@@ -848,6 +906,97 @@ error_fatal:
        return -1;
 }
 
+/*
+ * Populate index values of a kernel stream. Values are set in big endian order.
+ *
+ * Return 0 on success or else a negative value.
+ */
+static int get_index_values(struct lttng_packet_index *index, int infd)
+{
+       int ret;
+
+       ret = kernctl_get_timestamp_begin(infd, &index->timestamp_begin);
+       if (ret < 0) {
+               PERROR("kernctl_get_timestamp_begin");
+               goto error;
+       }
+       index->timestamp_begin = htobe64(index->timestamp_begin);
+
+       ret = kernctl_get_timestamp_end(infd, &index->timestamp_end);
+       if (ret < 0) {
+               PERROR("kernctl_get_timestamp_end");
+               goto error;
+       }
+       index->timestamp_end = htobe64(index->timestamp_end);
+
+       ret = kernctl_get_events_discarded(infd, &index->events_discarded);
+       if (ret < 0) {
+               PERROR("kernctl_get_events_discarded");
+               goto error;
+       }
+       index->events_discarded = htobe64(index->events_discarded);
+
+       ret = kernctl_get_content_size(infd, &index->content_size);
+       if (ret < 0) {
+               PERROR("kernctl_get_content_size");
+               goto error;
+       }
+       index->content_size = htobe64(index->content_size);
+
+       ret = kernctl_get_packet_size(infd, &index->packet_size);
+       if (ret < 0) {
+               PERROR("kernctl_get_packet_size");
+               goto error;
+       }
+       index->packet_size = htobe64(index->packet_size);
+
+       ret = kernctl_get_stream_id(infd, &index->stream_id);
+       if (ret < 0) {
+               PERROR("kernctl_get_stream_id");
+               goto error;
+       }
+       index->stream_id = htobe64(index->stream_id);
+
+error:
+       return ret;
+}
+/*
+ * Sync metadata meaning request them to the session daemon and snapshot to the
+ * metadata thread can consumer them.
+ *
+ * Metadata stream lock MUST be acquired.
+ *
+ * Return 0 if new metadatda is available, EAGAIN if the metadata stream
+ * is empty or a negative value on error.
+ */
+int lttng_kconsumer_sync_metadata(struct lttng_consumer_stream *metadata)
+{
+       int ret;
+
+       assert(metadata);
+
+       ret = kernctl_buffer_flush(metadata->wait_fd);
+       if (ret < 0) {
+               ERR("Failed to flush kernel stream");
+               goto end;
+       }
+
+       ret = kernctl_snapshot(metadata->wait_fd);
+       if (ret < 0) {
+               if (errno != EAGAIN) {
+                       ERR("Sync metadata, taking kernel snapshot failed.");
+                       goto end;
+               }
+               DBG("Sync metadata, no new kernel metadata");
+               /* No new metadata, exit. */
+               ret = ENODATA;
+               goto end;
+       }
+
+end:
+       return ret;
+}
+
 /*
  * Consume data on a file descriptor and write it on a trace file.
  */
@@ -855,15 +1004,16 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                struct lttng_consumer_local_data *ctx)
 {
        unsigned long len, subbuf_size, padding;
-       int err;
+       int err, write_index = 1;
        ssize_t ret = 0;
        int infd = stream->wait_fd;
+       struct lttng_packet_index index;
 
        DBG("In read_subbuffer (infd : %d)", infd);
+
        /* Get the next subbuffer */
        err = kernctl_get_next_subbuf(infd);
        if (err != 0) {
-               ret = err;
                /*
                 * This is a debug message even for single-threaded consumer,
                 * because poll() have more relaxed criterions than get subbuf,
@@ -872,18 +1022,27 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                 */
                DBG("Reserving sub buffer failed (everything is normal, "
                                "it is due to concurrency)");
+               ret = -errno;
                goto end;
        }
 
        /* Get the full subbuffer size including padding */
        err = kernctl_get_padded_subbuf_size(infd, &len);
        if (err != 0) {
-               errno = -err;
                perror("Getting sub-buffer len failed.");
-               ret = err;
+               ret = -errno;
                goto end;
        }
 
+       if (!stream->metadata_flag) {
+               ret = get_index_values(&index, infd);
+               if (ret < 0) {
+                       goto end;
+               }
+       } else {
+               write_index = 0;
+       }
+
        switch (stream->chan->output) {
        case CONSUMER_CHANNEL_SPLICE:
                /*
@@ -896,7 +1055,7 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
 
                /* splice the subbuffer to the tracefile */
                ret = lttng_consumer_on_read_subbuffer_splice(ctx, stream, subbuf_size,
-                               padding);
+                               padding, &index);
                /*
                 * XXX: Splice does not support network streaming so the return value
                 * is simply checked against subbuf_size and not like the mmap() op.
@@ -908,15 +1067,15 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                         */
                        ERR("Error splicing to tracefile (ret: %zd != len: %lu)",
                                        ret, subbuf_size);
+                       write_index = 0;
                }
                break;
        case CONSUMER_CHANNEL_MMAP:
                /* Get subbuffer size without padding */
                err = kernctl_get_subbuf_size(infd, &subbuf_size);
                if (err != 0) {
-                       errno = -err;
                        perror("Getting sub-buffer len failed.");
-                       ret = err;
+                       ret = -errno;
                        goto end;
                }
 
@@ -927,7 +1086,7 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
 
                /* write the subbuffer to the tracefile */
                ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, subbuf_size,
-                               padding);
+                               padding, &index);
                /*
                 * The mmap operation should write subbuf_size amount of data when
                 * network streaming or the full padding (len) size when we are _not_
@@ -942,24 +1101,43 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                        ERR("Error writing to tracefile "
                                        "(ret: %zd != len: %lu != subbuf_size: %lu)",
                                        ret, len, subbuf_size);
+                       write_index = 0;
                }
                break;
        default:
                ERR("Unknown output method");
-               ret = -1;
+               ret = -EPERM;
        }
 
        err = kernctl_put_next_subbuf(infd);
        if (err != 0) {
-               errno = -err;
                if (errno == EFAULT) {
                        perror("Error in unreserving sub buffer\n");
                } else if (errno == EIO) {
                        /* Should never happen with newer LTTng versions */
                        perror("Reader has been pushed by the writer, last sub-buffer corrupted.");
                }
+               ret = -errno;
+               goto end;
+       }
 
-               ret = -err;
+       /* Write index if needed. */
+       if (!write_index) {
+               goto end;
+       }
+
+       if (stream->chan->live_timer_interval && !stream->metadata_flag) {
+               /*
+                * In live, block until all the metadata is sent.
+                */
+               err = consumer_stream_sync_metadata(ctx, stream->session_id);
+               if (err < 0) {
+                       goto end;
+               }
+       }
+
+       err = consumer_stream_write_index(stream, &index);
+       if (err < 0) {
                goto end;
        }
 
@@ -980,12 +1158,23 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
        if (stream->net_seq_idx == (uint64_t) -1ULL && stream->chan->monitor) {
                ret = utils_create_stream_file(stream->chan->pathname, stream->name,
                                stream->chan->tracefile_size, stream->tracefile_count_current,
-                               stream->uid, stream->gid);
+                               stream->uid, stream->gid, NULL);
                if (ret < 0) {
                        goto error;
                }
                stream->out_fd = ret;
                stream->tracefile_size_current = 0;
+
+               if (!stream->metadata_flag) {
+                       ret = index_create_file(stream->chan->pathname,
+                                       stream->name, stream->uid, stream->gid,
+                                       stream->chan->tracefile_size,
+                                       stream->tracefile_count_current);
+                       if (ret < 0) {
+                               goto error;
+                       }
+                       stream->index_fd = ret;
+               }
        }
 
        if (stream->output == LTTNG_EVENT_MMAP) {
@@ -994,8 +1183,8 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
 
                ret = kernctl_get_mmap_len(stream->wait_fd, &mmap_len);
                if (ret != 0) {
-                       errno = -ret;
                        PERROR("kernctl_get_mmap_len");
+                       ret = -errno;
                        goto error_close_fd;
                }
                stream->mmap_len = (size_t) mmap_len;
@@ -1013,11 +1202,12 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
        return 0;
 
 error_close_fd:
-       {
+       if (stream->out_fd >= 0) {
                int err;
 
                err = close(stream->out_fd);
                assert(!err);
+               stream->out_fd = -1;
        }
 error:
        return ret;
@@ -1037,6 +1227,11 @@ int lttng_kconsumer_data_pending(struct lttng_consumer_stream *stream)
 
        assert(stream);
 
+       if (stream->endpoint_status != CONSUMER_ENDPOINT_ACTIVE) {
+               ret = 0;
+               goto end;
+       }
+
        ret = kernctl_get_next_subbuf(stream->wait_fd);
        if (ret == 0) {
                /* There is still data so let's put back this subbuffer. */
This page took 0.037074 seconds and 5 git commands to generate.