Fix: consumerd: packet sent before channel rotation
authorJérémie Galarneau <jeremie.galarneau@efficios.com>
Mon, 20 Jul 2020 19:29:41 +0000 (15:29 -0400)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Mon, 20 Jul 2020 19:56:56 +0000 (15:56 -0400)
Issue observed
==============

A clear test occasionally fails with the following output:
  # Test ust streaming rotate-clear
  # Parameters: tracing_active=0, clear_twice=1, rotate_before=0, rotate_after=0, buffer_type=uid
  ok 605 - Create session S0BXcJKWrmAwNSzm with uri:net://localhost and opts:
  PASS: tools/clear/test_ust 605 - Create session S0BXcJKWrmAwNSzm with uri:net://localhost and opts:
  ok 606 - Enable channel chan for session S0BXcJKWrmAwNSzm
  PASS: tools/clear/test_ust 606 - Enable channel chan for session S0BXcJKWrmAwNSzm
  ok 607 - Enable ust event tp:tptest for session S0BXcJKWrmAwNSzm
  PASS: tools/clear/test_ust 607 - Enable ust event tp:tptest for session S0BXcJKWrmAwNSzm
  ok 608 - Start tracing for session S0BXcJKWrmAwNSzm
  PASS: tools/clear/test_ust 608 - Start tracing for session S0BXcJKWrmAwNSzm
  ok 609 - Rotate session S0BXcJKWrmAwNSzm
  PASS: tools/clear/test_ust 609 - Rotate session S0BXcJKWrmAwNSzm
  ok 610 - Stop lttng tracing for session S0BXcJKWrmAwNSzm
  PASS: tools/clear/test_ust 610 - Stop lttng tracing for session S0BXcJKWrmAwNSzm
  ok 611 - Clear session S0BXcJKWrmAwNSzm
  PASS: tools/clear/test_ust 611 - Clear session S0BXcJKWrmAwNSzm
  ok 612 - Clear session S0BXcJKWrmAwNSzm
  PASS: tools/clear/test_ust 612 - Clear session S0BXcJKWrmAwNSzm
  Error: Relayd rotate streams replied error 97
  Error: Relayd rotate stream failed. Cleaning up relayd 33
  Error: Relayd send index failed. Cleaning up relayd 33.
  Error: Rotate channel failed
  Error: Stream 76 relayd ID 33 unknown. Can't write index.
  Error: Stream 74 relayd ID 33 unknown. Can't write index.
  Error: Stream 72 relayd ID 33 unknown. Can't write index.
  ok 613 - Start tracing for session S0BXcJKWrmAwNSzm
  PASS: tools/clear/test_ust 613 - Start tracing for session S0BXcJKWrmAwNSzm
  not ok 614 - Stop lttng tracing for session S0BXcJKWrmAwNSzm
  FAIL: tools/clear/test_ust 614 - Stop lttng tracing for session S0BXcJKWrmAwNSzm
  #   Failed test 'Stop lttng tracing for session S0BXcJKWrmAwNSzm'
  #   in ./tools/clear//../../../utils/utils.sh:stop_lttng_tracing_opt() at line 1311.
  ok 615 - Validate trace for event tp:tptest, 1 events
  PASS: tools/clear/test_ust 615 - Validate trace for event tp:tptest, 1 events
  not ok 616 - Read a total of 1 events, expected 4
  FAIL: tools/clear/test_ust 616 - Read a total of 1 events, expected 4
  #   Failed test 'Read a total of 1 events, expected 4'
  #   in ./tools/clear//../../../utils/utils.sh:validate_trace_count() at line 1764.
  Error: Failed to perform an implicit rotation as part of the destruction of session "S0BXcJKWrmAwNSzm": Unknown error code
  not ok 617 - Destroy session S0BXcJKWrmAwNSzm
  FAIL: tools/clear/test_ust 617 - Destroy session S0BXcJKWrmAwNSzm
  #   Failed test 'Destroy session S0BXcJKWrmAwNSzm'
  #   in ./tools/clear//../../../utils/utils.sh:destroy_lttng_session() at line 1347.
  # Test ust streaming clear-rotate

