Fix: relayd: use packet sequence number for rotation position
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Tue, 5 Nov 2019 15:40:58 +0000 (10:40 -0500)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Thu, 19 Dec 2019 22:08:44 +0000 (17:08 -0500)
The "network" sequence number (net_seq_num) is a 64-bit sequence number
tagging each packet sent over the network. The net_seq_num increments
monotonically (+1) for each packet sent from consumer daemon to relay
daemon, on a per-stream basis. It is tagged by the consumer daemon when
sending a trace packet to the relay daemon.

The LTTng kernel and user-space ring buffer "consumed position"
(consumed_pos) and "produced position" (produced_pos) are free-running
counters counting the number of bytes consumed and produced so far by
each stream. Because those counters are updated atomically, they are
limited to a size of 32-bit on 32-bit architectures.

The "packet" sequence number (packet_seq_num) is a sequence number
found in the packet header starting from LTTng 2.8. It is a 64-bit
sequence number assigned by the lttng-modules and lttng-ust ring
buffers. It increments monotonically (+1) for each packet produced
within a given ring buffer (stream).

Using produced_pos as rotation position and comparing it to the
net_seq_num has a few issues:

1) It breaks on 32-bit producers after generating more than 4GB of
   data per stream, due to overflow. The net_seq_num is a 64-bit
   counter, which does not overflow, but the produced_pos overflows
   after 4GB on 32-bit architectures. This can lead to never-completing
   rotations.

2) It breaks scenarios where ring buffers are configured in
   overwrite mode, and streaming to a relay daemon. Indeed, when
   the ring buffer moves the consumed_pos ahead, actually overwriting
   data within the ring buffer, it introduces an offset between the
   produced_pos and the net_seq_num. Therefore, if producers are
   generating a low- (or no-) throughput in some streams, the
   rotation may never complete, even on 64-bit architectures.

The solution proposed for this issue is to use the packet_seq_num as
rotation position rather than the net_seq_num. It takes care of
the two problematic scenarios, since the counter is always 64-bit
(even on 32-bit architectures), and because the counter is managed
by the producer, which therefore tracks progress of the ring buffer
overwrites.

This commit introduces changes required at the relayd side. A
separate commit introduces the changes required in the consumerd.

In relayd, one major restriction is the fact that the packet_seq_num
is not sent over the data socket, only through the control socket
receiving the indexes.

Therefore, in order to figure out the pivot position for the data
socket for a given stream, the associated index first needs to be
received. At that point, the corresponding net_seq_num is known,
which provides the pivot position for the data stream. Given that
the data and index sockets provide no ordering guarantees with
respect to their arrival, we handle the fact that data might have
been saved to disk in the wrong (previous) trace chunk by moving
it to the next trace chunk when the pivot position is known.

In order to allow "jumps" in the sequence numbers produced by
overwrite mode buffers, try_rotate_stream_index(), which previously
asserted that each sequence number was received in sequence, now
uses the packet_seq_num pivot position as a lower (inclusive) bound.

Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
Change-Id: I755329e313f0980655a164b7bdb57e4f3d8e944a
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
src/bin/lttng-relayd/index.c
src/bin/lttng-relayd/main.c
src/bin/lttng-relayd/stream.c
src/bin/lttng-relayd/stream.h
src/common/relayd/relayd.h

