+static
+int consumer_clear_buffer(struct lttng_consumer_stream *stream)
+{
+ int ret = 0;
+ unsigned long consumed_pos_before, consumed_pos_after;
+
+ ret = lttng_consumer_sample_snapshot_positions(stream);
+ if (ret < 0) {
+ ERR("Taking snapshot positions");
+ goto end;
+ }
+
+ ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos_before);
+ if (ret < 0) {
+ ERR("Consumed snapshot position");
+ goto end;
+ }
+
+ switch (consumer_data.type) {
+ case LTTNG_CONSUMER_KERNEL:
+ ret = kernctl_buffer_clear(stream->wait_fd);
+ if (ret < 0) {
+ ERR("Failed to flush kernel stream");
+ goto end;
+ }
+ break;
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
+ lttng_ustconsumer_clear_buffer(stream);
+ break;
+ default:
+ ERR("Unknown consumer_data type");
+ abort();
+ }
+
+ ret = lttng_consumer_sample_snapshot_positions(stream);
+ if (ret < 0) {
+ ERR("Taking snapshot positions");
+ goto end;
+ }
+ ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos_after);
+ if (ret < 0) {
+ ERR("Consumed snapshot position");
+ goto end;
+ }
+ DBG("clear: before: %lu after: %lu", consumed_pos_before, consumed_pos_after);
+end:
+ return ret;
+}
+
+static
+int consumer_clear_stream(struct lttng_consumer_stream *stream)
+{
+ int ret;
+
+ ret = consumer_flush_buffer(stream, 1);
+ if (ret < 0) {
+ ERR("Failed to flush stream %" PRIu64 " during channel clear",
+ stream->key);
+ ret = LTTCOMM_CONSUMERD_FATAL;
+ goto error;
+ }
+
+ ret = consumer_clear_buffer(stream);
+ if (ret < 0) {
+ ERR("Failed to clear stream %" PRIu64 " during channel clear",
+ stream->key);
+ ret = LTTCOMM_CONSUMERD_FATAL;
+ goto error;
+ }
+
+ ret = LTTCOMM_CONSUMERD_SUCCESS;
+error:
+ return ret;
+}
+
+static
+int consumer_clear_unmonitored_channel(struct lttng_consumer_channel *channel)
+{
+ int ret;
+ struct lttng_consumer_stream *stream;
+
+ rcu_read_lock();
+ pthread_mutex_lock(&channel->lock);
+ cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
+ health_code_update();
+ pthread_mutex_lock(&stream->lock);
+ ret = consumer_clear_stream(stream);
+ if (ret) {
+ goto error_unlock;
+ }
+ pthread_mutex_unlock(&stream->lock);
+ }
+ pthread_mutex_unlock(&channel->lock);
+ rcu_read_unlock();
+ return 0;
+
+error_unlock:
+ pthread_mutex_unlock(&stream->lock);
+ pthread_mutex_unlock(&channel->lock);
+ rcu_read_unlock();
+ if (ret) {
+ goto error;
+ }
+ ret = LTTCOMM_CONSUMERD_SUCCESS;
+error:
+ return ret;
+}
+