Notification iterator: generate automatic notifications when missing
authorPhilippe Proulx <eeppeliteloop@gmail.com>
Wed, 26 Apr 2017 00:15:09 +0000 (20:15 -0400)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Sun, 28 May 2017 16:57:41 +0000 (12:57 -0400)
With this patch, a notification iterator object makes sure to always
provide a valid sequence of notifications to its user (a component which
has access to the private connection). "Valid sequence of notifications"
means no "stream end" without a corresponding "stream begin", no "packet
end" without a corresponding "packet begin", event notifications are
always surrounded by "packet begin" and "packet end" notifications, and
packet notifications are always surrounded by stream notifications.

To accomplish this, each iterator has its own queue of notifications.
When it detects that one or more notifications are missing when it calls
the upstream's "next" method, it creates the missing notifications and
adds them to the queue before the upstream notification.

Upstream can still provide the "stream begin", "stream end", "packet
begin", and "packet end" notifications in the correct order. In this
case, the iterator does not generate any automatic notification.

When the upstream's "next" method returns
BT_NOTIFICATION_ITERATOR_STATUS_END, the iterator adds any missing
notifications to its queue ("packet end" and "stream end").

To know what is the current packet of a given stream, and if a given
stream exists or not from an iterator's point of view, the iterator
keeps a hash table of streams to stream states. When the iterator
creates a stream state, it gets a reference to the stream, because the
stream must exist until its "stream end" notification (which is possibly
generated by the iterator). However, when the "stream end" notification
occurs, the iterator puts the stream reference and adds to it a destroy
listener which is reponsible for removing the stream state entry when
the stream is eventually destroyed. The iterator keeps the stream state
even if the "stream end" notification was emitted because it also
validates that, for a given stream during the iterator's lifetime, only
one "stream begin" and one "stream end" notifications are emitted.

This patch adds another API constraint: within a given component, a
given stream can only be referenced by notifications that are emitted
one a given port. The first port which emits a notification which has a
reference to a stream should remain the same for the stream's lifetime.
This is enforced by keeping a hash table of component to port in each
stream. This hash table indicates which port, within a given component,
has the right to emit a notification which references this stream. This
is validated by each iterator. For each component in this hash table, a
stream adds a component destroy listener to get notified when it should
remove an entry from the hash table.

Signed-off-by: Philippe Proulx <eeppeliteloop@gmail.com>
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
24 files changed:
include/babeltrace/ctf-ir/packet-internal.h
include/babeltrace/ctf-ir/stream-internal.h
include/babeltrace/ctf-ir/trace-internal.h
include/babeltrace/ctf-ir/trace.h
include/babeltrace/graph/component-class-internal.h
include/babeltrace/graph/component-internal.h
include/babeltrace/graph/notification-event-internal.h
include/babeltrace/graph/notification-iterator-internal.h
include/babeltrace/graph/notification-iterator.h
include/babeltrace/graph/notification-packet-internal.h
include/babeltrace/graph/notification-packet.h
include/babeltrace/graph/notification-stream-internal.h
include/babeltrace/graph/notification-stream.h
include/babeltrace/graph/notification.h
include/babeltrace/plugin/plugin-so-internal.h
lib/ctf-ir/stream.c
lib/graph/component-class.c
lib/graph/component.c
lib/graph/connection.c
lib/graph/iterator.c
lib/graph/notification/stream.c
lib/plugin/plugin-so.c
lib/plugin/plugin.c
tests/plugins/test-utils-muxer.c

index 275524ba9bc2de47a6278f2024c05feed96b66f7..609717f6ada3c049dbe5e02e36b4b340585c6895 100644 (file)
@@ -29,6 +29,7 @@
 #include <babeltrace/ctf-ir/stream.h>
 #include <babeltrace/object-internal.h>
 #include <babeltrace/babeltrace-internal.h>
