lib: rename include dir to babeltrace2
[babeltrace.git] / plugins / utils / muxer / muxer.c
index 63a0f601210ef7abdd7ed26031fdaf9610f1a7a9..626c437bd2faf8e9e26419a27a4c2c1c47d82622 100644 (file)
 #define BT_LOG_TAG "PLUGIN-UTILS-MUXER-FLT"
 #include "logging.h"
 
-#include <babeltrace/babeltrace-internal.h>
-#include <babeltrace/compat/uuid-internal.h>
-#include <babeltrace/babeltrace.h>
-#include <babeltrace/value-internal.h>
-#include <babeltrace/graph/component-internal.h>
-#include <babeltrace/graph/message-iterator-internal.h>
-#include <babeltrace/graph/connection-internal.h>
+#include <babeltrace2/babeltrace-internal.h>
+#include <babeltrace2/compat/uuid-internal.h>
+#include <babeltrace2/babeltrace.h>
+#include <babeltrace2/value-internal.h>
+#include <babeltrace2/graph/component-internal.h>
+#include <babeltrace2/graph/message-iterator-internal.h>
+#include <babeltrace2/graph/connection-internal.h>
 #include <plugins-common.h>
 #include <glib.h>
 #include <stdbool.h>
 #include <inttypes.h>
-#include <babeltrace/assert-internal.h>
-#include <babeltrace/common-internal.h>
+#include <babeltrace2/assert-internal.h>
+#include <babeltrace2/common-internal.h>
 #include <stdlib.h>
 #include <string.h>
 
