Consumer: Implement lttng_consumer_clear_channel
authorJonathan Rajotte <jonathan.rajotte-julien@efficios.com>
Fri, 8 Feb 2019 22:05:10 +0000 (17:05 -0500)
committerJonathan Rajotte <jonathan.rajotte-julien@efficios.com>
Mon, 18 Feb 2019 19:25:02 +0000 (14:25 -0500)
This function is responsible for performing all actions needed to
clear a given channel.

It only supports clear operation on unmonitored channel
(snapshot mode) for now.

To do so, flush and clear all the channel streams.

We use an active flush (consumer_flush_buffer(..., 1)) since we consider
the producer active at all time. No reason so far to check for the
quiescent state of the channel. This might need to be revisited.

Signed-off-by: Jonathan Rajotte <jonathan.rajotte-julien@efficios.com>
src/common/consumer/consumer.c
src/common/consumer/consumer.h

index 5772b912731a7abedbc02db455b43807d3fbe2d6..da81f2eedf43eef24251f75dc84185423509c749 100644 (file)
@@ -3894,6 +3894,31 @@ end:
        return ret;
 }
 
+static
+int consumer_clear_buffer(struct lttng_consumer_stream *stream)
+{
+       int ret = 0;
+
+       switch (consumer_data.type) {
+       case LTTNG_CONSUMER_KERNEL:
+               ret = kernctl_buffer_clear(stream->wait_fd);
+               if (ret < 0) {
+                       ERR("Failed to flush kernel stream");
+                       goto end;
+               }
+               break;
+       case LTTNG_CONSUMER32_UST:
+       case LTTNG_CONSUMER64_UST:
+               lttng_ustconsumer_clear_buffer(stream);
+               break;
+       default:
+               ERR("Unknown consumer_data type");
+               abort();
+       }
+
+end:
+       return ret;
+}
 /*
  * Sample the rotate position for all the streams of a channel. If a stream
  * is already at the rotate position (produced == consumed), we flag it as
@@ -4473,3 +4498,82 @@ int lttng_consumer_mkdir(const char *path, uid_t uid, gid_t gid,
                return mkdir_local(path, uid, gid);
        }
 }
+
+static
+int clear_unmonitored_channel(struct lttng_consumer_channel *channel)
+{
+
+       int ret;
+       struct lttng_consumer_stream *stream;
+
+       assert(!channel->monitor);
+       assert(channel->type != CONSUMER_CHANNEL_TYPE_METADATA);
+
+       rcu_read_lock();
+       pthread_mutex_lock(&channel->lock);
+       cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
+               health_code_update();
+               pthread_mutex_lock(&stream->lock);
+
+               ret = consumer_flush_buffer(stream, 1);
+               if (ret < 0) {
+                       ERR("Failed to flush stream %" PRIu64 " during channel clear",
+                                       stream->key);
+                       ret = LTTCOMM_CONSUMERD_FATAL;
+                       goto error_unlock;
+               }
+
+               ret = consumer_clear_buffer(stream);
+               if (ret < 0) {
+                       ERR("Failed to clear stream %" PRIu64 " during channel clear",
+                                       stream->key);
+                       ret = LTTCOMM_CONSUMERD_FATAL;
+                       goto error_unlock;
+               }
+               pthread_mutex_unlock(&stream->lock);
+       }
+       pthread_mutex_unlock(&channel->lock);
+       rcu_read_unlock();
+       return 0;
+
+error_unlock:
+       pthread_mutex_unlock(&stream->lock);
+       pthread_mutex_unlock(&channel->lock);
+       rcu_read_unlock();
+       return ret;
+}
+
+int lttng_consumer_clear_channel(struct lttng_consumer_channel *channel)
+{
+       int ret;
+
+       DBG("Consumer clear channel %" PRIu64, channel->key);
+
+       if (!channel->monitor) {
+               /* Snapshot mode */
+               if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA) {
+                       /*
+                        * Nothing to do for the metadata channel/stream.
+                        * Snapshot mechanism already take care of the metadata
+                        * handling/generation.
+                        */
+                       goto end;
+               }
+               ret = clear_unmonitored_channel(channel);
+               if (ret) {
+                       goto error;
+               }
+       } else {
+               /* TODO:
+                * Normal channel and relayd bound clear operation not supported
+                * for now
+                */
+               ret = LTTCOMM_CONSUMERD_FATAL;
+               goto error;
+       }
+
+end:
+       ret = LTTCOMM_CONSUMERD_SUCCESS;
+error:
+       return ret;
+}
index a90b3a385e5377ff7fcb3053d63fa0bbb4c689fe..0195e688404f36b3b7c22e71b116068e0a11e2a5 100644 (file)
@@ -853,5 +853,6 @@ void lttng_consumer_reset_stream_rotate_state(struct lttng_consumer_stream *stre
 int lttng_consumer_mkdir(const char *path, uid_t uid, gid_t gid,
                uint64_t relayd_id);
 void lttng_consumer_cleanup_relayd(struct consumer_relayd_sock_pair *relayd);
+int lttng_consumer_clear_channel(struct lttng_consumer_channel *channel);
 
 #endif /* LIB_CONSUMER_H */
This page took 0.029053 seconds and 5 git commands to generate.