From a40a503f4218296700d791f1b79671bf3d0f1e22 Mon Sep 17 00:00:00 2001 From: Mathieu Desnoyers Date: Tue, 5 Nov 2019 13:07:44 -0500 Subject: [PATCH] Fix: consumerd: use packet sequence number for rotation position MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Refer to "Fix: relayd: use packet sequence number for rotation position" for context of this change. This commit introduces the changes required in the consumerd. Some notable points related to this commit: - Internally, the rotate_position (per-stream) is now a 64-bit value rather than an unsigned long. - The scheme to rotate a stream is changed to allow using the backward-compatible lttng_consumer_take_snapshot() rather than the newer lttng_consumer_get_produced_snapshot(), thus allowing backward compatibility of the implicit rotation on destroy with pre-2.10 lttng-modules. - The rotate position used as pivot point for the rotation is based on the packet_seq_num of the last packet that has been send over the network by consumerd, incremented by the number of packets between the sampled produced_pos and the consumed_pos. In the worse case scenario where an overwrite mode ring buffer overwrites its contents enough to trigger a 4GB overflow on a 32-bit producer since the last packet was sent (e.g. due to a slow network), the difference between produced_pos and consumed_pos will be lower that what would have been expected. However, because this pivot position is used as a lower bound, being smaller than the real value is fine: the data that would have been misplaced in the wrong trace chunk were actually overwritten, and will therefore never be consumed. - When interacting with pre-2.8 lttng-modules, the packet sequence number is not available. The current approach is to disallow rotations performed on sessions which have kernel tracing active with a pre-2.8 lttng-modules. Signed-off-by: Mathieu Desnoyers Change-Id: I8600cb5e2e9c05f3dfba0499a5fc4a3bb85dec24 Signed-off-by: Jérémie Galarneau --- include/lttng/lttng-error.h | 1 + src/bin/lttng-sessiond/cmd.c | 6 + src/bin/lttng-sessiond/kernel.c | 31 ++++ src/bin/lttng-sessiond/kernel.h | 1 + src/common/consumer/consumer.c | 141 ++++++++++--------- src/common/consumer/consumer.h | 15 +- src/common/error.c | 1 + src/common/kernel-consumer/kernel-consumer.c | 1 + 8 files changed, 126 insertions(+), 71 deletions(-) diff --git a/include/lttng/lttng-error.h b/include/lttng/lttng-error.h index c6e1575e7..c032d5a7d 100644 --- a/include/lttng/lttng-error.h +++ b/include/lttng/lttng-error.h @@ -175,6 +175,7 @@ enum lttng_error_code { LTTNG_ERR_INVALID_PROTOCOL = 152, /* a protocol error occurred */ LTTNG_ERR_FILE_CREATION_ERROR = 153, /* failed to create a file */ LTTNG_ERR_TIMER_STOP_ERROR = 154, /* failed to stop timer. */ + LTTNG_ERR_ROTATION_NOT_AVAILABLE_KERNEL = 155, /* Rotation feature not supported by the kernel tracer. */ /* MUST be last element */ LTTNG_ERR_NR, /* Last element */ diff --git a/src/bin/lttng-sessiond/cmd.c b/src/bin/lttng-sessiond/cmd.c index e067cb835..02a92be1e 100644 --- a/src/bin/lttng-sessiond/cmd.c +++ b/src/bin/lttng-sessiond/cmd.c @@ -4907,6 +4907,12 @@ int cmd_rotate_session(struct ltt_session *session, goto end; } + /* Unsupported feature in lttng-modules before 2.8 (lack of sequence number). */ + if (session->kernel_session && !kernel_supports_ring_buffer_packet_sequence_number()) { + cmd_ret = LTTNG_ERR_ROTATION_NOT_AVAILABLE_KERNEL; + goto end; + } + if (session->rotation_state == LTTNG_ROTATION_STATE_ONGOING) { DBG("Refusing to launch a rotation; a rotation is already in progress for session %s", session->name); diff --git a/src/bin/lttng-sessiond/kernel.c b/src/bin/lttng-sessiond/kernel.c index 76162e08c..b5c4a5e12 100644 --- a/src/bin/lttng-sessiond/kernel.c +++ b/src/bin/lttng-sessiond/kernel.c @@ -1425,6 +1425,37 @@ error: return ret; } +/* + * Check for the support of the packet sequence number via abi version number. + * + * Return 1 on success, 0 when feature is not supported, negative value in case + * of errors. + */ +int kernel_supports_ring_buffer_packet_sequence_number(void) +{ + int ret = 0; // Not supported by default + struct lttng_kernel_tracer_abi_version abi; + + ret = kernctl_tracer_abi_version(kernel_tracer_fd, &abi); + if (ret < 0) { + ERR("Failed to retrieve lttng-modules ABI version"); + goto error; + } + + /* + * Packet sequence number was introduced in 2.8 + */ + if (abi.major >= 2 && abi.minor >= 8) { + /* Supported */ + ret = 1; + } else { + /* Not supported */ + ret = 0; + } +error: + return ret; +} + /* * Rotate a kernel session. * diff --git a/src/bin/lttng-sessiond/kernel.h b/src/bin/lttng-sessiond/kernel.h index 0f1abd1d6..fe14589b7 100644 --- a/src/bin/lttng-sessiond/kernel.h +++ b/src/bin/lttng-sessiond/kernel.h @@ -70,6 +70,7 @@ int init_kernel_workarounds(void); ssize_t kernel_list_tracker_pids(struct ltt_kernel_session *session, int **_pids); int kernel_supports_ring_buffer_snapshot_sample_positions(void); +int kernel_supports_ring_buffer_packet_sequence_number(void); int init_kernel_tracer(void); void cleanup_kernel_tracer(void); bool kernel_tracer_is_initialized(void); diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index 00eb35670..d81d0fc7a 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -603,6 +603,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE; stream->index_file = NULL; stream->last_sequence_number = -1ULL; + stream->rotate_position = -1ULL; pthread_mutex_init(&stream->lock, NULL); pthread_mutex_init(&stream->metadata_timer_lock, NULL); @@ -4002,7 +4003,7 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, ht->hash_fct(&channel->key, lttng_ht_seed), ht->match_fct, &channel->key, &iter.iter, stream, node_channel_id.node) { - unsigned long consumed_pos; + unsigned long produced_pos = 0, consumed_pos = 0; health_code_update(); @@ -4015,65 +4016,78 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, rotating_to_new_chunk = false; } - ret = lttng_consumer_sample_snapshot_positions(stream); + /* + * Active flush; has no effect if the production position + * is at a packet boundary. + */ + ret = consumer_flush_buffer(stream, 1); if (ret < 0) { - ERR("Failed to sample snapshot position during channel rotation"); + ERR("Failed to flush stream %" PRIu64 " during channel rotation", + stream->key); goto end_unlock_stream; } - ret = lttng_consumer_get_produced_snapshot(stream, - &stream->rotate_position); - if (ret < 0) { - ERR("Failed to sample produced position during channel rotation"); + ret = lttng_consumer_take_snapshot(stream); + if (ret < 0 && ret != -ENODATA && ret != -EAGAIN) { + ERR("Failed to sample snapshot position during channel rotation"); goto end_unlock_stream; } + if (!ret) { + ret = lttng_consumer_get_produced_snapshot(stream, + &produced_pos); + if (ret < 0) { + ERR("Failed to sample produced position during channel rotation"); + goto end_unlock_stream; + } - lttng_consumer_get_consumed_snapshot(stream, - &consumed_pos); - if (consumed_pos == stream->rotate_position) { + ret = lttng_consumer_get_consumed_snapshot(stream, + &consumed_pos); + if (ret < 0) { + ERR("Failed to sample consumed position during channel rotation"); + goto end_unlock_stream; + } + } + /* + * Align produced position on the start-of-packet boundary of the first + * packet going into the next trace chunk. + */ + produced_pos = ALIGN_FLOOR(produced_pos, stream->max_sb_size); + if (consumed_pos == produced_pos) { stream->rotate_ready = true; } - /* - * Active flush; has no effect if the production position - * is at a packet boundary. + * The rotation position is based on the packet_seq_num of the + * packet following the last packet that was consumed for this + * stream, incremented by the offset between produced and + * consumed positions. This rotation position is a lower bound + * (inclusive) at which the next trace chunk starts. Since it + * is a lower bound, it is OK if the packet_seq_num does not + * correspond exactly to the same packet identified by the + * consumed_pos, which can happen in overwrite mode. */ - ret = consumer_flush_buffer(stream, 1); - if (ret < 0) { - ERR("Failed to flush stream %" PRIu64 " during channel rotation", + if (stream->sequence_number_unavailable) { + /* + * Rotation should never be performed on a session which + * interacts with a pre-2.8 lttng-modules, which does + * not implement packet sequence number. + */ + ERR("Failure to rotate stream %" PRIu64 ": sequence number unavailable", stream->key); + ret = -1; goto end_unlock_stream; } + stream->rotate_position = stream->last_sequence_number + 1 + + ((produced_pos - consumed_pos) / stream->max_sb_size); if (!is_local_trace) { /* * The relay daemon control protocol expects a rotation * position as "the sequence number of the first packet - * _after_ the current trace chunk. - * - * At the moment when the positions of the buffers are - * sampled, the production position does not necessarily - * sit at a packet boundary. The 'active' flush - * operation above will push the production position to - * the next packet boundary _if_ it is not already - * sitting at such a boundary. - * - * Assuming a current production position that is not - * on the bound of a packet, the 'target' sequence - * number is - * (consumed_pos / subbuffer_size) + 1 - * Note the '+ 1' to ensure the current packet is - * part of the current trace chunk. - * - * However, if the production position is already at - * a packet boundary, the '+ 1' is not necessary as the - * last packet of the current chunk is already - * 'complete'. + * _after_ the current trace chunk". */ const struct relayd_stream_rotation_position position = { .stream_id = stream->relayd_stream_id, - .rotate_at_seq_num = (stream->rotate_position / stream->max_sb_size) + - !!(stream->rotate_position % stream->max_sb_size), + .rotate_at_seq_num = stream->rotate_position, }; ret = lttng_dynamic_array_add_element( @@ -4136,44 +4150,39 @@ end: */ int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream) { - int ret; - unsigned long consumed_pos; - - if (!stream->rotate_position && !stream->rotate_ready) { - ret = 0; - goto end; - } - if (stream->rotate_ready) { - ret = 1; - goto end; + return 1; } /* - * If we don't have the rotate_ready flag, check the consumed position - * to determine if we need to rotate. + * If packet seq num is unavailable, it means we are interacting + * with a pre-2.8 lttng-modules which does not implement the + * sequence number. Rotation should never be used by sessiond in this + * scenario. */ - ret = lttng_consumer_sample_snapshot_positions(stream); - if (ret < 0) { - ERR("Taking snapshot positions"); - goto end; + if (stream->sequence_number_unavailable) { + ERR("Internal error: rotation used on stream %" PRIu64 + " with unavailable sequence number", + stream->key); + return -1; } - ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos); - if (ret < 0) { - ERR("Consumed snapshot position"); - goto end; + if (stream->rotate_position == -1ULL || + stream->last_sequence_number == -1ULL) { + return 0; } - /* Rotate position not reached yet (with check for overflow). */ - if ((long) (consumed_pos - stream->rotate_position) < 0) { - ret = 0; - goto end; + /* + * Rotate position not reached yet. The stream rotate position is + * the position of the next packet belonging to the next trace chunk, + * but consumerd considers rotation ready when reaching the last + * packet of the current chunk, hence the "rotate_position - 1". + */ + if (stream->last_sequence_number >= stream->rotate_position - 1) { + return 1; } - ret = 1; -end: - return ret; + return 0; } /* @@ -4181,7 +4190,7 @@ end: */ void lttng_consumer_reset_stream_rotate_state(struct lttng_consumer_stream *stream) { - stream->rotate_position = 0; + stream->rotate_position = -1ULL; stream->rotate_ready = false; } diff --git a/src/common/consumer/consumer.h b/src/common/consumer/consumer.h index 72c580eb2..17c3ee581 100644 --- a/src/common/consumer/consumer.h +++ b/src/common/consumer/consumer.h @@ -304,6 +304,11 @@ struct lttng_consumer_stream { */ bool quiescent; + /* + * True if the sequence number is not available (lttng-modules < 2.8). + */ + bool sequence_number_unavailable; + /* * metadata_timer_lock protects flags waiting_on_metadata and * missed_metadata_flush. @@ -438,12 +443,12 @@ struct lttng_consumer_stream { pthread_mutex_t metadata_rdv_lock; /* - * rotate_position represents the position in the ring-buffer that has to - * be flushed to disk to complete the ongoing rotation. When that position - * is reached, this tracefile can be closed and a new one is created in - * channel_read_only_attributes.path. + * rotate_position represents the packet sequence number of the last + * packet which belongs to the current trace chunk prior to the rotation. + * When that position is reached, this tracefile can be closed and a + * new one is created in channel_read_only_attributes.path. */ - unsigned long rotate_position; + uint64_t rotate_position; /* * Read-only copies of channel values. We cannot safely access the diff --git a/src/common/error.c b/src/common/error.c index 66fd8642f..cb24584ae 100644 --- a/src/common/error.c +++ b/src/common/error.c @@ -220,6 +220,7 @@ static const char *error_string_array[] = { [ ERROR_INDEX(LTTNG_ERR_INVALID_PROTOCOL) ] = "Protocol error occurred", [ ERROR_INDEX(LTTNG_ERR_FILE_CREATION_ERROR) ] = "Failed to create file", [ ERROR_INDEX(LTTNG_ERR_TIMER_STOP_ERROR) ] = "Failed to stop a timer", + [ ERROR_INDEX(LTTNG_ERR_ROTATION_NOT_AVAILABLE_KERNEL) ] = "Rotation feature not supported by the kernel tracer.", /* Last element */ [ ERROR_INDEX(LTTNG_ERR_NR) ] = "Unknown error code" diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index fdd9ca65b..2cd970444 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -1445,6 +1445,7 @@ int update_stream_stats(struct lttng_consumer_stream *stream) if (ret == -ENOTTY) { /* Command not implemented by lttng-modules. */ seq = -1ULL; + stream->sequence_number_unavailable = true; } else { PERROR("kernctl_get_sequence_number"); goto end; -- 2.34.1