@@ -63,6 +63,7 @@ struct muxer_upstream_msg_iter {
 
 enum muxer_msg_iter_clock_class_expectation {
        MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ANY = 0,
+       MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NONE,
        MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ABSOLUTE,
        MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_SPEC_UUID,
        MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_NO_UUID,
@@ -137,16 +138,16 @@ void destroy_muxer_upstream_msg_iter(
 }
 
 static
-struct muxer_upstream_msg_iter *muxer_msg_iter_add_upstream_msg_iter(
-               struct muxer_msg_iter *muxer_msg_iter,
+int muxer_msg_iter_add_upstream_msg_iter(struct muxer_msg_iter *muxer_msg_iter,
                bt_self_component_port_input_message_iterator *self_msg_iter)
 {
+       int ret = 0;
        struct muxer_upstream_msg_iter *muxer_upstream_msg_iter =
                g_new0(struct muxer_upstream_msg_iter, 1);
 
        if (!muxer_upstream_msg_iter) {
                BT_LOGE_STR("Failed to allocate one muxer's upstream message iterator wrapper.");
-               goto end;
+               goto error;
        }
 
        muxer_upstream_msg_iter->msg_iter = self_msg_iter;
@@ -154,7 +155,7 @@ struct muxer_upstream_msg_iter *muxer_msg_iter_add_upstream_msg_iter(
        muxer_upstream_msg_iter->msgs = g_queue_new();
        if (!muxer_upstream_msg_iter->msgs) {
                BT_LOGE_STR("Failed to allocate a GQueue.");
-               goto end;
+               goto error;
        }
 
        g_ptr_array_add(muxer_msg_iter->active_muxer_upstream_msg_iters,
@@ -164,8 +165,14 @@ struct muxer_upstream_msg_iter *muxer_msg_iter_add_upstream_msg_iter(
                muxer_upstream_msg_iter, muxer_msg_iter,
                self_msg_iter);
 
+       goto end;
+
+error:
+       g_free(muxer_upstream_msg_iter);
+       ret = -1;
+
 end:
-       return muxer_upstream_msg_iter;
+       return ret;
 }
 
 static
@@ -496,81 +503,95 @@ int get_msg_ts_ns(struct muxer_comp *muxer_comp,
                const bt_message *msg, int64_t last_returned_ts_ns,
                int64_t *ts_ns)
 {
-       const bt_clock_class *clock_class = NULL;
        const bt_clock_snapshot *clock_snapshot = NULL;
        int ret = 0;
-       const unsigned char *cc_uuid;
-       const char *cc_name;
-       bt_clock_snapshot_state cs_state = BT_CLOCK_SNAPSHOT_STATE_KNOWN;
        bt_message_stream_activity_clock_snapshot_state sa_cs_state;
+       const bt_stream_class *stream_class = NULL;
+       bt_message_type msg_type;
 
        BT_ASSERT(msg);
        BT_ASSERT(ts_ns);
-
        BT_LOGV("Getting message's timestamp: "
                "muxer-msg-iter-addr=%p, msg-addr=%p, "
                "last-returned-ts=%" PRId64,
                muxer_msg_iter, msg, last_returned_ts_ns);
 
-       switch (bt_message_get_type(msg)) {
-       case BT_MESSAGE_TYPE_EVENT:
-               clock_class =
-                       bt_message_event_borrow_stream_class_default_clock_class_const(
-                               msg);
-               if (!clock_class) {
-                       goto no_clock_snapshot;
-               }
+       if (unlikely(muxer_msg_iter->clock_class_expectation ==
+                       MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NONE)) {
+               *ts_ns = last_returned_ts_ns;
+               goto end;
+       }
 
-               cs_state = bt_message_event_borrow_default_clock_snapshot_const(
-                       msg, &clock_snapshot);
+       msg_type = bt_message_get_type(msg);
+
+       if (unlikely(msg_type == BT_MESSAGE_TYPE_PACKET_BEGINNING)) {
+               stream_class = bt_stream_borrow_class_const(
+                       bt_packet_borrow_stream_const(
+                               bt_message_packet_beginning_borrow_packet_const(
+                                       msg)));
+       } else if (unlikely(msg_type == BT_MESSAGE_TYPE_PACKET_END)) {
+               stream_class = bt_stream_borrow_class_const(
+                       bt_packet_borrow_stream_const(
+                               bt_message_packet_end_borrow_packet_const(
+                                       msg)));
+       } else if (unlikely(msg_type == BT_MESSAGE_TYPE_DISCARDED_EVENTS)) {
+               stream_class = bt_stream_borrow_class_const(
+                       bt_message_discarded_events_borrow_stream_const(msg));
+       } else if (unlikely(msg_type == BT_MESSAGE_TYPE_DISCARDED_PACKETS)) {
+               stream_class = bt_stream_borrow_class_const(
+                       bt_message_discarded_packets_borrow_stream_const(msg));
+       }
+
+       switch (msg_type) {
+       case BT_MESSAGE_TYPE_EVENT:
+               BT_ASSERT(bt_message_event_borrow_stream_class_default_clock_class_const(
+                               msg));
+               clock_snapshot = bt_message_event_borrow_default_clock_snapshot_const(
+                       msg);
                break;
        case BT_MESSAGE_TYPE_PACKET_BEGINNING:
-               bt_message_packet_beginning_borrow_stream_class_default_clock_class_const(
-                       msg);
-               if (!clock_class) {
+               if (bt_stream_class_packets_have_beginning_default_clock_snapshot(
+                               stream_class)) {
+                       clock_snapshot = bt_message_packet_beginning_borrow_default_clock_snapshot_const(
+                               msg);
+               } else {
                        goto no_clock_snapshot;
                }
 
-               cs_state = bt_message_packet_beginning_borrow_default_clock_snapshot_const(
-                       msg, &clock_snapshot);
                break;
        case BT_MESSAGE_TYPE_PACKET_END:
-               bt_message_packet_end_borrow_stream_class_default_clock_class_const(
-                       msg);
-               if (!clock_class) {
+               if (bt_stream_class_packets_have_end_default_clock_snapshot(
+                               stream_class)) {
+                       clock_snapshot = bt_message_packet_end_borrow_default_clock_snapshot_const(
+                               msg);
+               } else {
                        goto no_clock_snapshot;
                }
 
-               cs_state = bt_message_packet_end_borrow_default_clock_snapshot_const(
-                       msg, &clock_snapshot);
                break;
        case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
-               bt_message_discarded_events_borrow_stream_class_default_clock_class_const(
-                       msg);
-               if (!clock_class) {
+               if (bt_stream_class_discarded_events_have_default_clock_snapshots(
+                               stream_class)) {
+                       clock_snapshot = bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(
+                               msg);
+               } else {
                        goto no_clock_snapshot;
                }
 
-               cs_state = bt_message_discarded_events_borrow_default_beginning_clock_snapshot_const(
-                       msg, &clock_snapshot);
                break;
        case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
-               bt_message_discarded_packets_borrow_stream_class_default_clock_class_const(
-                       msg);
-               if (!clock_class) {
+               if (bt_stream_class_discarded_packets_have_default_clock_snapshots(
+                               stream_class)) {
+                       clock_snapshot = bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(
+                               msg);
+               } else {
                        goto no_clock_snapshot;
                }
 
-               cs_state = bt_message_discarded_packets_borrow_default_beginning_clock_snapshot_const(
-                       msg, &clock_snapshot);
                break;
        case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING:
-               bt_message_stream_activity_beginning_borrow_stream_class_default_clock_class_const(
-                       msg);
-               if (!clock_class) {
-                       goto no_clock_snapshot;
-               }
-
+               BT_ASSERT(bt_message_stream_activity_beginning_borrow_stream_class_default_clock_class_const(
+                               msg));
                sa_cs_state = bt_message_stream_activity_beginning_borrow_default_clock_snapshot_const(
                        msg, &clock_snapshot);
                if (sa_cs_state != BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_KNOWN) {
@@ -579,12 +600,8 @@ int get_msg_ts_ns(struct muxer_comp *muxer_comp,
 
                break;
        case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END:
-               bt_message_stream_activity_end_borrow_stream_class_default_clock_class_const(
-                       msg);
-               if (!clock_class) {
-                       goto no_clock_snapshot;
-               }
-
+               BT_ASSERT(bt_message_stream_activity_end_borrow_stream_class_default_clock_class_const(
+                               msg));
                sa_cs_state = bt_message_stream_activity_end_borrow_default_clock_snapshot_const(
                        msg, &clock_snapshot);
                if (sa_cs_state != BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_KNOWN) {
@@ -593,9 +610,8 @@ int get_msg_ts_ns(struct muxer_comp *muxer_comp,
 
                break;
        case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
-               cs_state =
-                       bt_message_message_iterator_inactivity_borrow_default_clock_snapshot_const(
-                               msg, &clock_snapshot);
+               clock_snapshot = bt_message_message_iterator_inactivity_borrow_default_clock_snapshot_const(
+                       msg);
                break;
        default:
                /* All the other messages have a higher priority */
@@ -604,13 +620,45 @@ int get_msg_ts_ns(struct muxer_comp *muxer_comp,
                goto end;
        }
 
-       if (cs_state != BT_CLOCK_SNAPSHOT_STATE_KNOWN) {
-               BT_LOGE_STR("Unsupported unknown clock snapshot.");
-               ret = -1;
-               goto end;
+       ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, ts_ns);
+       if (ret) {
+               BT_LOGE("Cannot get nanoseconds from Epoch of clock snapshot: "
+                       "clock-snapshot-addr=%p", clock_snapshot);
+               goto error;
        }
 
-       clock_class = bt_clock_snapshot_borrow_clock_class_const(clock_snapshot);
+       goto end;
+
+no_clock_snapshot:
+       BT_LOGV_STR("Message's default clock snapshot is missing: "
+               "using the last returned timestamp.");
+       *ts_ns = last_returned_ts_ns;
+       goto end;
+
+error:
+       ret = -1;
+
+end:
+       if (ret == 0) {
+               BT_LOGV("Found message's timestamp: "
+                       "muxer-msg-iter-addr=%p, msg-addr=%p, "
+                       "last-returned-ts=%" PRId64 ", ts=%" PRId64,
+                       muxer_msg_iter, msg, last_returned_ts_ns,
+                       *ts_ns);
+       }
+
+       return ret;
+}
+
+static inline
+int validate_clock_class(struct muxer_msg_iter *muxer_msg_iter,
+               struct muxer_comp *muxer_comp,
+               const bt_clock_class *clock_class)
+{
+       int ret = 0;
+       const unsigned char *cc_uuid;
+       const char *cc_name;
+
        BT_ASSERT(clock_class);
        cc_uuid = bt_clock_class_get_uuid(clock_class);
        cc_name = bt_clock_class_get_name(clock_class);
@@ -754,6 +802,11 @@ int get_msg_ts_ns(struct muxer_comp *muxer_comp,
                                goto error;
                        }
                        break;
+               case MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NONE:
+                       BT_LOGE("Expecting no clock class, but got one: "
+                               "clock-class-addr=%p, clock-class-name=\"%s\"",
+                               clock_class, cc_name);
+                       goto error;
                default:
                        /* Unexpected */
                        BT_LOGF("Unexpected clock class expectation: "
@@ -763,33 +816,46 @@ int get_msg_ts_ns(struct muxer_comp *muxer_comp,
                }
        }
 
-       ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, ts_ns);
-       if (ret) {
-               BT_LOGE("Cannot get nanoseconds from Epoch of clock snapshot: "
-                       "clock-snapshot-addr=%p", clock_snapshot);
-               goto error;
-       }
-
-       goto end;
-
-no_clock_snapshot:
-       BT_LOGV_STR("Message's default clock snapshot is missing: "
-               "using the last returned timestamp.");
-       *ts_ns = last_returned_ts_ns;
        goto end;
 
 error:
        ret = -1;
 
 end:
-       if (ret == 0) {
-               BT_LOGV("Found message's timestamp: "
-                       "muxer-msg-iter-addr=%p, msg-addr=%p, "
-                       "last-returned-ts=%" PRId64 ", ts=%" PRId64,
-                       muxer_msg_iter, msg, last_returned_ts_ns,
-                       *ts_ns);
+       return ret;
+}
+
+static inline
+int validate_new_stream_clock_class(struct muxer_msg_iter *muxer_msg_iter,
+               struct muxer_comp *muxer_comp, const bt_stream *stream)
+{
+       int ret = 0;
+       const bt_stream_class *stream_class =
+               bt_stream_borrow_class_const(stream);
+       const bt_clock_class *clock_class =
+               bt_stream_class_borrow_default_clock_class_const(stream_class);
+
+       if (!clock_class) {
+               if (muxer_msg_iter->clock_class_expectation ==
+                       MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ANY) {
+                       /* Expect no clock class */
+                       muxer_msg_iter->clock_class_expectation =
+                               MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NONE;
+               } else {
+                       BT_LOGE("Expecting stream class with a default clock class: "
+                               "stream-class-addr=%p, stream-class-name=\"%s\", "
+                               "stream-class-id=%" PRIu64,
+                               stream_class, bt_stream_class_get_name(stream_class),
+                               bt_stream_class_get_id(stream_class));
+                       ret = -1;
+               }
+
+               goto end;
        }
 
+       ret = validate_clock_class(muxer_msg_iter, muxer_comp, clock_class);
+
+end:
        return ret;
 }
 
@@ -848,6 +914,36 @@ muxer_msg_iter_youngest_upstream_msg_iter(
                BT_ASSERT(cur_muxer_upstream_msg_iter->msgs->length > 0);
                msg = g_queue_peek_head(cur_muxer_upstream_msg_iter->msgs);
                BT_ASSERT(msg);
+
+               if (unlikely(bt_message_get_type(msg) ==
+                               BT_MESSAGE_TYPE_STREAM_BEGINNING)) {
+                       ret = validate_new_stream_clock_class(
+                               muxer_msg_iter, muxer_comp,
+                               bt_message_stream_beginning_borrow_stream_const(
+                                       msg));
+                       if (ret) {
+                               /*
+                                * validate_new_stream_clock_class() logs
+                                * errors.
+                                */
+                               status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
+                               goto end;
+                       }
+               } else if (unlikely(bt_message_get_type(msg) ==
+                               BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY)) {
+                       const bt_clock_snapshot *cs;
+
+                       cs = bt_message_message_iterator_inactivity_borrow_default_clock_snapshot_const(
+                               msg);
+                       ret = validate_clock_class(muxer_msg_iter, muxer_comp,
+                               bt_clock_snapshot_borrow_clock_class_const(cs));
+                       if (ret) {
+                               /* validate_clock_class() logs errors */
+                               status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
+                               goto end;
+                       }
+               }
+
                ret = get_msg_ts_ns(muxer_comp, muxer_msg_iter, msg,
                        muxer_msg_iter->last_returned_ts_ns, &msg_ts_ns);
                if (ret) {
@@ -1124,7 +1220,6 @@ int muxer_msg_iter_init_upstream_iterators(struct muxer_comp *muxer_comp,
 
        for (i = 0; i < count; i++) {
                bt_self_component_port_input_message_iterator *upstream_msg_iter;
-               struct muxer_upstream_msg_iter *muxer_upstream_msg_iter;
                bt_self_component_port_input *self_port =
                        bt_self_component_filter_borrow_input_port_by_index(
                                muxer_comp->self_comp, i);
@@ -1149,14 +1244,12 @@ int muxer_msg_iter_init_upstream_iterators(struct muxer_comp *muxer_comp,
                        goto end;
                }
 
-               muxer_upstream_msg_iter =
-                       muxer_msg_iter_add_upstream_msg_iter(
-                               muxer_msg_iter, upstream_msg_iter);
+               ret = muxer_msg_iter_add_upstream_msg_iter(muxer_msg_iter,
+                       upstream_msg_iter);
                bt_self_component_port_input_message_iterator_put_ref(
                        upstream_msg_iter);
-               if (!muxer_upstream_msg_iter) {
+               if (ret) {
                        /* muxer_msg_iter_add_upstream_msg_iter() logs errors */
-                       ret = -1;
                        goto end;
                }
        }
@@ -1386,7 +1479,7 @@ bt_self_message_iterator_status muxer_msg_iter_seek_beginning(
 {
        struct muxer_msg_iter *muxer_msg_iter =
                bt_self_message_iterator_get_data(self_msg_iter);
-       int status;
+       bt_message_iterator_status status = BT_MESSAGE_ITERATOR_STATUS_OK;
        uint64_t i;
 
        /* Seek all ended upstream iterators first */
@@ -1437,5 +1530,5 @@ bt_self_message_iterator_status muxer_msg_iter_seek_beginning(
                MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ANY;
 
 end:
-       return status;
+       return (bt_self_message_iterator_status) status;
 }
This page took 0.028808 seconds and 4 git commands to generate.