X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=plugins%2Futils%2Fmuxer%2Fmuxer.c;h=47d790b3d9a14e59387e7d611c984077680d608e;hb=6c20f4a0c496cfac67ceabe876edc796929ffb64;hp=63a0f601210ef7abdd7ed26031fdaf9610f1a7a9;hpb=6bf2abbd799ec2be5cb591176f07e7f7019a079b;p=babeltrace.git diff --git a/plugins/utils/muxer/muxer.c b/plugins/utils/muxer/muxer.c index 63a0f601..47d790b3 100644 --- a/plugins/utils/muxer/muxer.c +++ b/plugins/utils/muxer/muxer.c @@ -63,6 +63,7 @@ struct muxer_upstream_msg_iter { enum muxer_msg_iter_clock_class_expectation { MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ANY = 0, + MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NONE, MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ABSOLUTE, MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_SPEC_UUID, MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_NO_UUID, @@ -146,7 +147,7 @@ struct muxer_upstream_msg_iter *muxer_msg_iter_add_upstream_msg_iter( if (!muxer_upstream_msg_iter) { BT_LOGE_STR("Failed to allocate one muxer's upstream message iterator wrapper."); - goto end; + goto error; } muxer_upstream_msg_iter->msg_iter = self_msg_iter; @@ -154,7 +155,7 @@ struct muxer_upstream_msg_iter *muxer_msg_iter_add_upstream_msg_iter( muxer_upstream_msg_iter->msgs = g_queue_new(); if (!muxer_upstream_msg_iter->msgs) { BT_LOGE_STR("Failed to allocate a GQueue."); - goto end; + goto error; } g_ptr_array_add(muxer_msg_iter->active_muxer_upstream_msg_iters, @@ -164,6 +165,12 @@ struct muxer_upstream_msg_iter *muxer_msg_iter_add_upstream_msg_iter( muxer_upstream_msg_iter, muxer_msg_iter, self_msg_iter); + goto end; + +error: + g_free(muxer_upstream_msg_iter); + muxer_upstream_msg_iter = NULL; + end: return muxer_upstream_msg_iter; } @@ -496,81 +503,58 @@ int get_msg_ts_ns(struct muxer_comp *muxer_comp, const bt_message *msg, int64_t last_returned_ts_ns, int64_t *ts_ns) { - const bt_clock_class *clock_class = NULL; const bt_clock_snapshot *clock_snapshot = NULL; int ret = 0; - const unsigned char *cc_uuid; - const char *cc_name; bt_clock_snapshot_state cs_state = BT_CLOCK_SNAPSHOT_STATE_KNOWN; bt_message_stream_activity_clock_snapshot_state sa_cs_state; BT_ASSERT(msg); BT_ASSERT(ts_ns); - BT_LOGV("Getting message's timestamp: " "muxer-msg-iter-addr=%p, msg-addr=%p, " "last-returned-ts=%" PRId64, muxer_msg_iter, msg, last_returned_ts_ns); + if (unlikely(muxer_msg_iter->clock_class_expectation == + MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NONE)) { + *ts_ns = last_returned_ts_ns; + goto end; + } + switch (bt_message_get_type(msg)) { case BT_MESSAGE_TYPE_EVENT: - clock_class = - bt_message_event_borrow_stream_class_default_clock_class_const( - msg); - if (!clock_class) { - goto no_clock_snapshot; - } - + BT_ASSERT(bt_message_event_borrow_stream_class_default_clock_class_const( + msg)); cs_state = bt_message_event_borrow_default_clock_snapshot_const( msg, &clock_snapshot); break; case BT_MESSAGE_TYPE_PACKET_BEGINNING: - bt_message_packet_beginning_borrow_stream_class_default_clock_class_const( - msg); - if (!clock_class) { - goto no_clock_snapshot; - } - + BT_ASSERT(bt_message_packet_beginning_borrow_stream_class_default_clock_class_const( + msg)); cs_state = bt_message_packet_beginning_borrow_default_clock_snapshot_const( msg, &clock_snapshot); break; case BT_MESSAGE_TYPE_PACKET_END: - bt_message_packet_end_borrow_stream_class_default_clock_class_const( - msg); - if (!clock_class) { - goto no_clock_snapshot; - } - + BT_ASSERT(bt_message_packet_end_borrow_stream_class_default_clock_class_const( + msg)); cs_state = bt_message_packet_end_borrow_default_clock_snapshot_const( msg, &clock_snapshot); break; case BT_MESSAGE_TYPE_DISCARDED_EVENTS: - bt_message_discarded_events_borrow_stream_class_default_clock_class_const( - msg); - if (!clock_class) { - goto no_clock_snapshot; - } - + BT_ASSERT(bt_message_discarded_events_borrow_stream_class_default_clock_class_const( + msg)); cs_state = bt_message_discarded_events_borrow_default_beginning_clock_snapshot_const( msg, &clock_snapshot); break; case BT_MESSAGE_TYPE_DISCARDED_PACKETS: - bt_message_discarded_packets_borrow_stream_class_default_clock_class_const( - msg); - if (!clock_class) { - goto no_clock_snapshot; - } - + BT_ASSERT(bt_message_discarded_packets_borrow_stream_class_default_clock_class_const( + msg)); cs_state = bt_message_discarded_packets_borrow_default_beginning_clock_snapshot_const( msg, &clock_snapshot); break; case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING: - bt_message_stream_activity_beginning_borrow_stream_class_default_clock_class_const( - msg); - if (!clock_class) { - goto no_clock_snapshot; - } - + BT_ASSERT(bt_message_stream_activity_beginning_borrow_stream_class_default_clock_class_const( + msg)); sa_cs_state = bt_message_stream_activity_beginning_borrow_default_clock_snapshot_const( msg, &clock_snapshot); if (sa_cs_state != BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_KNOWN) { @@ -579,12 +563,8 @@ int get_msg_ts_ns(struct muxer_comp *muxer_comp, break; case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END: - bt_message_stream_activity_end_borrow_stream_class_default_clock_class_const( - msg); - if (!clock_class) { - goto no_clock_snapshot; - } - + BT_ASSERT(bt_message_stream_activity_end_borrow_stream_class_default_clock_class_const( + msg)); sa_cs_state = bt_message_stream_activity_end_borrow_default_clock_snapshot_const( msg, &clock_snapshot); if (sa_cs_state != BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_KNOWN) { @@ -604,13 +584,46 @@ int get_msg_ts_ns(struct muxer_comp *muxer_comp, goto end; } - if (cs_state != BT_CLOCK_SNAPSHOT_STATE_KNOWN) { - BT_LOGE_STR("Unsupported unknown clock snapshot."); - ret = -1; - goto end; + BT_ASSERT(cs_state == BT_CLOCK_SNAPSHOT_STATE_KNOWN); + ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, ts_ns); + if (ret) { + BT_LOGE("Cannot get nanoseconds from Epoch of clock snapshot: " + "clock-snapshot-addr=%p", clock_snapshot); + goto error; } - clock_class = bt_clock_snapshot_borrow_clock_class_const(clock_snapshot); + goto end; + +no_clock_snapshot: + BT_LOGV_STR("Message's default clock snapshot is missing: " + "using the last returned timestamp."); + *ts_ns = last_returned_ts_ns; + goto end; + +error: + ret = -1; + +end: + if (ret == 0) { + BT_LOGV("Found message's timestamp: " + "muxer-msg-iter-addr=%p, msg-addr=%p, " + "last-returned-ts=%" PRId64 ", ts=%" PRId64, + muxer_msg_iter, msg, last_returned_ts_ns, + *ts_ns); + } + + return ret; +} + +static inline +int validate_clock_class(struct muxer_msg_iter *muxer_msg_iter, + struct muxer_comp *muxer_comp, + const bt_clock_class *clock_class) +{ + int ret = 0; + const unsigned char *cc_uuid; + const char *cc_name; + BT_ASSERT(clock_class); cc_uuid = bt_clock_class_get_uuid(clock_class); cc_name = bt_clock_class_get_name(clock_class); @@ -754,6 +767,11 @@ int get_msg_ts_ns(struct muxer_comp *muxer_comp, goto error; } break; + case MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NONE: + BT_LOGE("Expecting no clock class, but got one: " + "clock-class-addr=%p, clock-class-name=\"%s\"", + clock_class, cc_name); + goto error; default: /* Unexpected */ BT_LOGF("Unexpected clock class expectation: " @@ -763,33 +781,56 @@ int get_msg_ts_ns(struct muxer_comp *muxer_comp, } } - ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, ts_ns); - if (ret) { - BT_LOGE("Cannot get nanoseconds from Epoch of clock snapshot: " - "clock-snapshot-addr=%p", clock_snapshot); - goto error; - } - - goto end; - -no_clock_snapshot: - BT_LOGV_STR("Message's default clock snapshot is missing: " - "using the last returned timestamp."); - *ts_ns = last_returned_ts_ns; goto end; error: ret = -1; end: - if (ret == 0) { - BT_LOGV("Found message's timestamp: " - "muxer-msg-iter-addr=%p, msg-addr=%p, " - "last-returned-ts=%" PRId64 ", ts=%" PRId64, - muxer_msg_iter, msg, last_returned_ts_ns, - *ts_ns); + return ret; +} + +static inline +int validate_new_stream_clock_class(struct muxer_msg_iter *muxer_msg_iter, + struct muxer_comp *muxer_comp, const bt_stream *stream) +{ + int ret = 0; + const bt_stream_class *stream_class = + bt_stream_borrow_class_const(stream); + const bt_clock_class *clock_class = + bt_stream_class_borrow_default_clock_class_const(stream_class); + + if (!clock_class) { + if (muxer_msg_iter->clock_class_expectation == + MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ANY) { + /* Expect no clock class */ + muxer_msg_iter->clock_class_expectation = + MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NONE; + } else { + BT_LOGE("Expecting stream class with a default clock class: " + "stream-class-addr=%p, stream-class-name=\"%s\", " + "stream-class-id=%" PRIu64, + stream_class, bt_stream_class_get_name(stream_class), + bt_stream_class_get_id(stream_class)); + ret = -1; + } + + goto end; + } + + if (!bt_stream_class_default_clock_is_always_known(stream_class)) { + BT_LOGE("Stream's default clock is not always known: " + "stream-class-addr=%p, stream-class-name=\"%s\", " + "stream-class-id=%" PRIu64, + stream_class, bt_stream_class_get_name(stream_class), + bt_stream_class_get_id(stream_class)); + ret = -1; + goto end; } + ret = validate_clock_class(muxer_msg_iter, muxer_comp, clock_class); + +end: return ret; } @@ -848,6 +889,47 @@ muxer_msg_iter_youngest_upstream_msg_iter( BT_ASSERT(cur_muxer_upstream_msg_iter->msgs->length > 0); msg = g_queue_peek_head(cur_muxer_upstream_msg_iter->msgs); BT_ASSERT(msg); + + if (unlikely(bt_message_get_type(msg) == + BT_MESSAGE_TYPE_STREAM_BEGINNING)) { + ret = validate_new_stream_clock_class( + muxer_msg_iter, muxer_comp, + bt_message_stream_beginning_borrow_stream_const( + msg)); + if (ret) { + /* + * validate_new_stream_clock_class() logs + * errors. + */ + status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR; + goto end; + } + } else if (unlikely(bt_message_get_type(msg) == + BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY)) { + const bt_clock_snapshot *cs; + bt_clock_snapshot_state cs_state; + + cs_state = bt_message_message_iterator_inactivity_borrow_default_clock_snapshot_const( + msg, &cs); + + if (cs_state != BT_CLOCK_SNAPSHOT_STATE_KNOWN) { + BT_LOGE("Message iterator inactivity message's " + "default clock snapshot is unknown: " + "msg-addr=%p", + msg); + status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR; + goto end; + } + + ret = validate_clock_class(muxer_msg_iter, muxer_comp, + bt_clock_snapshot_borrow_clock_class_const(cs)); + if (ret) { + /* validate_clock_class() logs errors */ + status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR; + goto end; + } + } + ret = get_msg_ts_ns(muxer_comp, muxer_msg_iter, msg, muxer_msg_iter->last_returned_ts_ns, &msg_ts_ns); if (ret) { @@ -1386,7 +1468,7 @@ bt_self_message_iterator_status muxer_msg_iter_seek_beginning( { struct muxer_msg_iter *muxer_msg_iter = bt_self_message_iterator_get_data(self_msg_iter); - int status; + bt_message_iterator_status status = BT_MESSAGE_ITERATOR_STATUS_OK; uint64_t i; /* Seek all ended upstream iterators first */ @@ -1437,5 +1519,5 @@ bt_self_message_iterator_status muxer_msg_iter_seek_beginning( MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ANY; end: - return status; + return (bt_self_message_iterator_status) status; }