X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fplugins%2Fctf%2Flttng-live%2Flttng-live.c;h=8516ffb1b2bbe071568e5d2e9f888fd3cfe6cf5e;hb=6b1463e62b1281c335123beb0b0894bdbff8f90c;hp=66e20b73e17d2bd67a76696e0a148854fc507ace;hpb=fe5ee4280c235005d2fa7eab2af14a352740d4eb;p=babeltrace.git diff --git a/src/plugins/ctf/lttng-live/lttng-live.c b/src/plugins/ctf/lttng-live/lttng-live.c index 66e20b73..8516ffb1 100644 --- a/src/plugins/ctf/lttng-live/lttng-live.c +++ b/src/plugins/ctf/lttng-live/lttng-live.c @@ -1001,6 +1001,77 @@ end: return live_status; } +static +bool is_discarded_packet_or_event_message(const bt_message *msg) +{ + const enum bt_message_type msg_type = bt_message_get_type(msg); + + return msg_type == BT_MESSAGE_TYPE_DISCARDED_EVENTS || + msg_type == BT_MESSAGE_TYPE_DISCARDED_PACKETS; +} + +static +enum lttng_live_iterator_status adjust_discarded_packets_message( + bt_self_message_iterator *iter, + const bt_stream *stream, + const bt_message *msg_in, bt_message **msg_out, + uint64_t new_begin_ts) +{ + enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK; + enum bt_property_availability availability; + const bt_clock_snapshot *clock_snapshot; + uint64_t end_ts; + uint64_t count; + + clock_snapshot = bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(msg_in); + end_ts = bt_clock_snapshot_get_value(clock_snapshot); + + availability = bt_message_discarded_packets_get_count(msg_in, &count); + BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE); + + *msg_out = bt_message_discarded_packets_create_with_default_clock_snapshots( + iter, stream, new_begin_ts, end_ts); + if (!*msg_out) { + status = LTTNG_LIVE_ITERATOR_STATUS_NOMEM; + goto end; + } + + bt_message_discarded_packets_set_count(*msg_out, count); +end: + return status; +} + +static +enum lttng_live_iterator_status adjust_discarded_events_message( + bt_self_message_iterator *iter, + const bt_stream *stream, + const bt_message *msg_in, bt_message **msg_out, + uint64_t new_begin_ts) +{ + enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK; + enum bt_property_availability availability; + const bt_clock_snapshot *clock_snapshot; + uint64_t end_ts; + uint64_t count; + + clock_snapshot = bt_message_discarded_events_borrow_end_default_clock_snapshot_const(msg_in); + end_ts = bt_clock_snapshot_get_value(clock_snapshot); + + availability = bt_message_discarded_events_get_count(msg_in, &count); + BT_ASSERT_DBG(availability == BT_PROPERTY_AVAILABILITY_AVAILABLE); + + *msg_out = bt_message_discarded_events_create_with_default_clock_snapshots( + iter, stream, new_begin_ts, end_ts); + if (!*msg_out) { + status = LTTNG_LIVE_ITERATOR_STATUS_NOMEM; + goto end; + } + + bt_message_discarded_events_set_count(*msg_out, count); +end: + return status; +} + static enum lttng_live_iterator_status next_stream_iterator_for_trace( struct lttng_live_msg_iter *lttng_live_msg_iter, @@ -1073,20 +1144,92 @@ enum lttng_live_iterator_status next_stream_iterator_for_trace( stream_iter->current_msg = msg; stream_iter->current_msg_ts_ns = curr_msg_ts_ns; } else { - /* - * We received a message in the past. To ensure - * monotonicity, we can't send it forward. - */ - BT_COMP_LOGE_APPEND_CAUSE(self_comp, - "Message's timestamp is less than " - "lttng-live's message iterator's last " - "returned timestamp: " - "lttng-live-msg-iter-addr=%p, ts=%" PRId64 ", " - "last-msg-ts=%" PRId64, - lttng_live_msg_iter, curr_msg_ts_ns, - lttng_live_msg_iter->last_msg_ts_ns); - stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; - goto end; + const bt_clock_class *clock_class; + const bt_stream_class *stream_class; + enum bt_clock_class_cycles_to_ns_from_origin_status ts_ns_status; + int64_t last_inactivity_ts_ns; + + stream_class = bt_stream_borrow_class_const(stream_iter->stream); + clock_class = bt_stream_class_borrow_default_clock_class_const(stream_class); + + ts_ns_status = bt_clock_class_cycles_to_ns_from_origin( + clock_class, + stream_iter->last_inactivity_ts, + &last_inactivity_ts_ns); + if (ts_ns_status != BT_CLOCK_CLASS_CYCLES_TO_NS_FROM_ORIGIN_STATUS_OK) { + stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; + goto end; + } + + if (last_inactivity_ts_ns > curr_msg_ts_ns && + is_discarded_packet_or_event_message(msg)) { + /* + * The CTF message iterator emits Discarded + * Packets and Events with synthesized begin and + * end timestamps from the bounds of the last + * known packet and the newly decoded packet + * header. + * + * The CTF message iterator is not aware of + * stream inactivity beacons. Hence, we have + * to adjust the begin timestamp of those types + * of messages if a stream signaled its + * inactivity up until _after_ the last known + * packet's begin timestamp. + * + * Otherwise, the monotonicity guarantee would + * not be preserved. + */ + const enum bt_message_type msg_type = + bt_message_get_type(msg); + enum lttng_live_iterator_status adjust_status = + LTTNG_LIVE_ITERATOR_STATUS_OK; + bt_message *adjusted_message; + + switch (msg_type) { + case BT_MESSAGE_TYPE_DISCARDED_EVENTS: + adjust_status = adjust_discarded_events_message( + lttng_live_msg_iter->self_msg_iter, + stream_iter->stream, + msg, &adjusted_message, + stream_iter->last_inactivity_ts); + break; + case BT_MESSAGE_TYPE_DISCARDED_PACKETS: + adjust_status = adjust_discarded_packets_message( + lttng_live_msg_iter->self_msg_iter, + stream_iter->stream, + msg, &adjusted_message, + stream_iter->last_inactivity_ts); + break; + default: + bt_common_abort(); + } + + if (adjust_status != LTTNG_LIVE_ITERATOR_STATUS_OK) { + stream_iter_status = adjust_status; + goto end; + } + + BT_ASSERT_DBG(adjusted_message); + stream_iter->current_msg = adjusted_message; + stream_iter->current_msg_ts_ns = + last_inactivity_ts_ns; + } else { + /* + * We received a message in the past. To ensure + * monotonicity, we can't send it forward. + */ + BT_COMP_LOGE_APPEND_CAUSE(self_comp, + "Message's timestamp is less than " + "lttng-live's message iterator's last " + "returned timestamp: " + "lttng-live-msg-iter-addr=%p, ts=%" PRId64 ", " + "last-msg-ts=%" PRId64, + lttng_live_msg_iter, curr_msg_ts_ns, + lttng_live_msg_iter->last_msg_ts_ns); + stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; + goto end; + } } }