pthread_mutex_lock(&stream->lock);
if (!stream->quiescent) {
- ustctl_flush_buffer(stream->ustream, 0);
+ lttng_ustconsumer_flush_buffer(stream, 0);
stream->quiescent = true;
}
pthread_mutex_unlock(&stream->lock);
* Else, if quiescent, it has already been done by the prior stop.
*/
if (!stream->quiescent) {
- ustctl_flush_buffer(stream->ustream, 0);
+ lttng_ustconsumer_flush_buffer(stream, 0);
}
ret = lttng_ustconsumer_take_snapshot(stream);
* Sample the rotate position of all the streams in
* this channel.
*/
- ret = lttng_consumer_rotate_channel(channel, key,
+ ret = lttng_consumer_rotate_sample_channel(channel, key,
msg.u.rotate_channel.pathname,
msg.u.rotate_channel.relayd_id,
msg.u.rotate_channel.metadata,
if (pending < 0) {
/*
- * An error occured while running the command;
+ * An error occurred while running the command;
* don't send the 'pending' flag as the sessiond
* will not read it.
*/
if (pending < 0) {
/*
- * An error occured while running the command;
+ * An error occurred while running the command;
* don't send the 'pending' flag as the sessiond
* will not read it.
*/
}
break;
}
+ case LTTNG_CONSUMER_CLEAR_CHANNEL:
+ {
+ struct lttng_consumer_channel *channel;
+ uint64_t key = msg.u.clear_channel.key;
+
+ channel = consumer_find_channel(key);
+ if (!channel) {
+ DBG("Channel %" PRIu64 " not found", key);
+ ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
+ } else {
+ ret = lttng_consumer_clear_channel(channel);
+ if (ret) {
+ ERR("Clear channel failed");
+ ret_code = ret;
+ }
+
+ health_code_update();
+ }
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto end_nosignal;
+ }
+ break;
+ }
default:
break;
}
return ustctl_get_mmap_base(stream->ustream);
}
-void lttng_ustctl_flush_buffer(struct lttng_consumer_stream *stream,
- int producer_active)
-{
- assert(stream);
- assert(stream->ustream);
-
- ustctl_flush_buffer(stream->ustream, producer_active);
-}
-
/*
* Take a snapshot for a specific stream.
*
ustctl_flush_buffer(stream->ustream, producer);
}
+void lttng_ustconsumer_clear_buffer(struct lttng_consumer_stream *stream)
+{
+ assert(stream);
+ assert(stream->ustream);
+
+ ustctl_clear_buffer(stream->ustream);
+}
+
int lttng_ustconsumer_get_current_timestamp(
struct lttng_consumer_stream *stream, uint64_t *ts)
{
pthread_mutex_lock(&stream->lock);
if (!stream->quiescent) {
- ustctl_flush_buffer(stream->ustream, 0);
+ lttng_ustconsumer_flush_buffer(stream, 0);
stream->quiescent = true;
}
pthread_mutex_unlock(&stream->lock);
retry = 1;
}
- ustctl_flush_buffer(metadata->ustream, 1);
+ lttng_ustconsumer_flush_buffer(metadata, 1);
ret = ustctl_snapshot(metadata->ustream);
if (ret < 0) {
if (errno != EAGAIN) {
if (ret <= 0) {
goto error;
}
- ustctl_flush_buffer(stream->ustream, 1);
+ lttng_ustconsumer_flush_buffer(stream, 1);
goto retry;
}