Consumer: Implement lttng_consumer_clear_channel
[lttng-tools.git] / src / common / consumer / consumer.c
index 5772b912731a7abedbc02db455b43807d3fbe2d6..da81f2eedf43eef24251f75dc84185423509c749 100644 (file)
@@ -3894,6 +3894,31 @@ end:
        return ret;
 }
 
+static
+int consumer_clear_buffer(struct lttng_consumer_stream *stream)
+{
+       int ret = 0;
+
+       switch (consumer_data.type) {
+       case LTTNG_CONSUMER_KERNEL:
+               ret = kernctl_buffer_clear(stream->wait_fd);
+               if (ret < 0) {
+                       ERR("Failed to flush kernel stream");
+                       goto end;
+               }
+               break;
+       case LTTNG_CONSUMER32_UST:
+       case LTTNG_CONSUMER64_UST:
+               lttng_ustconsumer_clear_buffer(stream);
+               break;
+       default:
+               ERR("Unknown consumer_data type");
+               abort();
+       }
+
+end:
+       return ret;
+}
 /*
  * Sample the rotate position for all the streams of a channel. If a stream
  * is already at the rotate position (produced == consumed), we flag it as
@@ -4473,3 +4498,82 @@ int lttng_consumer_mkdir(const char *path, uid_t uid, gid_t gid,
                return mkdir_local(path, uid, gid);
        }
 }
+
+static
+int clear_unmonitored_channel(struct lttng_consumer_channel *channel)
+{
+
+       int ret;
+       struct lttng_consumer_stream *stream;
+
+       assert(!channel->monitor);
+       assert(channel->type != CONSUMER_CHANNEL_TYPE_METADATA);
+
+       rcu_read_lock();
+       pthread_mutex_lock(&channel->lock);
+       cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
+               health_code_update();
+               pthread_mutex_lock(&stream->lock);
+
+               ret = consumer_flush_buffer(stream, 1);
+               if (ret < 0) {
+                       ERR("Failed to flush stream %" PRIu64 " during channel clear",
+                                       stream->key);
+                       ret = LTTCOMM_CONSUMERD_FATAL;
+                       goto error_unlock;
+               }
+
+               ret = consumer_clear_buffer(stream);
+               if (ret < 0) {
+                       ERR("Failed to clear stream %" PRIu64 " during channel clear",
+                                       stream->key);
+                       ret = LTTCOMM_CONSUMERD_FATAL;
+                       goto error_unlock;
+               }
+               pthread_mutex_unlock(&stream->lock);
+       }
+       pthread_mutex_unlock(&channel->lock);
+       rcu_read_unlock();
+       return 0;
+
+error_unlock:
+       pthread_mutex_unlock(&stream->lock);
+       pthread_mutex_unlock(&channel->lock);
+       rcu_read_unlock();
+       return ret;
+}
+
+int lttng_consumer_clear_channel(struct lttng_consumer_channel *channel)
+{
+       int ret;
+
+       DBG("Consumer clear channel %" PRIu64, channel->key);
+
+       if (!channel->monitor) {
+               /* Snapshot mode */
+               if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA) {
+                       /*
+                        * Nothing to do for the metadata channel/stream.
+                        * Snapshot mechanism already take care of the metadata
+                        * handling/generation.
+                        */
+                       goto end;
+               }
+               ret = clear_unmonitored_channel(channel);
+               if (ret) {
+                       goto error;
+               }
+       } else {
+               /* TODO:
+                * Normal channel and relayd bound clear operation not supported
+                * for now
+                */
+               ret = LTTCOMM_CONSUMERD_FATAL;
+               goto error;
+       }
+
+end:
+       ret = LTTCOMM_CONSUMERD_SUCCESS;
+error:
+       return ret;
+}
This page took 0.026773 seconds and 5 git commands to generate.