Test for new metadata at each packet
[lttng-tools.git] / src / common / kernel-consumer / kernel-consumer.c
index 315af2eb86e540900ad08496b6b0cff83fef4804..d02e8502d99914c0acf5652d9835bd71f20b540e 100644 (file)
@@ -39,6 +39,7 @@
 #include <common/utils.h>
 #include <common/consumer-stream.h>
 #include <common/index/index.h>
+#include <common/consumer-timer.h>
 
 #include "kernel-consumer.h"
 
@@ -441,7 +442,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                /* Session daemon status message are handled in the following call. */
                ret = consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
                                msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll,
-                               &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id);
+                               &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id,
+                                msg.u.relayd_sock.relayd_session_id);
                goto end_nosignal;
        }
        case LTTNG_CONSUMER_ADD_CHANNEL:
@@ -502,6 +504,10 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                } else {
                        ret = consumer_add_channel(new_channel, ctx);
                }
+               if (CONSUMER_CHANNEL_TYPE_DATA) {
+                       consumer_timer_live_start(new_channel,
+                                       msg.u.channel.live_timer_interval);
+               }
 
                /* If we received an error in add_channel, we need to report it. */
                if (ret < 0) {
@@ -899,6 +905,42 @@ static int get_index_values(struct lttng_packet_index *index, int infd)
 error:
        return ret;
 }
+/*
+ * Sync metadata meaning request them to the session daemon and snapshot to the
+ * metadata thread can consumer them.
+ *
+ * Metadata stream lock MUST be acquired.
+ *
+ * Return 0 if new metadatda is available, EAGAIN if the metadata stream
+ * is empty or a negative value on error.
+ */
+int lttng_kconsumer_sync_metadata(struct lttng_consumer_stream *metadata)
+{
+       int ret;
+
+       assert(metadata);
+
+       ret = kernctl_buffer_flush(metadata->wait_fd);
+       if (ret < 0) {
+               ERR("Failed to flush kernel stream");
+               goto end;
+       }
+
+       ret = kernctl_snapshot(metadata->wait_fd);
+       if (ret < 0) {
+               if (errno != EAGAIN) {
+                       ERR("Sync metadata, taking kernel snapshot failed.");
+                       goto end;
+               }
+               DBG("Sync metadata, no new kernel metadata");
+               /* No new metadata, exit. */
+               ret = ENODATA;
+               goto end;
+       }
+
+end:
+       return ret;
+}
 
 /*
  * Consume data on a file descriptor and write it on a trace file.
@@ -1029,6 +1071,16 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                goto end;
        }
 
+       if (stream->chan->live_timer_interval && !stream->metadata_flag) {
+               /*
+                * In live, block until all the metadata is sent.
+                */
+               err = consumer_stream_sync_metadata(ctx, stream->session_id);
+               if (err < 0) {
+                       goto end;
+               }
+       }
+
        err = consumer_stream_write_index(stream, &index);
        if (err < 0) {
                goto end;
This page took 0.026256 seconds and 5 git commands to generate.