Fix: src.ctf.lttng-live: emitting stream end msg with no stream
authorFrancis Deslauriers <francis.deslauriers@efficios.com>
Thu, 19 Dec 2019 21:39:45 +0000 (16:39 -0500)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Mon, 20 Jan 2020 20:15:24 +0000 (15:15 -0500)
Background
==========
When a stream hangs up on the `src.ctf.lttng-live` component, we make
sure we send a stream end message to ensure we honor The Contract which
states that any stream beginning must eventually be followed by its
stream end counterpart. We do this by calling
`ctf_msg_iter_get_next_message()` one last time to emit any missing
messages.

Using the upcoming lttng clear feature in conjunction with a per-pid
session makes it highly likely that a live stream hangs up on the
`src.ctf.lttng-live` component between the moment we learn about it and
the moment we first ask for its live index.

In such event, the live stream iterator and its `ctf_msg_it` are both
created but the corresponding stream is uninitialized.

When the component realized that a live stream has hung up, it calls
`ctf_msg_iter_get_next_message()` to respect The Contract but then
errors out here:
  CAUSED BY [lttng-live: 'source.ctf.lttng-live'] (msg-iter.c:2474)
    Cannot create stream end message because stream is NULL:
    msg-it-addr=0x555fba864010

The `stream` field is null because we never got the chance to received
any index for this stream.

Issue
=====
It's possible for a `ctf_msg` state machine to pass by the
`STATE_EMIT_MSG_STREAM_END` state without having passed by the
`STATE_EMIT_MSG_STREAM_BEGINNING` state.

Solution
========
Keep track of the fact that we sent a stream beginning message
downstream and that we need to send its respective stream end message.
If no message were send for a particular stream, we can omit sending a
stream end message.

Signed-off-by: Francis Deslauriers <francis.deslauriers@efficios.com>
Change-Id: If7f52f43162e7263785713c01c226907fe475d94
Reviewed-on: https://review.lttng.org/c/babeltrace/+/2719
CI-Build: Simon Marchi <simon.marchi@efficios.com>
Tested-by: jenkins <jenkins@lttng.org>
Reviewed-by: Simon Marchi <simon.marchi@efficios.com>
src/plugins/ctf/common/msg-iter/msg-iter.c
src/plugins/ctf/lttng-live/lttng-live.c

index fc617f3d366ccee0f8a9b8815a23b3222c432602..a49462b85977810e9c3883a8ac4aff6a3d986b15 100644 (file)
@@ -108,6 +108,7 @@ enum state {
        STATE_EMIT_MSG_PACKET_END_MULTI,
        STATE_EMIT_MSG_PACKET_END_SINGLE,
        STATE_EMIT_QUEUED_MSG_PACKET_END,
+       STATE_CHECK_EMIT_MSG_STREAM_END,
        STATE_EMIT_MSG_STREAM_END,
        STATE_DONE,
 };