index 3cae94e8e57525d37cdddcd3526c010c6105d5cd..bdbd11330f638a05223c81d256da6ef2da646dfd 100644 (file)
@@ -433,6 +433,11 @@ int relay_index_set_control_data(struct relay_index *index,
        if (minor_version >= 8) {
                index->index_data.stream_instance_id = htobe64(data->stream_instance_id);
                index->index_data.packet_seq_num = htobe64(data->packet_seq_num);
+       } else {
+               uint64_t unset_value = -1ULL;
+
+               index->index_data.stream_instance_id = htobe64(unset_value);
+               index->index_data.packet_seq_num = htobe64(unset_value);
        }
 
        return relay_index_set_data(index, &index_data);
index 169832002a445fe522af0ef5e769490dd2ab710b..9581d03eb074e1351ac0774034fb31a205c75a9b 100644 (file)
@@ -2188,6 +2188,9 @@ static int relay_recv_index(const struct lttcomm_relayd_hdr *recv_hdr,
                index_info.stream_instance_id =
                                be64toh(index_info.stream_instance_id);
                index_info.packet_seq_num = be64toh(index_info.packet_seq_num);
+       } else {
+               index_info.stream_instance_id = -1ULL;
+               index_info.packet_seq_num = -1ULL;
        }
 
        stream = stream_get_by_id(index_info.relay_stream_id);
index 755fb6734072113af804e59f6e1e81089adb6b1f..c0aeb17190d187e12155e8a162ef89204647df2b 100644 (file)
@@ -368,18 +368,23 @@ static int try_rotate_stream_data(struct relay_stream *stream)
        }
 
        if (stream->prev_data_seq == -1ULL ||
-                       stream->prev_data_seq + 1 < stream->ongoing_rotation.value.seq_num) {
+                       stream->ongoing_rotation.value.prev_data_net_seq == -1ULL ||
+                       stream->prev_data_seq <
+                       stream->ongoing_rotation.value.prev_data_net_seq) {
                /*
                 * The next packet that will be written is not part of the next
                 * chunk yet.
                 */
-               DBG("Stream %" PRIu64 " not yet ready for rotation (rotate_at_seq_num = %" PRIu64
+               DBG("Stream %" PRIu64 " data not yet ready for rotation "
+                               "(rotate_at_index_packet_seq_num = %" PRIu64
+                               ", rotate_at_prev_data_net_seq = %" PRIu64
                                ", prev_data_seq = %" PRIu64 ")",
                                stream->stream_handle,
-                               stream->ongoing_rotation.value.seq_num,
+                               stream->ongoing_rotation.value.packet_seq_num,
+                               stream->ongoing_rotation.value.prev_data_net_seq,
                                stream->prev_data_seq);
                goto end;
-       } else if (stream->prev_data_seq > stream->ongoing_rotation.value.seq_num) {
+       } else if (stream->prev_data_seq > stream->ongoing_rotation.value.prev_data_net_seq) {
                /*
                 * prev_data_seq is checked here since indexes and rotation
                 * commands are serialized with respect to each other.
@@ -477,23 +482,38 @@ static int try_rotate_stream_index(struct relay_stream *stream)
                goto end;
        }
 
-       if (stream->prev_index_seq == -1ULL ||
-                       stream->prev_index_seq + 1 < stream->ongoing_rotation.value.seq_num) {
-               DBG("Stream %" PRIu64 " index not yet ready for rotation (rotate_at_seq_num = %" PRIu64 ", prev_index_seq = %" PRIu64 ")",
+       if (!stream->received_packet_seq_num.is_set ||
+                       LTTNG_OPTIONAL_GET(stream->received_packet_seq_num) + 1 <
+                       stream->ongoing_rotation.value.packet_seq_num) {
+               DBG("Stream %" PRIu64 " index not yet ready for rotation "
+                               "(rotate_at_packet_seq_num = %" PRIu64
+                               ", received_packet_seq_num = "
+                               "(value = %" PRIu64 ", is_set = %" PRIu8 "))",
                                stream->stream_handle,
-                               stream->ongoing_rotation.value.seq_num,
-                               stream->prev_index_seq);
+                               stream->ongoing_rotation.value.packet_seq_num,
+                               stream->received_packet_seq_num.value,
+                               stream->received_packet_seq_num.is_set);
                goto end;
        } else {
-               /* The next index belongs to the new trace chunk; rotate. */
-               assert(stream->prev_index_seq + 1 ==
-                               stream->ongoing_rotation.value.seq_num);
+               /*
+                * The next index belongs to the new trace chunk; rotate.
+                * In overwrite mode, the packet seq num may jump over the
+                * rotation position.
+                */
+               assert(LTTNG_OPTIONAL_GET(stream->received_packet_seq_num) + 1 >=
+                               stream->ongoing_rotation.value.packet_seq_num);
                DBG("Rotating stream %" PRIu64 " index file",
                                stream->stream_handle);
                ret = create_index_file(stream,
                                stream->ongoing_rotation.value.next_trace_chunk);
                stream->ongoing_rotation.value.index_rotated = true;
 
+               /*
+                * Set the rotation pivot position for the data, now that we have the
+                * net_seq_num matching the packet_seq_num index pivot position.
+                */
+               stream->ongoing_rotation.value.prev_data_net_seq =
+                       stream->prev_index_seq;
                if (stream->ongoing_rotation.value.data_rotated &&
                                stream->ongoing_rotation.value.index_rotated) {
                        /* Rotation completed; reset its state. */
@@ -798,7 +818,10 @@ int stream_set_pending_rotation(struct relay_stream *stream,
 {
        int ret = 0;
        const struct relay_stream_rotation rotation = {
-               .seq_num = rotation_sequence_number,
+               .data_rotated = false,
+               .index_rotated = false,
+               .packet_seq_num = rotation_sequence_number,
+               .prev_data_net_seq = -1ULL,
                .next_trace_chunk = next_trace_chunk,
        };
 
@@ -816,7 +839,8 @@ int stream_set_pending_rotation(struct relay_stream *stream,
        }
        LTTNG_OPTIONAL_SET(&stream->ongoing_rotation, rotation);
 
-       DBG("Setting pending rotation: stream_id = %" PRIu64 ", rotation_seq_num = %" PRIu64,
+       DBG("Setting pending rotation: stream_id = %" PRIu64
+                       ", rotate_at_packet_seq_num = %" PRIu64,
                        stream->stream_handle, rotation_sequence_number);
        if (stream->is_metadata) {
                /*
@@ -825,12 +849,12 @@ int stream_set_pending_rotation(struct relay_stream *stream,
                stream->ongoing_rotation.value.index_rotated = true;
                ret = stream_rotate_data_file(stream);
        } else {
-               ret = try_rotate_stream_data(stream);
+               ret = try_rotate_stream_index(stream);
                if (ret < 0) {
                        goto end;
                }
 
-               ret = try_rotate_stream_index(stream);
+               ret = try_rotate_stream_data(stream);
                if (ret < 0) {
                        goto end;
                }
@@ -1132,6 +1156,8 @@ int stream_update_index(struct relay_stream *stream, uint64_t net_seq_num,
                tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_READ);
                tracefile_array_commit_seq(stream->tfa);
                stream->index_received_seqcount++;
+               LTTNG_OPTIONAL_SET(&stream->received_packet_seq_num,
+                       be64toh(index->index_data.packet_seq_num));
                *flushed = true;
        } else if (ret > 0) {
                index->total_size = total_size;
@@ -1228,11 +1254,17 @@ int stream_add_index(struct relay_stream *stream,
                stream->index_received_seqcount++;
                stream->pos_after_last_complete_data_index += index->total_size;
                stream->prev_index_seq = index_info->net_seq_num;
+               LTTNG_OPTIONAL_SET(&stream->received_packet_seq_num,
+                               index_info->packet_seq_num);
 
                ret = try_rotate_stream_index(stream);
                if (ret < 0) {
                        goto end;
                }
+               ret = try_rotate_stream_data(stream);
+               if (ret < 0) {
+                       goto end;
+               }
        } else if (ret > 0) {
                /* no flush. */
                ret = 0;
index b8d14ecfb8cb0d07b40553456bc0fae86a7d5f4e..02948fa8e4c3439906dca9df5710c0d5f992f644 100644 (file)
@@ -44,10 +44,15 @@ struct relay_stream_rotation {
        bool data_rotated;
        bool index_rotated;
        /*
-        * Sequence number of the first packet of the new trace chunk to which
-        * the stream is rotating.
+        * Packet sequence number of the first packet of the new trace chunk to
+        * which the stream is rotating.
         */
-       uint64_t seq_num;
+       uint64_t packet_seq_num;
+       /*
+        * Monotonically increasing previous network sequence number of first
+        * data packet of the new trace chunk to which the stream is rotating.
+        */
+       uint64_t prev_data_net_seq;
        struct lttng_trace_chunk *next_trace_chunk;
 };
 
@@ -111,6 +116,12 @@ struct relay_stream {
         */
        uint64_t index_received_seqcount;
 
+       /*
+        * Packet sequence number of the last received packet index.
+        * Only populated when interacting with CTF_INDEX 1.1+.
+        */
+       LTTNG_OPTIONAL(uint64_t) received_packet_seq_num;
+
        /*
         * Tracefile array is an index of the stream trace files,
         * indexed by position. It allows keeping track of the oldest
index 5c5368391bd55c26347bdbf1f2b0f4b793aed03c..61829f7bc7806112d9cefaf84a64002bbe049b1b 100644 (file)
@@ -29,7 +29,7 @@
 struct relayd_stream_rotation_position {
        uint64_t stream_id;
        /*
-        * Sequence number of the first packet belonging to the new
+        * Packet sequence number of the first packet belonging to the new
         * "destination" trace chunk to which the stream is rotating.
         *
         * Ignored for metadata streams.
This page took 0.032535 seconds and 5 git commands to generate.