X-Git-Url: http://git.efficios.com/?p=babeltrace.git;a=blobdiff_plain;f=src%2Fplugins%2Fctf%2Flttng-live%2Flttng-live.c;h=6c412dd827209c1f8497f32bd7caaa480674d205;hp=2eafb2a6be2d5dedbb9c7f0ea3410ab770774c5f;hb=82338bfd24b995ca7b5224f54f1a245d906d517e;hpb=a043c3621976126dfaaaf8ffa2ee7fd5d98ea33b diff --git a/src/plugins/ctf/lttng-live/lttng-live.c b/src/plugins/ctf/lttng-live/lttng-live.c index 2eafb2a6..6c412dd8 100644 --- a/src/plugins/ctf/lttng-live/lttng-live.c +++ b/src/plugins/ctf/lttng-live/lttng-live.c @@ -104,8 +104,8 @@ void lttng_live_stream_iterator_set_state(struct lttng_live_stream_iterator *str #define LTTNG_LIVE_LOGD_STREAM_ITER(live_stream_iter) \ do { \ BT_COMP_LOGD("Live stream iterator state=%s, " \ - "last-inact-ts-is-set=%d, last-inact-ts-value=%" PRId64 "), " \ - "curr-inact-ts %" PRId64, \ + "last-inact-ts-is-set=%d, last-inact-ts-value=%" PRId64 ", " \ + "curr-inact-ts=%" PRId64, \ lttng_live_stream_state_string(live_stream_iter->state), \ live_stream_iter->last_inactivity_ts.is_set, \ live_stream_iter->last_inactivity_ts.value, \ @@ -671,7 +671,7 @@ enum lttng_live_iterator_status emit_inactivity_message( BT_COMP_LOGD("Emitting inactivity message for stream: ctf-stream-id=%" PRIu64 ", viewer-stream-id=%" PRIu64 ", timestamp=%" PRIu64, - stream_iter->ctf_stream_class_id, + stream_iter->ctf_stream_class_id.value, stream_iter->viewer_stream_id, timestamp); msg = bt_message_message_iterator_inactivity_create( @@ -1066,7 +1066,7 @@ end: goto retry; } - BT_COMP_LOGD("Returning from advancing live stream iterator: status=%s" + BT_COMP_LOGD("Returning from advancing live stream iterator: status=%s, " "stream-name=\"%s\", viewer-stream-id=%" PRIu64, lttng_live_iterator_status_string(live_status), stream_iter->name->str, stream_iter->viewer_stream_id); @@ -1158,6 +1158,58 @@ enum lttng_live_iterator_status handle_late_message( enum bt_clock_class_cycles_to_ns_from_origin_status ts_ns_status; int64_t last_inactivity_ts_ns; enum lttng_live_iterator_status stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK; + enum lttng_live_iterator_status adjust_status; + bt_message *adjusted_message; + + /* + * The timestamp of the current message is before the last message sent + * by this component. We CANNOT send it as is. + * + * The only expected scenario in which that could happen is the + * following, everything else is a bug in this component, relay deamon, + * or CTF parser. + * + * Expected scenario: The CTF message iterator emitted discarded + * packets and discarded events with synthesized beginning and end + * timestamps from the bounds of the last known packet and the newly + * decoded packet header. The CTF message iterator is not aware of + * stream inactivity beacons. Hence, we have to adjust the beginning + * timestamp of those types of messages if a stream signalled its + * inactivity up until _after_ the last known packet's beginning + * timestamp. + * + * Otherwise, the monotonicity guarantee of message timestamps would + * not be preserved. + * + * In short, the only scenario in which it's okay and fixable to + * received a late message is when: + * 1. the late message is a discarded packets or discarded events + * message, + * 2. this stream produced an inactivity message downstream, and + * 3. the timestamp of the late message is within the inactivity + * timespan we sent downstream through the inactivity message. + */ + + BT_COMP_LOGD("Handling late message on live stream iterator: " + "stream-name=\"%s\", viewer-stream-id=%" PRIu64, + stream_iter->name->str, stream_iter->viewer_stream_id); + + if (!stream_iter->last_inactivity_ts.is_set) { + BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Invalid live stream state: " + "have a late message when no inactivity message " + "was ever sent for that stream."); + stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; + goto end; + } + + if (!is_discarded_packet_or_event_message(late_msg)) { + BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Invalid live stream state: " + "have a late message that is not a packet discarded or " + "event discarded message: late-msg-type=%s", + bt_common_message_type_string(bt_message_get_type(late_msg))); + stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; + goto end; + } stream_class = bt_stream_borrow_class_const(stream_iter->stream); clock_class = bt_stream_class_borrow_default_clock_class_const(stream_class); @@ -1165,73 +1217,58 @@ enum lttng_live_iterator_status handle_late_message( ts_ns_status = bt_clock_class_cycles_to_ns_from_origin(clock_class, stream_iter->last_inactivity_ts.value, &last_inactivity_ts_ns); if (ts_ns_status != BT_CLOCK_CLASS_CYCLES_TO_NS_FROM_ORIGIN_STATUS_OK) { + BT_COMP_LOGE_APPEND_CAUSE(self_comp,"Error converting last " + "inactivity message timestamp to nanoseconds"); stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; goto end; } - if (last_inactivity_ts_ns > late_msg_ts_ns && - is_discarded_packet_or_event_message(late_msg)) { - /* - * The CTF message iterator emits Discarded Packets and Events - * with synthesized begin and end timestamps from the bounds of - * the last known packet and the newly decoded packet header. - * - * The CTF message iterator is not aware of stream inactivity - * beacons. Hence, we have to adjust the begin timestamp of - * those types of messages if a stream signaled its inactivity - * up until _after_ the last known packet's begin timestamp. - * - * Otherwise, the monotonicity guarantee would not be - * preserved. - */ - const enum bt_message_type msg_type = bt_message_get_type( - late_msg); - enum lttng_live_iterator_status adjust_status = - LTTNG_LIVE_ITERATOR_STATUS_OK; - bt_message *adjusted_message; - - switch (msg_type) { - case BT_MESSAGE_TYPE_DISCARDED_EVENTS: - adjust_status = adjust_discarded_events_message( - lttng_live_msg_iter->self_msg_iter, - stream_iter->stream, - late_msg, &adjusted_message, - stream_iter->last_inactivity_ts.value); - break; - case BT_MESSAGE_TYPE_DISCARDED_PACKETS: - adjust_status = adjust_discarded_packets_message( - lttng_live_msg_iter->self_msg_iter, - stream_iter->stream, - late_msg, &adjusted_message, - stream_iter->last_inactivity_ts.value); - break; - default: - bt_common_abort(); - } + if (last_inactivity_ts_ns <= late_msg_ts_ns) { + BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Invalid live stream state: " + "have a late message that is none included in a stream " + "inactivity timespan: last-inactivity-ts-ns=%" PRIu64 + "late-msg-ts-ns=%" PRIu64, last_inactivity_ts_ns, late_msg_ts_ns); + stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; + goto end; + } - if (adjust_status != LTTNG_LIVE_ITERATOR_STATUS_OK) { - stream_iter_status = adjust_status; - goto end; - } + /* + * We now know that it's okay for this message to be late, we can now + * adjust its timestamp to ensure monotonicity. + */ + BT_COMP_LOGD("Adjusting the timestamp of late message: late-msg-type=%s, " + "msg-new-ts-ns=%" PRIu64, + bt_common_message_type_string(bt_message_get_type(late_msg)), + stream_iter->last_inactivity_ts.value); + switch (bt_message_get_type(late_msg)) { + case BT_MESSAGE_TYPE_DISCARDED_EVENTS: + adjust_status = adjust_discarded_events_message( + lttng_live_msg_iter->self_msg_iter, + stream_iter->stream, + late_msg, &adjusted_message, + stream_iter->last_inactivity_ts.value); + break; + case BT_MESSAGE_TYPE_DISCARDED_PACKETS: + adjust_status = adjust_discarded_packets_message( + lttng_live_msg_iter->self_msg_iter, + stream_iter->stream, + late_msg, &adjusted_message, + stream_iter->last_inactivity_ts.value); + break; + default: + bt_common_abort(); + } - BT_ASSERT_DBG(adjusted_message); - stream_iter->current_msg = adjusted_message; - stream_iter->current_msg_ts_ns = last_inactivity_ts_ns; - } else { - /* - * We received a message in the past. To ensure - * monotonicity, we can't send it forward. - */ - BT_COMP_LOGE_APPEND_CAUSE(self_comp, - "Message's timestamp is less than lttng-live's message " - "iterator's last returned timestamp: " - "lttng-live-msg-iter-addr=%p, ts=%" PRId64 ", " - "last-msg-ts=%" PRId64, - lttng_live_msg_iter, late_msg_ts_ns, - lttng_live_msg_iter->last_msg_ts_ns); - stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; + if (adjust_status != LTTNG_LIVE_ITERATOR_STATUS_OK) { + stream_iter_status = adjust_status; goto end; } + + BT_ASSERT_DBG(adjusted_message); + stream_iter->current_msg = adjusted_message; + stream_iter->current_msg_ts_ns = last_inactivity_ts_ns; + bt_message_put_ref(late_msg); + end: return stream_iter_status; } @@ -1289,7 +1326,7 @@ enum lttng_live_iterator_status next_stream_iterator_for_trace( BT_ASSERT_DBG(msg); - BT_COMP_LOGD("Live stream iterator returned message: msg-type=%s" + BT_COMP_LOGD("Live stream iterator returned message: msg-type=%s, " "stream-name=\"%s\", viewer-stream-id=%" PRIu64, bt_common_message_type_string(bt_message_get_type(msg)), stream_iter->name->str, stream_iter->viewer_stream_id); @@ -1306,7 +1343,7 @@ enum lttng_live_iterator_status next_stream_iterator_for_trace( * Check if the message of the current live stream * iterator occurred at the exact same time or after the * last message returned by this component's message - * iterator. If not, we return an error. + * iterator. If not, we need to handle it with care. */ if (curr_msg_ts_ns >= lttng_live_msg_iter->last_msg_ts_ns) { stream_iter->current_msg = msg;