Fix: implicit conversion of enum types in consumer
[lttng-tools.git] / src / common / kernel-consumer / kernel-consumer.c
index 09ccda329526b69cf1e5aa147ed32f4dc8a67e81..1b01d414a0248b0207bbf72aba620a8fa62b93e4 100644 (file)
@@ -29,6 +29,7 @@
 #include <unistd.h>
 #include <sys/stat.h>
 
+#include <bin/lttng-consumerd/health-consumerd.h>
 #include <common/common.h>
 #include <common/kernel-ctl/kernel-ctl.h>
 #include <common/sessiond-comm/sessiond-comm.h>
@@ -39,6 +40,7 @@
 #include <common/utils.h>
 #include <common/consumer-stream.h>
 #include <common/index/index.h>
+#include <common/consumer-timer.h>
 
 #include "kernel-consumer.h"
 
@@ -138,6 +140,9 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
        }
 
        cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
+
+               health_code_update();
+
                /*
                 * Lock stream because we are about to change its state.
                 */
@@ -220,6 +225,8 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                        ssize_t read_len;
                        unsigned long len, padded_len;
 
+                       health_code_update();
+
                        DBG("Kernel consumer taking snapshot at pos %lu", consumed_pos);
 
                        ret = kernctl_get_subbuf(stream->wait_fd, &consumed_pos);
@@ -361,6 +368,8 @@ int lttng_kconsumer_snapshot_metadata(uint64_t key, char *path,
        }
 
        do {
+               health_code_update();
+
                ret_read = lttng_kconsumer_read_subbuffer(metadata_stream, ctx);
                if (ret_read < 0) {
                        if (ret_read != -EAGAIN) {
@@ -409,9 +418,11 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                int sock, struct pollfd *consumer_sockpoll)
 {
        ssize_t ret;
-       enum lttng_error_code ret_code = LTTNG_OK;
+       enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
        struct lttcomm_consumer_msg msg;
 
+       health_code_update();
+
        ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
        if (ret != sizeof(msg)) {
                if (ret > 0) {
@@ -420,6 +431,9 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                }
                return ret;
        }
+
+       health_code_update();
+
        if (msg.cmd_type == LTTNG_CONSUMER_STOP) {
                /*
                 * Notify the session daemon that the command is completed.
@@ -432,6 +446,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                return -ENOENT;
        }
 
+       health_code_update();
+
        /* relayd needs RCU read-side protection */
        rcu_read_lock();
 
@@ -441,7 +457,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                /* Session daemon status message are handled in the following call. */
                ret = consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
                                msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll,
-                               &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id);
+                               &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id,
+                                msg.u.relayd_sock.relayd_session_id);
                goto end_nosignal;
        }
        case LTTNG_CONSUMER_ADD_CHANNEL:
@@ -449,12 +466,17 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_channel *new_channel;
                int ret_recv;
 
+               health_code_update();
+
                /* First send a status message before receiving the fds. */
                ret = consumer_send_status_msg(sock, ret_code);
                if (ret < 0) {
                        /* Somehow, the session daemon is not responding anymore. */
                        goto error_fatal;
                }
+
+               health_code_update();
+
                DBG("consumer_add_channel %" PRIu64, msg.u.channel.channel_key);
                new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
                                msg.u.channel.session_id, msg.u.channel.pathname,
@@ -462,7 +484,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                                msg.u.channel.relayd_id, msg.u.channel.output,
                                msg.u.channel.tracefile_size,
                                msg.u.channel.tracefile_count, 0,
-                               msg.u.channel.monitor);
+                               msg.u.channel.monitor,
+                               msg.u.channel.live_timer_interval);
                if (new_channel == NULL) {
                        lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
                        goto end_nosignal;
@@ -491,6 +514,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        goto end_nosignal;
                };
 
