From: Francis Deslauriers Date: Wed, 10 Nov 2021 16:48:35 +0000 (-0500) Subject: src.ctf.lttng-live: Extract `handle_late_message()` function for future fix X-Git-Url: http://git.efficios.com/?p=babeltrace.git;a=commitdiff_plain;h=285951be2bbbbf5a22c1b012fadf5bd285f2f631 src.ctf.lttng-live: Extract `handle_late_message()` function for future fix An upcoming commit will include a fix regarding the handling of late messages. So, extract this logic into its own function simplifies the that future commit. No change of behaviour intended by this commit. Signed-off-by: Francis Deslauriers Change-Id: Ida4f980ab2574ebc64fb455e22047233c0f25cee Reviewed-on: https://review.lttng.org/c/babeltrace/+/6697 Tested-by: jenkins Reviewed-by: Philippe Proulx --- diff --git a/src/plugins/ctf/lttng-live/lttng-live.c b/src/plugins/ctf/lttng-live/lttng-live.c index 1b4deb94..b0833061 100644 --- a/src/plugins/ctf/lttng-live/lttng-live.c +++ b/src/plugins/ctf/lttng-live/lttng-live.c @@ -1134,6 +1134,97 @@ end: return status; } +static +enum lttng_live_iterator_status handle_late_message( + struct lttng_live_msg_iter *lttng_live_msg_iter, + struct lttng_live_stream_iterator *stream_iter, + int64_t late_msg_ts_ns, const bt_message *late_msg) +{ + bt_self_component *self_comp = lttng_live_msg_iter->self_comp; + bt_logging_level log_level = lttng_live_msg_iter->log_level; + const bt_clock_class *clock_class; + const bt_stream_class *stream_class; + enum bt_clock_class_cycles_to_ns_from_origin_status ts_ns_status; + int64_t last_inactivity_ts_ns; + enum lttng_live_iterator_status stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK; + + stream_class = bt_stream_borrow_class_const(stream_iter->stream); + clock_class = bt_stream_class_borrow_default_clock_class_const(stream_class); + + ts_ns_status = bt_clock_class_cycles_to_ns_from_origin(clock_class, + stream_iter->last_inactivity_ts, &last_inactivity_ts_ns); + if (ts_ns_status != BT_CLOCK_CLASS_CYCLES_TO_NS_FROM_ORIGIN_STATUS_OK) { + stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; + goto end; + } + + if (last_inactivity_ts_ns > late_msg_ts_ns && + is_discarded_packet_or_event_message(late_msg)) { + /* + * The CTF message iterator emits Discarded Packets and Events + * with synthesized begin and end timestamps from the bounds of + * the last known packet and the newly decoded packet header. + * + * The CTF message iterator is not aware of stream inactivity + * beacons. Hence, we have to adjust the begin timestamp of + * those types of messages if a stream signaled its inactivity + * up until _after_ the last known packet's begin timestamp. + * + * Otherwise, the monotonicity guarantee would not be + * preserved. + */ + const enum bt_message_type msg_type = bt_message_get_type( + late_msg); + enum lttng_live_iterator_status adjust_status = + LTTNG_LIVE_ITERATOR_STATUS_OK; + bt_message *adjusted_message; + + switch (msg_type) { + case BT_MESSAGE_TYPE_DISCARDED_EVENTS: + adjust_status = adjust_discarded_events_message( + lttng_live_msg_iter->self_msg_iter, + stream_iter->stream, + late_msg, &adjusted_message, + stream_iter->last_inactivity_ts); + break; + case BT_MESSAGE_TYPE_DISCARDED_PACKETS: + adjust_status = adjust_discarded_packets_message( + lttng_live_msg_iter->self_msg_iter, + stream_iter->stream, + late_msg, &adjusted_message, + stream_iter->last_inactivity_ts); + break; + default: + bt_common_abort(); + } + + if (adjust_status != LTTNG_LIVE_ITERATOR_STATUS_OK) { + stream_iter_status = adjust_status; + goto end; + } + + BT_ASSERT_DBG(adjusted_message); + stream_iter->current_msg = adjusted_message; + stream_iter->current_msg_ts_ns = last_inactivity_ts_ns; + } else { + /* + * We received a message in the past. To ensure + * monotonicity, we can't send it forward. + */ + BT_COMP_LOGE_APPEND_CAUSE(self_comp, + "Message's timestamp is less than lttng-live's message " + "iterator's last returned timestamp: " + "lttng-live-msg-iter-addr=%p, ts=%" PRId64 ", " + "last-msg-ts=%" PRId64, + lttng_live_msg_iter, late_msg_ts_ns, + lttng_live_msg_iter->last_msg_ts_ns); + stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; + goto end; + } +end: + return stream_iter_status; +} + static enum lttng_live_iterator_status next_stream_iterator_for_trace( struct lttng_live_msg_iter *lttng_live_msg_iter, @@ -1210,88 +1301,23 @@ enum lttng_live_iterator_status next_stream_iterator_for_trace( stream_iter->current_msg = msg; stream_iter->current_msg_ts_ns = curr_msg_ts_ns; } else { - const bt_clock_class *clock_class; - const bt_stream_class *stream_class; - enum bt_clock_class_cycles_to_ns_from_origin_status ts_ns_status; - int64_t last_inactivity_ts_ns; - - stream_class = bt_stream_borrow_class_const(stream_iter->stream); - clock_class = bt_stream_class_borrow_default_clock_class_const(stream_class); - - ts_ns_status = bt_clock_class_cycles_to_ns_from_origin( - clock_class, - stream_iter->last_inactivity_ts, - &last_inactivity_ts_ns); - if (ts_ns_status != BT_CLOCK_CLASS_CYCLES_TO_NS_FROM_ORIGIN_STATUS_OK) { - stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; - goto end; - } - - if (last_inactivity_ts_ns > curr_msg_ts_ns && - is_discarded_packet_or_event_message(msg)) { - /* - * The CTF message iterator emits Discarded - * Packets and Events with synthesized begin and - * end timestamps from the bounds of the last - * known packet and the newly decoded packet - * header. - * - * The CTF message iterator is not aware of - * stream inactivity beacons. Hence, we have - * to adjust the begin timestamp of those types - * of messages if a stream signaled its - * inactivity up until _after_ the last known - * packet's begin timestamp. - * - * Otherwise, the monotonicity guarantee would - * not be preserved. - */ - const enum bt_message_type msg_type = - bt_message_get_type(msg); - enum lttng_live_iterator_status adjust_status = - LTTNG_LIVE_ITERATOR_STATUS_OK; - bt_message *adjusted_message; - - switch (msg_type) { - case BT_MESSAGE_TYPE_DISCARDED_EVENTS: - adjust_status = adjust_discarded_events_message( - lttng_live_msg_iter->self_msg_iter, - stream_iter->stream, - msg, &adjusted_message, - stream_iter->last_inactivity_ts); - break; - case BT_MESSAGE_TYPE_DISCARDED_PACKETS: - adjust_status = adjust_discarded_packets_message( - lttng_live_msg_iter->self_msg_iter, - stream_iter->stream, - msg, &adjusted_message, - stream_iter->last_inactivity_ts); - break; - default: - bt_common_abort(); - } - - if (adjust_status != LTTNG_LIVE_ITERATOR_STATUS_OK) { - stream_iter_status = adjust_status; - goto end; - } - - BT_ASSERT_DBG(adjusted_message); - stream_iter->current_msg = adjusted_message; - stream_iter->current_msg_ts_ns = - last_inactivity_ts_ns; - } else { - /* - * We received a message in the past. To ensure - * monotonicity, we can't send it forward. - */ + /* + * We received a message from the past. This + * may be fixable but it can also be an error. + */ + stream_iter_status = handle_late_message( + lttng_live_msg_iter, stream_iter, + curr_msg_ts_ns, msg); + if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) { BT_COMP_LOGE_APPEND_CAUSE(self_comp, - "Message's timestamp is less than " - "lttng-live's message iterator's last " - "returned timestamp: " - "lttng-live-msg-iter-addr=%p, ts=%" PRId64 ", " - "last-msg-ts=%" PRId64, - lttng_live_msg_iter, curr_msg_ts_ns, + "Late message could not be handled correctly: " + "lttng-live-msg-iter-addr=%p, " + "stream-name=\"%s\", " + "curr-msg-ts=%" PRId64 + ", last-msg-ts=%" PRId64, + lttng_live_msg_iter, + stream_iter->name->str, + curr_msg_ts_ns, lttng_live_msg_iter->last_msg_ts_ns); stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; goto end;