X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fplugins%2Fctf%2Flttng-live%2Flttng-live.c;h=6c412dd827209c1f8497f32bd7caaa480674d205;hb=516bf0a77e025cfccce2fa400b757e94dc0bf1d8;hp=91e312f2f3536f6014312cacad424f5f7c84a92a;hpb=f93afbf95fcd73ca9a7835ffdf5adbe0787b07c4;p=babeltrace.git diff --git a/src/plugins/ctf/lttng-live/lttng-live.c b/src/plugins/ctf/lttng-live/lttng-live.c index 91e312f2..6c412dd8 100644 --- a/src/plugins/ctf/lttng-live/lttng-live.c +++ b/src/plugins/ctf/lttng-live/lttng-live.c @@ -86,12 +86,29 @@ const char *lttng_live_stream_state_string(enum lttng_live_stream_state state) } } +void lttng_live_stream_iterator_set_state(struct lttng_live_stream_iterator *stream_iter, + enum lttng_live_stream_state new_state) +{ + bt_self_component *self_comp = stream_iter->self_comp; + bt_logging_level log_level = stream_iter->log_level; + + BT_COMP_LOGD("Setting live stream iterator state: viewer-stream-id=%" PRIu64 + ", old-state=%s, new-state=%s", + stream_iter->viewer_stream_id, + lttng_live_stream_state_string(stream_iter->state), + lttng_live_stream_state_string(new_state)); + + stream_iter->state = new_state; +} + #define LTTNG_LIVE_LOGD_STREAM_ITER(live_stream_iter) \ do { \ - BT_COMP_LOGD("Live stream iterator state=%s, last-inact-ts=%" PRId64 \ - ", curr-inact-ts %" PRId64, \ + BT_COMP_LOGD("Live stream iterator state=%s, " \ + "last-inact-ts-is-set=%d, last-inact-ts-value=%" PRId64 ", " \ + "curr-inact-ts=%" PRId64, \ lttng_live_stream_state_string(live_stream_iter->state), \ - live_stream_iter->last_inactivity_ts, \ + live_stream_iter->last_inactivity_ts.is_set, \ + live_stream_iter->last_inactivity_ts.value, \ live_stream_iter->current_inactivity_ts); \ } while (0); @@ -403,11 +420,18 @@ enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stre BT_ASSERT_DBG(lttng_live_stream->state != LTTNG_LIVE_STREAM_EOF); if (lttng_live_stream->state == LTTNG_LIVE_STREAM_QUIESCENT) { - uint64_t last_inact_ts = lttng_live_stream->last_inactivity_ts, + uint64_t last_inact_ts = lttng_live_stream->last_inactivity_ts.value, curr_inact_ts = lttng_live_stream->current_inactivity_ts; if (orig_state == LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA && last_inact_ts == curr_inact_ts) { + /* + * Because the stream is in the QUIESCENT_NO_DATA + * state, we can assert that the last_inactivity_ts was + * set and can be safely used in the `if` above. + */ + BT_ASSERT(lttng_live_stream->last_inactivity_ts.is_set); + ret = LTTNG_LIVE_ITERATOR_STATUS_AGAIN; LTTNG_LIVE_LOGD_STREAM_ITER(lttng_live_stream); } else { @@ -647,7 +671,7 @@ enum lttng_live_iterator_status emit_inactivity_message( BT_COMP_LOGD("Emitting inactivity message for stream: ctf-stream-id=%" PRIu64 ", viewer-stream-id=%" PRIu64 ", timestamp=%" PRIu64, - stream_iter->ctf_stream_class_id, + stream_iter->ctf_stream_class_id.value, stream_iter->viewer_stream_id, timestamp); msg = bt_message_message_iterator_inactivity_create( @@ -685,9 +709,12 @@ enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_st * Check if we already sent an inactivty message downstream for this * `current_inactivity_ts` value. */ - if (lttng_live_stream->current_inactivity_ts == - lttng_live_stream->last_inactivity_ts) { - lttng_live_stream->state = LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA; + if (lttng_live_stream->last_inactivity_ts.is_set && + lttng_live_stream->current_inactivity_ts == + lttng_live_stream->last_inactivity_ts.value) { + lttng_live_stream_iterator_set_state(lttng_live_stream, + LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA); + ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; goto end; } @@ -695,8 +722,9 @@ enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_st ret = emit_inactivity_message(lttng_live_msg_iter, lttng_live_stream, message, lttng_live_stream->current_inactivity_ts); - lttng_live_stream->last_inactivity_ts = + lttng_live_stream->last_inactivity_ts.value = lttng_live_stream->current_inactivity_ts; + lttng_live_stream->last_inactivity_ts.is_set = true; end: return ret; } @@ -1038,7 +1066,7 @@ end: goto retry; } - BT_COMP_LOGD("Returning from advancing live stream iterator: status=%s" + BT_COMP_LOGD("Returning from advancing live stream iterator: status=%s, " "stream-name=\"%s\", viewer-stream-id=%" PRIu64, lttng_live_iterator_status_string(live_status), stream_iter->name->str, stream_iter->viewer_stream_id); @@ -1117,6 +1145,134 @@ end: return status; } +static +enum lttng_live_iterator_status handle_late_message( + struct lttng_live_msg_iter *lttng_live_msg_iter, + struct lttng_live_stream_iterator *stream_iter, + int64_t late_msg_ts_ns, const bt_message *late_msg) +{ + bt_self_component *self_comp = lttng_live_msg_iter->self_comp; + bt_logging_level log_level = lttng_live_msg_iter->log_level; + const bt_clock_class *clock_class; + const bt_stream_class *stream_class; + enum bt_clock_class_cycles_to_ns_from_origin_status ts_ns_status; + int64_t last_inactivity_ts_ns; + enum lttng_live_iterator_status stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK; + enum lttng_live_iterator_status adjust_status; + bt_message *adjusted_message; + + /* + * The timestamp of the current message is before the last message sent + * by this component. We CANNOT send it as is. + * + * The only expected scenario in which that could happen is the + * following, everything else is a bug in this component, relay deamon, + * or CTF parser. + * + * Expected scenario: The CTF message iterator emitted discarded + * packets and discarded events with synthesized beginning and end + * timestamps from the bounds of the last known packet and the newly + * decoded packet header. The CTF message iterator is not aware of + * stream inactivity beacons. Hence, we have to adjust the beginning + * timestamp of those types of messages if a stream signalled its + * inactivity up until _after_ the last known packet's beginning + * timestamp. + * + * Otherwise, the monotonicity guarantee of message timestamps would + * not be preserved. + * + * In short, the only scenario in which it's okay and fixable to + * received a late message is when: + * 1. the late message is a discarded packets or discarded events + * message, + * 2. this stream produced an inactivity message downstream, and + * 3. the timestamp of the late message is within the inactivity + * timespan we sent downstream through the inactivity message. + */ + + BT_COMP_LOGD("Handling late message on live stream iterator: " + "stream-name=\"%s\", viewer-stream-id=%" PRIu64, + stream_iter->name->str, stream_iter->viewer_stream_id); + + if (!stream_iter->last_inactivity_ts.is_set) { + BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Invalid live stream state: " + "have a late message when no inactivity message " + "was ever sent for that stream."); + stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; + goto end; + } + + if (!is_discarded_packet_or_event_message(late_msg)) { + BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Invalid live stream state: " + "have a late message that is not a packet discarded or " + "event discarded message: late-msg-type=%s", + bt_common_message_type_string(bt_message_get_type(late_msg))); + stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; + goto end; + } + + stream_class = bt_stream_borrow_class_const(stream_iter->stream); + clock_class = bt_stream_class_borrow_default_clock_class_const(stream_class); + + ts_ns_status = bt_clock_class_cycles_to_ns_from_origin(clock_class, + stream_iter->last_inactivity_ts.value, &last_inactivity_ts_ns); + if (ts_ns_status != BT_CLOCK_CLASS_CYCLES_TO_NS_FROM_ORIGIN_STATUS_OK) { + BT_COMP_LOGE_APPEND_CAUSE(self_comp,"Error converting last " + "inactivity message timestamp to nanoseconds"); + stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; + goto end; + } + + if (last_inactivity_ts_ns <= late_msg_ts_ns) { + BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Invalid live stream state: " + "have a late message that is none included in a stream " + "inactivity timespan: last-inactivity-ts-ns=%" PRIu64 + "late-msg-ts-ns=%" PRIu64, last_inactivity_ts_ns, late_msg_ts_ns); + stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; + goto end; + } + + /* + * We now know that it's okay for this message to be late, we can now + * adjust its timestamp to ensure monotonicity. + */ + BT_COMP_LOGD("Adjusting the timestamp of late message: late-msg-type=%s, " + "msg-new-ts-ns=%" PRIu64, + bt_common_message_type_string(bt_message_get_type(late_msg)), + stream_iter->last_inactivity_ts.value); + switch (bt_message_get_type(late_msg)) { + case BT_MESSAGE_TYPE_DISCARDED_EVENTS: + adjust_status = adjust_discarded_events_message( + lttng_live_msg_iter->self_msg_iter, + stream_iter->stream, + late_msg, &adjusted_message, + stream_iter->last_inactivity_ts.value); + break; + case BT_MESSAGE_TYPE_DISCARDED_PACKETS: + adjust_status = adjust_discarded_packets_message( + lttng_live_msg_iter->self_msg_iter, + stream_iter->stream, + late_msg, &adjusted_message, + stream_iter->last_inactivity_ts.value); + break; + default: + bt_common_abort(); + } + + if (adjust_status != LTTNG_LIVE_ITERATOR_STATUS_OK) { + stream_iter_status = adjust_status; + goto end; + } + + BT_ASSERT_DBG(adjusted_message); + stream_iter->current_msg = adjusted_message; + stream_iter->current_msg_ts_ns = last_inactivity_ts_ns; + bt_message_put_ref(late_msg); + +end: + return stream_iter_status; +} + static enum lttng_live_iterator_status next_stream_iterator_for_trace( struct lttng_live_msg_iter *lttng_live_msg_iter, @@ -1170,7 +1326,7 @@ enum lttng_live_iterator_status next_stream_iterator_for_trace( BT_ASSERT_DBG(msg); - BT_COMP_LOGD("Live stream iterator returned message: msg-type=%s" + BT_COMP_LOGD("Live stream iterator returned message: msg-type=%s, " "stream-name=\"%s\", viewer-stream-id=%" PRIu64, bt_common_message_type_string(bt_message_get_type(msg)), stream_iter->name->str, stream_iter->viewer_stream_id); @@ -1187,94 +1343,29 @@ enum lttng_live_iterator_status next_stream_iterator_for_trace( * Check if the message of the current live stream * iterator occurred at the exact same time or after the * last message returned by this component's message - * iterator. If not, we return an error. + * iterator. If not, we need to handle it with care. */ if (curr_msg_ts_ns >= lttng_live_msg_iter->last_msg_ts_ns) { stream_iter->current_msg = msg; stream_iter->current_msg_ts_ns = curr_msg_ts_ns; } else { - const bt_clock_class *clock_class; - const bt_stream_class *stream_class; - enum bt_clock_class_cycles_to_ns_from_origin_status ts_ns_status; - int64_t last_inactivity_ts_ns; - - stream_class = bt_stream_borrow_class_const(stream_iter->stream); - clock_class = bt_stream_class_borrow_default_clock_class_const(stream_class); - - ts_ns_status = bt_clock_class_cycles_to_ns_from_origin( - clock_class, - stream_iter->last_inactivity_ts, - &last_inactivity_ts_ns); - if (ts_ns_status != BT_CLOCK_CLASS_CYCLES_TO_NS_FROM_ORIGIN_STATUS_OK) { - stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; - goto end; - } - - if (last_inactivity_ts_ns > curr_msg_ts_ns && - is_discarded_packet_or_event_message(msg)) { - /* - * The CTF message iterator emits Discarded - * Packets and Events with synthesized begin and - * end timestamps from the bounds of the last - * known packet and the newly decoded packet - * header. - * - * The CTF message iterator is not aware of - * stream inactivity beacons. Hence, we have - * to adjust the begin timestamp of those types - * of messages if a stream signaled its - * inactivity up until _after_ the last known - * packet's begin timestamp. - * - * Otherwise, the monotonicity guarantee would - * not be preserved. - */ - const enum bt_message_type msg_type = - bt_message_get_type(msg); - enum lttng_live_iterator_status adjust_status = - LTTNG_LIVE_ITERATOR_STATUS_OK; - bt_message *adjusted_message; - - switch (msg_type) { - case BT_MESSAGE_TYPE_DISCARDED_EVENTS: - adjust_status = adjust_discarded_events_message( - lttng_live_msg_iter->self_msg_iter, - stream_iter->stream, - msg, &adjusted_message, - stream_iter->last_inactivity_ts); - break; - case BT_MESSAGE_TYPE_DISCARDED_PACKETS: - adjust_status = adjust_discarded_packets_message( - lttng_live_msg_iter->self_msg_iter, - stream_iter->stream, - msg, &adjusted_message, - stream_iter->last_inactivity_ts); - break; - default: - bt_common_abort(); - } - - if (adjust_status != LTTNG_LIVE_ITERATOR_STATUS_OK) { - stream_iter_status = adjust_status; - goto end; - } - - BT_ASSERT_DBG(adjusted_message); - stream_iter->current_msg = adjusted_message; - stream_iter->current_msg_ts_ns = - last_inactivity_ts_ns; - } else { - /* - * We received a message in the past. To ensure - * monotonicity, we can't send it forward. - */ + /* + * We received a message from the past. This + * may be fixable but it can also be an error. + */ + stream_iter_status = handle_late_message( + lttng_live_msg_iter, stream_iter, + curr_msg_ts_ns, msg); + if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) { BT_COMP_LOGE_APPEND_CAUSE(self_comp, - "Message's timestamp is less than " - "lttng-live's message iterator's last " - "returned timestamp: " - "lttng-live-msg-iter-addr=%p, ts=%" PRId64 ", " - "last-msg-ts=%" PRId64, - lttng_live_msg_iter, curr_msg_ts_ns, + "Late message could not be handled correctly: " + "lttng-live-msg-iter-addr=%p, " + "stream-name=\"%s\", " + "curr-msg-ts=%" PRId64 + ", last-msg-ts=%" PRId64, + lttng_live_msg_iter, + stream_iter->name->str, + curr_msg_ts_ns, lttng_live_msg_iter->last_msg_ts_ns); stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; goto end;