+#include <assert.h>
 
 struct bt_ctf_packet {
        struct bt_object base;
@@ -41,4 +42,12 @@ struct bt_ctf_packet {
 BT_HIDDEN
 void bt_ctf_packet_freeze(struct bt_ctf_packet *packet);
 
+static inline
+struct bt_ctf_stream *bt_ctf_packet_borrow_stream(
+               struct bt_ctf_packet *packet)
+{
+       assert(packet);
+       return packet->stream;
+}
+
 #endif /* BABELTRACE_CTF_IR_PACKET_INTERNAL_H */
index c101236e275a96a4cbf9a268903b791b955e9136..a54cd57b49924b9c77eb54b3b0ff1df7d0652139 100644 (file)
 #include <babeltrace/babeltrace-internal.h>
 #include <glib.h>
 
+struct bt_port;
+struct bt_component;
+
+typedef void (*bt_ctf_stream_destroy_listener_func)(
+               struct bt_ctf_stream *stream, void *data);
+
+struct bt_ctf_stream_destroy_listener {
+       bt_ctf_stream_destroy_listener_func func;
+       void *data;
+};
+
 struct bt_ctf_stream {
        struct bt_object base;
        uint32_t id;
@@ -44,15 +55,55 @@ struct bt_ctf_stream {
        struct bt_ctf_field *packet_header;
        struct bt_ctf_field *packet_context;
 
+       /*
+        * When a notification which contains a reference to a stream
+        * object (event notification, for example) is returned by the
+        * "next" method of a sink or filter component's notification
+        * iterator, it must NOT be returned by the "next" method of a
+        * notification iterator which iterates on the notifications of
+        * another output port of the same component.
+        *
+        * To ensure this, the stream object keeps a hash table which
+        * indicates which port, for a given component, is currently
+        * allowed to emit notifications which contain a reference to
+        * this stream.
+        *
+        * This is a `struct bt_component *` to `struct bt_port *` hash
+        * table. Both pointers are weak references because there's no
+        * need to keep one or the other alive as far as this stream is
+        * concerned.
+        */
+       GHashTable *comp_cur_port;
+
        /* Writer-specific members. */
        /* Array of pointers to bt_ctf_event for the current packet */
        GPtrArray *events;
        struct bt_ctf_stream_pos pos;
        unsigned int flushed_packet_count;
        uint64_t size;
+
+       /* Array of struct bt_ctf_stream_destroy_listener */
+       GArray *destroy_listeners;
 };
 
 BT_HIDDEN
 int bt_ctf_stream_set_fd(struct bt_ctf_stream *stream, int fd);
 
+BT_HIDDEN
+void bt_ctf_stream_map_component_to_port(struct bt_ctf_stream *stream,
+               struct bt_component *comp,
+               struct bt_port *port);
+
+BT_HIDDEN
+struct bt_port *bt_ctf_stream_port_for_component(struct bt_ctf_stream *stream,
+               struct bt_component *comp);
+
+BT_HIDDEN
+void bt_ctf_stream_add_destroy_listener(struct bt_ctf_stream *stream,
+               bt_ctf_stream_destroy_listener_func func, void *data);
+
+BT_HIDDEN
+void bt_ctf_stream_remove_destroy_listener(struct bt_ctf_stream *stream,
+               bt_ctf_stream_destroy_listener_func func, void *data);
+
 #endif /* BABELTRACE_CTF_WRITER_STREAM_INTERNAL_H */
index d332a581049d9cf798af161c3597ca4be5a60a8d..eb3e0e27f26f139f4f4c96ac15a24f824b80c190 100644 (file)
@@ -89,4 +89,35 @@ BT_HIDDEN
 bool bt_ctf_trace_has_clock_class(struct bt_ctf_trace *trace,
                struct bt_ctf_clock_class *clock_class);
 
+/**
+@brief User function type to use with bt_ctf_trace_add_listener().
+
+@param[in] obj New CTF IR object which is part of the trace
+               class hierarchy.
+@param[in] data        User data.
+
+@prenotnull{obj}
+*/
+typedef void (*bt_ctf_listener_cb)(struct bt_ctf_object *obj, void *data);
+
+/**
+@brief Adds the trace class modification listener \p listener to
+       the CTF IR trace class \p trace_class.
+
+Once you add \p listener to \p trace_class, whenever \p trace_class
+is modified, \p listener is called with the new element and with
+\p data (user data).
+
+@param[in] trace_class Trace class to which to add \p listener.
+@param[in] listener    Modification listener function.
+@param[in] data                User data.
+@returns               0 on success, or a negative value on error.
+
+@prenotnull{trace_class}
+@prenotnull{listener}
+@postrefcountsame{trace_class}
+*/
+extern int bt_ctf_trace_add_listener(struct bt_ctf_trace *trace_class,
+               bt_ctf_listener_cb listener, void *data);
+
 #endif /* BABELTRACE_CTF_IR_TRACE_INTERNAL_H */
index af8b9fccf1359cba439ee8bbbb861a153f32c875..91b09f80eb409be76244dd7c4ff55a333344c194 100644 (file)
@@ -725,37 +725,6 @@ extern int bt_ctf_trace_add_stream_class(struct bt_ctf_trace *trace_class,
 @{
 */
 
-/**
-@brief User function type to use with bt_ctf_trace_add_listener().
-
-@param[in] obj New CTF IR object which is part of the trace
-               class hierarchy.
-@param[in] data        User data.
-
-@prenotnull{obj}
-*/
-typedef void (*bt_ctf_listener_cb)(struct bt_ctf_object *obj, void *data);
-
-/**
-@brief Adds the trace class modification listener \p listener to
-       the CTF IR trace class \p trace_class.
-
-Once you add \p listener to \p trace_class, whenever \p trace_class
-is modified, \p listener is called with the new element and with
-\p data (user data).
-
-@param[in] trace_class Trace class to which to add \p listener.
-@param[in] listener    Modification listener function.
-@param[in] data                User data.
-@returns               0 on success, or a negative value on error.
-
-@prenotnull{trace_class}
-@prenotnull{listener}
-@postrefcountsame{trace_class}
-*/
-extern int bt_ctf_trace_add_listener(struct bt_ctf_trace *trace_class,
-               bt_ctf_listener_cb listener, void *data);
-
 /**
 @brief Accepts the visitor \p visitor to visit the hierarchy of the
        CTF IR trace class \p trace_class.
index 364299c63bccad896e7f5c68ee096998ac560a04..5f84dcd4edfe26de2bdeac332876c4ae3ec962a8 100644 (file)
@@ -96,7 +96,7 @@ struct bt_component_class_filter {
 };
 
 BT_HIDDEN
-int bt_component_class_add_destroy_listener(struct bt_component_class *class,
+void bt_component_class_add_destroy_listener(struct bt_component_class *class,
                bt_component_class_destroy_listener_func func, void *data);
 
 #endif /* BABELTRACE_COMPONENT_COMPONENT_CLASS_INTERNAL_H */
index 1e7b547aef911b56b6f5b9076e8c6bb750737522..27b03b09595520b5e19598b0dffcec0473735b8c 100644 (file)
 #define DEFAULT_INPUT_PORT_NAME                "default"
 #define DEFAULT_OUTPUT_PORT_NAME       "default"
 
+typedef void (*bt_component_destroy_listener_func)(
+               struct bt_component *class, void *data);
+
+struct bt_component_destroy_listener {
+       bt_component_destroy_listener_func func;
+       void *data;
+};
+
 struct bt_component {
        struct bt_object base;
        struct bt_component_class *class;
@@ -61,6 +69,9 @@ struct bt_component {
        /* Input and output ports (weak references) */
        GPtrArray *input_ports;
        GPtrArray *output_ports;
+
+       /* Array of struct bt_component_destroy_listener */
+       GArray *destroy_listeners;
 };
 
 static inline
@@ -128,4 +139,12 @@ BT_HIDDEN
 enum bt_component_status bt_component_remove_port(
                struct bt_component *component, struct bt_port *port);
 
+BT_HIDDEN
+void bt_component_add_destroy_listener(struct bt_component *component,
+               bt_component_destroy_listener_func func, void *data);
+
+BT_HIDDEN
+void bt_component_remove_destroy_listener(struct bt_component *component,
+               bt_component_destroy_listener_func func, void *data);
+
 #endif /* BABELTRACE_COMPONENT_COMPONENT_INTERNAL_H */
index 8fffd47ee9b7c1af57b4d42f7439d212a3870d4e..de4a6a1260f996a90bec561ad1b08ce84dffe17f 100644 (file)
@@ -27,6 +27,7 @@
  * SOFTWARE.
  */
 
+#include <babeltrace/compiler-internal.h>
 #include <babeltrace/ctf-ir/event.h>
 #include <babeltrace/graph/notification-internal.h>
 #include <babeltrace/graph/clock-class-priority-map.h>
@@ -41,6 +42,30 @@ struct bt_notification_event {
        struct bt_clock_class_priority_map *cc_prio_map;
 };
 
+static inline
+struct bt_ctf_event *bt_notification_event_borrow_event(
+               struct bt_notification *notif)
+{
+       struct bt_notification_event *notif_event = container_of(notif,
+                       struct bt_notification_event, parent);
+
+       assert(notif_event);
+       return notif_event->event;
+}
+
+static inline
+struct bt_clock_class_priority_map *
+bt_notification_event_borrow_clock_class_priority_map(
+               struct bt_notification *notif)
+{
+       struct bt_notification_event *notif_event = container_of(notif,
+                       struct bt_notification_event, parent);
+
+       assert(notif_event);
+       return notif_event->cc_prio_map;
+}
+
+
 #ifdef __cplusplus
 }
 #endif
index d7025669ba285f6fcd8be17414fec103e9f803ab..dc4e430928609b37df95d1a8d750a59bb9110db1 100644 (file)
@@ -5,8 +5,7 @@
  * BabelTrace - Notification Iterator Internal
  *
  * Copyright 2015 Jérémie Galarneau <jeremie.galarneau@efficios.com>
- *
- * Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ * Copyright 2017 Philippe Proulx <pproulx@efficios.com>
  *
  * Permission is hereby granted, free of charge, to any person obtaining a copy
  * of this software and associated documentation files (the "Software"), to deal
 #include <babeltrace/graph/notification-iterator.h>
 #include <babeltrace/graph/private-notification-iterator.h>
 
+struct bt_port;
+
 struct bt_notification_iterator {
        struct bt_object base;
-       struct bt_component *component;
-       struct bt_notification *current_notification;
+       struct bt_component *upstream_component; /* owned by this */
+       struct bt_port *upstream_port; /* owned by this */
+       struct bt_notification *current_notification; /* owned by this */
+       GQueue *queue; /* struct bt_notification * (owned by this) */
+
+       /*
+        * This hash table keeps the state of a stream as viewed by
+        * this notification iterator. This is used to:
+        *
+        * * Automatically enqueue "stream begin", "packet begin",
+        *   "packet end", and "stream end" notifications depending
+        *   on the stream's state and on the next notification returned
+        *   by the upstream component.
+        *
+        * * Make sure that, once the notification iterator has seen
+        *   a "stream end" notification for a given stream, that no
+        *   other notifications which refer to this stream can be
+        *   delivered by this iterator.
+        *
+        * The key (struct bt_ctf_stream *) is not owned by this. The
+        * value is an allocated state structure.
+        */
+       GHashTable *stream_states;
+
+       /*
+        * This is an array of actions which can be rolled back. It's
+        * similar to the memento pattern, but it's not exactly that. It
+        * is allocated once and reset for each notification to process.
+        * More details near the implementation.
+        */
+       GArray *actions;
+
+       bool is_ended;
        void *user_data;
 };
 
@@ -64,7 +96,8 @@ bt_private_notification_iterator_from_notification_iterator(
  */
 BT_HIDDEN
 struct bt_notification_iterator *bt_notification_iterator_create(
-               struct bt_component *component);
+               struct bt_component *upstream_component,
+               struct bt_port *upstream_port);
 
 /**
  * Validate a notification iterator.
index d8554316794a9608e1244bdbbef4fa68940f9ad5..03b53ce8ee51e0d802fc3a3dfff738bdc044f66e 100644 (file)
@@ -54,7 +54,6 @@ enum bt_notification_iterator_status {
        BT_NOTIFICATION_ITERATOR_STATUS_NOMEM = -12,
        /** Unsupported iterator feature. */
        BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED = -2,
-
 };
 
 /**
index 8d525c55f673d6b3dbfdf163853d164c1a5354ae..4d519544f289a1642a7c80cade6f12809042f026 100644 (file)
@@ -27,6 +27,7 @@
  * SOFTWARE.
  */
 
+#include <babeltrace/compiler-internal.h>
 #include <babeltrace/ctf-ir/packet.h>
 #include <babeltrace/graph/notification-internal.h>
 
@@ -40,4 +41,28 @@ struct bt_notification_packet_end {
        struct bt_ctf_packet *packet;
 };
 
+static inline
+struct bt_ctf_packet *bt_notification_packet_begin_borrow_packet(
+               struct bt_notification *notif)
+{
+       struct bt_notification_packet_begin *notif_packet_begin =
+               container_of(notif,
+                       struct bt_notification_packet_begin, parent);
+
+       assert(notif_packet_begin);
+       return notif_packet_begin->packet;
+}
+
+static inline
+struct bt_ctf_packet *bt_notification_packet_end_borrow_packet(
+               struct bt_notification *notif)
+{
+       struct bt_notification_packet_end *notif_packet_end =
+               container_of(notif,
+                       struct bt_notification_packet_end, parent);
+
+       assert(notif_packet_end);
+       return notif_packet_end->packet;
+}
+
 #endif /* BABELTRACE_COMPONENT_NOTIFICATION_PACKET_INTERNAL_H */
index a15a96924457edb1ce131c90d03ead3e1aeb54e7..94480e6027f720a412144611db44df2b0c37032a 100644 (file)
@@ -35,16 +35,18 @@ extern "C" {
 
 struct bt_ctf_packet;
 
-/*** BT_NOTIFICATION_TYPE_PACKET_BEGIN ***/
-struct bt_notification *bt_notification_packet_begin_create(
+extern struct bt_notification *bt_notification_packet_begin_create(
+               struct bt_ctf_packet *packet);
+
+extern struct bt_notification *bt_notification_packet_end_create(
                struct bt_ctf_packet *packet);
-struct bt_ctf_packet *bt_notification_packet_begin_get_packet(
+
+/*** BT_NOTIFICATION_TYPE_PACKET_BEGIN ***/
+extern struct bt_ctf_packet *bt_notification_packet_begin_get_packet(
                struct bt_notification *notification);
 
 /*** BT_NOTIFICATION_TYPE_PACKET_END ***/
-struct bt_notification *bt_notification_packet_end_create(
-               struct bt_ctf_packet *packet);
-struct bt_ctf_packet *bt_notification_packet_end_get_packet(
+extern struct bt_ctf_packet *bt_notification_packet_end_get_packet(
                struct bt_notification *notification);
 
 #ifdef __cplusplus
index 839c220ccaa445773150d33b20d61697bb53e7c6..b170e4e61375d106de77cc95b99538faa3af1f3c 100644 (file)
  * SOFTWARE.
  */
 
+#include <babeltrace/compiler-internal.h>
 #include <babeltrace/ctf-ir/packet.h>
 #include <babeltrace/graph/notification-internal.h>
 
+struct bt_notification_stream_begin {
+       struct bt_notification parent;
+       struct bt_ctf_stream *stream;
+};
+
 struct bt_notification_stream_end {
        struct bt_notification parent;
        struct bt_ctf_stream *stream;
 };
 
+static inline
+struct bt_ctf_stream *bt_notification_stream_begin_borrow_stream(
+               struct bt_notification *notif)
+{
+       struct bt_notification_stream_begin *notif_stream_begin =
+               container_of(notif,
+                       struct bt_notification_stream_begin, parent);
+
+       assert(notif_stream_begin);
+       return notif_stream_begin->stream;
+}
+
+static inline
+struct bt_ctf_stream *bt_notification_stream_end_borrow_stream(
+               struct bt_notification *notif)
+{
+       struct bt_notification_stream_end *notif_stream_end =
+               container_of(notif,
+                       struct bt_notification_stream_end, parent);
+
+       assert(notif_stream_end);
+       return notif_stream_end->stream;
+}
+
 #endif /* BABELTRACE_COMPONENT_NOTIFICATION_STREAM_INTERNAL_H */
index bdcf4c96011157d9530c6bce81f9b1c439a51861..67ed5f5b43a49a0e01d03d7e0338776287bbb00e 100644 (file)
 extern "C" {
 #endif
 
-/*** BT_NOTIFICATION_TYPE_STREAM_END ***/
-struct bt_notification *bt_notification_stream_end_create(
+extern struct bt_notification *bt_notification_stream_begin_create(
                struct bt_ctf_stream *stream);
-struct bt_ctf_stream *bt_notification_stream_end_get_stream(
+
+extern struct bt_notification *bt_notification_stream_end_create(
+               struct bt_ctf_stream *stream);
+
+extern struct bt_ctf_stream *bt_notification_stream_begin_get_stream(
+               struct bt_notification *notification);
+
+extern struct bt_ctf_stream *bt_notification_stream_end_get_stream(
                struct bt_notification *notification);
 
 #ifdef __cplusplus
index c768baaef813513ce419fa468f5f0eb5ac6e1dc5..ae5e22f95617bd0d85d1fc6a418eafb8dc368946 100644 (file)
@@ -37,40 +37,15 @@ struct bt_notification;
  * Notification types. Unhandled notification types should be ignored.
  */
 enum bt_notification_type {
-       BT_NOTIFICATION_TYPE_UNKNOWN = -1,
-
-       /**
-        * All types of notifications (used to register to notification
-        * delivery).
-        */
-       BT_NOTIFICATION_TYPE_ALL = 0,
-
-       /** Event delivery notification, see event.h */
-       BT_NOTIFICATION_TYPE_EVENT = 1,
-
-       /** Beginning of stream packet notification, see packet.h */
-       BT_NOTIFICATION_TYPE_PACKET_BEGIN = 2,
-
-       /** End of stream packet notification, see packet.h */
-       BT_NOTIFICATION_TYPE_PACKET_END = 3,
-
-       /** End of stream packet notification, see stream.h */
-       BT_NOTIFICATION_TYPE_STREAM_END = 4,
-
-       /** New trace notification, see model.h */
-       BT_NOTIFICATION_TYPE_NEW_TRACE = 5,
-
-       /** New stream class notification, see model.h */
-       BT_NOTIFICATION_TYPE_NEW_STREAM_CLASS = 6,
-
-       /** New event class notification, see model.h */
-       BT_NOTIFICATION_TYPE_NEW_EVENT_CLASS = 7,
-
-       /** End of trace notification, see eot.h */
-       BT_NOTIFICATION_TYPE_END_OF_TRACE = 8,
-
-       BT_NOTIFICATION_TYPE_INACTIVITY = 9,
-
+       BT_NOTIFICATION_TYPE_SENTINEL =         -2,
+       BT_NOTIFICATION_TYPE_UNKNOWN =          -1,
+       BT_NOTIFICATION_TYPE_ALL =              0,
+       BT_NOTIFICATION_TYPE_EVENT =            1,
+       BT_NOTIFICATION_TYPE_INACTIVITY =       2,
+       BT_NOTIFICATION_TYPE_STREAM_BEGIN =     3,
+       BT_NOTIFICATION_TYPE_STREAM_END =       4,
+       BT_NOTIFICATION_TYPE_PACKET_BEGIN =     5,
+       BT_NOTIFICATION_TYPE_PACKET_END =       6,
        BT_NOTIFICATION_TYPE_NR, /* Not part of ABI. */
 };
 
index e2dd51c014102ae22c309e367eaa532d2590c841..c9d59f5f80420b6123de36e4b6e1d46fb03af5af 100644 (file)
@@ -61,7 +61,7 @@ BT_HIDDEN
 struct bt_plugin **bt_plugin_so_create_all_from_static(void);
 
 BT_HIDDEN
-int bt_plugin_so_on_add_component_class(struct bt_plugin *plugin,
+void bt_plugin_so_on_add_component_class(struct bt_plugin *plugin,
                struct bt_component_class *comp_class);
 
 #endif /* BABELTRACE_PLUGIN_PLUGIN_SO_INTERNAL_H */
index 7a4d53cee2d3e7dba4a55e2922f6f2f7dc53c86a..949b11e9512f66889a90fe02d609017e8cfdd07d 100644 (file)
@@ -38,6 +38,7 @@
 #include <babeltrace/ctf-ir/stream-class-internal.h>
 #include <babeltrace/ctf-ir/trace-internal.h>
 #include <babeltrace/ctf-writer/writer-internal.h>
+#include <babeltrace/graph/component-internal.h>
 #include <babeltrace/ref.h>
 #include <babeltrace/ctf-writer/functor-internal.h>
 #include <babeltrace/compiler-internal.h>
@@ -340,6 +341,14 @@ end:
        return ret;
 }
 
+static
+void component_destroy_listener(struct bt_component *component, void *data)
+{
+       struct bt_ctf_stream *stream = data;
+
+       g_hash_table_remove(stream->comp_cur_port, component);
+}
+
 struct bt_ctf_stream *bt_ctf_stream_create(
                struct bt_ctf_stream_class *stream_class,
                const char *name)
@@ -373,6 +382,12 @@ struct bt_ctf_stream *bt_ctf_stream_create(
        stream->stream_class = stream_class;
        stream->pos.fd = -1;
 
+       stream->destroy_listeners = g_array_new(FALSE, TRUE,
+               sizeof(struct bt_ctf_stream_destroy_listener));
+       if (!stream->destroy_listeners) {
+               goto error;
+       }
+
        if (name) {
                stream->name = g_string_new(name);
                if (!stream->name) {
@@ -448,6 +463,12 @@ struct bt_ctf_stream *bt_ctf_stream_create(
                if (ret) {
                        goto error;
                }
+
+               stream->comp_cur_port = g_hash_table_new(g_direct_hash,
+                       g_direct_equal);
+               if (!stream->comp_cur_port) {
+                       goto error;
+               }
        }
 
        /* Add this stream to the trace's streams */
@@ -1095,8 +1116,19 @@ static
 void bt_ctf_stream_destroy(struct bt_object *obj)
 {
        struct bt_ctf_stream *stream;
+       int i;
 
        stream = container_of(obj, struct bt_ctf_stream, base);
+
+       /* Call destroy listeners in reverse registration order */
+       for (i = stream->destroy_listeners->len - 1; i >= 0; i--) {
+               struct bt_ctf_stream_destroy_listener *listener =
+                       &g_array_index(stream->destroy_listeners,
+                               struct bt_ctf_stream_destroy_listener, i);
+
+               listener->func(stream, listener->data);
+       }
+
        (void) bt_ctf_stream_pos_fini(&stream->pos);
        if (stream->pos.fd >= 0) {
                int ret;
@@ -1126,6 +1158,32 @@ void bt_ctf_stream_destroy(struct bt_object *obj)
                g_string_free(stream->name, TRUE);
        }
 
+       if (stream->comp_cur_port) {
+               GHashTableIter ht_iter;
+               gpointer comp_gptr, port_gptr;
+
+               /*
+                * Since we're destroying the stream, remove the destroy
+                * listeners that it registered for each component in
+                * its component-port mapping hash table. Otherwise they
+                * would be called and the stream would be accessed once
+                * it's freed or another stream would be accessed.
+                */
+               g_hash_table_iter_init(&ht_iter, stream->comp_cur_port);
+
+               while (g_hash_table_iter_next(&ht_iter, &comp_gptr, &port_gptr)) {
+                       assert(comp_gptr);
+                       bt_component_remove_destroy_listener((void *) comp_gptr,
+                               component_destroy_listener, stream);
+               }
+
+               g_hash_table_destroy(stream->comp_cur_port);
+       }
+
+       if (stream->destroy_listeners) {
+               g_array_free(stream->destroy_listeners, TRUE);
+       }
+
        bt_put(stream->packet_header);
        bt_put(stream->packet_context);
        g_free(stream);
@@ -1228,3 +1286,69 @@ int bt_ctf_stream_is_writer(struct bt_ctf_stream *stream)
 end:
        return ret;
 }
+
+BT_HIDDEN
+void bt_ctf_stream_map_component_to_port(struct bt_ctf_stream *stream,
+               struct bt_component *comp,
+               struct bt_port *port)
+{
+       assert(stream);
+       assert(comp);
+       assert(port);
+       assert(stream->comp_cur_port);
+
+       /*
+        * Do not take a reference to the component here because we
+        * don't want the component to exist as long as this stream
+        * exists. Instead, keep a weak reference, but add a destroy
+        * listener so that we remove this hash table entry when we know
+        * the component is destroyed.
+        */
+       bt_component_add_destroy_listener(comp, component_destroy_listener,
+               stream);
+       g_hash_table_insert(stream->comp_cur_port, comp, port);
+}
+
+BT_HIDDEN
+struct bt_port *bt_ctf_stream_port_for_component(struct bt_ctf_stream *stream,
+               struct bt_component *comp)
+{
+       assert(stream);
+       assert(comp);
+       assert(stream->comp_cur_port);
+       return g_hash_table_lookup(stream->comp_cur_port, comp);
+}
+
+BT_HIDDEN
+void bt_ctf_stream_add_destroy_listener(struct bt_ctf_stream *stream,
+               bt_ctf_stream_destroy_listener_func func, void *data)
+{
+       struct bt_ctf_stream_destroy_listener listener;
+
+       assert(stream);
+       assert(func);
+       listener.func = func;
+       listener.data = data;
+       g_array_append_val(stream->destroy_listeners, listener);
+}
+
+BT_HIDDEN
+void bt_ctf_stream_remove_destroy_listener(struct bt_ctf_stream *stream,
+               bt_ctf_stream_destroy_listener_func func, void *data)
+{
+       size_t i;
+
+       assert(stream);
+       assert(func);
+
+       for (i = 0; i < stream->destroy_listeners->len; i++) {
+               struct bt_ctf_stream_destroy_listener *listener =
+                       &g_array_index(stream->destroy_listeners,
+                               struct bt_ctf_stream_destroy_listener, i);
+
+               if (listener->func == func && listener->data == data) {
+                       g_array_remove_index(stream->destroy_listeners, i);
+                       i--;
+               }
+       }
+}
index 28aa04c51cca340b548ab1cde507574c590a8ba2..b438f10430b4366141ddaa12c26c3fa74ec96303 100644 (file)
@@ -507,23 +507,16 @@ const char *bt_component_class_get_help(
 }
 
 BT_HIDDEN
-int bt_component_class_add_destroy_listener(struct bt_component_class *class,
+void bt_component_class_add_destroy_listener(struct bt_component_class *class,
                bt_component_class_destroy_listener_func func, void *data)
 {
-       int ret = 0;
        struct bt_component_class_destroy_listener listener;
 
-       if (!class || class->frozen || !func) {
-               ret = -1;
-               goto end;
-       }
-
+       assert(class);
+       assert(func);
        listener.func = func;
        listener.data = data;
        g_array_append_val(class->destroy_listeners, listener);
-
-end:
-       return ret;
 }
 
 int bt_component_class_freeze(
index 31abc0737c554574e18e5b09df5bd79ff0474132..8837c0d1fa50dad479c6a36cbbd17db8a6acd3c8 100644 (file)
@@ -70,12 +70,23 @@ void bt_component_destroy(struct bt_object *obj)
 {
        struct bt_component *component = NULL;
        struct bt_component_class *component_class = NULL;
+       int i;
 
        if (!obj) {
                return;
        }
 
        component = container_of(obj, struct bt_component, base);
+
+       /* Call destroy listeners in reverse registration order */
+       for (i = component->destroy_listeners->len - 1; i >= 0; i--) {
+               struct bt_component_destroy_listener *listener =
+                       &g_array_index(component->destroy_listeners,
+                               struct bt_component_destroy_listener, i);
+
+               listener->func(component, listener->data);
+       }
+
        component_class = component->class;
 
        /*
@@ -99,6 +110,10 @@ void bt_component_destroy(struct bt_object *obj)
                g_ptr_array_free(component->output_ports, TRUE);
        }
 
+       if (component->destroy_listeners) {
+               g_array_free(component->destroy_listeners, TRUE);
+       }
+
        g_string_free(component->name, TRUE);
        bt_put(component_class);
        g_free(component);
@@ -250,6 +265,13 @@ struct bt_component *bt_component_create_with_init_method_data(
                goto end;
        }
 
+       component->destroy_listeners = g_array_new(FALSE, TRUE,
+               sizeof(struct bt_component_destroy_listener));
+       if (!component->destroy_listeners) {
+               BT_PUT(component);
+               goto end;
+       }
+
        if (type == BT_COMPONENT_CLASS_TYPE_SOURCE ||
                        type == BT_COMPONENT_CLASS_TYPE_FILTER) {
                default_port = bt_component_add_port(component,
@@ -599,3 +621,37 @@ void bt_component_port_disconnected(struct bt_component *comp,
                        bt_private_port_from_port(port));
        }
 }
+
+BT_HIDDEN
+void bt_component_add_destroy_listener(struct bt_component *component,
+               bt_component_destroy_listener_func func, void *data)
+{
+       struct bt_component_destroy_listener listener;
+
+       assert(component);
+       assert(func);
+       listener.func = func;
+       listener.data = data;
+       g_array_append_val(component->destroy_listeners, listener);
+}
+
+BT_HIDDEN
+void bt_component_remove_destroy_listener(struct bt_component *component,
+               bt_component_destroy_listener_func func, void *data)
+{
+       size_t i;
+
+       assert(component);
+       assert(func);
+
+       for (i = 0; i < component->destroy_listeners->len; i++) {
+               struct bt_component_destroy_listener *listener =
+                       &g_array_index(component->destroy_listeners,
+                               struct bt_component_destroy_listener, i);
+
+               if (listener->func == func && listener->data == data) {
+                       g_array_remove_index(component->destroy_listeners, i);
+                       i--;
+               }
+       }
+}
index 986303ed82a78dba2d09ba16212cc9ccc89f7374..bfc9c04b8f0b0831fd03a5a6cc47648a6713afa0 100644 (file)
@@ -179,7 +179,8 @@ bt_private_connection_create_notification_iterator(
                goto error;
        }
 
-       iterator = bt_notification_iterator_create(upstream_component);
+       iterator = bt_notification_iterator_create(upstream_component,
+               upstream_port);
        if (!iterator) {
                goto error;
        }
index 491dddf38a32e5f30ecd05607af04c8ad40d13a6..389eec528c022d6c1ff32b7e1b7ccde3a07e0a3f 100644 (file)
@@ -4,8 +4,7 @@
  * Babeltrace Notification Iterator
  *
  * Copyright 2015 Jérémie Galarneau <jeremie.galarneau@efficios.com>
- *
- * Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ * Copyright 2017 Philippe Proulx <pproulx@efficios.com>
  *
  * Permission is hereby granted, free of charge, to any person obtaining a copy
  * of this software and associated documentation files (the "Software"), to deal
 
 #include <babeltrace/compiler-internal.h>
 #include <babeltrace/ref.h>
+#include <babeltrace/ctf-ir/event-internal.h>
+#include <babeltrace/ctf-ir/packet-internal.h>
+#include <babeltrace/ctf-ir/stream-internal.h>
 #include <babeltrace/graph/component.h>
 #include <babeltrace/graph/component-source-internal.h>
 #include <babeltrace/graph/component-class-internal.h>
 #include <babeltrace/graph/notification-iterator.h>
 #include <babeltrace/graph/notification-iterator-internal.h>
 #include <babeltrace/graph/notification-internal.h>
+#include <babeltrace/graph/notification-event.h>
+#include <babeltrace/graph/notification-event-internal.h>
+#include <babeltrace/graph/notification-packet.h>
+#include <babeltrace/graph/notification-packet-internal.h>
+#include <babeltrace/graph/notification-stream.h>
+#include <babeltrace/graph/notification-stream-internal.h>
+#include <babeltrace/graph/port.h>
+
+struct stream_state {
+       struct bt_ctf_stream *stream; /* owned by this */
+       struct bt_ctf_packet *cur_packet; /* owned by this */
+       bool is_ended;
+};
+
+enum action_type {
+       ACTION_TYPE_PUSH_NOTIF,
+       ACTION_TYPE_MAP_PORT_TO_COMP_IN_STREAM,
+       ACTION_TYPE_ADD_STREAM_STATE,
+       ACTION_TYPE_SET_STREAM_STATE_IS_ENDED,
+       ACTION_TYPE_SET_STREAM_STATE_CUR_PACKET,
+};
+
+struct action {
+       enum action_type type;
+       union {
+               /* ACTION_TYPE_PUSH_NOTIF */
+               struct {
+                       struct bt_notification *notif; /* owned by this */
+               } push_notif;
+
+               /* ACTION_TYPE_MAP_PORT_TO_COMP_IN_STREAM */
+               struct {
+                       struct bt_ctf_stream *stream; /* owned by this */
+                       struct bt_component *component; /* owned by this */
+                       struct bt_port *port; /* owned by this */
+               } map_port_to_comp_in_stream;
+
+               /* ACTION_TYPE_ADD_STREAM_STATE */
+               struct {
+                       struct bt_ctf_stream *stream; /* owned by this */
+                       struct stream_state *stream_state; /* owned by this */
+               } add_stream_state;
+
+               /* ACTION_TYPE_SET_STREAM_STATE_IS_ENDED */
+               struct {
+                       struct stream_state *stream_state; /* weak */
+               } set_stream_state_is_ended;
+
+               /* ACTION_TYPE_SET_STREAM_STATE_CUR_PACKET */
+               struct {
+                       struct stream_state *stream_state; /* weak */
+                       struct bt_ctf_packet *packet; /* owned by this */
+               } set_stream_state_cur_packet;
+       } payload;
+};
+
+static
+void stream_destroy_listener(struct bt_ctf_stream *stream, void *data)
+{
+       struct bt_notification_iterator *iterator = data;
+
+       /* Remove associated stream state */
+       g_hash_table_remove(iterator->stream_states, stream);
+}
+
+static
+void destroy_stream_state(struct stream_state *stream_state)
+{
+       if (!stream_state) {
+               return;
+       }
+
+       bt_put(stream_state->cur_packet);
+       bt_put(stream_state->stream);
+       g_free(stream_state);
+}
+
+static
+void destroy_action(struct action *action)
+{
+       assert(action);
+
+       switch (action->type) {
+       case ACTION_TYPE_PUSH_NOTIF:
+               BT_PUT(action->payload.push_notif.notif);
+               break;
+       case ACTION_TYPE_MAP_PORT_TO_COMP_IN_STREAM:
+               BT_PUT(action->payload.map_port_to_comp_in_stream.stream);
+               BT_PUT(action->payload.map_port_to_comp_in_stream.component);
+               BT_PUT(action->payload.map_port_to_comp_in_stream.port);
+               break;
+       case ACTION_TYPE_ADD_STREAM_STATE:
+               BT_PUT(action->payload.add_stream_state.stream);
+               destroy_stream_state(
+                       action->payload.add_stream_state.stream_state);
+               action->payload.add_stream_state.stream_state = NULL;
+               break;
+       case ACTION_TYPE_SET_STREAM_STATE_CUR_PACKET:
+               BT_PUT(action->payload.set_stream_state_cur_packet.packet);
+               break;
+       case ACTION_TYPE_SET_STREAM_STATE_IS_ENDED:
+               break;
+       default:
+               assert(false);
+       }
+}
+
+static
+void add_action(struct bt_notification_iterator *iterator,
+               struct action *action)
+{
+       g_array_append_val(iterator->actions, *action);
+}
+
+static
+void clear_actions(struct bt_notification_iterator *iterator)
+{
+       size_t i;
+
+       for (i = 0; i < iterator->actions->len; i++) {
+               struct action *action = &g_array_index(iterator->actions,
+                       struct action, i);
+
+               destroy_action(action);
+       }
+
+       g_array_set_size(iterator->actions, 0);
+}
+
+static
+void apply_actions(struct bt_notification_iterator *iterator)
+{
+       size_t i;
+
+       for (i = 0; i < iterator->actions->len; i++) {
+               struct action *action = &g_array_index(iterator->actions,
+                       struct action, i);
+
+               switch (action->type) {
+               case ACTION_TYPE_PUSH_NOTIF:
+                       /* Move notification to queue */
+                       g_queue_push_head(iterator->queue,
+                               action->payload.push_notif.notif);
+                       bt_notification_freeze(
+                               action->payload.push_notif.notif);
+                       action->payload.push_notif.notif = NULL;
+                       break;
+               case ACTION_TYPE_MAP_PORT_TO_COMP_IN_STREAM:
+                       bt_ctf_stream_map_component_to_port(
+                               action->payload.map_port_to_comp_in_stream.stream,
+                               action->payload.map_port_to_comp_in_stream.component,
+                               action->payload.map_port_to_comp_in_stream.port);
+                       break;
+               case ACTION_TYPE_ADD_STREAM_STATE:
+                       /* Move stream state to hash table */
+                       g_hash_table_insert(iterator->stream_states,
+                               action->payload.add_stream_state.stream,
+                               action->payload.add_stream_state.stream_state);
+
+                       action->payload.add_stream_state.stream_state = NULL;
+                       break;
+               case ACTION_TYPE_SET_STREAM_STATE_IS_ENDED:
+                       /*
+                        * We know that this stream is ended. We need to
+                        * remember this as long as the stream exists to
+                        * enforce that the same stream does not end
+                        * twice.
+                        *
+                        * Here we add a destroy listener to the stream
+                        * which we put after (becomes weak as the hash
+                        * table key). If we were the last object to own
+                        * this stream, the destroy listener is called
+                        * when we call bt_put() which removes this
+                        * stream state completely. This is important
+                        * because the memory used by this stream object
+                        * could be reused for another stream, and they
+                        * must have different states.
+                        */
+                       bt_ctf_stream_add_destroy_listener(
+                               action->payload.set_stream_state_is_ended.stream_state->stream,
+                               stream_destroy_listener, iterator);
+                       action->payload.set_stream_state_is_ended.stream_state->is_ended = true;
+                       BT_PUT(action->payload.set_stream_state_is_ended.stream_state->stream);
+                       break;
+               case ACTION_TYPE_SET_STREAM_STATE_CUR_PACKET:
+                       /* Move packet to stream state's current packet */
+                       BT_MOVE(action->payload.set_stream_state_cur_packet.stream_state->cur_packet,
+                               action->payload.set_stream_state_cur_packet.packet);
+                       break;
+               default:
+                       assert(false);
+               }
+       }
+
+       clear_actions(iterator);
+}
+
+static
+struct stream_state *create_stream_state(struct bt_ctf_stream *stream)
+{
+       struct stream_state *stream_state = g_new0(struct stream_state, 1);
+
+       if (!stream_state) {
+               goto end;
+       }
+
+       /*
+        * We keep a reference to the stream until we know it's ended
+        * because we need to be able to create an automatic "stream
+        * end" notification when the user's "next" method returns
+        * BT_NOTIFICATION_ITERATOR_STATUS_END.
+        *
+        * We put this reference when the stream is marked as ended.
+        */
+       stream_state->stream = bt_get(stream);
+
+end:
+       return stream_state;
+}
 
 static
 void bt_notification_iterator_destroy(struct bt_object *obj)
@@ -44,8 +265,8 @@ void bt_notification_iterator_destroy(struct bt_object *obj)
        assert(obj);
        iterator = container_of(obj, struct bt_notification_iterator,
                        base);
-       assert(iterator->component);
-       comp_class = iterator->component->class;
+       assert(iterator->upstream_component);
+       comp_class = iterator->upstream_component->class;
 
        /* Call user-defined destroy method */
        switch (comp_class->type) {
@@ -78,38 +299,99 @@ void bt_notification_iterator_destroy(struct bt_object *obj)
                assert(0);
        }
 
-       BT_PUT(iterator->current_notification);
-       BT_PUT(iterator->component);
+       if (iterator->queue) {
+               struct bt_notification *notif;
+
+               while ((notif = g_queue_pop_tail(iterator->queue))) {
+                       bt_put(notif);
+               }
+
+               g_queue_free(iterator->queue);
+       }
+
+       if (iterator->stream_states) {
+               /*
+                * Remove our destroy listener from each stream which
+                * has a state in this iterator. Otherwise the destroy
+                * listener would be called with an invalid/other
+                * notification iterator object.
+                */
+               GHashTableIter ht_iter;
+               gpointer stream_gptr, stream_state_gptr;
+
+               g_hash_table_iter_init(&ht_iter, iterator->stream_states);
+
+               while (g_hash_table_iter_next(&ht_iter, &stream_gptr, &stream_state_gptr)) {
+                       assert(stream_gptr);
+                       bt_ctf_stream_remove_destroy_listener(
+                               (void *) stream_gptr, stream_destroy_listener,
+                               iterator);
+               }
+
+               g_hash_table_destroy(iterator->stream_states);
+       }
+
+       if (iterator->actions) {
+               g_array_free(iterator->actions, TRUE);
+       }
+
+       bt_put(iterator->current_notification);
+       bt_put(iterator->upstream_component);
+       bt_put(iterator->upstream_port);
        g_free(iterator);
 }
 
 BT_HIDDEN
 struct bt_notification_iterator *bt_notification_iterator_create(
-               struct bt_component *component)
+               struct bt_component *upstream_comp,
+               struct bt_port *upstream_port)
 {
        enum bt_component_class_type type;
        struct bt_notification_iterator *iterator = NULL;
 
-       if (!component) {
-               goto end;
-       }
+       assert(upstream_comp);
+       assert(upstream_port);
+       assert(bt_port_is_connected(upstream_port));
 
-       type = bt_component_get_class_type(component);
+       type = bt_component_get_class_type(upstream_comp);
        switch (type) {
        case BT_COMPONENT_CLASS_TYPE_SOURCE:
        case BT_COMPONENT_CLASS_TYPE_FILTER:
                break;
        default:
-               goto end;
+               goto error;
        }
 
        iterator = g_new0(struct bt_notification_iterator, 1);
        if (!iterator) {
-               goto end;
+               goto error;
        }
 
-       iterator->component = bt_get(component);
        bt_object_init(iterator, bt_notification_iterator_destroy);
+
+       iterator->stream_states = g_hash_table_new_full(g_direct_hash,
+               g_direct_equal, NULL, (GDestroyNotify) destroy_stream_state);
+       if (!iterator->stream_states) {
+               goto error;
+       }
+
+       iterator->queue = g_queue_new();
+       if (!iterator->queue) {
+               goto error;
+       }
+
+       iterator->actions = g_array_new(FALSE, FALSE, sizeof(struct action));
+       if (!iterator->actions) {
+               goto error;
+       }
+
+       iterator->upstream_component = bt_get(upstream_comp);
+       iterator->upstream_port = bt_get(upstream_port);
+       goto end;
+
+error:
+       BT_PUT(iterator);
+
 end:
        return iterator;
 }
@@ -173,8 +455,660 @@ end:
        return notification;
 }
 
-enum bt_notification_iterator_status
-bt_notification_iterator_next(struct bt_notification_iterator *iterator)
+static
+bool validate_notification(struct bt_notification_iterator *iterator,
+               struct bt_notification *notif,
+               struct bt_ctf_stream *notif_stream,
+               struct bt_ctf_packet *notif_packet)
+{
+       bool is_valid = true;
+       struct stream_state *stream_state;
+       struct bt_port *stream_comp_cur_port;
+
+       assert(notif_stream);
+       stream_comp_cur_port =
+               bt_ctf_stream_port_for_component(notif_stream,
+                       iterator->upstream_component);
+       if (!stream_comp_cur_port) {
+               /*
+                * This is the first time this notification iterator
+                * bumps into this stream. Add an action to map the
+                * iterator's upstream component to the iterator's
+                * upstream port in this stream.
+                */
+               struct action action = {
+                       .type = ACTION_TYPE_MAP_PORT_TO_COMP_IN_STREAM,
+                       .payload.map_port_to_comp_in_stream = {
+                               .stream = bt_get(notif_stream),
+                               .component = bt_get(iterator->upstream_component),
+                               .port = bt_get(iterator->upstream_port),
+                       },
+               };
+
+               add_action(iterator, &action);
+       } else {
+               if (stream_comp_cur_port != iterator->upstream_port) {
+                       /*
+                        * It looks like two different ports of the same
+                        * component are emitting notifications which
+                        * have references to the same stream. This is
+                        * bad: the API guarantees that it can never
+                        * happen.
+                        */
+                       is_valid = false;
+                       goto end;
+               }
+
+       }
+
+       stream_state = g_hash_table_lookup(iterator->stream_states,
+               notif_stream);
+       if (stream_state) {
+               if (stream_state->is_ended) {
+                       /*
+                        * There's a new notification which has a
+                        * reference to a stream which, from this
+                        * iterator's point of view, is ended ("end of
+                        * stream" notification was returned). This is
+                        * bad: the API guarantees that it can never
+                        * happen.
+                        */
+                       is_valid = false;
+                       goto end;
+               }
+
+               switch (notif->type) {
+               case BT_NOTIFICATION_TYPE_STREAM_BEGIN:
+                       /*
+                        * We already have a stream state, which means
+                        * we already returned a "stream begin"
+                        * notification: this is an invalid duplicate.
+                        */
+                       is_valid = false;
+                       goto end;
+               case BT_NOTIFICATION_TYPE_PACKET_BEGIN:
+                       if (notif_packet == stream_state->cur_packet) {
+                               /* Duplicate "packet begin" notification */
+                               is_valid = false;
+                               goto end;
+                       }
+                       break;
+               default:
+                       break;
+               }
+       }
+
+end:
+       return is_valid;
+}
+
+static
+void add_action_push_notif(struct bt_notification_iterator *iterator,
+               struct bt_notification *notif)
+{
+       struct action action = {
+               .type = ACTION_TYPE_PUSH_NOTIF,
+               .payload.push_notif = {
+                       .notif = bt_get(notif),
+               },
+       };
+
+       assert(notif);
+       add_action(iterator, &action);
+}
+
+static
+int add_action_push_notif_stream_begin(
+               struct bt_notification_iterator *iterator,
+               struct bt_ctf_stream *stream)
+{
+       int ret = 0;
+       struct bt_notification *stream_begin_notif = NULL;
+
+       assert(stream);
+       stream_begin_notif = bt_notification_stream_begin_create(stream);
+       if (!stream_begin_notif) {
+               goto error;
+       }
+
+       add_action_push_notif(iterator, stream_begin_notif);
+       goto end;
+
+error:
+       ret = -1;
+
+end:
+       bt_put(stream_begin_notif);
+       return ret;
+}
+
+static
+int add_action_push_notif_stream_end(
+               struct bt_notification_iterator *iterator,
+               struct bt_ctf_stream *stream)
+{
+       int ret = 0;
+       struct bt_notification *stream_end_notif = NULL;
+
+       assert(stream);
+       stream_end_notif = bt_notification_stream_end_create(stream);
+       if (!stream_end_notif) {
+               goto error;
+       }
+
+       add_action_push_notif(iterator, stream_end_notif);
+       goto end;
+
+error:
+       ret = -1;
+
+end:
+       bt_put(stream_end_notif);
+       return ret;
+}
+
+static
+int add_action_push_notif_packet_begin(
+               struct bt_notification_iterator *iterator,
+               struct bt_ctf_packet *packet)
+{
+       int ret = 0;
+       struct bt_notification *packet_begin_notif = NULL;
+
+       assert(packet);
+       packet_begin_notif = bt_notification_packet_begin_create(packet);
+       if (!packet_begin_notif) {
+               goto error;
+       }
+
+       add_action_push_notif(iterator, packet_begin_notif);
+       goto end;
+
+error:
+       ret = -1;
+
+end:
+       bt_put(packet_begin_notif);
+       return ret;
+}
+
+static
+int add_action_push_notif_packet_end(
+               struct bt_notification_iterator *iterator,
+               struct bt_ctf_packet *packet)
+{
+       int ret = 0;
+       struct bt_notification *packet_end_notif = NULL;
+
+       assert(packet);
+       packet_end_notif = bt_notification_packet_end_create(packet);
+       if (!packet_end_notif) {
+               goto error;
+       }
+
+       add_action_push_notif(iterator, packet_end_notif);
+       goto end;
+
+error:
+       ret = -1;
+
+end:
+       bt_put(packet_end_notif);
+       return ret;
+}
+
+static
+void add_action_set_stream_state_is_ended(
+               struct bt_notification_iterator *iterator,
+               struct stream_state *stream_state)
+{
+       struct action action = {
+               .type = ACTION_TYPE_SET_STREAM_STATE_IS_ENDED,
+               .payload.set_stream_state_is_ended = {
+                       .stream_state = stream_state,
+               },
+       };
+
+       assert(stream_state);
+       add_action(iterator, &action);
+}
+
+static
+void add_action_set_stream_state_cur_packet(
+               struct bt_notification_iterator *iterator,
+               struct stream_state *stream_state,
+               struct bt_ctf_packet *packet)
+{
+       struct action action = {
+               .type = ACTION_TYPE_SET_STREAM_STATE_CUR_PACKET,
+               .payload.set_stream_state_cur_packet = {
+                       .stream_state = stream_state,
+                       .packet = bt_get(packet),
+               },
+       };
+
+       assert(stream_state);
+       add_action(iterator, &action);
+}
+
+static
+int ensure_stream_state_exists(struct bt_notification_iterator *iterator,
+               struct bt_notification *stream_begin_notif,
+               struct bt_ctf_stream *notif_stream,
+               struct stream_state **stream_state)
+{
+       int ret = 0;
+
+       if (!notif_stream) {
+               /*
+                * The notification does not reference any stream: no
+                * need to get or create a stream state.
+                */
+               goto end;
+       }
+
+       *stream_state = g_hash_table_lookup(iterator->stream_states,
+               notif_stream);
+       if (!*stream_state) {
+               /*
+                * This iterator did not bump into this stream yet:
+                * create a stream state and a "stream begin"
+                * notification.
+                */
+               struct action action = {
+                       .type = ACTION_TYPE_ADD_STREAM_STATE,
+                       .payload.add_stream_state = {
+                               .stream = bt_get(notif_stream),
+                               .stream_state = NULL,
+                       },
+               };
+
+               *stream_state = create_stream_state(notif_stream);
+               if (!stream_state) {
+                       goto error;
+               }
+
+               action.payload.add_stream_state.stream_state =
+                       *stream_state;
+               add_action(iterator, &action);
+
+               if (stream_begin_notif) {
+                       add_action_push_notif(iterator, stream_begin_notif);
+               } else {
+                       ret = add_action_push_notif_stream_begin(iterator,
+                               notif_stream);
+                       if (ret) {
+                               goto error;
+                       }
+               }
+       }
+
+       goto end;
+
+error:
+       destroy_stream_state(*stream_state);
+       ret = -1;
+
+end:
+       return ret;
+}
+
+static
+int handle_packet_switch(struct bt_notification_iterator *iterator,
+               struct bt_notification *packet_begin_notif,
+               struct bt_ctf_packet *new_packet,
+               struct stream_state *stream_state)
+{
+       int ret = 0;
+
+       if (stream_state->cur_packet == new_packet) {
+               goto end;
+       }
+
+       if (stream_state->cur_packet) {
+               /* End of the current packet */
+               ret = add_action_push_notif_packet_end(iterator,
+                       stream_state->cur_packet);
+               if (ret) {
+                       goto error;
+               }
+       }
+
+       /* Beginning of the new packet */
+       if (packet_begin_notif) {
+               add_action_push_notif(iterator, packet_begin_notif);
+       } else if (new_packet) {
+               ret = add_action_push_notif_packet_begin(iterator,
+                       new_packet);
+               if (ret) {
+                       goto error;
+               }
+       }
+
+       add_action_set_stream_state_cur_packet(iterator, stream_state,
+               new_packet);
+       goto end;
+
+error:
+       ret = -1;
+
+end:
+       return ret;
+}
+
+static
+int handle_notif_stream_begin(
+               struct bt_notification_iterator *iterator,
+               struct bt_notification *notif,
+               struct bt_ctf_stream *notif_stream)
+{
+       int ret = 0;
+       struct stream_state *stream_state;
+
+       assert(notif->type == BT_NOTIFICATION_TYPE_STREAM_BEGIN);
+       assert(notif_stream);
+       ret = ensure_stream_state_exists(iterator, notif, notif_stream,
+               &stream_state);
+       if (ret) {
+               goto error;
+       }
+
+       goto end;
+
+error:
+       ret = -1;
+
+end:
+       return ret;
+}
+
+static
+int handle_notif_stream_end(
+               struct bt_notification_iterator *iterator,
+               struct bt_notification *notif,
+               struct bt_ctf_stream *notif_stream)
+{
+       int ret = 0;
+       struct stream_state *stream_state;
+
+       assert(notif->type == BT_NOTIFICATION_TYPE_STREAM_END);
+       assert(notif_stream);
+       ret = ensure_stream_state_exists(iterator, NULL, notif_stream,
+               &stream_state);
+       if (ret) {
+               goto error;
+       }
+
+       ret = handle_packet_switch(iterator, NULL, NULL, stream_state);
+       if (ret) {
+               goto error;
+       }
+
+       add_action_push_notif(iterator, notif);
+       add_action_set_stream_state_is_ended(iterator, stream_state);
+       goto end;
+
+error:
+       ret = -1;
+
+end:
+       return ret;
+}
+
+static
+int handle_notif_packet_begin(
+               struct bt_notification_iterator *iterator,
+               struct bt_notification *notif,
+               struct bt_ctf_stream *notif_stream,
+               struct bt_ctf_packet *notif_packet)
+{
+       int ret = 0;
+       struct stream_state *stream_state;
+
+       assert(notif->type == BT_NOTIFICATION_TYPE_PACKET_BEGIN);
+       assert(notif_packet);
+       ret = ensure_stream_state_exists(iterator, NULL, notif_stream,
+               &stream_state);
+       if (ret) {
+               goto error;
+       }
+
+       ret = handle_packet_switch(iterator, notif, notif_packet, stream_state);
+       if (ret) {
+               goto error;
+       }
+
+       goto end;
+
+error:
+       ret = -1;
+
+end:
+       return ret;
+}
+
+static
+int handle_notif_packet_end(
+               struct bt_notification_iterator *iterator,
+               struct bt_notification *notif,
+               struct bt_ctf_stream *notif_stream,
+               struct bt_ctf_packet *notif_packet)
+{
+       int ret = 0;
+       struct stream_state *stream_state;
+
+       assert(notif->type == BT_NOTIFICATION_TYPE_PACKET_END);
+       assert(notif_packet);
+       ret = ensure_stream_state_exists(iterator, NULL, notif_stream,
+               &stream_state);
+       if (ret) {
+               goto error;
+       }
+
+       ret = handle_packet_switch(iterator, NULL, notif_packet, stream_state);
+       if (ret) {
+               goto error;
+       }
+
+       /* End of the current packet */
+       add_action_push_notif(iterator, notif);
+       add_action_set_stream_state_cur_packet(iterator, stream_state, NULL);
+       goto end;
+
+error:
+       ret = -1;
+
+end:
+       return ret;
+}
+
+static
+int handle_notif_event(
+               struct bt_notification_iterator *iterator,
+               struct bt_notification *notif,
+               struct bt_ctf_stream *notif_stream,
+               struct bt_ctf_packet *notif_packet)
+{
+       int ret = 0;
+       struct stream_state *stream_state;
+
+       assert(notif->type == BT_NOTIFICATION_TYPE_EVENT);
+       assert(notif_packet);
+       ret = ensure_stream_state_exists(iterator, NULL, notif_stream,
+               &stream_state);
+       if (ret) {
+               goto error;
+       }
+
+       ret = handle_packet_switch(iterator, NULL, notif_packet, stream_state);
+       if (ret) {
+               goto error;
+       }
+
+       add_action_push_notif(iterator, notif);
+       goto end;
+
+error:
+       ret = -1;
+
+end:
+       return ret;
+}
+
+static
+int enqueue_notification_and_automatic(
+               struct bt_notification_iterator *iterator,
+               struct bt_notification *notif)
+{
+       int ret = 0;
+       struct bt_ctf_event *notif_event = NULL;
+       struct bt_ctf_stream *notif_stream = NULL;
+       struct bt_ctf_packet *notif_packet = NULL;
+
+       assert(notif);
+
+       /* Get the stream and packet referred by the notification */
+       switch (notif->type) {
+       case BT_NOTIFICATION_TYPE_EVENT:
+               notif_event = bt_notification_event_borrow_event(notif);
+               assert(notif_event);
+               notif_packet = bt_ctf_event_borrow_packet(notif_event);
+               assert(notif_packet);
+               break;
+       case BT_NOTIFICATION_TYPE_STREAM_BEGIN:
+               notif_stream =
+                       bt_notification_stream_begin_borrow_stream(notif);
+               assert(notif_stream);
+               break;
+       case BT_NOTIFICATION_TYPE_STREAM_END:
+               notif_stream = bt_notification_stream_end_borrow_stream(notif);
+               assert(notif_stream);
+               break;
+       case BT_NOTIFICATION_TYPE_PACKET_BEGIN:
+               notif_packet =
+                       bt_notification_packet_begin_borrow_packet(notif);
+               assert(notif_packet);
+               break;
+       case BT_NOTIFICATION_TYPE_PACKET_END:
+               notif_packet = bt_notification_packet_end_borrow_packet(notif);
+               assert(notif_packet);
+               break;
+       case BT_NOTIFICATION_TYPE_INACTIVITY:
+               /* Always valid */
+               break;
+       default:
+               /*
+                * Invalid type of notification. Only the notification
+                * types above are allowed to be returned by a user
+                * component.
+                */
+               goto error;
+       }
+
+       if (notif_packet) {
+               notif_stream = bt_ctf_packet_borrow_stream(notif_packet);
+               assert(notif_stream);
+       }
+
+       if (!notif_stream) {
+               /*
+                * The notification has no reference to a stream: it
+                * cannot cause the creation of automatic notifications.
+                */
+               goto end;
+       }
+
+       if (!validate_notification(iterator, notif, notif_stream,
+                       notif_packet)) {
+               goto error;
+       }
+
+       switch (notif->type) {
+       case BT_NOTIFICATION_TYPE_EVENT:
+               ret = handle_notif_event(iterator, notif, notif_stream,
+                       notif_packet);
+               break;
+       case BT_NOTIFICATION_TYPE_STREAM_BEGIN:
+               ret = handle_notif_stream_begin(iterator, notif, notif_stream);
+               break;
+       case BT_NOTIFICATION_TYPE_STREAM_END:
+               ret = handle_notif_stream_end(iterator, notif, notif_stream);
+               break;
+       case BT_NOTIFICATION_TYPE_PACKET_BEGIN:
+               ret = handle_notif_packet_begin(iterator, notif, notif_stream,
+                       notif_packet);
+               break;
+       case BT_NOTIFICATION_TYPE_PACKET_END:
+               ret = handle_notif_packet_end(iterator, notif, notif_stream,
+                       notif_packet);
+               break;
+       default:
+               break;
+       }
+
+       if (ret) {
+               goto error;
+       }
+
+       apply_actions(iterator);
+       goto end;
+
+error:
+       ret = -1;
+
+end:
+       return ret;
+}
+
+static
+int handle_end(struct bt_notification_iterator *iterator)
+{
+       GHashTableIter stream_state_iter;
+       gpointer stream_gptr, stream_state_gptr;
+       int ret = 0;
+
+       /*
+        * Emit a "stream end" notification for each non-ended stream
+        * known by this iterator and mark them as ended.
+        */
+       g_hash_table_iter_init(&stream_state_iter, iterator->stream_states);
+
+       while (g_hash_table_iter_next(&stream_state_iter, &stream_gptr,
+                       &stream_state_gptr)) {
+               struct stream_state *stream_state = stream_state_gptr;
+
+               assert(stream_state_gptr);
+
+               if (stream_state->is_ended) {
+                       continue;
+               }
+
+               ret = handle_packet_switch(iterator, NULL, NULL, stream_state);
+               if (ret) {
+                       goto error;
+               }
+
+               ret = add_action_push_notif_stream_end(iterator, stream_gptr);
+               if (ret) {
+                       goto error;
+               }
+
+               add_action_set_stream_state_is_ended(iterator, stream_state);
+       }
+
+       apply_actions(iterator);
+       goto end;
+
+error:
+       ret = -1;
+
+end:
+       return ret;
+}
+
+static
+enum bt_notification_iterator_status ensure_queue_has_notifications(
+               struct bt_notification_iterator *iterator)
 {
        struct bt_private_notification_iterator *priv_iterator =
                bt_private_notification_iterator_from_notification_iterator(iterator);
@@ -183,20 +1117,31 @@ bt_notification_iterator_next(struct bt_notification_iterator *iterator)
                .status = BT_NOTIFICATION_ITERATOR_STATUS_OK,
                .notification = NULL,
        };
+       enum bt_notification_iterator_status status =
+               BT_NOTIFICATION_ITERATOR_STATUS_OK;
+       int ret;
 
-       if (!iterator) {
-               next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_INVALID;
+       assert(iterator);
+
+       if (iterator->queue->length > 0) {
+               /* We already have enough */
+               goto end;
+       }
+
+       if (iterator->is_ended) {
+               status = BT_NOTIFICATION_ITERATOR_STATUS_END;
                goto end;
        }
 
-       assert(iterator->component);
-       assert(iterator->component->class);
+       assert(iterator->upstream_component);
+       assert(iterator->upstream_component->class);
 
-       switch (iterator->component->class->type) {
+       /* Pick the appropriate "next" method */
+       switch (iterator->upstream_component->class->type) {
        case BT_COMPONENT_CLASS_TYPE_SOURCE:
        {
                struct bt_component_class_source *source_class =
-                       container_of(iterator->component->class,
+                       container_of(iterator->upstream_component->class,
                                struct bt_component_class_source, parent);
 
                assert(source_class->methods.iterator.next);
@@ -206,7 +1151,7 @@ bt_notification_iterator_next(struct bt_notification_iterator *iterator)
        case BT_COMPONENT_CLASS_TYPE_FILTER:
        {
                struct bt_component_class_filter *filter_class =
-                       container_of(iterator->component->class,
+                       container_of(iterator->upstream_component->class,
                                struct bt_component_class_filter, parent);
 
                assert(filter_class->methods.iterator.next);
@@ -218,28 +1163,99 @@ bt_notification_iterator_next(struct bt_notification_iterator *iterator)
                break;
        }
 
+       /*
+        * Call the user's "next" method to get the next notification
+        * and status, skipping the forwarded automatic notifications
+        * if any.
+        */
        assert(next_method);
        next_return = next_method(priv_iterator);
-       if (next_return.status == BT_NOTIFICATION_ITERATOR_STATUS_OK) {
+       if (next_return.status < 0) {
+               status = next_return.status;
+               goto end;
+       }
+
+       switch (next_return.status) {
+       case BT_NOTIFICATION_ITERATOR_STATUS_END:
+               ret = handle_end(iterator);
+               if (ret) {
+                       status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
+                       goto end;
+               }
+
+               if (iterator->queue->length == 0) {
+                       status = BT_NOTIFICATION_ITERATOR_STATUS_END;
+               }
+
+               iterator->is_ended = true;
+               break;
+       case BT_NOTIFICATION_ITERATOR_STATUS_AGAIN:
+               status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
+               break;
+       case BT_NOTIFICATION_ITERATOR_STATUS_OK:
                if (!next_return.notification) {
-                       next_return.status =
-                               BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
+                       status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
                        goto end;
                }
 
-               BT_MOVE(iterator->current_notification,
+               /*
+                * We know the notification is valid. Before we push it
+                * to the head of the queue, push the appropriate
+                * automatic notifications if any.
+                */
+               ret = enqueue_notification_and_automatic(iterator,
                        next_return.notification);
-               bt_notification_freeze(iterator->current_notification);
+               BT_PUT(next_return.notification);
+               if (ret) {
+                       status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
+                       goto end;
+               }
+               break;
+       default:
+               /* Unknown non-error status */
+               assert(false);
        }
 
 end:
-       return next_return.status;
+       return status;
+}
+
+enum bt_notification_iterator_status
+bt_notification_iterator_next(struct bt_notification_iterator *iterator)
+{
+       enum bt_notification_iterator_status status;
+
+       if (!iterator) {
+               status = BT_NOTIFICATION_ITERATOR_STATUS_INVALID;
+               goto end;
+       }
+
+       /*
+        * Make sure that the iterator's queue contains at least one
+        * notification.
+        */
+       status = ensure_queue_has_notifications(iterator);
+       if (status != BT_NOTIFICATION_ITERATOR_STATUS_OK) {
+               goto end;
+       }
+
+       /*
+        * Move the notification at the tail of the queue to the
+        * iterator's current notification.
+        */
+       assert(iterator->queue->length > 0);
+       bt_put(iterator->current_notification);
+       iterator->current_notification = g_queue_pop_tail(iterator->queue);
+       assert(iterator->current_notification);
+
+end:
+       return status;
 }
 
 struct bt_component *bt_notification_iterator_get_component(
                struct bt_notification_iterator *iterator)
 {
-       return bt_get(iterator->component);
+       return bt_get(iterator->upstream_component);
 }
 
 struct bt_private_component *
index 68b9bcdb8b193e2c46375b51f5a29375c8e6ca06..68b999c0c6e0d36c7c02d0eea5f8b58656684b17 100644 (file)
@@ -25,6 +25,7 @@
  */
 
 #include <babeltrace/compiler-internal.h>
+#include <babeltrace/ctf-ir/stream-internal.h>
 #include <babeltrace/graph/notification-stream-internal.h>
 
 static
@@ -42,7 +43,7 @@ struct bt_notification *bt_notification_stream_end_create(
 {
        struct bt_notification_stream_end *notification;
 
-       if (!stream) {
+       if (!stream || stream->pos.fd >= 0) {
                goto error;
        }
 
@@ -65,3 +66,42 @@ struct bt_ctf_packet *bt_notification_stream_end_get_stream(
                        struct bt_notification_stream_end, parent);
        return bt_get(stream_end->stream);
 }
+
+static
+void bt_notification_stream_begin_destroy(struct bt_object *obj)
+{
+       struct bt_notification_stream_begin *notification =
+                       (struct bt_notification_stream_begin *) obj;
+
+       BT_PUT(notification->stream);
+       g_free(notification);
+}
+
+struct bt_notification *bt_notification_stream_begin_create(
+               struct bt_ctf_stream *stream)
+{
+       struct bt_notification_stream_begin *notification;
+
+       if (!stream || stream->pos.fd >= 0) {
+               goto error;
+       }
+
+       notification = g_new0(struct bt_notification_stream_begin, 1);
+       bt_notification_init(&notification->parent,
+                       BT_NOTIFICATION_TYPE_STREAM_BEGIN,
+                       bt_notification_stream_begin_destroy);
+       notification->stream = bt_get(stream);
+       return &notification->parent;
+error:
+       return NULL;
+}
+
+struct bt_ctf_packet *bt_notification_stream_begin_get_stream(
+               struct bt_notification *notification)
+{
+       struct bt_notification_stream_begin *stream_begin;
+
+       stream_begin = container_of(notification,
+                       struct bt_notification_stream_begin, parent);
+       return bt_get(stream_begin->stream);
+}
index 77c29c13b6e8104c8c30978b13dfe7a68ca107b0..771937ed8782a57d4c07a9dddd5103c176da4c38 100644 (file)
@@ -943,10 +943,9 @@ void plugin_comp_class_destroy_listener(struct bt_component_class *comp_class,
 }
 
 BT_HIDDEN
-int bt_plugin_so_on_add_component_class(struct bt_plugin *plugin,
+void bt_plugin_so_on_add_component_class(struct bt_plugin *plugin,
                struct bt_component_class *comp_class)
 {
-       int ret;
        struct bt_plugin_so_spec_data *spec = plugin->spec_data;
 
        assert(plugin->spec_data);
@@ -957,18 +956,6 @@ int bt_plugin_so_on_add_component_class(struct bt_plugin *plugin,
                bt_get(spec->shared_lib_handle));
 
        /* Add our custom destroy listener */
-       ret = bt_component_class_add_destroy_listener(comp_class,
+       bt_component_class_add_destroy_listener(comp_class,
                plugin_comp_class_destroy_listener, NULL);
-       if (ret) {
-               goto error;
-       }
-       goto end;
-
-error:
-       /* Remove entry from global hash table (if exists) */
-       g_hash_table_remove(comp_classes_to_shlib_handles,
-               comp_class);
-
-end:
-       return ret;
 }
index 7b9644d3e088b2dd1b353ccd2089adc68c820f99..d85604afbd89a2e15ad15a5b402dbc19bcca72b0 100644 (file)
@@ -555,7 +555,6 @@ enum bt_plugin_status bt_plugin_add_component_class(
 {
        enum bt_plugin_status status = BT_PLUGIN_STATUS_OK;
        struct bt_component_class *comp_class_dup = NULL;
-       int ret;
        int comp_class_index = -1;
 
        if (!plugin || !comp_class || plugin->frozen) {
@@ -580,10 +579,7 @@ enum bt_plugin_status bt_plugin_add_component_class(
 
        /* Special case for a shared object plugin */
        if (plugin->type == BT_PLUGIN_TYPE_SO) {
-               ret = bt_plugin_so_on_add_component_class(plugin, comp_class);
-               if (ret) {
-                       goto error;
-               }
+               bt_plugin_so_on_add_component_class(plugin, comp_class);
        }
 
        goto end;
index 8b0e5209de3b5c574812900a11a9861e18beca16..9c369b3986a209dc26c2668ed99dd46207975a4f 100644 (file)
 
 #include "tap/tap.h"
 
-#define NR_TESTS       14
+#define NR_TESTS       12
 
 enum test {
        TEST_NO_TS,
        TEST_NO_UPSTREAM_CONNECTION,
-       TEST_EVENT_NOTIF_VS_THE_REST,
        TEST_SIMPLE_4_PORTS,
        TEST_4_PORTS_WITH_RETRIES,
        TEST_SINGLE_END_THEN_MULTIPLE_FULL,
@@ -73,6 +72,8 @@ enum test_event_type {
        TEST_EV_TYPE_NOTIF_INACTIVITY,
        TEST_EV_TYPE_NOTIF_PACKET_BEGIN,
        TEST_EV_TYPE_NOTIF_PACKET_END,
+       TEST_EV_TYPE_NOTIF_STREAM_BEGIN,
+       TEST_EV_TYPE_NOTIF_STREAM_END,
        TEST_EV_TYPE_AGAIN,
        TEST_EV_TYPE_END,
        TEST_EV_TYPE_SENTINEL,
@@ -104,7 +105,10 @@ static struct bt_clock_class_priority_map *src_empty_cc_prio_map;
 static struct bt_ctf_clock_class *src_clock_class;
 static struct bt_ctf_stream_class *src_stream_class;
 static struct bt_ctf_event_class *src_event_class;
-static struct bt_ctf_packet *src_packet;
+static struct bt_ctf_packet *src_packet0;
+static struct bt_ctf_packet *src_packet1;
+static struct bt_ctf_packet *src_packet2;
+static struct bt_ctf_packet *src_packet3;
 
 enum {
        SEQ_END = -1,
@@ -117,6 +121,7 @@ struct src_iter_user_data {
        size_t iter_index;
        int64_t *seq;
        size_t at;
+       struct bt_ctf_packet *packet;
 };
 
 struct sink_user_data {
@@ -173,14 +178,6 @@ static int64_t seq5[] = {
        1, 4, 189, 1001, SEQ_END,
 };
 
-static int64_t seq6[] = {
-       1, 2, 12, SEQ_PACKET_BEGIN, 14, 19, SEQ_END,
-};
-
-static int64_t seq7[] = {
-       8, 9, SEQ_PACKET_BEGIN, 10, 13, SEQ_PACKET_END, 22, SEQ_END,
-};
-
 static
 void clear_test_events(void)
 {
@@ -208,6 +205,12 @@ void print_test_event(FILE *fp, const struct test_event *event)
        case TEST_EV_TYPE_NOTIF_PACKET_END:
                fprintf(fp, "TEST_EV_TYPE_NOTIF_PACKET_END");
                break;
+       case TEST_EV_TYPE_NOTIF_STREAM_BEGIN:
+               fprintf(fp, "TEST_EV_TYPE_NOTIF_STREAM_BEGIN");
+               break;
+       case TEST_EV_TYPE_NOTIF_STREAM_END:
+               fprintf(fp, "TEST_EV_TYPE_NOTIF_STREAM_END");
+               break;
        case TEST_EV_TYPE_AGAIN:
                fprintf(fp, "TEST_EV_TYPE_AGAIN");
                break;
@@ -363,13 +366,28 @@ void init_static_data(void)
        assert(ret == 0);
        ret = bt_ctf_trace_add_stream_class(trace, src_stream_class);
        assert(ret == 0);
-       stream = bt_ctf_stream_create(src_stream_class, "my-stream");
+       stream = bt_ctf_stream_create(src_stream_class, "stream0");
+       assert(stream);
+       src_packet0 = bt_ctf_packet_create(stream);
+       assert(src_packet0);
+       bt_put(stream);
+       stream = bt_ctf_stream_create(src_stream_class, "stream1");
        assert(stream);
-       src_packet = bt_ctf_packet_create(stream);
-       assert(src_packet);
+       src_packet1 = bt_ctf_packet_create(stream);
+       assert(src_packet0);
+       bt_put(stream);
+       stream = bt_ctf_stream_create(src_stream_class, "stream2");
+       assert(stream);
+       src_packet2 = bt_ctf_packet_create(stream);
+       assert(src_packet0);
+       bt_put(stream);
+       stream = bt_ctf_stream_create(src_stream_class, "stream3");
+       assert(stream);
+       src_packet3 = bt_ctf_packet_create(stream);
+       assert(src_packet0);
+       bt_put(stream);
 
        bt_put(trace);
-       bt_put(stream);
        bt_put(empty_struct_ft);
 }
 
@@ -385,6 +403,10 @@ void fini_static_data(void)
        bt_put(src_clock_class);
        bt_put(src_stream_class);
        bt_put(src_event_class);
+       bt_put(src_packet0);
+       bt_put(src_packet1);
+       bt_put(src_packet2);
+       bt_put(src_packet3);
 }
 
 static
@@ -421,19 +443,29 @@ enum bt_notification_iterator_status src_iter_init(
        user_data->iter_index = port_name[3] - '0';
        bt_put(port);
 
+       switch (user_data->iter_index) {
+       case 0:
+               user_data->packet = src_packet0;
+               break;
+       case 1:
+               user_data->packet = src_packet1;
+               break;
+       case 2:
+               user_data->packet = src_packet2;
+               break;
+       case 3:
+               user_data->packet = src_packet3;
+               break;
+       default:
+               assert(false);
+       }
+
        switch (current_test) {
        case TEST_NO_TS:
                if (user_data->iter_index == 1) {
                        user_data->seq = seq5;
                }
                break;
-       case TEST_EVENT_NOTIF_VS_THE_REST:
-               if (user_data->iter_index == 0) {
-                       user_data->seq = seq6;
-               } else {
-                       user_data->seq = seq7;
-               }
-               break;
        case TEST_SIMPLE_4_PORTS:
                if (user_data->iter_index == 0) {
                        user_data->seq = seq1;
@@ -474,13 +506,14 @@ enum bt_notification_iterator_status src_iter_init(
 }
 
 static
-struct bt_ctf_event *src_create_event(int64_t ts_ns)
+struct bt_ctf_event *src_create_event(struct bt_ctf_packet *packet,
+               int64_t ts_ns)
 {
        struct bt_ctf_event *event = bt_ctf_event_create(src_event_class);
        int ret;
 
        assert(event);
-       ret = bt_ctf_event_set_packet(event, src_packet);
+       ret = bt_ctf_event_set_packet(event, packet);
        assert(ret == 0);
 
        if (ts_ns != -1) {
@@ -519,17 +552,18 @@ struct bt_notification_iterator_next_return src_iter_next_seq(
                break;
        case SEQ_PACKET_BEGIN:
                next_return.notification =
-                       bt_notification_packet_begin_create(src_packet);
+                       bt_notification_packet_begin_create(user_data->packet);
                assert(next_return.notification);
                break;
        case SEQ_PACKET_END:
                next_return.notification =
-                       bt_notification_packet_end_create(src_packet);
+                       bt_notification_packet_end_create(user_data->packet);
                assert(next_return.notification);
                break;
        default:
        {
-               struct bt_ctf_event *event = src_create_event(cur_ts_ns);
+               struct bt_ctf_event *event = src_create_event(
+                       user_data->packet, cur_ts_ns);
 
                assert(event);
                next_return.notification = bt_notification_event_create(event,
@@ -568,10 +602,12 @@ struct bt_notification_iterator_next_return src_iter_next(
                if (user_data->iter_index == 0) {
                        if (user_data->at == 0) {
                                next_return.notification =
-                                       bt_notification_packet_begin_create(src_packet);
+                                       bt_notification_packet_begin_create(
+                                               user_data->packet);
                                assert(next_return.notification);
                        } else if (user_data->at < 6) {
-                               struct bt_ctf_event *event = src_create_event(-1);
+                               struct bt_ctf_event *event = src_create_event(
+                                       user_data->packet, -1);
 
                                assert(event);
                                next_return.notification =
@@ -589,7 +625,6 @@ struct bt_notification_iterator_next_return src_iter_next(
                        next_return = src_iter_next_seq(user_data);
                }
                break;
-       case TEST_EVENT_NOTIF_VS_THE_REST:
        case TEST_SIMPLE_4_PORTS:
        case TEST_4_PORTS_WITH_RETRIES:
                next_return = src_iter_next_seq(user_data);
@@ -660,7 +695,6 @@ enum bt_component_status src_init(
 
        switch (current_test) {
        case TEST_NO_TS:
-       case TEST_EVENT_NOTIF_VS_THE_REST:
                nb_ports = 2;
                break;
        case TEST_SINGLE_END_THEN_MULTIPLE_FULL:
@@ -822,6 +856,12 @@ enum bt_component_status sink_consume(
        case BT_NOTIFICATION_TYPE_PACKET_END:
                test_event.type = TEST_EV_TYPE_NOTIF_PACKET_END;
                break;
+       case BT_NOTIFICATION_TYPE_STREAM_BEGIN:
+               test_event.type = TEST_EV_TYPE_NOTIF_STREAM_BEGIN;
+               break;
+       case BT_NOTIFICATION_TYPE_STREAM_END:
+               test_event.type = TEST_EV_TYPE_NOTIF_STREAM_END;
+               break;
        default:
                test_event.type = TEST_EV_TYPE_NOTIF_UNEXPECTED;
                break;
@@ -1008,16 +1048,23 @@ static
 void test_no_ts(void)
 {
        const struct test_event expected_test_events[] = {
+               { .type = TEST_EV_TYPE_NOTIF_STREAM_BEGIN, },
+               { .type = TEST_EV_TYPE_NOTIF_PACKET_BEGIN, },
+               { .type = TEST_EV_TYPE_NOTIF_STREAM_BEGIN, },
                { .type = TEST_EV_TYPE_NOTIF_PACKET_BEGIN, },
                { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = -1, },
                { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = -1, },
                { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = -1, },
                { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = -1, },
                { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = -1, },
+               { .type = TEST_EV_TYPE_NOTIF_PACKET_END, },
+               { .type = TEST_EV_TYPE_NOTIF_STREAM_END, },
                { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 1, },
                { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 4, },
                { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 189, },
                { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 1001, },
+               { .type = TEST_EV_TYPE_NOTIF_PACKET_END, },
+               { .type = TEST_EV_TYPE_NOTIF_STREAM_END, },
                { .type = TEST_EV_TYPE_END, },
                { .type = TEST_EV_TYPE_SENTINEL, },
        };
@@ -1039,35 +1086,17 @@ void test_no_upstream_connection(void)
 }
 
 static
-void test_event_notif_vs_the_rest(void)
+void test_simple_4_ports(void)
 {
        const struct test_event expected_test_events[] = {
-               { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 1, },
-               { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 2, },
-               { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 8, },
-               { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 9, },
+               { .type = TEST_EV_TYPE_NOTIF_STREAM_BEGIN, },
                { .type = TEST_EV_TYPE_NOTIF_PACKET_BEGIN, },
-               { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 10, },
-               { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 12, },
+               { .type = TEST_EV_TYPE_NOTIF_STREAM_BEGIN, },
+               { .type = TEST_EV_TYPE_NOTIF_PACKET_BEGIN, },
+               { .type = TEST_EV_TYPE_NOTIF_STREAM_BEGIN, },
+               { .type = TEST_EV_TYPE_NOTIF_PACKET_BEGIN, },
+               { .type = TEST_EV_TYPE_NOTIF_STREAM_BEGIN, },
                { .type = TEST_EV_TYPE_NOTIF_PACKET_BEGIN, },
-               { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 13, },
-               { .type = TEST_EV_TYPE_NOTIF_PACKET_END, },
-               { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 14, },
-               { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 19, },
-               { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 22, },
-               { .type = TEST_EV_TYPE_END, },
-               { .type = TEST_EV_TYPE_SENTINEL, },
-       };
-
-       do_std_test(TEST_EVENT_NOTIF_VS_THE_REST, "event notifications vs. the rest",
-               expected_test_events, true);
-}
-
-
-static
-void test_simple_4_ports(void)
-{
-       const struct test_event expected_test_events[] = {
                { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 8 },
                { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 24 },
                { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 41 },
@@ -1146,6 +1175,8 @@ void test_simple_4_ports(void)
                { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 820 },
                { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 825 },
                { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 836 },
+               { .type = TEST_EV_TYPE_NOTIF_PACKET_END, },
+               { .type = TEST_EV_TYPE_NOTIF_STREAM_END, },
                { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 850 },
                { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 852 },
                { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 857 },
@@ -1162,8 +1193,14 @@ void test_simple_4_ports(void)
                { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 956 },
                { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 985 },
                { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 996 },
+               { .type = TEST_EV_TYPE_NOTIF_PACKET_END, },
+               { .type = TEST_EV_TYPE_NOTIF_STREAM_END, },
                { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 998 },
+               { .type = TEST_EV_TYPE_NOTIF_PACKET_END, },
+               { .type = TEST_EV_TYPE_NOTIF_STREAM_END, },
                { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 999 },
+               { .type = TEST_EV_TYPE_NOTIF_PACKET_END, },
+               { .type = TEST_EV_TYPE_NOTIF_STREAM_END, },
                { .type = TEST_EV_TYPE_END, },
                { .type = TEST_EV_TYPE_SENTINEL, },
        };
@@ -1177,6 +1214,14 @@ void test_4_ports_with_retries(void)
 {
        const struct test_event expected_test_events[] = {
                { .type = TEST_EV_TYPE_AGAIN, },
+               { .type = TEST_EV_TYPE_NOTIF_STREAM_BEGIN, },
+               { .type = TEST_EV_TYPE_NOTIF_PACKET_BEGIN, },
+               { .type = TEST_EV_TYPE_NOTIF_STREAM_BEGIN, },
+               { .type = TEST_EV_TYPE_NOTIF_PACKET_BEGIN, },
+               { .type = TEST_EV_TYPE_NOTIF_STREAM_BEGIN, },
+               { .type = TEST_EV_TYPE_NOTIF_PACKET_BEGIN, },
+               { .type = TEST_EV_TYPE_NOTIF_STREAM_BEGIN, },
+               { .type = TEST_EV_TYPE_NOTIF_PACKET_BEGIN, },
                { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 8 },
                { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 24 },
                { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 41 },
@@ -1258,6 +1303,8 @@ void test_4_ports_with_retries(void)
                { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 825 },
                { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 836 },
                { .type = TEST_EV_TYPE_AGAIN, },
+               { .type = TEST_EV_TYPE_NOTIF_PACKET_END, },
+               { .type = TEST_EV_TYPE_NOTIF_STREAM_END, },
                { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 850 },
                { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 852 },
                { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 857 },
@@ -1274,8 +1321,14 @@ void test_4_ports_with_retries(void)
                { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 956 },
                { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 985 },
                { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 996 },
+               { .type = TEST_EV_TYPE_NOTIF_PACKET_END, },
+               { .type = TEST_EV_TYPE_NOTIF_STREAM_END, },
                { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 998 },
+               { .type = TEST_EV_TYPE_NOTIF_PACKET_END, },
+               { .type = TEST_EV_TYPE_NOTIF_STREAM_END, },
                { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 999 },
+               { .type = TEST_EV_TYPE_NOTIF_PACKET_END, },
+               { .type = TEST_EV_TYPE_NOTIF_STREAM_END, },
                { .type = TEST_EV_TYPE_END, },
                { .type = TEST_EV_TYPE_SENTINEL, },
        };
@@ -1356,6 +1409,10 @@ void test_single_end_then_multiple_full(void)
        enum bt_graph_status graph_status = BT_GRAPH_STATUS_OK;
        struct graph_listener_data graph_listener_data;
        const struct test_event expected_test_events[] = {
+               { .type = TEST_EV_TYPE_NOTIF_STREAM_BEGIN, },
+               { .type = TEST_EV_TYPE_NOTIF_PACKET_BEGIN, },
+               { .type = TEST_EV_TYPE_NOTIF_STREAM_BEGIN, },
+               { .type = TEST_EV_TYPE_NOTIF_PACKET_BEGIN, },
                { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 8 },
                { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 51 },
                { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 59 },
@@ -1396,11 +1453,15 @@ void test_single_end_then_multiple_full(void)
                { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 819 },
                { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 820 },
                { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 836 },
+               { .type = TEST_EV_TYPE_NOTIF_PACKET_END, },
+               { .type = TEST_EV_TYPE_NOTIF_STREAM_END, },
                { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 857 },
                { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 892 },
                { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 903 },
                { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 944 },
                { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 998 },
+               { .type = TEST_EV_TYPE_NOTIF_PACKET_END, },
+               { .type = TEST_EV_TYPE_NOTIF_STREAM_END, },
                { .type = TEST_EV_TYPE_END, },
                { .type = TEST_EV_TYPE_SENTINEL, },
        };
@@ -1477,6 +1538,10 @@ void test_single_again_end_then_multiple_full(void)
        struct graph_listener_data graph_listener_data;
        const struct test_event expected_test_events[] = {
                { .type = TEST_EV_TYPE_AGAIN, },
+               { .type = TEST_EV_TYPE_NOTIF_STREAM_BEGIN, },
+               { .type = TEST_EV_TYPE_NOTIF_PACKET_BEGIN, },
+               { .type = TEST_EV_TYPE_NOTIF_STREAM_BEGIN, },
+               { .type = TEST_EV_TYPE_NOTIF_PACKET_BEGIN, },
                { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 8 },
                { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 51 },
                { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 59 },
@@ -1517,11 +1582,15 @@ void test_single_again_end_then_multiple_full(void)
                { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 819 },
                { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 820 },
                { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 836 },
+               { .type = TEST_EV_TYPE_NOTIF_PACKET_END, },
+               { .type = TEST_EV_TYPE_NOTIF_STREAM_END, },
                { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 857 },
                { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 892 },
                { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 903 },
                { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 944 },
                { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 998 },
+               { .type = TEST_EV_TYPE_NOTIF_PACKET_END, },
+               { .type = TEST_EV_TYPE_NOTIF_STREAM_END, },
                { .type = TEST_EV_TYPE_END, },
                { .type = TEST_EV_TYPE_SENTINEL, },
        };
@@ -1593,7 +1662,6 @@ int main(int argc, char **argv)
        init_static_data();
        test_no_ts();
        test_no_upstream_connection();
-       test_event_notif_vs_the_rest();
        test_simple_4_ports();
        test_4_ports_with_retries();
        test_single_end_then_multiple_full();
This page took 0.056284 seconds and 4 git commands to generate.