From 81164b6b222b4bafe1e3cec57c50429ab6dab30f Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=A9mie=20Galarneau?= Date: Thu, 10 May 2018 17:14:39 -0400 Subject: [PATCH] Initialize relay_stream chunk_id to its session's current trace archive id MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Initializing the relayd's streams with a stream_chunk_id allows the relayd to differentiate between a stream created before the first rotation (at chunk id == 0) vs. a stream that has been created after the last (or pending) rotation. Before this fix, the relayd can fail to identify that a rotation has been completed. This is caused by the fact that a stream's chunk id is initialized to 0 and updated by the RELAYD_ROTATE_STREAM command to the id of the chunk that is currently being rotated. The 'stream->current_chunk_id.value < chunk_id' check performed by the RELAYD_ROTATE_PENDING will cause rotations to never complete for streams that are created between the launch of a rotation and the check for its completion. For example, when the relayd is checking whether the rotation id '3' is completed, it may see streams with the default value of their chunk id set to '0' and determine that a rotation is still pending. Signed-off-by: Jérémie Galarneau --- src/bin/lttng-relayd/main.c | 9 ++++++--- src/bin/lttng-relayd/stream.c | 4 +++- src/bin/lttng-relayd/stream.h | 12 ++++++++++-- 3 files changed, 19 insertions(+), 6 deletions(-) diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index a04303e4f..44fb2e27c 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -1198,6 +1198,7 @@ static int relay_add_stream(const struct lttcomm_relayd_hdr *recv_hdr, uint64_t stream_handle = -1ULL; char *path_name = NULL, *channel_name = NULL; uint64_t tracefile_size = 0, tracefile_count = 0; + struct relay_stream_chunk_id stream_chunk_id = { 0 }; if (!session || !conn->version_check_done) { ERR("Trying to add a stream before version check"); @@ -1237,7 +1238,8 @@ static int relay_add_stream(const struct lttcomm_relayd_hdr *recv_hdr, /* We pass ownership of path_name and channel_name. */ stream = stream_create(trace, stream_handle, path_name, - channel_name, tracefile_size, tracefile_count); + channel_name, tracefile_size, tracefile_count, + &stream_chunk_id); path_name = NULL; channel_name = NULL; @@ -2483,7 +2485,8 @@ static int relay_rotate_session_stream(const struct lttcomm_relayd_hdr *recv_hdr goto end_stream_unlock; } - stream->chunk_id = stream_info.new_chunk_id; + assert(stream->current_chunk_id.is_set); + stream->current_chunk_id.value = stream_info.new_chunk_id; if (stream->is_metadata) { /* @@ -2839,7 +2842,7 @@ int relay_rotate_pending(const struct lttcomm_relayd_hdr *recv_hdr, rotate_pending = true; DBG("Stream %" PRIu64 " is still rotating", stream->stream_handle); - } else if (stream->chunk_id < chunk_id) { + } else if (stream->current_chunk_id.value < chunk_id) { /* * Stream closed on the consumer but still active on the * relay. diff --git a/src/bin/lttng-relayd/stream.c b/src/bin/lttng-relayd/stream.c index 709b3b1f5..326ec4bf2 100644 --- a/src/bin/lttng-relayd/stream.c +++ b/src/bin/lttng-relayd/stream.c @@ -68,7 +68,8 @@ end: struct relay_stream *stream_create(struct ctf_trace *trace, uint64_t stream_handle, char *path_name, char *channel_name, uint64_t tracefile_size, - uint64_t tracefile_count) + uint64_t tracefile_count, + const struct relay_stream_chunk_id *chunk_id) { int ret; struct relay_stream *stream = NULL; @@ -94,6 +95,7 @@ struct relay_stream *stream_create(struct ctf_trace *trace, urcu_ref_init(&stream->ref); ctf_trace_get(trace); stream->trace = trace; + stream->current_chunk_id = *chunk_id; stream->indexes_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); if (!stream->indexes_ht) { diff --git a/src/bin/lttng-relayd/stream.h b/src/bin/lttng-relayd/stream.h index 47ae2e8f9..fb3e1ed68 100644 --- a/src/bin/lttng-relayd/stream.h +++ b/src/bin/lttng-relayd/stream.h @@ -31,6 +31,11 @@ #include "stream-fd.h" #include "tracefile-array.h" +struct relay_stream_chunk_id { + bool is_set; + uint64_t value; +}; + /* * Represents a stream in the relay */ @@ -150,14 +155,17 @@ struct relay_stream { * atomically with rotate_at_seq_num. * * Always access with stream lock held. + * + * This attribute is not set if the stream is created by a pre-2.11 + * consumer. */ - uint64_t chunk_id; + struct relay_stream_chunk_id current_chunk_id; }; struct relay_stream *stream_create(struct ctf_trace *trace, uint64_t stream_handle, char *path_name, char *channel_name, uint64_t tracefile_size, - uint64_t tracefile_count); + uint64_t tracefile_count, const struct relay_stream_chunk_id *chunk_id); struct relay_stream *stream_get_by_id(uint64_t stream_id); bool stream_get(struct relay_stream *stream); -- 2.34.1