Looking at the relay daemon log when the problem is reproduced,
we see:
  Error: Protocol error: received a packet for a stream that doesn't have a current trace chunk: stream_id = 1, channel_name = chan_0

Cause
=====

The "rotate channel" consumer command iterates over a channel's streams
to perform a rotation and open a new packet when necessary
(see comments).

In the case where a channel is associated with a relay daemon, the
rotation positions are accumulated to send a single "rotate channel
streams" command to the relay daemon. This is done to reduce the time
needed to complete a rotation when tracing to a relay daemon through an
high-latency network connection.

Unfortunately, this causes packets to be opened before the rotation
command was sent to the relay daemon as the "open packet" command
is performed during the iteration on the streams.

Solution
========

Streams for which a packet should be opened are accumulated into an
array of stream pointers. The "open packet" is performed after a
successful rotation of the streams as a second "pass".

Known drawbacks
===============

None.

Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
Change-Id: I6cb0b92619da73ebf223e1dfee88530e4244b56b

src/common/consumer/consumer.c

index b2c5eb3ddbd1484e5aa2f44f64bc48f17a8dceda..4a8b14634d545b5587517caff54eddbd78cbfa48 100644 (file)
@@ -3946,11 +3946,15 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
        const bool is_local_trace = relayd_id == -1ULL;
        struct consumer_relayd_sock_pair *relayd = NULL;
        bool rotating_to_new_chunk = true;
        const bool is_local_trace = relayd_id == -1ULL;
        struct consumer_relayd_sock_pair *relayd = NULL;
        bool rotating_to_new_chunk = true;
+       /* Array of `struct lttng_consumer_stream *` */
+       struct lttng_dynamic_pointer_array streams_packet_to_open;
+       size_t stream_idx;
 
        DBG("Consumer sample rotate position for channel %" PRIu64, key);
 
        lttng_dynamic_array_init(&stream_rotation_positions,
                        sizeof(struct relayd_stream_rotation_position), NULL);
 
        DBG("Consumer sample rotate position for channel %" PRIu64, key);
 
        lttng_dynamic_array_init(&stream_rotation_positions,
                        sizeof(struct relayd_stream_rotation_position), NULL);
+       lttng_dynamic_pointer_array_init(&streams_packet_to_open, NULL);
 
        rcu_read_lock();
 
 
        rcu_read_lock();
 
@@ -4128,67 +4132,88 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                         * packets in this scenario and allows the tracer to
                         * "stamp" the beginning of the new trace chunk at the
                         * earliest possible point.
                         * packets in this scenario and allows the tracer to
                         * "stamp" the beginning of the new trace chunk at the
                         * earliest possible point.
+                        *
+                        * The packet open is performed after the channel
+                        * rotation to ensure that no attempt to open a packet
+                        * is performed in a stream that has no active trace
+                        * chunk.
                         */
                         */
-                       const enum consumer_stream_open_packet_status status =
-                               consumer_stream_open_packet(stream);
-
-                       switch (status) {
-                       case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
-                               DBG("Opened a packet after a rotation: stream id = %" PRIu64
-                                               ", channel name = %s, session id = %" PRIu64,
-                                               stream->key, stream->chan->name,
-                                               stream->chan->session_id);
-                               break;
-                       case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE:
-                               /*
-                                * Can't open a packet as there is no space left
-                                * in the buffer. A new packet will be opened
-                                * once one has been consumed.
-                                */
-                               DBG("No space left to open a packet after a rotation: stream id = %" PRIu64
-                                               ", channel name = %s, session id = %" PRIu64,
-                                               stream->key, stream->chan->name,
-                                               stream->chan->session_id);
-                               break;
-                       case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
-                               /* Logged by callee. */
+                       ret = lttng_dynamic_pointer_array_add_pointer(
+                                       &streams_packet_to_open, stream);
+                       if (ret) {
+                               PERROR("Failed to add a stream pointer to array of streams in which to open a packet");
                                ret = -1;
                                goto end_unlock_stream;
                                ret = -1;
                                goto end_unlock_stream;
-                       default:
-                               abort();
                        }
                }
 
                pthread_mutex_unlock(&stream->lock);
        }
        stream = NULL;
                        }
                }
 
                pthread_mutex_unlock(&stream->lock);
        }
        stream = NULL;