@@ -181,6 +182,14 @@ struct ctf_msg_iter {
         */
        bool emit_stream_beginning_message;
 
+       /*
+        * True if we need to emit a stream end message at the end of the
+        * current stream. A live stream may never receive any data and thus
+        * never send a stream beginning message which removes the need to emit
+        * a stream end message.
+        */
+       bool emit_stream_end_message;
+
        /* Database of current dynamic scopes */
        struct {
                bt_field *stream_packet_context;
@@ -318,6 +327,8 @@ const char *state_string(enum state state)
                return "EMIT_MSG_PACKET_END_SINGLE";
        case STATE_EMIT_QUEUED_MSG_PACKET_END:
                return "EMIT_QUEUED_MSG_PACKET_END";
+       case STATE_CHECK_EMIT_MSG_STREAM_END:
+               return "CHECK_EMIT_MSG_STREAM_END";
        case STATE_EMIT_MSG_STREAM_END:
                return "EMIT_MSG_STREAM_END";
        case STATE_DONE:
@@ -725,7 +736,7 @@ enum ctf_msg_iter_status switch_packet_state(struct ctf_msg_iter *msg_it)
                medium_status = msg_it->medium.medops.switch_packet(msg_it->medium.data);
                if (medium_status == CTF_MSG_ITER_MEDIUM_STATUS_EOF) {
                        /* No more packets. */
-                       msg_it->state = STATE_EMIT_MSG_STREAM_END;
+                       msg_it->state = STATE_CHECK_EMIT_MSG_STREAM_END;
                        status = CTF_MSG_ITER_STATUS_OK;
                        goto end;
                } else if (medium_status != CTF_MSG_ITER_MEDIUM_STATUS_OK) {
@@ -803,7 +814,7 @@ enum ctf_msg_iter_status read_packet_header_begin_state(
                break;
        case CTF_MSG_ITER_STATUS_EOF:
                status = CTF_MSG_ITER_STATUS_OK;
-               msg_it->state = STATE_EMIT_MSG_STREAM_END;
+               msg_it->state = STATE_CHECK_EMIT_MSG_STREAM_END;
                goto end;
        default:
                goto end;
@@ -1657,6 +1668,20 @@ end:
        return CTF_MSG_ITER_STATUS_OK;
 }
 
+static inline
+enum state check_emit_msg_stream_end(struct ctf_msg_iter *msg_it)
+{
+       enum state next_state;
+
+       if (msg_it->emit_stream_end_message) {
+               next_state = STATE_EMIT_MSG_STREAM_END;
+       } else {
+               next_state = STATE_DONE;
+       }
+
+       return next_state;
+}
+
 static inline
 enum ctf_msg_iter_status handle_state(struct ctf_msg_iter *msg_it)
 {
@@ -1755,6 +1780,9 @@ enum ctf_msg_iter_status handle_state(struct ctf_msg_iter *msg_it)
        case STATE_EMIT_QUEUED_MSG_PACKET_END:
                msg_it->state = STATE_EMIT_MSG_PACKET_END_SINGLE;
                break;
+       case STATE_CHECK_EMIT_MSG_STREAM_END:
+               msg_it->state = check_emit_msg_stream_end(msg_it);
+               break;
        case STATE_EMIT_MSG_STREAM_END:
                msg_it->state = STATE_DONE;
                break;
@@ -1817,6 +1845,7 @@ void ctf_msg_iter_reset(struct ctf_msg_iter *msg_it)
        msg_it->prev_packet_snapshots.beginning_clock = UINT64_C(-1);
        msg_it->prev_packet_snapshots.end_clock = UINT64_C(-1);
        msg_it->emit_stream_beginning_message = true;
+       msg_it->emit_stream_end_message = false;
 }
 
 static
@@ -2947,6 +2976,7 @@ enum ctf_msg_iter_status ctf_msg_iter_get_next_message(
                        /* create_msg_stream_beginning() logs errors */
                        *message = create_msg_stream_beginning(msg_it);
                        msg_it->emit_stream_beginning_message = false;
+                       msg_it->emit_stream_end_message = true;
 
                        if (!*message) {
                                status = CTF_MSG_ITER_STATUS_ERROR;
@@ -2956,6 +2986,7 @@ enum ctf_msg_iter_status ctf_msg_iter_get_next_message(
                case STATE_EMIT_MSG_STREAM_END:
                        /* create_msg_stream_end() logs errors */
                        *message = create_msg_stream_end(msg_it);
+                       msg_it->emit_stream_end_message = false;
 
                        if (!*message) {
                                status = CTF_MSG_ITER_STATUS_ERROR;
index 35e1037f6569c613cfd744c819a4a1ffedc1040b..bef21b09ce38823a30456b06b06860bdfaa92c0e 100644 (file)
@@ -873,6 +873,10 @@ enum lttng_live_iterator_status lttng_live_iterator_close_stream(
                        "Error getting the next message from CTF message iterator");
                live_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
                goto end;
+       } else if (status == CTF_MSG_ITER_STATUS_EOF) {
+               BT_COMP_LOGI("Reached the end of the live stream iterator.");
+               live_status = LTTNG_LIVE_ITERATOR_STATUS_END;
+               goto end;
        }
 
        BT_ASSERT(status == CTF_MSG_ITER_STATUS_OK);
This page took 0.027817 seconds and 4 git commands to generate.