Consumer: Implement lttng_consumer_clear_channel
authorJonathan Rajotte <jonathan.rajotte-julien@efficios.com>
Fri, 8 Feb 2019 22:05:10 +0000 (17:05 -0500)
committerJonathan Rajotte <jonathan.rajotte-julien@efficios.com>
Wed, 10 Apr 2019 20:20:10 +0000 (16:20 -0400)
This function is responsible for performing all actions needed to
clear a given channel.

It only supports clear operation on unmonitored channel
(snapshot mode) for now.

To do so, flush and clear all the channel streams.

We use an active flush (consumer_flush_buffer(..., 1)) since we consider
the producer active at all time. No reason so far to check for the
quiescent state of the channel. This might need to be revisited.

Signed-off-by: Jonathan Rajotte <jonathan.rajotte-julien@efficios.com>
src/common/consumer/consumer.c
src/common/consumer/consumer.h

index cb1414979c619a19f4eadd92a71088b7f83038f8..3e3c304873bad94c005cfa5a611c46ccc4ce716e 100644 (file)
@@ -3767,3 +3767,134 @@ unsigned long consumer_get_consume_start_pos(unsigned long consumed_pos,
        }
        return start_pos;
 }
+
+static
+int consumer_flush_buffer(struct lttng_consumer_stream *stream, int producer_active)
+{
+       int ret = 0;
+
+       switch (consumer_data.type) {
+       case LTTNG_CONSUMER_KERNEL:
+               ret = kernctl_buffer_flush(stream->wait_fd);
+               if (ret < 0) {
+                       ERR("Failed to flush kernel stream");
+                       goto end;
+               }
+               break;
+       case LTTNG_CONSUMER32_UST:
+       case LTTNG_CONSUMER64_UST:
+               lttng_ustconsumer_flush_buffer(stream, producer_active);
+               break;
+       default:
+               ERR("Unknown consumer_data type");
+               abort();
+       }
+
+end:
+       return ret;
+}
+
+static
+int consumer_clear_buffer(struct lttng_consumer_stream *stream)
+{
+       int ret = 0;
+
+       switch (consumer_data.type) {
+       case LTTNG_CONSUMER_KERNEL:
+               ret = kernctl_buffer_clear(stream->wait_fd);
+               if (ret < 0) {
+                       ERR("Failed to flush kernel stream");
+                       goto end;
+               }
+               break;
+       case LTTNG_CONSUMER32_UST:
+       case LTTNG_CONSUMER64_UST:
+               lttng_ustconsumer_clear_buffer(stream);
+               break;
+       default:
+               ERR("Unknown consumer_data type");
+               abort();
+       }
+
+end:
+       return ret;
+}
+
+static
+int clear_unmonitored_channel(struct lttng_consumer_channel *channel)
+{
+
+       int ret;
+       struct lttng_consumer_stream *stream;
+
+       assert(!channel->monitor);
+       assert(channel->type != CONSUMER_CHANNEL_TYPE_METADATA);
+
+       rcu_read_lock();
+       pthread_mutex_lock(&channel->lock);
+       cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
+               health_code_update();
+               pthread_mutex_lock(&stream->lock);
+
+               ret = consumer_flush_buffer(stream, 1);
+               if (ret < 0) {
+                       ERR("Failed to flush stream %" PRIu64 " during channel clear",
+                                       stream->key);
+                       ret = LTTCOMM_CONSUMERD_FATAL;
+                       goto error_unlock;
+               }
+
+               ret = consumer_clear_buffer(stream);
+               if (ret < 0) {
+                       ERR("Failed to clear stream %" PRIu64 " during channel clear",
+                                       stream->key);
+                       ret = LTTCOMM_CONSUMERD_FATAL;
+                       goto error_unlock;
+               }
+               pthread_mutex_unlock(&stream->lock);
+       }
+       pthread_mutex_unlock(&channel->lock);
+       rcu_read_unlock();
+       return 0;
+
+error_unlock:
+       pthread_mutex_unlock(&stream->lock);
+       pthread_mutex_unlock(&channel->lock);
+       rcu_read_unlock();
+       return ret;
+}
+
+int lttng_consumer_clear_channel(struct lttng_consumer_channel *channel)
+{
+       int ret;
+
+       DBG("Consumer clear channel %" PRIu64, channel->key);
+
+       if (!channel->monitor) {
+               /* Snapshot mode */
+               if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA) {
+                       /*
+                        * Nothing to do for the metadata channel/stream.
+                        * Snapshot mechanism already take care of the metadata
+                        * handling/generation.
+                        */
+                       goto end;
+               }
+               ret = clear_unmonitored_channel(channel);
+               if (ret) {
+                       goto error;
+               }
+       } else {
+               /* TODO:
+                * Normal channel and relayd bound clear operation not supported
+                * for now
+                */
+               ret = LTTCOMM_CONSUMERD_FATAL;
+               goto error;
+       }
+
+end:
+       ret = LTTCOMM_CONSUMERD_SUCCESS;
+error:
+       return ret;
+}
index 17e5b55a6c1fb14cec255411eb19ef9259bb10b2..c4ea005b1f643c1f22bcee42b6aa4073bcf422f0 100644 (file)
@@ -755,5 +755,6 @@ int consumer_add_metadata_stream(struct lttng_consumer_stream *stream);
 void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream);
 int consumer_create_index_file(struct lttng_consumer_stream *stream);
 void lttng_consumer_cleanup_relayd(struct consumer_relayd_sock_pair *relayd);
+int lttng_consumer_clear_channel(struct lttng_consumer_channel *channel);
 
 #endif /* LIB_CONSUMER_H */
This page took 0.029079 seconds and 5 git commands to generate.