Test for new metadata at each packet
[lttng-tools.git] / src / common / ust-consumer / ust-consumer.c
index 192217b4e0eea6557e02e25d4df648bc12c81987..2d3d89c7c66e4349aa9e5141a5229ade6ba32451 100644 (file)
@@ -800,7 +800,7 @@ static int snapshot_metadata(uint64_t key, char *path, uint64_t relayd_id,
         * Ask the sessiond if we have new metadata waiting and update the
         * consumer metadata cache.
         */
-       ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0);
+       ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0, 1);
        if (ret < 0) {
                goto error;
        }
@@ -835,18 +835,13 @@ static int snapshot_metadata(uint64_t key, char *path, uint64_t relayd_id,
                metadata_stream->tracefile_size_current = 0;
        }
 
-       pthread_mutex_lock(&metadata_channel->metadata_cache->lock);
-
        do {
                ret = lttng_consumer_read_subbuffer(metadata_stream, ctx);
                if (ret < 0) {
-                       goto error_unlock;
+                       goto error_stream;
                }
        } while (ret > 0);
 
-error_unlock:
-       pthread_mutex_unlock(&metadata_channel->metadata_cache->lock);
-
 error_stream:
        /*
         * Clean up the stream completly because the next snapshot will use a new
@@ -1024,7 +1019,7 @@ error:
  */
 int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
                uint64_t len, struct lttng_consumer_channel *channel,
-               int timer)
+               int timer, int wait)
 {
        int ret, ret_code = LTTNG_OK;
        char *metadata_str;
@@ -1061,6 +1056,9 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
        }
        pthread_mutex_unlock(&channel->metadata_cache->lock);
 
+       if (!wait) {
+               goto end_free;
+       }
        while (consumer_metadata_cache_flushed(channel, offset + len, timer)) {
                DBG("Waiting for metadata to be flushed");
                usleep(DEFAULT_METADATA_AVAILABILITY_WAIT_TIME);
@@ -1253,10 +1251,11 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        }
                        consumer_timer_switch_start(channel, attr.switch_timer_interval);
                        attr.switch_timer_interval = 0;
+               } else {
+                       consumer_timer_live_start(channel,
+                                       msg.u.ask_channel.live_timer_interval);
                }
 
-               consumer_timer_live_start(channel, msg.u.ask_channel.live_timer_interval);
-
                /*
                 * Add the channel to the internal state AFTER all streams were created
                 * and successfully sent to session daemon. This way, all streams must
@@ -1410,7 +1409,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                }
 
                ret = lttng_ustconsumer_recv_metadata(sock, key, offset,
-                               len, channel, 0);
+                               len, channel, 0, 1);
                if (ret < 0) {
                        /* error receiving from sessiond */
                        goto error_fatal;
@@ -1665,7 +1664,112 @@ error:
        return ret;
 }
 
+/*
+ * Write up to one packet from the metadata cache to the channel.
+ *
+ * Returns the number of bytes pushed in the cache, or a negative value
+ * on error.
+ */
+static
+int commit_one_metadata_packet(struct lttng_consumer_stream *stream)
+{
+       ssize_t write_len;
+       int ret;
+
+       pthread_mutex_lock(&stream->chan->metadata_cache->lock);
+       if (stream->chan->metadata_cache->contiguous
+                       == stream->ust_metadata_pushed) {
+               ret = 0;
+               goto end;
+       }
+
+       write_len = ustctl_write_one_packet_to_channel(stream->chan->uchan,
+                       &stream->chan->metadata_cache->data[stream->ust_metadata_pushed],
+                       stream->chan->metadata_cache->contiguous
+                       - stream->ust_metadata_pushed);
+       assert(write_len != 0);
+       if (write_len < 0) {
+               ERR("Writing one metadata packet");
+               ret = -1;
+               goto end;
+       }
+       stream->ust_metadata_pushed += write_len;
+
+       assert(stream->chan->metadata_cache->contiguous >=
+                       stream->ust_metadata_pushed);
+       ret = write_len;
+
+end:
+       pthread_mutex_unlock(&stream->chan->metadata_cache->lock);
+       return ret;
+}
+
 