-       pthread_mutex_unlock(&channel->lock);
 
 
-       if (is_local_trace) {
-               ret = 0;
-               goto end;
-       }
+       if (!is_local_trace) {
+               relayd = consumer_find_relayd(relayd_id);
+               if (!relayd) {
+                       ERR("Failed to find relayd %" PRIu64, relayd_id);
+                       ret = -1;
+                       goto end_unlock_channel;
+               }
 
 
-       relayd = consumer_find_relayd(relayd_id);
-       if (!relayd) {
-               ERR("Failed to find relayd %" PRIu64, relayd_id);
-               ret = -1;
-               goto end;
+               pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+               ret = relayd_rotate_streams(&relayd->control_sock, stream_count,
+                               rotating_to_new_chunk ? &next_chunk_id : NULL,
+                               (const struct relayd_stream_rotation_position *)
+                                               stream_rotation_positions.buffer
+                                                               .data);
+               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+               if (ret < 0) {
+                       ERR("Relayd rotate stream failed. Cleaning up relayd %" PRIu64,
+                                       relayd->net_seq_idx);
+                       lttng_consumer_cleanup_relayd(relayd);
+                       goto end_unlock_channel;
+               }
        }
 
        }
 
-       pthread_mutex_lock(&relayd->ctrl_sock_mutex);
-       ret = relayd_rotate_streams(&relayd->control_sock, stream_count,
-                       rotating_to_new_chunk ? &next_chunk_id : NULL,
-                       (const struct relayd_stream_rotation_position *)
-                                       stream_rotation_positions.buffer.data);
-       pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
-       if (ret < 0) {
-               ERR("Relayd rotate stream failed. Cleaning up relayd %" PRIu64,
-                               relayd->net_seq_idx);
-               lttng_consumer_cleanup_relayd(relayd);
-               goto end;
+       for (stream_idx = 0;
+                       stream_idx < lttng_dynamic_pointer_array_get_count(
+                               &streams_packet_to_open);
+                       stream_idx++) {
+               enum consumer_stream_open_packet_status status;
+
+               stream = lttng_dynamic_pointer_array_get_pointer(
+                               &streams_packet_to_open, stream_idx);
+
+               pthread_mutex_lock(&stream->lock);
+               status = consumer_stream_open_packet(stream);
+               pthread_mutex_unlock(&stream->lock);
+               switch (status) {
+               case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
+                       DBG("Opened a packet after a rotation: stream id = %" PRIu64
+                           ", channel name = %s, session id = %" PRIu64,
+                                       stream->key, stream->chan->name,
+                                       stream->chan->session_id);
+                       break;
+               case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE:
+                       /*
+                        * Can't open a packet as there is no space left
+                        * in the buffer. A new packet will be opened
+                        * once one has been consumed.
+                        */
+                       DBG("No space left to open a packet after a rotation: stream id = %" PRIu64
+                           ", channel name = %s, session id = %" PRIu64,
+                                       stream->key, stream->chan->name,
+                                       stream->chan->session_id);
+                       break;
+               case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
+                       /* Logged by callee. */
+                       ret = -1;
+                       goto end_unlock_stream;
+               default:
+                       abort();
+               }
        }
 
        }
 
+       pthread_mutex_unlock(&channel->lock);
        ret = 0;
        goto end;
 
        ret = 0;
        goto end;
 
@@ -4199,6 +4224,7 @@ end_unlock_channel:
 end:
        rcu_read_unlock();
        lttng_dynamic_array_reset(&stream_rotation_positions);
 end:
        rcu_read_unlock();
        lttng_dynamic_array_reset(&stream_rotation_positions);
+       lttng_dynamic_pointer_array_reset(&streams_packet_to_open);
        return ret;
 }
 
        return ret;
 }
 
This page took 0.030705 seconds and 5 git commands to generate.