X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fplugins%2Fctf%2Flttng-live%2Flttng-live.c;h=6c412dd827209c1f8497f32bd7caaa480674d205;hb=516bf0a77e025cfccce2fa400b757e94dc0bf1d8;hp=b8f3c8c278c36c31cd232351147e40ee243e6816;hpb=cad707e2548d7c9ea8501f12aec7ed28eaaf7975;p=babeltrace.git diff --git a/src/plugins/ctf/lttng-live/lttng-live.c b/src/plugins/ctf/lttng-live/lttng-live.c index b8f3c8c2..6c412dd8 100644 --- a/src/plugins/ctf/lttng-live/lttng-live.c +++ b/src/plugins/ctf/lttng-live/lttng-live.c @@ -1,31 +1,11 @@ /* - * lttng-live.c - * - * Babeltrace CTF LTTng-live Client Component + * SPDX-License-Identifier: MIT * * Copyright 2019 Francis Deslauriers * Copyright 2016 Jérémie Galarneau * Copyright 2016 Mathieu Desnoyers * - * Author: Jérémie Galarneau - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in - * all copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. + * Babeltrace CTF LTTng-live Client Component */ #define BT_COMP_LOG_SELF_COMP self_comp @@ -106,12 +86,29 @@ const char *lttng_live_stream_state_string(enum lttng_live_stream_state state) } } +void lttng_live_stream_iterator_set_state(struct lttng_live_stream_iterator *stream_iter, + enum lttng_live_stream_state new_state) +{ + bt_self_component *self_comp = stream_iter->self_comp; + bt_logging_level log_level = stream_iter->log_level; + + BT_COMP_LOGD("Setting live stream iterator state: viewer-stream-id=%" PRIu64 + ", old-state=%s, new-state=%s", + stream_iter->viewer_stream_id, + lttng_live_stream_state_string(stream_iter->state), + lttng_live_stream_state_string(new_state)); + + stream_iter->state = new_state; +} + #define LTTNG_LIVE_LOGD_STREAM_ITER(live_stream_iter) \ do { \ - BT_COMP_LOGD("Live stream iterator state=%s, last-inact-ts=%" PRId64 \ - ", curr-inact-ts %" PRId64, \ + BT_COMP_LOGD("Live stream iterator state=%s, " \ + "last-inact-ts-is-set=%d, last-inact-ts-value=%" PRId64 ", " \ + "curr-inact-ts=%" PRId64, \ lttng_live_stream_state_string(live_stream_iter->state), \ - live_stream_iter->last_inactivity_ts, \ + live_stream_iter->last_inactivity_ts.is_set, \ + live_stream_iter->last_inactivity_ts.value, \ live_stream_iter->current_inactivity_ts); \ } while (0); @@ -377,12 +374,12 @@ enum lttng_live_iterator_status lttng_live_iterator_next_check_stream_state( } /* - * For active no data stream, fetch next data. It can be either: - * - quiescent: need to put it in the prio heap at quiescent end - * timestamp, - * - have data: need to wire up first event into the prio heap, - * - have no data on this stream at this point: need to retry (AGAIN) or - * return EOF. + * For active no data stream, fetch next index. As a result of that it can + * become either: + * - quiescent: won't have events for a bit, + * - have data: need to get that data and produce the event, + * - have no data on this stream at this point: need to retry (AGAIN) or return + * EOF. */ static enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stream( @@ -397,29 +394,44 @@ enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stre if (lttng_live_stream->trace->metadata_stream_state == LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED) { + BT_COMP_LOGD("Need to get an update for the metadata stream before proceeding further with this stream: " + "stream-name=\"%s\"", lttng_live_stream->name->str); ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; goto end; } + if (lttng_live_stream->trace->session->new_streams_needed) { + BT_COMP_LOGD("Need to get an update of all streams before proceeding further with this stream: " + "stream-name=\"%s\"", lttng_live_stream->name->str); ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; goto end; } + if (lttng_live_stream->state != LTTNG_LIVE_STREAM_ACTIVE_NO_DATA && lttng_live_stream->state != LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA) { goto end; } ret = lttng_live_get_next_index(lttng_live_msg_iter, lttng_live_stream, - &index); + &index); if (ret != LTTNG_LIVE_ITERATOR_STATUS_OK) { goto end; } + BT_ASSERT_DBG(lttng_live_stream->state != LTTNG_LIVE_STREAM_EOF); + if (lttng_live_stream->state == LTTNG_LIVE_STREAM_QUIESCENT) { - uint64_t last_inact_ts = lttng_live_stream->last_inactivity_ts, + uint64_t last_inact_ts = lttng_live_stream->last_inactivity_ts.value, curr_inact_ts = lttng_live_stream->current_inactivity_ts; if (orig_state == LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA && last_inact_ts == curr_inact_ts) { + /* + * Because the stream is in the QUIESCENT_NO_DATA + * state, we can assert that the last_inactivity_ts was + * set and can be safely used in the `if` above. + */ + BT_ASSERT(lttng_live_stream->last_inactivity_ts.is_set); + ret = LTTNG_LIVE_ITERATOR_STATUS_AGAIN; LTTNG_LIVE_LOGD_STREAM_ITER(lttng_live_stream); } else { @@ -427,9 +439,19 @@ enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_no_data_stre } goto end; } + lttng_live_stream->base_offset = index.offset; lttng_live_stream->offset = index.offset; lttng_live_stream->len = index.packet_size / CHAR_BIT; + + BT_COMP_LOGD("Setting live stream reading info: stream-name=\"%s\", " + "viewer-stream-id=%" PRIu64 ", stream-base-offset=%" PRIu64 + ", stream-offset=%" PRIu64 ", stream-len=%" PRIu64, + lttng_live_stream->name->str, + lttng_live_stream->viewer_stream_id, + lttng_live_stream->base_offset, + lttng_live_stream->offset, lttng_live_stream->len); + end: if (ret == LTTNG_LIVE_ITERATOR_STATUS_OK) { ret = lttng_live_iterator_next_check_stream_state(lttng_live_stream); @@ -454,11 +476,9 @@ enum lttng_live_iterator_status lttng_live_get_session( enum lttng_live_iterator_status status; uint64_t trace_idx; - BT_COMP_LOGD("Updating all streams for session: " - "session-id=%"PRIu64", session-name=\"%s\"", - session->id, session->session_name->str); - if (!session->attached) { + BT_COMP_LOGD("Attach to session: session-id=%" PRIu64, + session->id); enum lttng_live_viewer_status attach_status = lttng_live_session_attach(session, lttng_live_msg_iter->self_msg_iter); @@ -481,12 +501,17 @@ enum lttng_live_iterator_status lttng_live_get_session( } } + BT_COMP_LOGD("Updating all streams and metadata for session: " + "session-id=%" PRIu64 ", session-name=\"%s\"", + session->id, session->session_name->str); + status = lttng_live_session_get_new_streams(session, lttng_live_msg_iter->self_msg_iter); if (status != LTTNG_LIVE_ITERATOR_STATUS_OK && status != LTTNG_LIVE_ITERATOR_STATUS_END) { goto end; } + trace_idx = 0; while (trace_idx < session->traces->len) { struct lttng_live_trace *trace = @@ -510,6 +535,11 @@ enum lttng_live_iterator_status lttng_live_get_session( goto end; } } + + /* + * Now that we have the metadata we can initialize the downstream + * iterator. + */ status = lttng_live_lazy_msg_init(session, lttng_live_msg_iter->self_msg_iter); @@ -521,16 +551,23 @@ static void lttng_live_force_new_streams_and_metadata(struct lttng_live_msg_iter *lttng_live_msg_iter) { uint64_t session_idx, trace_idx; + bt_logging_level log_level = lttng_live_msg_iter->log_level; + bt_self_component *self_comp = lttng_live_msg_iter->self_comp; for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) { struct lttng_live_session *session = g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx); + BT_COMP_LOGD("Force marking session as needing new streams: " + "session-id=%" PRIu64, session->id); session->new_streams_needed = true; for (trace_idx = 0; trace_idx < session->traces->len; trace_idx++) { struct lttng_live_trace *trace = g_ptr_array_index(session->traces, trace_idx); + BT_COMP_LOGD("Force marking trace metadata state as needing an update: " + "session-id=%" PRIu64 ", trace-id=%" PRIu64, + session->id, trace->id); BT_ASSERT(trace->metadata_stream_state != LTTNG_LIVE_METADATA_STREAM_STATE_CLOSED); @@ -554,7 +591,7 @@ lttng_live_iterator_handle_new_streams_and_metadata( enum session_not_found_action sess_not_found_act = lttng_live_msg_iter->lttng_live_comp->params.sess_not_found_act; - BT_COMP_LOGD("Update data and metadata of all sessions" + BT_COMP_LOGD("Update data and metadata of all sessions: " "live-msg-iter-addr=%p", lttng_live_msg_iter); /* * In a remotely distant future, we could add a "new @@ -632,6 +669,11 @@ enum lttng_live_iterator_status emit_inactivity_message( BT_ASSERT(stream_iter->trace->clock_class); + BT_COMP_LOGD("Emitting inactivity message for stream: ctf-stream-id=%" PRIu64 + ", viewer-stream-id=%" PRIu64 ", timestamp=%" PRIu64, + stream_iter->ctf_stream_class_id.value, + stream_iter->viewer_stream_id, timestamp); + msg = bt_message_message_iterator_inactivity_create( lttng_live_msg_iter->self_msg_iter, stream_iter->trace->clock_class, timestamp); @@ -663,9 +705,16 @@ enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_st return LTTNG_LIVE_ITERATOR_STATUS_OK; } - if (lttng_live_stream->current_inactivity_ts == - lttng_live_stream->last_inactivity_ts) { - lttng_live_stream->state = LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA; + /* + * Check if we already sent an inactivty message downstream for this + * `current_inactivity_ts` value. + */ + if (lttng_live_stream->last_inactivity_ts.is_set && + lttng_live_stream->current_inactivity_ts == + lttng_live_stream->last_inactivity_ts.value) { + lttng_live_stream_iterator_set_state(lttng_live_stream, + LTTNG_LIVE_STREAM_QUIESCENT_NO_DATA); + ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; goto end; } @@ -673,8 +722,9 @@ enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_quiescent_st ret = emit_inactivity_message(lttng_live_msg_iter, lttng_live_stream, message, lttng_live_stream->current_inactivity_ts); - lttng_live_stream->last_inactivity_ts = + lttng_live_stream->last_inactivity_ts.value = lttng_live_stream->current_inactivity_ts; + lttng_live_stream->last_inactivity_ts.is_set = true; end: return ret; } @@ -740,7 +790,7 @@ int live_get_msg_ts_ns(struct lttng_live_stream_iterator *stream_iter, msg); break; case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY: - clock_snapshot = bt_message_message_iterator_inactivity_borrow_default_clock_snapshot_const( + clock_snapshot = bt_message_message_iterator_inactivity_borrow_clock_snapshot_const( msg); break; default: @@ -795,6 +845,8 @@ enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_ g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx); if (session->new_streams_needed) { + BT_COMP_LOGD("Need an update for streams: " + "session-id=%" PRIu64, session->id); ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; goto end; } @@ -803,6 +855,9 @@ enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_ struct lttng_live_trace *trace = g_ptr_array_index(session->traces, trace_idx); if (trace->metadata_stream_state == LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED) { + BT_COMP_LOGD("Need an update for metadata stream: " + "session-id=%" PRIu64 ", trace-id=%" PRIu64, + session->id, trace->id); ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; goto end; } @@ -835,8 +890,6 @@ enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_ */ ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; break; - case CTF_MSG_ITER_STATUS_INVAL: - /* No argument provided by the user, so don't return INVAL. */ case CTF_MSG_ITER_STATUS_ERROR: default: ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR; @@ -862,6 +915,11 @@ enum lttng_live_iterator_status lttng_live_iterator_close_stream( LTTNG_LIVE_ITERATOR_STATUS_OK; bt_logging_level log_level = lttng_live_msg_iter->log_level; bt_self_component *self_comp = lttng_live_msg_iter->self_comp; + + BT_COMP_LOGD("Closing live stream iterator: stream-name=\"%s\", " + "viewer-stream-id=%" PRIu64, stream_iter->name->str, + stream_iter->viewer_stream_id); + /* * The viewer has hung up on us so we are closing the stream. The * `ctf_msg_iter` should simply realize that it needs to close the @@ -875,6 +933,10 @@ enum lttng_live_iterator_status lttng_live_iterator_close_stream( "Error getting the next message from CTF message iterator"); live_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; goto end; + } else if (status == CTF_MSG_ITER_STATUS_EOF) { + BT_COMP_LOGI("Reached the end of the live stream iterator."); + live_status = LTTNG_LIVE_ITERATOR_STATUS_END; + goto end; } BT_ASSERT(status == CTF_MSG_ITER_STATUS_OK); @@ -941,8 +1003,9 @@ enum lttng_live_iterator_status lttng_live_iterator_next_msg_on_stream( bt_self_component *self_comp = lttng_live_msg_iter->self_comp; enum lttng_live_iterator_status live_status; - BT_COMP_LOGD("Finding the next message for stream iterator: " - "stream-name=\"%s\"", stream_iter->name->str); + BT_COMP_LOGD("Advancing live stream iterator until next message if possible: " + "stream-name=\"%s\", viewer-stream-id=%" PRIu64, + stream_iter->name->str, stream_iter->viewer_stream_id); if (stream_iter->has_stream_hung_up) { /* @@ -957,14 +1020,19 @@ enum lttng_live_iterator_status lttng_live_iterator_next_msg_on_stream( retry: LTTNG_LIVE_LOGD_STREAM_ITER(stream_iter); + + /* + * Make sure we have the most recent metadata and possibly some new + * streams. + */ live_status = lttng_live_iterator_handle_new_streams_and_metadata( lttng_live_msg_iter); if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) { goto end; } + live_status = lttng_live_iterator_next_handle_one_no_data_stream( lttng_live_msg_iter, stream_iter); - if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) { if (live_status == LTTNG_LIVE_ITERATOR_STATUS_END) { /* @@ -976,6 +1044,7 @@ retry: } goto end; } + live_status = lttng_live_iterator_next_handle_one_quiescent_stream( lttng_live_msg_iter, stream_iter, curr_msg); if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) { @@ -993,12 +1062,217 @@ retry: end: if (live_status == LTTNG_LIVE_ITERATOR_STATUS_CONTINUE) { + BT_COMP_LOGD("Ask the relay daemon for an updated view of the data and metadata streams"); goto retry; } + BT_COMP_LOGD("Returning from advancing live stream iterator: status=%s, " + "stream-name=\"%s\", viewer-stream-id=%" PRIu64, + lttng_live_iterator_status_string(live_status), + stream_iter->name->str, stream_iter->viewer_stream_id); + return live_status; } +static +bool is_discarded_packet_or_event_message(const bt_message *msg) +{ + const enum bt_message_type msg_type = bt_message_get_type(msg); + + return msg_type == BT_MESSAGE_TYPE_DISCARDED_EVENTS || + msg_type == BT_MESSAGE_TYPE_DISCARDED_PACKETS; +} + +static +enum lttng_live_iterator_status adjust_discarded_packets_message( + bt_self_message_iterator *iter, + const bt_stream *stream, + const bt_message *msg_in, bt_message **msg_out, + uint64_t new_begin_ts) +{ + enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK; + enum bt_property_availability availability; + const bt_clock_snapshot *clock_snapshot; + uint64_t end_ts; + uint64_t count; + + clock_snapshot = bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(msg_in); + end_ts = bt_clock_snapshot_get_value(clock_snapshot); + + availability = bt_message_discarded_packets_get_count(msg_in, &count); + BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE); + + *msg_out = bt_message_discarded_packets_create_with_default_clock_snapshots( + iter, stream, new_begin_ts, end_ts); + if (!*msg_out) { + status = LTTNG_LIVE_ITERATOR_STATUS_NOMEM; + goto end; + } + + bt_message_discarded_packets_set_count(*msg_out, count); +end: + return status; +} + +static +enum lttng_live_iterator_status adjust_discarded_events_message( + bt_self_message_iterator *iter, + const bt_stream *stream, + const bt_message *msg_in, bt_message **msg_out, + uint64_t new_begin_ts) +{ + enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK; + enum bt_property_availability availability; + const bt_clock_snapshot *clock_snapshot; + uint64_t end_ts; + uint64_t count; + + clock_snapshot = bt_message_discarded_events_borrow_end_default_clock_snapshot_const(msg_in); + end_ts = bt_clock_snapshot_get_value(clock_snapshot); + + availability = bt_message_discarded_events_get_count(msg_in, &count); + BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE); + + *msg_out = bt_message_discarded_events_create_with_default_clock_snapshots( + iter, stream, new_begin_ts, end_ts); + if (!*msg_out) { + status = LTTNG_LIVE_ITERATOR_STATUS_NOMEM; + goto end; + } + + bt_message_discarded_events_set_count(*msg_out, count); +end: + return status; +} + +static +enum lttng_live_iterator_status handle_late_message( + struct lttng_live_msg_iter *lttng_live_msg_iter, + struct lttng_live_stream_iterator *stream_iter, + int64_t late_msg_ts_ns, const bt_message *late_msg) +{ + bt_self_component *self_comp = lttng_live_msg_iter->self_comp; + bt_logging_level log_level = lttng_live_msg_iter->log_level; + const bt_clock_class *clock_class; + const bt_stream_class *stream_class; + enum bt_clock_class_cycles_to_ns_from_origin_status ts_ns_status; + int64_t last_inactivity_ts_ns; + enum lttng_live_iterator_status stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK; + enum lttng_live_iterator_status adjust_status; + bt_message *adjusted_message; + + /* + * The timestamp of the current message is before the last message sent + * by this component. We CANNOT send it as is. + * + * The only expected scenario in which that could happen is the + * following, everything else is a bug in this component, relay deamon, + * or CTF parser. + * + * Expected scenario: The CTF message iterator emitted discarded + * packets and discarded events with synthesized beginning and end + * timestamps from the bounds of the last known packet and the newly + * decoded packet header. The CTF message iterator is not aware of + * stream inactivity beacons. Hence, we have to adjust the beginning + * timestamp of those types of messages if a stream signalled its + * inactivity up until _after_ the last known packet's beginning + * timestamp. + * + * Otherwise, the monotonicity guarantee of message timestamps would + * not be preserved. + * + * In short, the only scenario in which it's okay and fixable to + * received a late message is when: + * 1. the late message is a discarded packets or discarded events + * message, + * 2. this stream produced an inactivity message downstream, and + * 3. the timestamp of the late message is within the inactivity + * timespan we sent downstream through the inactivity message. + */ + + BT_COMP_LOGD("Handling late message on live stream iterator: " + "stream-name=\"%s\", viewer-stream-id=%" PRIu64, + stream_iter->name->str, stream_iter->viewer_stream_id); + + if (!stream_iter->last_inactivity_ts.is_set) { + BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Invalid live stream state: " + "have a late message when no inactivity message " + "was ever sent for that stream."); + stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; + goto end; + } + + if (!is_discarded_packet_or_event_message(late_msg)) { + BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Invalid live stream state: " + "have a late message that is not a packet discarded or " + "event discarded message: late-msg-type=%s", + bt_common_message_type_string(bt_message_get_type(late_msg))); + stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; + goto end; + } + + stream_class = bt_stream_borrow_class_const(stream_iter->stream); + clock_class = bt_stream_class_borrow_default_clock_class_const(stream_class); + + ts_ns_status = bt_clock_class_cycles_to_ns_from_origin(clock_class, + stream_iter->last_inactivity_ts.value, &last_inactivity_ts_ns); + if (ts_ns_status != BT_CLOCK_CLASS_CYCLES_TO_NS_FROM_ORIGIN_STATUS_OK) { + BT_COMP_LOGE_APPEND_CAUSE(self_comp,"Error converting last " + "inactivity message timestamp to nanoseconds"); + stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; + goto end; + } + + if (last_inactivity_ts_ns <= late_msg_ts_ns) { + BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Invalid live stream state: " + "have a late message that is none included in a stream " + "inactivity timespan: last-inactivity-ts-ns=%" PRIu64 + "late-msg-ts-ns=%" PRIu64, last_inactivity_ts_ns, late_msg_ts_ns); + stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; + goto end; + } + + /* + * We now know that it's okay for this message to be late, we can now + * adjust its timestamp to ensure monotonicity. + */ + BT_COMP_LOGD("Adjusting the timestamp of late message: late-msg-type=%s, " + "msg-new-ts-ns=%" PRIu64, + bt_common_message_type_string(bt_message_get_type(late_msg)), + stream_iter->last_inactivity_ts.value); + switch (bt_message_get_type(late_msg)) { + case BT_MESSAGE_TYPE_DISCARDED_EVENTS: + adjust_status = adjust_discarded_events_message( + lttng_live_msg_iter->self_msg_iter, + stream_iter->stream, + late_msg, &adjusted_message, + stream_iter->last_inactivity_ts.value); + break; + case BT_MESSAGE_TYPE_DISCARDED_PACKETS: + adjust_status = adjust_discarded_packets_message( + lttng_live_msg_iter->self_msg_iter, + stream_iter->stream, + late_msg, &adjusted_message, + stream_iter->last_inactivity_ts.value); + break; + default: + bt_common_abort(); + } + + if (adjust_status != LTTNG_LIVE_ITERATOR_STATUS_OK) { + stream_iter_status = adjust_status; + goto end; + } + + BT_ASSERT_DBG(adjusted_message); + stream_iter->current_msg = adjusted_message; + stream_iter->current_msg_ts_ns = last_inactivity_ts_ns; + bt_message_put_ref(late_msg); + +end: + return stream_iter_status; +} + static enum lttng_live_iterator_status next_stream_iterator_for_trace( struct lttng_live_msg_iter *lttng_live_msg_iter, @@ -1031,17 +1305,16 @@ enum lttng_live_iterator_status next_stream_iterator_for_trace( stream_iter_idx); /* - * Find if there is are now current message for this stream - * iterator get it. + * If there is no current message for this stream, go fetch + * one. */ while (!stream_iter->current_msg) { const bt_message *msg = NULL; int64_t curr_msg_ts_ns = INT64_MAX; + stream_iter_status = lttng_live_iterator_next_msg_on_stream( lttng_live_msg_iter, stream_iter, &msg); - BT_COMP_LOGD("live stream iterator returned status :%s", - lttng_live_iterator_status_string(stream_iter_status)); if (stream_iter_status == LTTNG_LIVE_ITERATOR_STATUS_END) { stream_iter_is_ended = true; break; @@ -1053,6 +1326,11 @@ enum lttng_live_iterator_status next_stream_iterator_for_trace( BT_ASSERT_DBG(msg); + BT_COMP_LOGD("Live stream iterator returned message: msg-type=%s, " + "stream-name=\"%s\", viewer-stream-id=%" PRIu64, + bt_common_message_type_string(bt_message_get_type(msg)), + stream_iter->name->str, stream_iter->viewer_stream_id); + /* * Get the timestamp in nanoseconds from origin of this * messsage. @@ -1063,28 +1341,35 @@ enum lttng_live_iterator_status next_stream_iterator_for_trace( /* * Check if the message of the current live stream - * iterator occured at the exact same time or after the + * iterator occurred at the exact same time or after the * last message returned by this component's message - * iterator. If not, we return an error. + * iterator. If not, we need to handle it with care. */ if (curr_msg_ts_ns >= lttng_live_msg_iter->last_msg_ts_ns) { stream_iter->current_msg = msg; stream_iter->current_msg_ts_ns = curr_msg_ts_ns; } else { /* - * We received a message in the past. To ensure - * monotonicity, we can't send it forward. + * We received a message from the past. This + * may be fixable but it can also be an error. */ - BT_COMP_LOGE_APPEND_CAUSE(self_comp, - "Message's timestamp is less than " - "lttng-live's message iterator's last " - "returned timestamp: " - "lttng-live-msg-iter-addr=%p, ts=%" PRId64 ", " - "last-msg-ts=%" PRId64, - lttng_live_msg_iter, curr_msg_ts_ns, - lttng_live_msg_iter->last_msg_ts_ns); - stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; - goto end; + stream_iter_status = handle_late_message( + lttng_live_msg_iter, stream_iter, + curr_msg_ts_ns, msg); + if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) { + BT_COMP_LOGE_APPEND_CAUSE(self_comp, + "Late message could not be handled correctly: " + "lttng-live-msg-iter-addr=%p, " + "stream-name=\"%s\", " + "curr-msg-ts=%" PRId64 + ", last-msg-ts=%" PRId64, + lttng_live_msg_iter, + stream_iter->name->str, + curr_msg_ts_ns, + lttng_live_msg_iter->last_msg_ts_ns); + stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; + goto end; + } } } @@ -1281,12 +1566,12 @@ void put_messages(bt_message_array_const msgs, uint64_t count) } BT_HIDDEN -bt_component_class_message_iterator_next_method_status lttng_live_msg_iter_next( +bt_message_iterator_class_next_method_status lttng_live_msg_iter_next( bt_self_message_iterator *self_msg_it, bt_message_array_const msgs, uint64_t capacity, uint64_t *count) { - bt_component_class_message_iterator_next_method_status status; + bt_message_iterator_class_next_method_status status; enum lttng_live_viewer_status viewer_status; struct lttng_live_msg_iter *lttng_live_msg_iter = bt_self_message_iterator_get_data(self_msg_it); @@ -1310,7 +1595,7 @@ bt_component_class_message_iterator_next_method_status lttng_live_msg_iter_next( * is to prevent other graph users from using this live * iterator in an messed up internal state. */ - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR; + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR; BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Message iterator was interrupted during a previous call to the `next()` and currently does not support continuing after such event."); goto end; @@ -1330,7 +1615,7 @@ bt_component_class_message_iterator_next_method_status lttng_live_msg_iter_next( if (lttng_live_msg_iter->sessions->len == 0) { if (lttng_live->params.sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) { - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_END; + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END; goto end; } else { /* @@ -1341,11 +1626,11 @@ bt_component_class_message_iterator_next_method_status lttng_live_msg_iter_next( viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter); if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) { if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) { - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR; + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR; BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error creating LTTng live viewer session"); } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) { - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_AGAIN; + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_AGAIN; } else { bt_common_abort(); } @@ -1496,20 +1781,20 @@ return_status: * doesn't support restarting after an interruption. */ if (*count > 0) { - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK; + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK; } else { - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_AGAIN; + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_AGAIN; } break; case LTTNG_LIVE_ITERATOR_STATUS_END: - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_END; + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END; break; case LTTNG_LIVE_ITERATOR_STATUS_NOMEM: BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Memory error preparing the next batch of messages: " "live-iter-status=%s", lttng_live_iterator_status_string(stream_iter_status)); - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_MEMORY_ERROR; + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_MEMORY_ERROR; break; case LTTNG_LIVE_ITERATOR_STATUS_ERROR: case LTTNG_LIVE_ITERATOR_STATUS_INVAL: @@ -1519,7 +1804,7 @@ return_status: "live-iter-status=%s", lttng_live_iterator_status_string(stream_iter_status)); - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR; + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR; /* Put all existing messages on error. */ put_messages(msgs, *count); break; @@ -1566,21 +1851,18 @@ end: } BT_HIDDEN -bt_component_class_message_iterator_initialize_method_status lttng_live_msg_iter_init( +bt_message_iterator_class_initialize_method_status lttng_live_msg_iter_init( bt_self_message_iterator *self_msg_it, bt_self_message_iterator_configuration *config, - bt_self_component_source *self_comp_src, bt_self_component_port_output *self_port) { - bt_component_class_message_iterator_initialize_method_status status; - bt_self_component *self_comp = - bt_self_component_source_as_self_component(self_comp_src); + bt_message_iterator_class_initialize_method_status status; struct lttng_live_component *lttng_live; struct lttng_live_msg_iter *lttng_live_msg_iter; enum lttng_live_viewer_status viewer_status; bt_logging_level log_level; - - BT_ASSERT(self_msg_it); + bt_self_component *self_comp = + bt_self_message_iterator_borrow_component(self_msg_it); lttng_live = bt_self_component_get_data(self_comp); log_level = lttng_live->log_level; @@ -1594,7 +1876,7 @@ bt_component_class_message_iterator_initialize_method_status lttng_live_msg_iter lttng_live_msg_iter = lttng_live_msg_iter_create(lttng_live, self_msg_it); if (!lttng_live_msg_iter) { - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_MEMORY_ERROR; + status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR; BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to create lttng_live_msg_iter"); goto error; @@ -1664,11 +1946,11 @@ bt_component_class_message_iterator_initialize_method_status lttng_live_msg_iter } bt_self_message_iterator_set_data(self_msg_it, lttng_live_msg_iter); - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_OK; + status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_OK; goto end; error: - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_ERROR; + status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR; lttng_live_msg_iter_destroy(lttng_live_msg_iter); end: return status;