+               health_code_update();
+
                if (ctx->on_recv_channel != NULL) {
                        ret_recv = ctx->on_recv_channel(new_channel);
                        if (ret_recv == 0) {
@@ -501,6 +526,12 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                } else {
                        ret = consumer_add_channel(new_channel, ctx);
                }
+               if (CONSUMER_CHANNEL_TYPE_DATA) {
+                       consumer_timer_live_start(new_channel,
+                                       msg.u.channel.live_timer_interval);
+               }
+
+               health_code_update();
 
                /* If we received an error in add_channel, we need to report it. */
                if (ret < 0) {
@@ -535,23 +566,33 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        ret_code = LTTNG_ERR_KERN_CHAN_NOT_FOUND;
                }
 
+               health_code_update();
+
                /* First send a status message before receiving the fds. */
                ret = consumer_send_status_msg(sock, ret_code);
                if (ret < 0) {
                        /* Somehow, the session daemon is not responding anymore. */
                        goto error_fatal;
                }
-               if (ret_code != LTTNG_OK) {
+
+               health_code_update();
+
+               if (ret_code != LTTCOMM_CONSUMERD_SUCCESS) {
                        /* Channel was not found. */
                        goto end_nosignal;
                }
 
                /* Blocking call */
-               if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+               health_poll_entry();
+               ret = lttng_consumer_poll_socket(consumer_sockpoll);
+               health_poll_exit();
+               if (ret < 0) {
                        rcu_read_unlock();
                        return -EINTR;
                }
 
+               health_code_update();
+
                /* Get stream file descriptor from socket */
                ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
                if (ret != sizeof(fd)) {
@@ -560,6 +601,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        return ret;
                }
 
+               health_code_update();
+
                /*
                 * Send status code to session daemon only if the recv works. If the
                 * above recv() failed, the session daemon is notified through the
@@ -571,6 +614,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        goto end_nosignal;
                }
 
+               health_code_update();
+
                new_stream = consumer_allocate_stream(channel->key,
                                fd,
                                LTTNG_CONSUMER_ACTIVE_STREAM,
@@ -628,6 +673,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                 */
                new_stream->hangup_flush_done = 0;
 
+               health_code_update();
+
                if (ctx->on_recv_stream) {
                        ret = ctx->on_recv_stream(new_stream);
                        if (ret < 0) {
@@ -636,6 +683,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        }
                }
 
+               health_code_update();
+
                if (new_stream->metadata_flag) {
                        channel->metadata_stream = new_stream;
                }
@@ -683,6 +732,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                /* Vitible to other threads */
                new_stream->globally_visible = 1;
 
+               health_code_update();
+
                ret = lttng_pipe_write(stream_pipe, &new_stream, sizeof(new_stream));
                if (ret < 0) {
                        ERR("Consumer write %s stream to pipe %d",
@@ -733,6 +784,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        consumer_flag_relayd_for_destroy(relayd);
                }
 
+               health_code_update();
+
                ret = consumer_send_status_msg(sock, ret_code);
                if (ret < 0) {
                        /* Somehow, the session daemon is not responding anymore. */
@@ -750,6 +803,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
                ret = consumer_data_pending(id);
 
+               health_code_update();
+
                /* Send back returned value to session daemon */
                ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret));
                if (ret < 0) {
@@ -785,6 +840,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        }
                }
 
+               health_code_update();
+
                ret = consumer_send_status_msg(sock, ret_code);
                if (ret < 0) {
                        /* Somehow, the session daemon is not responding anymore. */
@@ -803,12 +860,16 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        ret_code = LTTNG_ERR_KERN_CHAN_NOT_FOUND;
                }
 
+               health_code_update();
+
                ret = consumer_send_status_msg(sock, ret_code);
                if (ret < 0) {
                        /* Somehow, the session daemon is not responding anymore. */
                        goto end_nosignal;
                }
 
+               health_code_update();
+
                /*
                 * This command should ONLY be issued for channel with streams set in
                 * no monitor mode.
@@ -836,6 +897,7 @@ end_nosignal:
         * Return 1 to indicate success since the 0 value can be a socket
         * shutdown during the recv() or send() call.
         */
+       health_code_update();
        return 1;
 
 error_fatal:
@@ -898,6 +960,42 @@ static int get_index_values(struct lttng_packet_index *index, int infd)
 error:
        return ret;
 }
+/*
+ * Sync metadata meaning request them to the session daemon and snapshot to the
+ * metadata thread can consumer them.
+ *
+ * Metadata stream lock MUST be acquired.
+ *
+ * Return 0 if new metadatda is available, EAGAIN if the metadata stream
+ * is empty or a negative value on error.
+ */
+int lttng_kconsumer_sync_metadata(struct lttng_consumer_stream *metadata)
+{
+       int ret;
+
+       assert(metadata);
+
+       ret = kernctl_buffer_flush(metadata->wait_fd);
+       if (ret < 0) {
+               ERR("Failed to flush kernel stream");
+               goto end;
+       }
+
+       ret = kernctl_snapshot(metadata->wait_fd);
+       if (ret < 0) {
+               if (errno != EAGAIN) {
+                       ERR("Sync metadata, taking kernel snapshot failed.");
+                       goto end;
+               }
+               DBG("Sync metadata, no new kernel metadata");
+               /* No new metadata, exit. */
+               ret = ENODATA;
+               goto end;
+       }
+
+end:
+       return ret;
+}
 
 /*
  * Consume data on a file descriptor and write it on a trace file.
@@ -906,18 +1004,13 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                struct lttng_consumer_local_data *ctx)
 {
        unsigned long len, subbuf_size, padding;
-       int err, write_index = 0;
+       int err, write_index = 1;
        ssize_t ret = 0;
        int infd = stream->wait_fd;
        struct lttng_packet_index index;
 
        DBG("In read_subbuffer (infd : %d)", infd);
 
-       /* Indicate that for this stream we have to write the index. */
-       if (stream->index_fd >= 0) {
-               write_index = 1;
-       }
-
        /* Get the next subbuffer */
        err = kernctl_get_next_subbuf(infd);
        if (err != 0) {
@@ -941,11 +1034,13 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                goto end;
        }
 
-       if (!stream->metadata_flag && write_index) {
+       if (!stream->metadata_flag) {
                ret = get_index_values(&index, infd);
                if (ret < 0) {
                        goto end;
                }
+       } else {
+               write_index = 0;
        }
 
        switch (stream->chan->output) {
@@ -1027,14 +1122,25 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
        }
 
        /* Write index if needed. */
-       if (write_index) {
-               err = index_write(stream->index_fd, &index, sizeof(index));
+       if (!write_index) {
+               goto end;
+       }
+
+       if (stream->chan->live_timer_interval && !stream->metadata_flag) {
+               /*
+                * In live, block until all the metadata is sent.
+                */
+               err = consumer_stream_sync_metadata(ctx, stream->session_id);
                if (err < 0) {
-                       ret = -1;
                        goto end;
                }
        }
 
+       err = consumer_stream_write_index(stream, &index);
+       if (err < 0) {
+               goto end;
+       }
+
 end:
        return ret;
 }
This page took 0.030469 seconds and 5 git commands to generate.