From b32703d679aa5dd34dbee1ff21e44c3728c01b78 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=A9mie=20Galarneau?= Date: Mon, 20 Jul 2020 15:29:41 -0400 Subject: [PATCH] Fix: consumerd: packet sent before channel rotation MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Issue observed ============== A clear test occasionally fails with the following output: # Test ust streaming rotate-clear # Parameters: tracing_active=0, clear_twice=1, rotate_before=0, rotate_after=0, buffer_type=uid ok 605 - Create session S0BXcJKWrmAwNSzm with uri:net://localhost and opts: PASS: tools/clear/test_ust 605 - Create session S0BXcJKWrmAwNSzm with uri:net://localhost and opts: ok 606 - Enable channel chan for session S0BXcJKWrmAwNSzm PASS: tools/clear/test_ust 606 - Enable channel chan for session S0BXcJKWrmAwNSzm ok 607 - Enable ust event tp:tptest for session S0BXcJKWrmAwNSzm PASS: tools/clear/test_ust 607 - Enable ust event tp:tptest for session S0BXcJKWrmAwNSzm ok 608 - Start tracing for session S0BXcJKWrmAwNSzm PASS: tools/clear/test_ust 608 - Start tracing for session S0BXcJKWrmAwNSzm ok 609 - Rotate session S0BXcJKWrmAwNSzm PASS: tools/clear/test_ust 609 - Rotate session S0BXcJKWrmAwNSzm ok 610 - Stop lttng tracing for session S0BXcJKWrmAwNSzm PASS: tools/clear/test_ust 610 - Stop lttng tracing for session S0BXcJKWrmAwNSzm ok 611 - Clear session S0BXcJKWrmAwNSzm PASS: tools/clear/test_ust 611 - Clear session S0BXcJKWrmAwNSzm ok 612 - Clear session S0BXcJKWrmAwNSzm PASS: tools/clear/test_ust 612 - Clear session S0BXcJKWrmAwNSzm Error: Relayd rotate streams replied error 97 Error: Relayd rotate stream failed. Cleaning up relayd 33 Error: Relayd send index failed. Cleaning up relayd 33. Error: Rotate channel failed Error: Stream 76 relayd ID 33 unknown. Can't write index. Error: Stream 74 relayd ID 33 unknown. Can't write index. Error: Stream 72 relayd ID 33 unknown. Can't write index. ok 613 - Start tracing for session S0BXcJKWrmAwNSzm PASS: tools/clear/test_ust 613 - Start tracing for session S0BXcJKWrmAwNSzm not ok 614 - Stop lttng tracing for session S0BXcJKWrmAwNSzm FAIL: tools/clear/test_ust 614 - Stop lttng tracing for session S0BXcJKWrmAwNSzm # Failed test 'Stop lttng tracing for session S0BXcJKWrmAwNSzm' # in ./tools/clear//../../../utils/utils.sh:stop_lttng_tracing_opt() at line 1311. ok 615 - Validate trace for event tp:tptest, 1 events PASS: tools/clear/test_ust 615 - Validate trace for event tp:tptest, 1 events not ok 616 - Read a total of 1 events, expected 4 FAIL: tools/clear/test_ust 616 - Read a total of 1 events, expected 4 # Failed test 'Read a total of 1 events, expected 4' # in ./tools/clear//../../../utils/utils.sh:validate_trace_count() at line 1764. Error: Failed to perform an implicit rotation as part of the destruction of session "S0BXcJKWrmAwNSzm": Unknown error code not ok 617 - Destroy session S0BXcJKWrmAwNSzm FAIL: tools/clear/test_ust 617 - Destroy session S0BXcJKWrmAwNSzm # Failed test 'Destroy session S0BXcJKWrmAwNSzm' # in ./tools/clear//../../../utils/utils.sh:destroy_lttng_session() at line 1347. # Test ust streaming clear-rotate Looking at the relay daemon log when the problem is reproduced, we see: Error: Protocol error: received a packet for a stream that doesn't have a current trace chunk: stream_id = 1, channel_name = chan_0 Cause ===== The "rotate channel" consumer command iterates over a channel's streams to perform a rotation and open a new packet when necessary (see comments). In the case where a channel is associated with a relay daemon, the rotation positions are accumulated to send a single "rotate channel streams" command to the relay daemon. This is done to reduce the time needed to complete a rotation when tracing to a relay daemon through an high-latency network connection. Unfortunately, this causes packets to be opened before the rotation command was sent to the relay daemon as the "open packet" command is performed during the iteration on the streams. Solution ======== Streams for which a packet should be opened are accumulated into an array of stream pointers. The "open packet" is performed after a successful rotation of the streams as a second "pass". Known drawbacks =============== None. Signed-off-by: Jérémie Galarneau Change-Id: I6cb0b92619da73ebf223e1dfee88530e4244b56b --- src/common/consumer/consumer.c | 118 ++++++++++++++++++++------------- 1 file changed, 72 insertions(+), 46 deletions(-) diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index b2c5eb3dd..4a8b14634 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -3946,11 +3946,15 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, const bool is_local_trace = relayd_id == -1ULL; struct consumer_relayd_sock_pair *relayd = NULL; bool rotating_to_new_chunk = true; + /* Array of `struct lttng_consumer_stream *` */ + struct lttng_dynamic_pointer_array streams_packet_to_open; + size_t stream_idx; DBG("Consumer sample rotate position for channel %" PRIu64, key); lttng_dynamic_array_init(&stream_rotation_positions, sizeof(struct relayd_stream_rotation_position), NULL); + lttng_dynamic_pointer_array_init(&streams_packet_to_open, NULL); rcu_read_lock(); @@ -4128,67 +4132,88 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, * packets in this scenario and allows the tracer to * "stamp" the beginning of the new trace chunk at the * earliest possible point. + * + * The packet open is performed after the channel + * rotation to ensure that no attempt to open a packet + * is performed in a stream that has no active trace + * chunk. */ - const enum consumer_stream_open_packet_status status = - consumer_stream_open_packet(stream); - - switch (status) { - case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED: - DBG("Opened a packet after a rotation: stream id = %" PRIu64 - ", channel name = %s, session id = %" PRIu64, - stream->key, stream->chan->name, - stream->chan->session_id); - break; - case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE: - /* - * Can't open a packet as there is no space left - * in the buffer. A new packet will be opened - * once one has been consumed. - */ - DBG("No space left to open a packet after a rotation: stream id = %" PRIu64 - ", channel name = %s, session id = %" PRIu64, - stream->key, stream->chan->name, - stream->chan->session_id); - break; - case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR: - /* Logged by callee. */ + ret = lttng_dynamic_pointer_array_add_pointer( + &streams_packet_to_open, stream); + if (ret) { + PERROR("Failed to add a stream pointer to array of streams in which to open a packet"); ret = -1; goto end_unlock_stream; - default: - abort(); } } pthread_mutex_unlock(&stream->lock); } stream = NULL; - pthread_mutex_unlock(&channel->lock); - if (is_local_trace) { - ret = 0; - goto end; - } + if (!is_local_trace) { + relayd = consumer_find_relayd(relayd_id); + if (!relayd) { + ERR("Failed to find relayd %" PRIu64, relayd_id); + ret = -1; + goto end_unlock_channel; + } - relayd = consumer_find_relayd(relayd_id); - if (!relayd) { - ERR("Failed to find relayd %" PRIu64, relayd_id); - ret = -1; - goto end; + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + ret = relayd_rotate_streams(&relayd->control_sock, stream_count, + rotating_to_new_chunk ? &next_chunk_id : NULL, + (const struct relayd_stream_rotation_position *) + stream_rotation_positions.buffer + .data); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + if (ret < 0) { + ERR("Relayd rotate stream failed. Cleaning up relayd %" PRIu64, + relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); + goto end_unlock_channel; + } } - pthread_mutex_lock(&relayd->ctrl_sock_mutex); - ret = relayd_rotate_streams(&relayd->control_sock, stream_count, - rotating_to_new_chunk ? &next_chunk_id : NULL, - (const struct relayd_stream_rotation_position *) - stream_rotation_positions.buffer.data); - pthread_mutex_unlock(&relayd->ctrl_sock_mutex); - if (ret < 0) { - ERR("Relayd rotate stream failed. Cleaning up relayd %" PRIu64, - relayd->net_seq_idx); - lttng_consumer_cleanup_relayd(relayd); - goto end; + for (stream_idx = 0; + stream_idx < lttng_dynamic_pointer_array_get_count( + &streams_packet_to_open); + stream_idx++) { + enum consumer_stream_open_packet_status status; + + stream = lttng_dynamic_pointer_array_get_pointer( + &streams_packet_to_open, stream_idx); + + pthread_mutex_lock(&stream->lock); + status = consumer_stream_open_packet(stream); + pthread_mutex_unlock(&stream->lock); + switch (status) { + case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED: + DBG("Opened a packet after a rotation: stream id = %" PRIu64 + ", channel name = %s, session id = %" PRIu64, + stream->key, stream->chan->name, + stream->chan->session_id); + break; + case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE: + /* + * Can't open a packet as there is no space left + * in the buffer. A new packet will be opened + * once one has been consumed. + */ + DBG("No space left to open a packet after a rotation: stream id = %" PRIu64 + ", channel name = %s, session id = %" PRIu64, + stream->key, stream->chan->name, + stream->chan->session_id); + break; + case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR: + /* Logged by callee. */ + ret = -1; + goto end_unlock_stream; + default: + abort(); + } } + pthread_mutex_unlock(&channel->lock); ret = 0; goto end; @@ -4199,6 +4224,7 @@ end_unlock_channel: end: rcu_read_unlock(); lttng_dynamic_array_reset(&stream_rotation_positions); + lttng_dynamic_pointer_array_reset(&streams_packet_to_open); return ret; } -- 2.34.1