channel->tracefile_size;
}
-struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
+struct lttng_consumer_stream *consumer_allocate_stream(
+ struct lttng_consumer_channel *channel,
+ uint64_t channel_key,
uint64_t stream_key,
const char *channel_name,
uint64_t relayd_id,
}
rcu_read_lock();
+ stream->chan = channel;
stream->key = stream_key;
stream->trace_chunk = trace_chunk;
stream->out_fd = -1;
uint64_t session_id_per_pid,
unsigned int monitor,
unsigned int live_timer_interval,
+ bool is_in_live_session,
const char *root_shm_path,
const char *shm_path)
{
channel->tracefile_count = tracefile_count;
channel->monitor = monitor;
channel->live_timer_interval = live_timer_interval;
+ channel->is_live = is_in_live_session;
pthread_mutex_init(&channel->lock, NULL);
pthread_mutex_init(&channel->timer_lock, NULL);
struct lttng_consumer_local_data *ctx)
{
ssize_t ret;
+ int rotation_ret;
pthread_mutex_lock(&stream->chan->lock);
pthread_mutex_lock(&stream->lock);
pthread_mutex_lock(&stream->metadata_rdv_lock);
}
+ /*
+ * If the stream was flagged to be ready for rotation before we extract
+ * the next packet, rotate it now.
+ */
+ if (stream->rotate_ready) {
+ DBG("Rotate stream before consuming data");
+ ret = lttng_consumer_rotate_stream(ctx, stream);
+ if (ret < 0) {
+ ERR("Stream rotation error before consuming data");
+ goto end;
+ }
+ }
+
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
ret = lttng_kconsumer_read_subbuffer(stream, ctx);
break;
}
+ if (ret < 0) {
+ goto end;
+ }
+
+ /*
+ * After extracting the packet, we check if the stream is now ready to
+ * be rotated and perform the action immediately.
+ *
+ * Don't overwrite `ret` as callers expect the number of bytes
+ * consumed to be returned on success.
+ */
+ rotation_ret = lttng_consumer_stream_is_rotate_ready(stream);
+ if (rotation_ret == 1) {
+ rotation_ret = lttng_consumer_rotate_stream(ctx, stream);
+ if (rotation_ret < 0) {
+ ret = rotation_ret;
+ ERR("Stream rotation error after consuming data");
+ goto end;
+ }
+ } else if (rotation_ret < 0) {
+ ret = rotation_ret;
+ ERR("Failed to check if stream was ready to rotate after consuming data");
+ goto end;
+ }
+
+end:
if (stream->metadata_flag) {
pthread_cond_broadcast(&stream->metadata_rdv);
pthread_mutex_unlock(&stream->metadata_rdv_lock);
}
pthread_mutex_unlock(&stream->lock);
pthread_mutex_unlock(&stream->chan->lock);
-
return ret;
}