+/*
+ * Sync metadata meaning request them to the session daemon and snapshot to the
+ * metadata thread can consumer them.
+ *
+ * Metadata stream lock MUST be acquired.
+ *
+ * Return 0 if new metadatda is available, EAGAIN if the metadata stream
+ * is empty or a negative value on error.
+ */
+int lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx,
+               struct lttng_consumer_stream *metadata)
+{
+       int ret;
+       int retry = 0;
+
+       assert(ctx);
+       assert(metadata);
+
+       /*
+        * Request metadata from the sessiond, but don't wait for the flush
+        * because we locked the metadata thread.
+        */
+       ret = lttng_ustconsumer_request_metadata(ctx, metadata->chan, 0, 0);
+       if (ret < 0) {
+               goto end;
+       }
+
+       ret = commit_one_metadata_packet(metadata);
+       if (ret <= 0) {
+               goto end;
+       } else if (ret > 0) {
+               retry = 1;
+       }
+
+       ustctl_flush_buffer(metadata->ustream, 1);
+       ret = ustctl_snapshot(metadata->ustream);
+       if (ret < 0) {
+               if (errno != EAGAIN) {
+                       ERR("Sync metadata, taking UST snapshot");
+                       goto end;
+               }
+               DBG("No new metadata when syncing them.");
+               /* No new metadata, exit. */
+               ret = ENODATA;
+               goto end;
+       }
+
+       /*
+        * After this flush, we still need to extract metadata.
+        */
+       if (retry) {
+               ret = EAGAIN;
+       }
+
+end:
+       return ret;
+}
+
+/*
+ * Read subbuffer from the given stream.
+ *
+ * Stream lock MUST be acquired.
+ *
+ * Return 0 on success else a negative value.
+ */
 int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                struct lttng_consumer_local_data *ctx)
 {
@@ -1708,25 +1812,10 @@ retry:
                 * already been read.
                 */
                if (stream->metadata_flag) {
-                       ssize_t write_len;
-
-                       if (stream->chan->metadata_cache->contiguous
-                                       == stream->ust_metadata_pushed) {
-                               ret = 0;
+                       ret = commit_one_metadata_packet(stream);
+                       if (ret <= 0) {
                                goto end;
                        }
-
-                       write_len = ustctl_write_one_packet_to_channel(stream->chan->uchan,
-                                       &stream->chan->metadata_cache->data[stream->ust_metadata_pushed],
-                                       stream->chan->metadata_cache->contiguous
-                                               - stream->ust_metadata_pushed);
-                       assert(write_len != 0);
-                       if (write_len < 0) {
-                               ERR("Writing one metadata packet");
-                               ret = -1;
-                               goto end;
-                       }
-                       stream->ust_metadata_pushed += write_len;
                        ustctl_flush_buffer(stream->ustream, 1);
                        goto retry;
                }
@@ -1795,6 +1884,16 @@ retry:
                goto end;
        }
 
+       if (stream->chan->live_timer_interval && !stream->metadata_flag) {
+               /*
+                * In live, block until all the metadata is sent.
+                */
+               err = consumer_stream_sync_metadata(ctx, stream->session_id);
+               if (err < 0) {
+                       goto end;
+               }
+       }
+
        assert(!stream->metadata_flag);
        err = consumer_stream_write_index(stream, &index);
        if (err < 0) {
@@ -1965,7 +2064,7 @@ void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream)
  * introduces deadlocks.
  */
 int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
-               struct lttng_consumer_channel *channel, int timer)
+               struct lttng_consumer_channel *channel, int timer, int wait)
 {
        struct lttcomm_metadata_request_msg request;
        struct lttcomm_consumer_msg msg;
@@ -2059,7 +2158,7 @@ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
        }
 
        ret_code = lttng_ustconsumer_recv_metadata(ctx->consumer_metadata_socket,
-                       key, offset, len, channel, timer);
+                       key, offset, len, channel, timer, wait);
        if (ret_code >= 0) {
                /*
                 * Only send the status msg if the sessiond is alive meaning a positive
This page took 0.027684 seconds and 5 git commands to generate.