X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fplugins%2Fctf%2Flttng-live%2Flttng-live.c;h=8516ffb1b2bbe071568e5d2e9f888fd3cfe6cf5e;hb=6b1463e62b1281c335123beb0b0894bdbff8f90c;hp=b8f3c8c278c36c31cd232351147e40ee243e6816;hpb=55e57fc952297e772d08a4b71d2447341f54c82b;p=babeltrace.git diff --git a/src/plugins/ctf/lttng-live/lttng-live.c b/src/plugins/ctf/lttng-live/lttng-live.c index b8f3c8c2..8516ffb1 100644 --- a/src/plugins/ctf/lttng-live/lttng-live.c +++ b/src/plugins/ctf/lttng-live/lttng-live.c @@ -740,7 +740,7 @@ int live_get_msg_ts_ns(struct lttng_live_stream_iterator *stream_iter, msg); break; case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY: - clock_snapshot = bt_message_message_iterator_inactivity_borrow_default_clock_snapshot_const( + clock_snapshot = bt_message_message_iterator_inactivity_borrow_clock_snapshot_const( msg); break; default: @@ -835,8 +835,6 @@ enum lttng_live_iterator_status lttng_live_iterator_next_handle_one_active_data_ */ ret = LTTNG_LIVE_ITERATOR_STATUS_CONTINUE; break; - case CTF_MSG_ITER_STATUS_INVAL: - /* No argument provided by the user, so don't return INVAL. */ case CTF_MSG_ITER_STATUS_ERROR: default: ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR; @@ -875,6 +873,10 @@ enum lttng_live_iterator_status lttng_live_iterator_close_stream( "Error getting the next message from CTF message iterator"); live_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; goto end; + } else if (status == CTF_MSG_ITER_STATUS_EOF) { + BT_COMP_LOGI("Reached the end of the live stream iterator."); + live_status = LTTNG_LIVE_ITERATOR_STATUS_END; + goto end; } BT_ASSERT(status == CTF_MSG_ITER_STATUS_OK); @@ -999,6 +1001,77 @@ end: return live_status; } +static +bool is_discarded_packet_or_event_message(const bt_message *msg) +{ + const enum bt_message_type msg_type = bt_message_get_type(msg); + + return msg_type == BT_MESSAGE_TYPE_DISCARDED_EVENTS || + msg_type == BT_MESSAGE_TYPE_DISCARDED_PACKETS; +} + +static +enum lttng_live_iterator_status adjust_discarded_packets_message( + bt_self_message_iterator *iter, + const bt_stream *stream, + const bt_message *msg_in, bt_message **msg_out, + uint64_t new_begin_ts) +{ + enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK; + enum bt_property_availability availability; + const bt_clock_snapshot *clock_snapshot; + uint64_t end_ts; + uint64_t count; + + clock_snapshot = bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(msg_in); + end_ts = bt_clock_snapshot_get_value(clock_snapshot); + + availability = bt_message_discarded_packets_get_count(msg_in, &count); + BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE); + + *msg_out = bt_message_discarded_packets_create_with_default_clock_snapshots( + iter, stream, new_begin_ts, end_ts); + if (!*msg_out) { + status = LTTNG_LIVE_ITERATOR_STATUS_NOMEM; + goto end; + } + + bt_message_discarded_packets_set_count(*msg_out, count); +end: + return status; +} + +static +enum lttng_live_iterator_status adjust_discarded_events_message( + bt_self_message_iterator *iter, + const bt_stream *stream, + const bt_message *msg_in, bt_message **msg_out, + uint64_t new_begin_ts) +{ + enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK; + enum bt_property_availability availability; + const bt_clock_snapshot *clock_snapshot; + uint64_t end_ts; + uint64_t count; + + clock_snapshot = bt_message_discarded_events_borrow_end_default_clock_snapshot_const(msg_in); + end_ts = bt_clock_snapshot_get_value(clock_snapshot); + + availability = bt_message_discarded_events_get_count(msg_in, &count); + BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE); + + *msg_out = bt_message_discarded_events_create_with_default_clock_snapshots( + iter, stream, new_begin_ts, end_ts); + if (!*msg_out) { + status = LTTNG_LIVE_ITERATOR_STATUS_NOMEM; + goto end; + } + + bt_message_discarded_events_set_count(*msg_out, count); +end: + return status; +} + static enum lttng_live_iterator_status next_stream_iterator_for_trace( struct lttng_live_msg_iter *lttng_live_msg_iter, @@ -1063,7 +1136,7 @@ enum lttng_live_iterator_status next_stream_iterator_for_trace( /* * Check if the message of the current live stream - * iterator occured at the exact same time or after the + * iterator occurred at the exact same time or after the * last message returned by this component's message * iterator. If not, we return an error. */ @@ -1071,20 +1144,92 @@ enum lttng_live_iterator_status next_stream_iterator_for_trace( stream_iter->current_msg = msg; stream_iter->current_msg_ts_ns = curr_msg_ts_ns; } else { - /* - * We received a message in the past. To ensure - * monotonicity, we can't send it forward. - */ - BT_COMP_LOGE_APPEND_CAUSE(self_comp, - "Message's timestamp is less than " - "lttng-live's message iterator's last " - "returned timestamp: " - "lttng-live-msg-iter-addr=%p, ts=%" PRId64 ", " - "last-msg-ts=%" PRId64, - lttng_live_msg_iter, curr_msg_ts_ns, - lttng_live_msg_iter->last_msg_ts_ns); - stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; - goto end; + const bt_clock_class *clock_class; + const bt_stream_class *stream_class; + enum bt_clock_class_cycles_to_ns_from_origin_status ts_ns_status; + int64_t last_inactivity_ts_ns; + + stream_class = bt_stream_borrow_class_const(stream_iter->stream); + clock_class = bt_stream_class_borrow_default_clock_class_const(stream_class); + + ts_ns_status = bt_clock_class_cycles_to_ns_from_origin( + clock_class, + stream_iter->last_inactivity_ts, + &last_inactivity_ts_ns); + if (ts_ns_status != BT_CLOCK_CLASS_CYCLES_TO_NS_FROM_ORIGIN_STATUS_OK) { + stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; + goto end; + } + + if (last_inactivity_ts_ns > curr_msg_ts_ns && + is_discarded_packet_or_event_message(msg)) { + /* + * The CTF message iterator emits Discarded + * Packets and Events with synthesized begin and + * end timestamps from the bounds of the last + * known packet and the newly decoded packet + * header. + * + * The CTF message iterator is not aware of + * stream inactivity beacons. Hence, we have + * to adjust the begin timestamp of those types + * of messages if a stream signaled its + * inactivity up until _after_ the last known + * packet's begin timestamp. + * + * Otherwise, the monotonicity guarantee would + * not be preserved. + */ + const enum bt_message_type msg_type = + bt_message_get_type(msg); + enum lttng_live_iterator_status adjust_status = + LTTNG_LIVE_ITERATOR_STATUS_OK; + bt_message *adjusted_message; + + switch (msg_type) { + case BT_MESSAGE_TYPE_DISCARDED_EVENTS: + adjust_status = adjust_discarded_events_message( + lttng_live_msg_iter->self_msg_iter, + stream_iter->stream, + msg, &adjusted_message, + stream_iter->last_inactivity_ts); + break; + case BT_MESSAGE_TYPE_DISCARDED_PACKETS: + adjust_status = adjust_discarded_packets_message( + lttng_live_msg_iter->self_msg_iter, + stream_iter->stream, + msg, &adjusted_message, + stream_iter->last_inactivity_ts); + break; + default: + bt_common_abort(); + } + + if (adjust_status != LTTNG_LIVE_ITERATOR_STATUS_OK) { + stream_iter_status = adjust_status; + goto end; + } + + BT_ASSERT_DBG(adjusted_message); + stream_iter->current_msg = adjusted_message; + stream_iter->current_msg_ts_ns = + last_inactivity_ts_ns; + } else { + /* + * We received a message in the past. To ensure + * monotonicity, we can't send it forward. + */ + BT_COMP_LOGE_APPEND_CAUSE(self_comp, + "Message's timestamp is less than " + "lttng-live's message iterator's last " + "returned timestamp: " + "lttng-live-msg-iter-addr=%p, ts=%" PRId64 ", " + "last-msg-ts=%" PRId64, + lttng_live_msg_iter, curr_msg_ts_ns, + lttng_live_msg_iter->last_msg_ts_ns); + stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; + goto end; + } } } @@ -1281,12 +1426,12 @@ void put_messages(bt_message_array_const msgs, uint64_t count) } BT_HIDDEN -bt_component_class_message_iterator_next_method_status lttng_live_msg_iter_next( +bt_message_iterator_class_next_method_status lttng_live_msg_iter_next( bt_self_message_iterator *self_msg_it, bt_message_array_const msgs, uint64_t capacity, uint64_t *count) { - bt_component_class_message_iterator_next_method_status status; + bt_message_iterator_class_next_method_status status; enum lttng_live_viewer_status viewer_status; struct lttng_live_msg_iter *lttng_live_msg_iter = bt_self_message_iterator_get_data(self_msg_it); @@ -1310,7 +1455,7 @@ bt_component_class_message_iterator_next_method_status lttng_live_msg_iter_next( * is to prevent other graph users from using this live * iterator in an messed up internal state. */ - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR; + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR; BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Message iterator was interrupted during a previous call to the `next()` and currently does not support continuing after such event."); goto end; @@ -1330,7 +1475,7 @@ bt_component_class_message_iterator_next_method_status lttng_live_msg_iter_next( if (lttng_live_msg_iter->sessions->len == 0) { if (lttng_live->params.sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) { - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_END; + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END; goto end; } else { /* @@ -1341,11 +1486,11 @@ bt_component_class_message_iterator_next_method_status lttng_live_msg_iter_next( viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter); if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) { if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) { - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR; + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR; BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error creating LTTng live viewer session"); } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) { - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_AGAIN; + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_AGAIN; } else { bt_common_abort(); } @@ -1496,20 +1641,20 @@ return_status: * doesn't support restarting after an interruption. */ if (*count > 0) { - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK; + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK; } else { - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_AGAIN; + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_AGAIN; } break; case LTTNG_LIVE_ITERATOR_STATUS_END: - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_END; + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END; break; case LTTNG_LIVE_ITERATOR_STATUS_NOMEM: BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Memory error preparing the next batch of messages: " "live-iter-status=%s", lttng_live_iterator_status_string(stream_iter_status)); - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_MEMORY_ERROR; + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_MEMORY_ERROR; break; case LTTNG_LIVE_ITERATOR_STATUS_ERROR: case LTTNG_LIVE_ITERATOR_STATUS_INVAL: @@ -1519,7 +1664,7 @@ return_status: "live-iter-status=%s", lttng_live_iterator_status_string(stream_iter_status)); - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR; + status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR; /* Put all existing messages on error. */ put_messages(msgs, *count); break; @@ -1566,21 +1711,18 @@ end: } BT_HIDDEN -bt_component_class_message_iterator_initialize_method_status lttng_live_msg_iter_init( +bt_message_iterator_class_initialize_method_status lttng_live_msg_iter_init( bt_self_message_iterator *self_msg_it, bt_self_message_iterator_configuration *config, - bt_self_component_source *self_comp_src, bt_self_component_port_output *self_port) { - bt_component_class_message_iterator_initialize_method_status status; - bt_self_component *self_comp = - bt_self_component_source_as_self_component(self_comp_src); + bt_message_iterator_class_initialize_method_status status; struct lttng_live_component *lttng_live; struct lttng_live_msg_iter *lttng_live_msg_iter; enum lttng_live_viewer_status viewer_status; bt_logging_level log_level; - - BT_ASSERT(self_msg_it); + bt_self_component *self_comp = + bt_self_message_iterator_borrow_component(self_msg_it); lttng_live = bt_self_component_get_data(self_comp); log_level = lttng_live->log_level; @@ -1594,7 +1736,7 @@ bt_component_class_message_iterator_initialize_method_status lttng_live_msg_iter lttng_live_msg_iter = lttng_live_msg_iter_create(lttng_live, self_msg_it); if (!lttng_live_msg_iter) { - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_MEMORY_ERROR; + status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR; BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to create lttng_live_msg_iter"); goto error; @@ -1664,11 +1806,11 @@ bt_component_class_message_iterator_initialize_method_status lttng_live_msg_iter } bt_self_message_iterator_set_data(self_msg_it, lttng_live_msg_iter); - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_OK; + status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_OK; goto end; error: - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_ERROR; + status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR; lttng_live_msg_iter_destroy(lttng_live_msg_iter); end: return status;