Background
==========
When a stream hangs up on the `src.ctf.lttng-live` component, we make
sure we send a stream end message to ensure we honor The Contract which
states that any stream beginning must eventually be followed by its
stream end counterpart. We do this by calling
`ctf_msg_iter_get_next_message()` one last time to emit any missing
messages.
Using the upcoming lttng clear feature in conjunction with a per-pid
session makes it highly likely that a live stream hangs up on the
`src.ctf.lttng-live` component between the moment we learn about it and
the moment we first ask for its live index.
In such event, the live stream iterator and its `ctf_msg_it` are both
created but the corresponding stream is uninitialized.
When the component realized that a live stream has hung up, it calls
`ctf_msg_iter_get_next_message()` to respect The Contract but then
errors out here:
CAUSED BY [lttng-live: 'source.ctf.lttng-live'] (msg-iter.c:2474)
Cannot create stream end message because stream is NULL:
msg-it-addr=0x555fba864010
The `stream` field is null because we never got the chance to received
any index for this stream.
Issue
=====
It's possible for a `ctf_msg` state machine to pass by the
`STATE_EMIT_MSG_STREAM_END` state without having passed by the
`STATE_EMIT_MSG_STREAM_BEGINNING` state.
Solution
========
Keep track of the fact that we sent a stream beginning message
downstream and that we need to send its respective stream end message.
If no message were send for a particular stream, we can omit sending a
stream end message.
Signed-off-by: Francis Deslauriers <francis.deslauriers@efficios.com>
Change-Id: If7f52f43162e7263785713c01c226907fe475d94
Reviewed-on: https://review.lttng.org/c/babeltrace/+/2719
CI-Build: Simon Marchi <simon.marchi@efficios.com>
Tested-by: jenkins <jenkins@lttng.org>
Reviewed-by: Simon Marchi <simon.marchi@efficios.com>
STATE_EMIT_MSG_PACKET_END_MULTI,
STATE_EMIT_MSG_PACKET_END_SINGLE,
STATE_EMIT_QUEUED_MSG_PACKET_END,
STATE_EMIT_MSG_PACKET_END_MULTI,
STATE_EMIT_MSG_PACKET_END_SINGLE,
STATE_EMIT_QUEUED_MSG_PACKET_END,
+ STATE_CHECK_EMIT_MSG_STREAM_END,
STATE_EMIT_MSG_STREAM_END,
STATE_DONE,
};
STATE_EMIT_MSG_STREAM_END,
STATE_DONE,
};
*/
bool emit_stream_beginning_message;
*/
bool emit_stream_beginning_message;
+ /*
+ * True if we need to emit a stream end message at the end of the
+ * current stream. A live stream may never receive any data and thus
+ * never send a stream beginning message which removes the need to emit
+ * a stream end message.
+ */
+ bool emit_stream_end_message;
+
/* Database of current dynamic scopes */
struct {
bt_field *stream_packet_context;
/* Database of current dynamic scopes */
struct {
bt_field *stream_packet_context;
return "EMIT_MSG_PACKET_END_SINGLE";
case STATE_EMIT_QUEUED_MSG_PACKET_END:
return "EMIT_QUEUED_MSG_PACKET_END";
return "EMIT_MSG_PACKET_END_SINGLE";
case STATE_EMIT_QUEUED_MSG_PACKET_END:
return "EMIT_QUEUED_MSG_PACKET_END";
+ case STATE_CHECK_EMIT_MSG_STREAM_END:
+ return "CHECK_EMIT_MSG_STREAM_END";
case STATE_EMIT_MSG_STREAM_END:
return "EMIT_MSG_STREAM_END";
case STATE_DONE:
case STATE_EMIT_MSG_STREAM_END:
return "EMIT_MSG_STREAM_END";
case STATE_DONE:
medium_status = msg_it->medium.medops.switch_packet(msg_it->medium.data);
if (medium_status == CTF_MSG_ITER_MEDIUM_STATUS_EOF) {
/* No more packets. */
medium_status = msg_it->medium.medops.switch_packet(msg_it->medium.data);
if (medium_status == CTF_MSG_ITER_MEDIUM_STATUS_EOF) {
/* No more packets. */
- msg_it->state = STATE_EMIT_MSG_STREAM_END;
+ msg_it->state = STATE_CHECK_EMIT_MSG_STREAM_END;
status = CTF_MSG_ITER_STATUS_OK;
goto end;
} else if (medium_status != CTF_MSG_ITER_MEDIUM_STATUS_OK) {
status = CTF_MSG_ITER_STATUS_OK;
goto end;
} else if (medium_status != CTF_MSG_ITER_MEDIUM_STATUS_OK) {
break;
case CTF_MSG_ITER_STATUS_EOF:
status = CTF_MSG_ITER_STATUS_OK;
break;
case CTF_MSG_ITER_STATUS_EOF:
status = CTF_MSG_ITER_STATUS_OK;
- msg_it->state = STATE_EMIT_MSG_STREAM_END;
+ msg_it->state = STATE_CHECK_EMIT_MSG_STREAM_END;
goto end;
default:
goto end;
goto end;
default:
goto end;
return CTF_MSG_ITER_STATUS_OK;
}
return CTF_MSG_ITER_STATUS_OK;
}
+static inline
+enum state check_emit_msg_stream_end(struct ctf_msg_iter *msg_it)
+{
+ enum state next_state;
+
+ if (msg_it->emit_stream_end_message) {
+ next_state = STATE_EMIT_MSG_STREAM_END;
+ } else {
+ next_state = STATE_DONE;
+ }
+
+ return next_state;
+}
+
static inline
enum ctf_msg_iter_status handle_state(struct ctf_msg_iter *msg_it)
{
static inline
enum ctf_msg_iter_status handle_state(struct ctf_msg_iter *msg_it)
{
case STATE_EMIT_QUEUED_MSG_PACKET_END:
msg_it->state = STATE_EMIT_MSG_PACKET_END_SINGLE;
break;
case STATE_EMIT_QUEUED_MSG_PACKET_END:
msg_it->state = STATE_EMIT_MSG_PACKET_END_SINGLE;
break;
+ case STATE_CHECK_EMIT_MSG_STREAM_END:
+ msg_it->state = check_emit_msg_stream_end(msg_it);
+ break;
case STATE_EMIT_MSG_STREAM_END:
msg_it->state = STATE_DONE;
break;
case STATE_EMIT_MSG_STREAM_END:
msg_it->state = STATE_DONE;
break;
msg_it->prev_packet_snapshots.beginning_clock = UINT64_C(-1);
msg_it->prev_packet_snapshots.end_clock = UINT64_C(-1);
msg_it->emit_stream_beginning_message = true;
msg_it->prev_packet_snapshots.beginning_clock = UINT64_C(-1);
msg_it->prev_packet_snapshots.end_clock = UINT64_C(-1);
msg_it->emit_stream_beginning_message = true;
+ msg_it->emit_stream_end_message = false;
/* create_msg_stream_beginning() logs errors */
*message = create_msg_stream_beginning(msg_it);
msg_it->emit_stream_beginning_message = false;
/* create_msg_stream_beginning() logs errors */
*message = create_msg_stream_beginning(msg_it);
msg_it->emit_stream_beginning_message = false;
+ msg_it->emit_stream_end_message = true;
if (!*message) {
status = CTF_MSG_ITER_STATUS_ERROR;
if (!*message) {
status = CTF_MSG_ITER_STATUS_ERROR;
case STATE_EMIT_MSG_STREAM_END:
/* create_msg_stream_end() logs errors */
*message = create_msg_stream_end(msg_it);
case STATE_EMIT_MSG_STREAM_END:
/* create_msg_stream_end() logs errors */
*message = create_msg_stream_end(msg_it);
+ msg_it->emit_stream_end_message = false;
if (!*message) {
status = CTF_MSG_ITER_STATUS_ERROR;
if (!*message) {
status = CTF_MSG_ITER_STATUS_ERROR;
"Error getting the next message from CTF message iterator");
live_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
goto end;
"Error getting the next message from CTF message iterator");
live_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
goto end;
+ } else if (status == CTF_MSG_ITER_STATUS_EOF) {
+ BT_COMP_LOGI("Reached the end of the live stream iterator.");
+ live_status = LTTNG_LIVE_ITERATOR_STATUS_END;
+ goto end;
}
BT_ASSERT(status == CTF_MSG_ITER_STATUS_OK);
}
BT_ASSERT(status == CTF_MSG_ITER_STATUS_OK);