CUSTOM: consumer: ust: relayd: perform 2 stage relay_add_stream
[lttng-tools.git] / src / common / ust-consumer / ust-consumer.c
index b78295c917c216f3be18fdf95ec0c7af652a8496..c9ce0d1b295b03ef641a640b85009d60ba7de099 100644 (file)
@@ -537,7 +537,6 @@ static int send_sessiond_channel(int sock,
 {
        int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS;
        struct lttng_consumer_stream *stream;
-       uint64_t relayd_id = -1ULL;
 
        assert(channel);
        assert(ctx);
@@ -546,25 +545,16 @@ static int send_sessiond_channel(int sock,
        DBG("UST consumer sending channel %s to sessiond", channel->name);
 
        if (channel->relayd_id != (uint64_t) -1ULL) {
-               cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
-
-                       health_code_update();
-
-                       /* Try to send the stream to the relayd if one is available. */
-                       ret = consumer_send_relayd_stream(stream, stream->chan->pathname);
-                       if (ret < 0) {
-                               /*
-                                * Flag that the relayd was the problem here probably due to a
-                                * communicaton error on the socket.
-                                */
-                               if (relayd_error) {
-                                       *relayd_error = 1;
-                               }
-                               ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
-                       }
-                       if (relayd_id == -1ULL) {
-                               relayd_id = stream->relayd_id;
+               ret = consumer_send_relayd_channel_bulk(channel);
+               if (ret < 0) {
+                       /*
+                        * Flag that the relayd was the problem here probably due to a
+                        * communicaton error on the socket.
+                        */
+                       if (relayd_error) {
+                               *relayd_error = 1;
                        }
+                       ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
                }
        }
 
@@ -1651,6 +1641,51 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                assert(cds_list_empty(&channel->streams.head));
                goto end_msg_sessiond;
        }
+       case LTTNG_CONSUMER_CHANNEL_STOP_LIVE_TIMER:
+       {
+               uint64_t key = msg.u.get_channel.key;
+               struct lttng_consumer_channel *channel;
+
+               channel = consumer_find_channel(key);
+               if (!channel) {
+                       ERR("UST consumer get channel key %" PRIu64 " not found", key);
+                       ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
+                       goto end_msg_sessiond;
+               }
+
+               health_code_update();
+
+               if (channel->live_timer_enabled == 1) {
+                       consumer_timer_live_stop(channel);
+               }
+
+               health_code_update();
+
+               goto end_msg_sessiond;
+       }
+       case LTTNG_CONSUMER_CHANNEL_START_LIVE_TIMER:
+       {
+               uint64_t key = msg.u.get_channel.key;
+               struct lttng_consumer_channel *channel;
+
+               channel = consumer_find_channel(key);
+               if (!channel) {
+                       ERR("UST consumer get channel key %" PRIu64 " not found", key);
+                       ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
+                       goto end_msg_sessiond;
+               }
+
+               health_code_update();
+
+               if (channel->live_timer_enabled == 0) {
+                       consumer_timer_live_start(channel, channel->live_timer_interval);
+               }
+
+               health_code_update();
+
+               goto end_msg_sessiond;
+       }
+
        case LTTNG_CONSUMER_DESTROY_CHANNEL:
        {
                uint64_t key = msg.u.destroy_channel.key;
@@ -2135,13 +2170,12 @@ int lttng_ustconsumer_close_wakeup_fd(struct lttng_consumer_stream *stream)
 }
 
 static
-void metadata_stream_reset_cache(struct lttng_consumer_stream *stream)
+void metadata_stream_reset_cache_consumed_position(
+               struct lttng_consumer_stream *stream)
 {
        DBG("Reset metadata cache of session %" PRIu64,
                        stream->chan->session_id);
        stream->ust_metadata_pushed = 0;
-       stream->metadata_version = stream->chan->metadata_cache->version;
-       stream->reset_metadata_flag = 1;
 }
 
 /*
@@ -2157,10 +2191,41 @@ int commit_one_metadata_packet(struct lttng_consumer_stream *stream)
        int ret;
 
        pthread_mutex_lock(&stream->chan->metadata_cache->lock);
-       if (stream->chan->metadata_cache->max_offset
-                       == stream->ust_metadata_pushed) {
-               ret = 0;
-               goto end;
+       if (stream->chan->metadata_cache->max_offset ==
+           stream->ust_metadata_pushed) {
+               /*
+                * In the context of a user space metadata channel, a
+                * change in version can be detected in two ways:
+                *   1) During the pre-consume of the `read_subbuffer` loop,
+                *   2) When populating the metadata ring buffer (i.e. here).
+                *
+                * This function is invoked when there is no metadata
+                * available in the ring-buffer. If all data was consumed
+                * up to the size of the metadata cache, there is no metadata
+                * to insert in the ring-buffer.
+                *
+                * However, the metadata version could still have changed (a
+                * regeneration without any new data will yield the same cache
+                * size).
+                *
+                * The cache's version is checked for a version change and the
+                * consumed position is reset if one occurred.
+                *
+                * This check is only necessary for the user space domain as
+                * it has to manage the cache explicitly. If this reset was not
+                * performed, no metadata would be consumed (and no reset would
+                * occur as part of the pre-consume) until the metadata size
+                * exceeded the cache size.
+                */
+               if (stream->metadata_version !=
+                               stream->chan->metadata_cache->version) {
+                       metadata_stream_reset_cache_consumed_position(stream);
+                       consumer_stream_metadata_set_version(stream,
+                                       stream->chan->metadata_cache->version);
+               } else {
+                       ret = 0;
+                       goto end;
+               }
        }
 
        write_len = ustctl_write_one_packet_to_channel(stream->chan->uchan,
@@ -2363,7 +2428,7 @@ static int extract_metadata_subbuffer_info(struct lttng_consumer_stream *stream,
                goto end;
        }
 
-       subbuf->info.metadata.version = stream->chan->metadata_cache->version;
+       subbuf->info.metadata.version = stream->metadata_version;
 
 end:
        return ret;
@@ -2607,7 +2672,7 @@ static int lttng_ustconsumer_set_stream_ops(
                stream->read_subbuffer_ops.extract_subbuffer_info =
                                extract_metadata_subbuffer_info;
                stream->read_subbuffer_ops.reset_metadata =
-                               metadata_stream_reset_cache;
+                               metadata_stream_reset_cache_consumed_position;
                if (stream->chan->is_live) {
                        stream->read_subbuffer_ops.on_sleep = signal_metadata;
                        ret = consumer_stream_enable_metadata_bucketization(
This page took 0.027463 seconds and 5 git commands to generate.