Command metadata regenerate
[lttng-tools.git] / src / common / ust-consumer / ust-consumer.c
index 5686fbd09fe58966cebf3f3be70bf8ec9acd339e..fe7445b7f133818353fb96b1df10ed0171db88bf 100644 (file)
@@ -1197,8 +1197,8 @@ error:
  * complete.
  */
 int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
-               uint64_t len, struct lttng_consumer_channel *channel,
-               int timer, int wait)
+               uint64_t len, uint64_t version,
+               struct lttng_consumer_channel *channel, int timer, int wait)
 {
        int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS;
        char *metadata_str;
@@ -1225,7 +1225,8 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
        health_code_update();
 
        pthread_mutex_lock(&channel->metadata_cache->lock);
-       ret = consumer_metadata_cache_write(channel, offset, len, metadata_str);
+       ret = consumer_metadata_cache_write(channel, offset, len, version,
+                       metadata_str);
        if (ret < 0) {
                /* Unable to handle metadata. Notify session daemon. */
                ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA;
@@ -1587,6 +1588,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                uint64_t len = msg.u.push_metadata.len;
                uint64_t key = msg.u.push_metadata.key;
                uint64_t offset = msg.u.push_metadata.target_offset;
+               uint64_t version = msg.u.push_metadata.version;
                struct lttng_consumer_channel *channel;
 
                DBG("UST consumer push metadata key %" PRIu64 " of len %" PRIu64, key,
@@ -1637,7 +1639,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                health_code_update();
 
                ret = lttng_ustconsumer_recv_metadata(sock, key, offset,
-                               len, channel, 0, 1);
+                               len, version, channel, 0, 1);
                if (ret < 0) {
                        /* error receiving from sessiond */
                        goto error_fatal;
@@ -2097,6 +2099,38 @@ error:
        return ret;
 }
 
+static
+void metadata_stream_reset_cache(struct lttng_consumer_stream *stream,
+               struct consumer_metadata_cache *cache)
+{
+       DBG("Metadata stream update to version %" PRIu64,
+                       cache->version);
+       stream->ust_metadata_pushed = 0;
+       stream->metadata_version = cache->version;
+       stream->reset_metadata_flag = 1;
+}
+
+/*
+ * Check if the version of the metadata stream and metadata cache match.
+ * If the cache got updated, reset the metadata stream.
+ * The stream lock and metadata cache lock MUST be held.
+ * Return 0 on success, a negative value on error.
+ */
+static
+int metadata_stream_check_version(struct lttng_consumer_stream *stream)
+{
+       int ret = 0;
+       struct consumer_metadata_cache *cache = stream->chan->metadata_cache;
+
+       if (cache->version == stream->metadata_version) {
+               goto end;
+       }
+       metadata_stream_reset_cache(stream, cache);
+
+end:
+       return ret;
+}
+
 /*
  * Write up to one packet from the metadata cache to the channel.
  *
@@ -2110,6 +2144,10 @@ int commit_one_metadata_packet(struct lttng_consumer_stream *stream)
        int ret;
 
        pthread_mutex_lock(&stream->chan->metadata_cache->lock);
+       ret = metadata_stream_check_version(stream);
+       if (ret < 0) {
+               goto end;
+       }
        if (stream->chan->metadata_cache->max_offset
                        == stream->ust_metadata_pushed) {
                ret = 0;
@@ -2693,7 +2731,7 @@ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
        struct lttcomm_metadata_request_msg request;
        struct lttcomm_consumer_msg msg;
        enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
-       uint64_t len, key, offset;
+       uint64_t len, key, offset, version;
        int ret;
 
        assert(channel);
@@ -2773,6 +2811,7 @@ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
        len = msg.u.push_metadata.len;
        key = msg.u.push_metadata.key;
        offset = msg.u.push_metadata.target_offset;
+       version = msg.u.push_metadata.version;
 
        assert(key == channel->key);
        if (len == 0) {
@@ -2795,7 +2834,7 @@ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
        health_code_update();
 
        ret = lttng_ustconsumer_recv_metadata(ctx->consumer_metadata_socket,
-                       key, offset, len, channel, timer, wait);
+                       key, offset, len, version, channel, timer, wait);
        if (ret >= 0) {
                /*
                 * Only send the status msg if the sessiond is alive meaning a positive
This page took 0.026019 seconds and 5 git commands to generate.