From 3230ee6b4f3a704958b761daecae835c56938bc9 Mon Sep 17 00:00:00 2001 From: Philippe Proulx Date: Tue, 25 Apr 2017 20:15:09 -0400 Subject: [PATCH] Notification iterator: generate automatic notifications when missing MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit With this patch, a notification iterator object makes sure to always provide a valid sequence of notifications to its user (a component which has access to the private connection). "Valid sequence of notifications" means no "stream end" without a corresponding "stream begin", no "packet end" without a corresponding "packet begin", event notifications are always surrounded by "packet begin" and "packet end" notifications, and packet notifications are always surrounded by stream notifications. To accomplish this, each iterator has its own queue of notifications. When it detects that one or more notifications are missing when it calls the upstream's "next" method, it creates the missing notifications and adds them to the queue before the upstream notification. Upstream can still provide the "stream begin", "stream end", "packet begin", and "packet end" notifications in the correct order. In this case, the iterator does not generate any automatic notification. When the upstream's "next" method returns BT_NOTIFICATION_ITERATOR_STATUS_END, the iterator adds any missing notifications to its queue ("packet end" and "stream end"). To know what is the current packet of a given stream, and if a given stream exists or not from an iterator's point of view, the iterator keeps a hash table of streams to stream states. When the iterator creates a stream state, it gets a reference to the stream, because the stream must exist until its "stream end" notification (which is possibly generated by the iterator). However, when the "stream end" notification occurs, the iterator puts the stream reference and adds to it a destroy listener which is reponsible for removing the stream state entry when the stream is eventually destroyed. The iterator keeps the stream state even if the "stream end" notification was emitted because it also validates that, for a given stream during the iterator's lifetime, only one "stream begin" and one "stream end" notifications are emitted. This patch adds another API constraint: within a given component, a given stream can only be referenced by notifications that are emitted one a given port. The first port which emits a notification which has a reference to a stream should remain the same for the stream's lifetime. This is enforced by keeping a hash table of component to port in each stream. This hash table indicates which port, within a given component, has the right to emit a notification which references this stream. This is validated by each iterator. For each component in this hash table, a stream adds a component destroy listener to get notified when it should remove an entry from the hash table. Signed-off-by: Philippe Proulx Signed-off-by: Jérémie Galarneau --- include/babeltrace/ctf-ir/packet-internal.h | 9 + include/babeltrace/ctf-ir/stream-internal.h | 51 + include/babeltrace/ctf-ir/trace-internal.h | 31 + include/babeltrace/ctf-ir/trace.h | 31 - .../graph/component-class-internal.h | 2 +- include/babeltrace/graph/component-internal.h | 19 + .../graph/notification-event-internal.h | 25 + .../graph/notification-iterator-internal.h | 43 +- .../babeltrace/graph/notification-iterator.h | 1 - .../graph/notification-packet-internal.h | 25 + .../babeltrace/graph/notification-packet.h | 14 +- .../graph/notification-stream-internal.h | 30 + .../babeltrace/graph/notification-stream.h | 12 +- include/babeltrace/graph/notification.h | 43 +- .../babeltrace/plugin/plugin-so-internal.h | 2 +- lib/ctf-ir/stream.c | 124 ++ lib/graph/component-class.c | 13 +- lib/graph/component.c | 56 + lib/graph/connection.c | 3 +- lib/graph/iterator.c | 1076 ++++++++++++++++- lib/graph/notification/stream.c | 42 +- lib/plugin/plugin-so.c | 17 +- lib/plugin/plugin.c | 6 +- tests/plugins/test-utils-muxer.c | 182 ++- 24 files changed, 1656 insertions(+), 201 deletions(-) diff --git a/include/babeltrace/ctf-ir/packet-internal.h b/include/babeltrace/ctf-ir/packet-internal.h index 275524ba..609717f6 100644 --- a/include/babeltrace/ctf-ir/packet-internal.h +++ b/include/babeltrace/ctf-ir/packet-internal.h @@ -29,6 +29,7 @@ #include #include #include +#include struct bt_ctf_packet { struct bt_object base; @@ -41,4 +42,12 @@ struct bt_ctf_packet { BT_HIDDEN void bt_ctf_packet_freeze(struct bt_ctf_packet *packet); +static inline +struct bt_ctf_stream *bt_ctf_packet_borrow_stream( + struct bt_ctf_packet *packet) +{ + assert(packet); + return packet->stream; +} + #endif /* BABELTRACE_CTF_IR_PACKET_INTERNAL_H */ diff --git a/include/babeltrace/ctf-ir/stream-internal.h b/include/babeltrace/ctf-ir/stream-internal.h index c101236e..a54cd57b 100644 --- a/include/babeltrace/ctf-ir/stream-internal.h +++ b/include/babeltrace/ctf-ir/stream-internal.h @@ -36,6 +36,17 @@ #include #include +struct bt_port; +struct bt_component; + +typedef void (*bt_ctf_stream_destroy_listener_func)( + struct bt_ctf_stream *stream, void *data); + +struct bt_ctf_stream_destroy_listener { + bt_ctf_stream_destroy_listener_func func; + void *data; +}; + struct bt_ctf_stream { struct bt_object base; uint32_t id; @@ -44,15 +55,55 @@ struct bt_ctf_stream { struct bt_ctf_field *packet_header; struct bt_ctf_field *packet_context; + /* + * When a notification which contains a reference to a stream + * object (event notification, for example) is returned by the + * "next" method of a sink or filter component's notification + * iterator, it must NOT be returned by the "next" method of a + * notification iterator which iterates on the notifications of + * another output port of the same component. + * + * To ensure this, the stream object keeps a hash table which + * indicates which port, for a given component, is currently + * allowed to emit notifications which contain a reference to + * this stream. + * + * This is a `struct bt_component *` to `struct bt_port *` hash + * table. Both pointers are weak references because there's no + * need to keep one or the other alive as far as this stream is + * concerned. + */ + GHashTable *comp_cur_port; + /* Writer-specific members. */ /* Array of pointers to bt_ctf_event for the current packet */ GPtrArray *events; struct bt_ctf_stream_pos pos; unsigned int flushed_packet_count; uint64_t size; + + /* Array of struct bt_ctf_stream_destroy_listener */ + GArray *destroy_listeners; }; BT_HIDDEN int bt_ctf_stream_set_fd(struct bt_ctf_stream *stream, int fd); +BT_HIDDEN +void bt_ctf_stream_map_component_to_port(struct bt_ctf_stream *stream, + struct bt_component *comp, + struct bt_port *port); + +BT_HIDDEN +struct bt_port *bt_ctf_stream_port_for_component(struct bt_ctf_stream *stream, + struct bt_component *comp); + +BT_HIDDEN +void bt_ctf_stream_add_destroy_listener(struct bt_ctf_stream *stream, + bt_ctf_stream_destroy_listener_func func, void *data); + +BT_HIDDEN +void bt_ctf_stream_remove_destroy_listener(struct bt_ctf_stream *stream, + bt_ctf_stream_destroy_listener_func func, void *data); + #endif /* BABELTRACE_CTF_WRITER_STREAM_INTERNAL_H */ diff --git a/include/babeltrace/ctf-ir/trace-internal.h b/include/babeltrace/ctf-ir/trace-internal.h index d332a581..eb3e0e27 100644 --- a/include/babeltrace/ctf-ir/trace-internal.h +++ b/include/babeltrace/ctf-ir/trace-internal.h @@ -89,4 +89,35 @@ BT_HIDDEN bool bt_ctf_trace_has_clock_class(struct bt_ctf_trace *trace, struct bt_ctf_clock_class *clock_class); +/** +@brief User function type to use with bt_ctf_trace_add_listener(). + +@param[in] obj New CTF IR object which is part of the trace + class hierarchy. +@param[in] data User data. + +@prenotnull{obj} +*/ +typedef void (*bt_ctf_listener_cb)(struct bt_ctf_object *obj, void *data); + +/** +@brief Adds the trace class modification listener \p listener to + the CTF IR trace class \p trace_class. + +Once you add \p listener to \p trace_class, whenever \p trace_class +is modified, \p listener is called with the new element and with +\p data (user data). + +@param[in] trace_class Trace class to which to add \p listener. +@param[in] listener Modification listener function. +@param[in] data User data. +@returns 0 on success, or a negative value on error. + +@prenotnull{trace_class} +@prenotnull{listener} +@postrefcountsame{trace_class} +*/ +extern int bt_ctf_trace_add_listener(struct bt_ctf_trace *trace_class, + bt_ctf_listener_cb listener, void *data); + #endif /* BABELTRACE_CTF_IR_TRACE_INTERNAL_H */ diff --git a/include/babeltrace/ctf-ir/trace.h b/include/babeltrace/ctf-ir/trace.h index af8b9fcc..91b09f80 100644 --- a/include/babeltrace/ctf-ir/trace.h +++ b/include/babeltrace/ctf-ir/trace.h @@ -725,37 +725,6 @@ extern int bt_ctf_trace_add_stream_class(struct bt_ctf_trace *trace_class, @{ */ -/** -@brief User function type to use with bt_ctf_trace_add_listener(). - -@param[in] obj New CTF IR object which is part of the trace - class hierarchy. -@param[in] data User data. - -@prenotnull{obj} -*/ -typedef void (*bt_ctf_listener_cb)(struct bt_ctf_object *obj, void *data); - -/** -@brief Adds the trace class modification listener \p listener to - the CTF IR trace class \p trace_class. - -Once you add \p listener to \p trace_class, whenever \p trace_class -is modified, \p listener is called with the new element and with -\p data (user data). - -@param[in] trace_class Trace class to which to add \p listener. -@param[in] listener Modification listener function. -@param[in] data User data. -@returns 0 on success, or a negative value on error. - -@prenotnull{trace_class} -@prenotnull{listener} -@postrefcountsame{trace_class} -*/ -extern int bt_ctf_trace_add_listener(struct bt_ctf_trace *trace_class, - bt_ctf_listener_cb listener, void *data); - /** @brief Accepts the visitor \p visitor to visit the hierarchy of the CTF IR trace class \p trace_class. diff --git a/include/babeltrace/graph/component-class-internal.h b/include/babeltrace/graph/component-class-internal.h index 364299c6..5f84dcd4 100644 --- a/include/babeltrace/graph/component-class-internal.h +++ b/include/babeltrace/graph/component-class-internal.h @@ -96,7 +96,7 @@ struct bt_component_class_filter { }; BT_HIDDEN -int bt_component_class_add_destroy_listener(struct bt_component_class *class, +void bt_component_class_add_destroy_listener(struct bt_component_class *class, bt_component_class_destroy_listener_func func, void *data); #endif /* BABELTRACE_COMPONENT_COMPONENT_CLASS_INTERNAL_H */ diff --git a/include/babeltrace/graph/component-internal.h b/include/babeltrace/graph/component-internal.h index 1e7b547a..27b03b09 100644 --- a/include/babeltrace/graph/component-internal.h +++ b/include/babeltrace/graph/component-internal.h @@ -38,6 +38,14 @@ #define DEFAULT_INPUT_PORT_NAME "default" #define DEFAULT_OUTPUT_PORT_NAME "default" +typedef void (*bt_component_destroy_listener_func)( + struct bt_component *class, void *data); + +struct bt_component_destroy_listener { + bt_component_destroy_listener_func func; + void *data; +}; + struct bt_component { struct bt_object base; struct bt_component_class *class; @@ -61,6 +69,9 @@ struct bt_component { /* Input and output ports (weak references) */ GPtrArray *input_ports; GPtrArray *output_ports; + + /* Array of struct bt_component_destroy_listener */ + GArray *destroy_listeners; }; static inline @@ -128,4 +139,12 @@ BT_HIDDEN enum bt_component_status bt_component_remove_port( struct bt_component *component, struct bt_port *port); +BT_HIDDEN +void bt_component_add_destroy_listener(struct bt_component *component, + bt_component_destroy_listener_func func, void *data); + +BT_HIDDEN +void bt_component_remove_destroy_listener(struct bt_component *component, + bt_component_destroy_listener_func func, void *data); + #endif /* BABELTRACE_COMPONENT_COMPONENT_INTERNAL_H */ diff --git a/include/babeltrace/graph/notification-event-internal.h b/include/babeltrace/graph/notification-event-internal.h index 8fffd47e..de4a6a12 100644 --- a/include/babeltrace/graph/notification-event-internal.h +++ b/include/babeltrace/graph/notification-event-internal.h @@ -27,6 +27,7 @@ * SOFTWARE. */ +#include #include #include #include @@ -41,6 +42,30 @@ struct bt_notification_event { struct bt_clock_class_priority_map *cc_prio_map; }; +static inline +struct bt_ctf_event *bt_notification_event_borrow_event( + struct bt_notification *notif) +{ + struct bt_notification_event *notif_event = container_of(notif, + struct bt_notification_event, parent); + + assert(notif_event); + return notif_event->event; +} + +static inline +struct bt_clock_class_priority_map * +bt_notification_event_borrow_clock_class_priority_map( + struct bt_notification *notif) +{ + struct bt_notification_event *notif_event = container_of(notif, + struct bt_notification_event, parent); + + assert(notif_event); + return notif_event->cc_prio_map; +} + + #ifdef __cplusplus } #endif diff --git a/include/babeltrace/graph/notification-iterator-internal.h b/include/babeltrace/graph/notification-iterator-internal.h index d7025669..dc4e4309 100644 --- a/include/babeltrace/graph/notification-iterator-internal.h +++ b/include/babeltrace/graph/notification-iterator-internal.h @@ -5,8 +5,7 @@ * BabelTrace - Notification Iterator Internal * * Copyright 2015 Jérémie Galarneau - * - * Author: Jérémie Galarneau + * Copyright 2017 Philippe Proulx * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -34,10 +33,43 @@ #include #include +struct bt_port; + struct bt_notification_iterator { struct bt_object base; - struct bt_component *component; - struct bt_notification *current_notification; + struct bt_component *upstream_component; /* owned by this */ + struct bt_port *upstream_port; /* owned by this */ + struct bt_notification *current_notification; /* owned by this */ + GQueue *queue; /* struct bt_notification * (owned by this) */ + + /* + * This hash table keeps the state of a stream as viewed by + * this notification iterator. This is used to: + * + * * Automatically enqueue "stream begin", "packet begin", + * "packet end", and "stream end" notifications depending + * on the stream's state and on the next notification returned + * by the upstream component. + * + * * Make sure that, once the notification iterator has seen + * a "stream end" notification for a given stream, that no + * other notifications which refer to this stream can be + * delivered by this iterator. + * + * The key (struct bt_ctf_stream *) is not owned by this. The + * value is an allocated state structure. + */ + GHashTable *stream_states; + + /* + * This is an array of actions which can be rolled back. It's + * similar to the memento pattern, but it's not exactly that. It + * is allocated once and reset for each notification to process. + * More details near the implementation. + */ + GArray *actions; + + bool is_ended; void *user_data; }; @@ -64,7 +96,8 @@ bt_private_notification_iterator_from_notification_iterator( */ BT_HIDDEN struct bt_notification_iterator *bt_notification_iterator_create( - struct bt_component *component); + struct bt_component *upstream_component, + struct bt_port *upstream_port); /** * Validate a notification iterator. diff --git a/include/babeltrace/graph/notification-iterator.h b/include/babeltrace/graph/notification-iterator.h index d8554316..03b53ce8 100644 --- a/include/babeltrace/graph/notification-iterator.h +++ b/include/babeltrace/graph/notification-iterator.h @@ -54,7 +54,6 @@ enum bt_notification_iterator_status { BT_NOTIFICATION_ITERATOR_STATUS_NOMEM = -12, /** Unsupported iterator feature. */ BT_NOTIFICATION_ITERATOR_STATUS_UNSUPPORTED = -2, - }; /** diff --git a/include/babeltrace/graph/notification-packet-internal.h b/include/babeltrace/graph/notification-packet-internal.h index 8d525c55..4d519544 100644 --- a/include/babeltrace/graph/notification-packet-internal.h +++ b/include/babeltrace/graph/notification-packet-internal.h @@ -27,6 +27,7 @@ * SOFTWARE. */ +#include #include #include @@ -40,4 +41,28 @@ struct bt_notification_packet_end { struct bt_ctf_packet *packet; }; +static inline +struct bt_ctf_packet *bt_notification_packet_begin_borrow_packet( + struct bt_notification *notif) +{ + struct bt_notification_packet_begin *notif_packet_begin = + container_of(notif, + struct bt_notification_packet_begin, parent); + + assert(notif_packet_begin); + return notif_packet_begin->packet; +} + +static inline +struct bt_ctf_packet *bt_notification_packet_end_borrow_packet( + struct bt_notification *notif) +{ + struct bt_notification_packet_end *notif_packet_end = + container_of(notif, + struct bt_notification_packet_end, parent); + + assert(notif_packet_end); + return notif_packet_end->packet; +} + #endif /* BABELTRACE_COMPONENT_NOTIFICATION_PACKET_INTERNAL_H */ diff --git a/include/babeltrace/graph/notification-packet.h b/include/babeltrace/graph/notification-packet.h index a15a9692..94480e60 100644 --- a/include/babeltrace/graph/notification-packet.h +++ b/include/babeltrace/graph/notification-packet.h @@ -35,16 +35,18 @@ extern "C" { struct bt_ctf_packet; -/*** BT_NOTIFICATION_TYPE_PACKET_BEGIN ***/ -struct bt_notification *bt_notification_packet_begin_create( +extern struct bt_notification *bt_notification_packet_begin_create( + struct bt_ctf_packet *packet); + +extern struct bt_notification *bt_notification_packet_end_create( struct bt_ctf_packet *packet); -struct bt_ctf_packet *bt_notification_packet_begin_get_packet( + +/*** BT_NOTIFICATION_TYPE_PACKET_BEGIN ***/ +extern struct bt_ctf_packet *bt_notification_packet_begin_get_packet( struct bt_notification *notification); /*** BT_NOTIFICATION_TYPE_PACKET_END ***/ -struct bt_notification *bt_notification_packet_end_create( - struct bt_ctf_packet *packet); -struct bt_ctf_packet *bt_notification_packet_end_get_packet( +extern struct bt_ctf_packet *bt_notification_packet_end_get_packet( struct bt_notification *notification); #ifdef __cplusplus diff --git a/include/babeltrace/graph/notification-stream-internal.h b/include/babeltrace/graph/notification-stream-internal.h index 839c220c..b170e4e6 100644 --- a/include/babeltrace/graph/notification-stream-internal.h +++ b/include/babeltrace/graph/notification-stream-internal.h @@ -27,12 +27,42 @@ * SOFTWARE. */ +#include #include #include +struct bt_notification_stream_begin { + struct bt_notification parent; + struct bt_ctf_stream *stream; +}; + struct bt_notification_stream_end { struct bt_notification parent; struct bt_ctf_stream *stream; }; +static inline +struct bt_ctf_stream *bt_notification_stream_begin_borrow_stream( + struct bt_notification *notif) +{ + struct bt_notification_stream_begin *notif_stream_begin = + container_of(notif, + struct bt_notification_stream_begin, parent); + + assert(notif_stream_begin); + return notif_stream_begin->stream; +} + +static inline +struct bt_ctf_stream *bt_notification_stream_end_borrow_stream( + struct bt_notification *notif) +{ + struct bt_notification_stream_end *notif_stream_end = + container_of(notif, + struct bt_notification_stream_end, parent); + + assert(notif_stream_end); + return notif_stream_end->stream; +} + #endif /* BABELTRACE_COMPONENT_NOTIFICATION_STREAM_INTERNAL_H */ diff --git a/include/babeltrace/graph/notification-stream.h b/include/babeltrace/graph/notification-stream.h index bdcf4c96..67ed5f5b 100644 --- a/include/babeltrace/graph/notification-stream.h +++ b/include/babeltrace/graph/notification-stream.h @@ -34,10 +34,16 @@ extern "C" { #endif -/*** BT_NOTIFICATION_TYPE_STREAM_END ***/ -struct bt_notification *bt_notification_stream_end_create( +extern struct bt_notification *bt_notification_stream_begin_create( struct bt_ctf_stream *stream); -struct bt_ctf_stream *bt_notification_stream_end_get_stream( + +extern struct bt_notification *bt_notification_stream_end_create( + struct bt_ctf_stream *stream); + +extern struct bt_ctf_stream *bt_notification_stream_begin_get_stream( + struct bt_notification *notification); + +extern struct bt_ctf_stream *bt_notification_stream_end_get_stream( struct bt_notification *notification); #ifdef __cplusplus diff --git a/include/babeltrace/graph/notification.h b/include/babeltrace/graph/notification.h index c768baae..ae5e22f9 100644 --- a/include/babeltrace/graph/notification.h +++ b/include/babeltrace/graph/notification.h @@ -37,40 +37,15 @@ struct bt_notification; * Notification types. Unhandled notification types should be ignored. */ enum bt_notification_type { - BT_NOTIFICATION_TYPE_UNKNOWN = -1, - - /** - * All types of notifications (used to register to notification - * delivery). - */ - BT_NOTIFICATION_TYPE_ALL = 0, - - /** Event delivery notification, see event.h */ - BT_NOTIFICATION_TYPE_EVENT = 1, - - /** Beginning of stream packet notification, see packet.h */ - BT_NOTIFICATION_TYPE_PACKET_BEGIN = 2, - - /** End of stream packet notification, see packet.h */ - BT_NOTIFICATION_TYPE_PACKET_END = 3, - - /** End of stream packet notification, see stream.h */ - BT_NOTIFICATION_TYPE_STREAM_END = 4, - - /** New trace notification, see model.h */ - BT_NOTIFICATION_TYPE_NEW_TRACE = 5, - - /** New stream class notification, see model.h */ - BT_NOTIFICATION_TYPE_NEW_STREAM_CLASS = 6, - - /** New event class notification, see model.h */ - BT_NOTIFICATION_TYPE_NEW_EVENT_CLASS = 7, - - /** End of trace notification, see eot.h */ - BT_NOTIFICATION_TYPE_END_OF_TRACE = 8, - - BT_NOTIFICATION_TYPE_INACTIVITY = 9, - + BT_NOTIFICATION_TYPE_SENTINEL = -2, + BT_NOTIFICATION_TYPE_UNKNOWN = -1, + BT_NOTIFICATION_TYPE_ALL = 0, + BT_NOTIFICATION_TYPE_EVENT = 1, + BT_NOTIFICATION_TYPE_INACTIVITY = 2, + BT_NOTIFICATION_TYPE_STREAM_BEGIN = 3, + BT_NOTIFICATION_TYPE_STREAM_END = 4, + BT_NOTIFICATION_TYPE_PACKET_BEGIN = 5, + BT_NOTIFICATION_TYPE_PACKET_END = 6, BT_NOTIFICATION_TYPE_NR, /* Not part of ABI. */ }; diff --git a/include/babeltrace/plugin/plugin-so-internal.h b/include/babeltrace/plugin/plugin-so-internal.h index e2dd51c0..c9d59f5f 100644 --- a/include/babeltrace/plugin/plugin-so-internal.h +++ b/include/babeltrace/plugin/plugin-so-internal.h @@ -61,7 +61,7 @@ BT_HIDDEN struct bt_plugin **bt_plugin_so_create_all_from_static(void); BT_HIDDEN -int bt_plugin_so_on_add_component_class(struct bt_plugin *plugin, +void bt_plugin_so_on_add_component_class(struct bt_plugin *plugin, struct bt_component_class *comp_class); #endif /* BABELTRACE_PLUGIN_PLUGIN_SO_INTERNAL_H */ diff --git a/lib/ctf-ir/stream.c b/lib/ctf-ir/stream.c index 7a4d53ce..949b11e9 100644 --- a/lib/ctf-ir/stream.c +++ b/lib/ctf-ir/stream.c @@ -38,6 +38,7 @@ #include #include #include +#include #include #include #include @@ -340,6 +341,14 @@ end: return ret; } +static +void component_destroy_listener(struct bt_component *component, void *data) +{ + struct bt_ctf_stream *stream = data; + + g_hash_table_remove(stream->comp_cur_port, component); +} + struct bt_ctf_stream *bt_ctf_stream_create( struct bt_ctf_stream_class *stream_class, const char *name) @@ -373,6 +382,12 @@ struct bt_ctf_stream *bt_ctf_stream_create( stream->stream_class = stream_class; stream->pos.fd = -1; + stream->destroy_listeners = g_array_new(FALSE, TRUE, + sizeof(struct bt_ctf_stream_destroy_listener)); + if (!stream->destroy_listeners) { + goto error; + } + if (name) { stream->name = g_string_new(name); if (!stream->name) { @@ -448,6 +463,12 @@ struct bt_ctf_stream *bt_ctf_stream_create( if (ret) { goto error; } + + stream->comp_cur_port = g_hash_table_new(g_direct_hash, + g_direct_equal); + if (!stream->comp_cur_port) { + goto error; + } } /* Add this stream to the trace's streams */ @@ -1095,8 +1116,19 @@ static void bt_ctf_stream_destroy(struct bt_object *obj) { struct bt_ctf_stream *stream; + int i; stream = container_of(obj, struct bt_ctf_stream, base); + + /* Call destroy listeners in reverse registration order */ + for (i = stream->destroy_listeners->len - 1; i >= 0; i--) { + struct bt_ctf_stream_destroy_listener *listener = + &g_array_index(stream->destroy_listeners, + struct bt_ctf_stream_destroy_listener, i); + + listener->func(stream, listener->data); + } + (void) bt_ctf_stream_pos_fini(&stream->pos); if (stream->pos.fd >= 0) { int ret; @@ -1126,6 +1158,32 @@ void bt_ctf_stream_destroy(struct bt_object *obj) g_string_free(stream->name, TRUE); } + if (stream->comp_cur_port) { + GHashTableIter ht_iter; + gpointer comp_gptr, port_gptr; + + /* + * Since we're destroying the stream, remove the destroy + * listeners that it registered for each component in + * its component-port mapping hash table. Otherwise they + * would be called and the stream would be accessed once + * it's freed or another stream would be accessed. + */ + g_hash_table_iter_init(&ht_iter, stream->comp_cur_port); + + while (g_hash_table_iter_next(&ht_iter, &comp_gptr, &port_gptr)) { + assert(comp_gptr); + bt_component_remove_destroy_listener((void *) comp_gptr, + component_destroy_listener, stream); + } + + g_hash_table_destroy(stream->comp_cur_port); + } + + if (stream->destroy_listeners) { + g_array_free(stream->destroy_listeners, TRUE); + } + bt_put(stream->packet_header); bt_put(stream->packet_context); g_free(stream); @@ -1228,3 +1286,69 @@ int bt_ctf_stream_is_writer(struct bt_ctf_stream *stream) end: return ret; } + +BT_HIDDEN +void bt_ctf_stream_map_component_to_port(struct bt_ctf_stream *stream, + struct bt_component *comp, + struct bt_port *port) +{ + assert(stream); + assert(comp); + assert(port); + assert(stream->comp_cur_port); + + /* + * Do not take a reference to the component here because we + * don't want the component to exist as long as this stream + * exists. Instead, keep a weak reference, but add a destroy + * listener so that we remove this hash table entry when we know + * the component is destroyed. + */ + bt_component_add_destroy_listener(comp, component_destroy_listener, + stream); + g_hash_table_insert(stream->comp_cur_port, comp, port); +} + +BT_HIDDEN +struct bt_port *bt_ctf_stream_port_for_component(struct bt_ctf_stream *stream, + struct bt_component *comp) +{ + assert(stream); + assert(comp); + assert(stream->comp_cur_port); + return g_hash_table_lookup(stream->comp_cur_port, comp); +} + +BT_HIDDEN +void bt_ctf_stream_add_destroy_listener(struct bt_ctf_stream *stream, + bt_ctf_stream_destroy_listener_func func, void *data) +{ + struct bt_ctf_stream_destroy_listener listener; + + assert(stream); + assert(func); + listener.func = func; + listener.data = data; + g_array_append_val(stream->destroy_listeners, listener); +} + +BT_HIDDEN +void bt_ctf_stream_remove_destroy_listener(struct bt_ctf_stream *stream, + bt_ctf_stream_destroy_listener_func func, void *data) +{ + size_t i; + + assert(stream); + assert(func); + + for (i = 0; i < stream->destroy_listeners->len; i++) { + struct bt_ctf_stream_destroy_listener *listener = + &g_array_index(stream->destroy_listeners, + struct bt_ctf_stream_destroy_listener, i); + + if (listener->func == func && listener->data == data) { + g_array_remove_index(stream->destroy_listeners, i); + i--; + } + } +} diff --git a/lib/graph/component-class.c b/lib/graph/component-class.c index 28aa04c5..b438f104 100644 --- a/lib/graph/component-class.c +++ b/lib/graph/component-class.c @@ -507,23 +507,16 @@ const char *bt_component_class_get_help( } BT_HIDDEN -int bt_component_class_add_destroy_listener(struct bt_component_class *class, +void bt_component_class_add_destroy_listener(struct bt_component_class *class, bt_component_class_destroy_listener_func func, void *data) { - int ret = 0; struct bt_component_class_destroy_listener listener; - if (!class || class->frozen || !func) { - ret = -1; - goto end; - } - + assert(class); + assert(func); listener.func = func; listener.data = data; g_array_append_val(class->destroy_listeners, listener); - -end: - return ret; } int bt_component_class_freeze( diff --git a/lib/graph/component.c b/lib/graph/component.c index 31abc073..8837c0d1 100644 --- a/lib/graph/component.c +++ b/lib/graph/component.c @@ -70,12 +70,23 @@ void bt_component_destroy(struct bt_object *obj) { struct bt_component *component = NULL; struct bt_component_class *component_class = NULL; + int i; if (!obj) { return; } component = container_of(obj, struct bt_component, base); + + /* Call destroy listeners in reverse registration order */ + for (i = component->destroy_listeners->len - 1; i >= 0; i--) { + struct bt_component_destroy_listener *listener = + &g_array_index(component->destroy_listeners, + struct bt_component_destroy_listener, i); + + listener->func(component, listener->data); + } + component_class = component->class; /* @@ -99,6 +110,10 @@ void bt_component_destroy(struct bt_object *obj) g_ptr_array_free(component->output_ports, TRUE); } + if (component->destroy_listeners) { + g_array_free(component->destroy_listeners, TRUE); + } + g_string_free(component->name, TRUE); bt_put(component_class); g_free(component); @@ -250,6 +265,13 @@ struct bt_component *bt_component_create_with_init_method_data( goto end; } + component->destroy_listeners = g_array_new(FALSE, TRUE, + sizeof(struct bt_component_destroy_listener)); + if (!component->destroy_listeners) { + BT_PUT(component); + goto end; + } + if (type == BT_COMPONENT_CLASS_TYPE_SOURCE || type == BT_COMPONENT_CLASS_TYPE_FILTER) { default_port = bt_component_add_port(component, @@ -599,3 +621,37 @@ void bt_component_port_disconnected(struct bt_component *comp, bt_private_port_from_port(port)); } } + +BT_HIDDEN +void bt_component_add_destroy_listener(struct bt_component *component, + bt_component_destroy_listener_func func, void *data) +{ + struct bt_component_destroy_listener listener; + + assert(component); + assert(func); + listener.func = func; + listener.data = data; + g_array_append_val(component->destroy_listeners, listener); +} + +BT_HIDDEN +void bt_component_remove_destroy_listener(struct bt_component *component, + bt_component_destroy_listener_func func, void *data) +{ + size_t i; + + assert(component); + assert(func); + + for (i = 0; i < component->destroy_listeners->len; i++) { + struct bt_component_destroy_listener *listener = + &g_array_index(component->destroy_listeners, + struct bt_component_destroy_listener, i); + + if (listener->func == func && listener->data == data) { + g_array_remove_index(component->destroy_listeners, i); + i--; + } + } +} diff --git a/lib/graph/connection.c b/lib/graph/connection.c index 986303ed..bfc9c04b 100644 --- a/lib/graph/connection.c +++ b/lib/graph/connection.c @@ -179,7 +179,8 @@ bt_private_connection_create_notification_iterator( goto error; } - iterator = bt_notification_iterator_create(upstream_component); + iterator = bt_notification_iterator_create(upstream_component, + upstream_port); if (!iterator) { goto error; } diff --git a/lib/graph/iterator.c b/lib/graph/iterator.c index 491dddf3..389eec52 100644 --- a/lib/graph/iterator.c +++ b/lib/graph/iterator.c @@ -4,8 +4,7 @@ * Babeltrace Notification Iterator * * Copyright 2015 Jérémie Galarneau - * - * Author: Jérémie Galarneau + * Copyright 2017 Philippe Proulx * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -28,12 +27,234 @@ #include #include +#include +#include +#include #include #include #include #include #include #include +#include +#include +#include +#include +#include +#include +#include + +struct stream_state { + struct bt_ctf_stream *stream; /* owned by this */ + struct bt_ctf_packet *cur_packet; /* owned by this */ + bool is_ended; +}; + +enum action_type { + ACTION_TYPE_PUSH_NOTIF, + ACTION_TYPE_MAP_PORT_TO_COMP_IN_STREAM, + ACTION_TYPE_ADD_STREAM_STATE, + ACTION_TYPE_SET_STREAM_STATE_IS_ENDED, + ACTION_TYPE_SET_STREAM_STATE_CUR_PACKET, +}; + +struct action { + enum action_type type; + union { + /* ACTION_TYPE_PUSH_NOTIF */ + struct { + struct bt_notification *notif; /* owned by this */ + } push_notif; + + /* ACTION_TYPE_MAP_PORT_TO_COMP_IN_STREAM */ + struct { + struct bt_ctf_stream *stream; /* owned by this */ + struct bt_component *component; /* owned by this */ + struct bt_port *port; /* owned by this */ + } map_port_to_comp_in_stream; + + /* ACTION_TYPE_ADD_STREAM_STATE */ + struct { + struct bt_ctf_stream *stream; /* owned by this */ + struct stream_state *stream_state; /* owned by this */ + } add_stream_state; + + /* ACTION_TYPE_SET_STREAM_STATE_IS_ENDED */ + struct { + struct stream_state *stream_state; /* weak */ + } set_stream_state_is_ended; + + /* ACTION_TYPE_SET_STREAM_STATE_CUR_PACKET */ + struct { + struct stream_state *stream_state; /* weak */ + struct bt_ctf_packet *packet; /* owned by this */ + } set_stream_state_cur_packet; + } payload; +}; + +static +void stream_destroy_listener(struct bt_ctf_stream *stream, void *data) +{ + struct bt_notification_iterator *iterator = data; + + /* Remove associated stream state */ + g_hash_table_remove(iterator->stream_states, stream); +} + +static +void destroy_stream_state(struct stream_state *stream_state) +{ + if (!stream_state) { + return; + } + + bt_put(stream_state->cur_packet); + bt_put(stream_state->stream); + g_free(stream_state); +} + +static +void destroy_action(struct action *action) +{ + assert(action); + + switch (action->type) { + case ACTION_TYPE_PUSH_NOTIF: + BT_PUT(action->payload.push_notif.notif); + break; + case ACTION_TYPE_MAP_PORT_TO_COMP_IN_STREAM: + BT_PUT(action->payload.map_port_to_comp_in_stream.stream); + BT_PUT(action->payload.map_port_to_comp_in_stream.component); + BT_PUT(action->payload.map_port_to_comp_in_stream.port); + break; + case ACTION_TYPE_ADD_STREAM_STATE: + BT_PUT(action->payload.add_stream_state.stream); + destroy_stream_state( + action->payload.add_stream_state.stream_state); + action->payload.add_stream_state.stream_state = NULL; + break; + case ACTION_TYPE_SET_STREAM_STATE_CUR_PACKET: + BT_PUT(action->payload.set_stream_state_cur_packet.packet); + break; + case ACTION_TYPE_SET_STREAM_STATE_IS_ENDED: + break; + default: + assert(false); + } +} + +static +void add_action(struct bt_notification_iterator *iterator, + struct action *action) +{ + g_array_append_val(iterator->actions, *action); +} + +static +void clear_actions(struct bt_notification_iterator *iterator) +{ + size_t i; + + for (i = 0; i < iterator->actions->len; i++) { + struct action *action = &g_array_index(iterator->actions, + struct action, i); + + destroy_action(action); + } + + g_array_set_size(iterator->actions, 0); +} + +static +void apply_actions(struct bt_notification_iterator *iterator) +{ + size_t i; + + for (i = 0; i < iterator->actions->len; i++) { + struct action *action = &g_array_index(iterator->actions, + struct action, i); + + switch (action->type) { + case ACTION_TYPE_PUSH_NOTIF: + /* Move notification to queue */ + g_queue_push_head(iterator->queue, + action->payload.push_notif.notif); + bt_notification_freeze( + action->payload.push_notif.notif); + action->payload.push_notif.notif = NULL; + break; + case ACTION_TYPE_MAP_PORT_TO_COMP_IN_STREAM: + bt_ctf_stream_map_component_to_port( + action->payload.map_port_to_comp_in_stream.stream, + action->payload.map_port_to_comp_in_stream.component, + action->payload.map_port_to_comp_in_stream.port); + break; + case ACTION_TYPE_ADD_STREAM_STATE: + /* Move stream state to hash table */ + g_hash_table_insert(iterator->stream_states, + action->payload.add_stream_state.stream, + action->payload.add_stream_state.stream_state); + + action->payload.add_stream_state.stream_state = NULL; + break; + case ACTION_TYPE_SET_STREAM_STATE_IS_ENDED: + /* + * We know that this stream is ended. We need to + * remember this as long as the stream exists to + * enforce that the same stream does not end + * twice. + * + * Here we add a destroy listener to the stream + * which we put after (becomes weak as the hash + * table key). If we were the last object to own + * this stream, the destroy listener is called + * when we call bt_put() which removes this + * stream state completely. This is important + * because the memory used by this stream object + * could be reused for another stream, and they + * must have different states. + */ + bt_ctf_stream_add_destroy_listener( + action->payload.set_stream_state_is_ended.stream_state->stream, + stream_destroy_listener, iterator); + action->payload.set_stream_state_is_ended.stream_state->is_ended = true; + BT_PUT(action->payload.set_stream_state_is_ended.stream_state->stream); + break; + case ACTION_TYPE_SET_STREAM_STATE_CUR_PACKET: + /* Move packet to stream state's current packet */ + BT_MOVE(action->payload.set_stream_state_cur_packet.stream_state->cur_packet, + action->payload.set_stream_state_cur_packet.packet); + break; + default: + assert(false); + } + } + + clear_actions(iterator); +} + +static +struct stream_state *create_stream_state(struct bt_ctf_stream *stream) +{ + struct stream_state *stream_state = g_new0(struct stream_state, 1); + + if (!stream_state) { + goto end; + } + + /* + * We keep a reference to the stream until we know it's ended + * because we need to be able to create an automatic "stream + * end" notification when the user's "next" method returns + * BT_NOTIFICATION_ITERATOR_STATUS_END. + * + * We put this reference when the stream is marked as ended. + */ + stream_state->stream = bt_get(stream); + +end: + return stream_state; +} static void bt_notification_iterator_destroy(struct bt_object *obj) @@ -44,8 +265,8 @@ void bt_notification_iterator_destroy(struct bt_object *obj) assert(obj); iterator = container_of(obj, struct bt_notification_iterator, base); - assert(iterator->component); - comp_class = iterator->component->class; + assert(iterator->upstream_component); + comp_class = iterator->upstream_component->class; /* Call user-defined destroy method */ switch (comp_class->type) { @@ -78,38 +299,99 @@ void bt_notification_iterator_destroy(struct bt_object *obj) assert(0); } - BT_PUT(iterator->current_notification); - BT_PUT(iterator->component); + if (iterator->queue) { + struct bt_notification *notif; + + while ((notif = g_queue_pop_tail(iterator->queue))) { + bt_put(notif); + } + + g_queue_free(iterator->queue); + } + + if (iterator->stream_states) { + /* + * Remove our destroy listener from each stream which + * has a state in this iterator. Otherwise the destroy + * listener would be called with an invalid/other + * notification iterator object. + */ + GHashTableIter ht_iter; + gpointer stream_gptr, stream_state_gptr; + + g_hash_table_iter_init(&ht_iter, iterator->stream_states); + + while (g_hash_table_iter_next(&ht_iter, &stream_gptr, &stream_state_gptr)) { + assert(stream_gptr); + bt_ctf_stream_remove_destroy_listener( + (void *) stream_gptr, stream_destroy_listener, + iterator); + } + + g_hash_table_destroy(iterator->stream_states); + } + + if (iterator->actions) { + g_array_free(iterator->actions, TRUE); + } + + bt_put(iterator->current_notification); + bt_put(iterator->upstream_component); + bt_put(iterator->upstream_port); g_free(iterator); } BT_HIDDEN struct bt_notification_iterator *bt_notification_iterator_create( - struct bt_component *component) + struct bt_component *upstream_comp, + struct bt_port *upstream_port) { enum bt_component_class_type type; struct bt_notification_iterator *iterator = NULL; - if (!component) { - goto end; - } + assert(upstream_comp); + assert(upstream_port); + assert(bt_port_is_connected(upstream_port)); - type = bt_component_get_class_type(component); + type = bt_component_get_class_type(upstream_comp); switch (type) { case BT_COMPONENT_CLASS_TYPE_SOURCE: case BT_COMPONENT_CLASS_TYPE_FILTER: break; default: - goto end; + goto error; } iterator = g_new0(struct bt_notification_iterator, 1); if (!iterator) { - goto end; + goto error; } - iterator->component = bt_get(component); bt_object_init(iterator, bt_notification_iterator_destroy); + + iterator->stream_states = g_hash_table_new_full(g_direct_hash, + g_direct_equal, NULL, (GDestroyNotify) destroy_stream_state); + if (!iterator->stream_states) { + goto error; + } + + iterator->queue = g_queue_new(); + if (!iterator->queue) { + goto error; + } + + iterator->actions = g_array_new(FALSE, FALSE, sizeof(struct action)); + if (!iterator->actions) { + goto error; + } + + iterator->upstream_component = bt_get(upstream_comp); + iterator->upstream_port = bt_get(upstream_port); + goto end; + +error: + BT_PUT(iterator); + end: return iterator; } @@ -173,8 +455,660 @@ end: return notification; } -enum bt_notification_iterator_status -bt_notification_iterator_next(struct bt_notification_iterator *iterator) +static +bool validate_notification(struct bt_notification_iterator *iterator, + struct bt_notification *notif, + struct bt_ctf_stream *notif_stream, + struct bt_ctf_packet *notif_packet) +{ + bool is_valid = true; + struct stream_state *stream_state; + struct bt_port *stream_comp_cur_port; + + assert(notif_stream); + stream_comp_cur_port = + bt_ctf_stream_port_for_component(notif_stream, + iterator->upstream_component); + if (!stream_comp_cur_port) { + /* + * This is the first time this notification iterator + * bumps into this stream. Add an action to map the + * iterator's upstream component to the iterator's + * upstream port in this stream. + */ + struct action action = { + .type = ACTION_TYPE_MAP_PORT_TO_COMP_IN_STREAM, + .payload.map_port_to_comp_in_stream = { + .stream = bt_get(notif_stream), + .component = bt_get(iterator->upstream_component), + .port = bt_get(iterator->upstream_port), + }, + }; + + add_action(iterator, &action); + } else { + if (stream_comp_cur_port != iterator->upstream_port) { + /* + * It looks like two different ports of the same + * component are emitting notifications which + * have references to the same stream. This is + * bad: the API guarantees that it can never + * happen. + */ + is_valid = false; + goto end; + } + + } + + stream_state = g_hash_table_lookup(iterator->stream_states, + notif_stream); + if (stream_state) { + if (stream_state->is_ended) { + /* + * There's a new notification which has a + * reference to a stream which, from this + * iterator's point of view, is ended ("end of + * stream" notification was returned). This is + * bad: the API guarantees that it can never + * happen. + */ + is_valid = false; + goto end; + } + + switch (notif->type) { + case BT_NOTIFICATION_TYPE_STREAM_BEGIN: + /* + * We already have a stream state, which means + * we already returned a "stream begin" + * notification: this is an invalid duplicate. + */ + is_valid = false; + goto end; + case BT_NOTIFICATION_TYPE_PACKET_BEGIN: + if (notif_packet == stream_state->cur_packet) { + /* Duplicate "packet begin" notification */ + is_valid = false; + goto end; + } + break; + default: + break; + } + } + +end: + return is_valid; +} + +static +void add_action_push_notif(struct bt_notification_iterator *iterator, + struct bt_notification *notif) +{ + struct action action = { + .type = ACTION_TYPE_PUSH_NOTIF, + .payload.push_notif = { + .notif = bt_get(notif), + }, + }; + + assert(notif); + add_action(iterator, &action); +} + +static +int add_action_push_notif_stream_begin( + struct bt_notification_iterator *iterator, + struct bt_ctf_stream *stream) +{ + int ret = 0; + struct bt_notification *stream_begin_notif = NULL; + + assert(stream); + stream_begin_notif = bt_notification_stream_begin_create(stream); + if (!stream_begin_notif) { + goto error; + } + + add_action_push_notif(iterator, stream_begin_notif); + goto end; + +error: + ret = -1; + +end: + bt_put(stream_begin_notif); + return ret; +} + +static +int add_action_push_notif_stream_end( + struct bt_notification_iterator *iterator, + struct bt_ctf_stream *stream) +{ + int ret = 0; + struct bt_notification *stream_end_notif = NULL; + + assert(stream); + stream_end_notif = bt_notification_stream_end_create(stream); + if (!stream_end_notif) { + goto error; + } + + add_action_push_notif(iterator, stream_end_notif); + goto end; + +error: + ret = -1; + +end: + bt_put(stream_end_notif); + return ret; +} + +static +int add_action_push_notif_packet_begin( + struct bt_notification_iterator *iterator, + struct bt_ctf_packet *packet) +{ + int ret = 0; + struct bt_notification *packet_begin_notif = NULL; + + assert(packet); + packet_begin_notif = bt_notification_packet_begin_create(packet); + if (!packet_begin_notif) { + goto error; + } + + add_action_push_notif(iterator, packet_begin_notif); + goto end; + +error: + ret = -1; + +end: + bt_put(packet_begin_notif); + return ret; +} + +static +int add_action_push_notif_packet_end( + struct bt_notification_iterator *iterator, + struct bt_ctf_packet *packet) +{ + int ret = 0; + struct bt_notification *packet_end_notif = NULL; + + assert(packet); + packet_end_notif = bt_notification_packet_end_create(packet); + if (!packet_end_notif) { + goto error; + } + + add_action_push_notif(iterator, packet_end_notif); + goto end; + +error: + ret = -1; + +end: + bt_put(packet_end_notif); + return ret; +} + +static +void add_action_set_stream_state_is_ended( + struct bt_notification_iterator *iterator, + struct stream_state *stream_state) +{ + struct action action = { + .type = ACTION_TYPE_SET_STREAM_STATE_IS_ENDED, + .payload.set_stream_state_is_ended = { + .stream_state = stream_state, + }, + }; + + assert(stream_state); + add_action(iterator, &action); +} + +static +void add_action_set_stream_state_cur_packet( + struct bt_notification_iterator *iterator, + struct stream_state *stream_state, + struct bt_ctf_packet *packet) +{ + struct action action = { + .type = ACTION_TYPE_SET_STREAM_STATE_CUR_PACKET, + .payload.set_stream_state_cur_packet = { + .stream_state = stream_state, + .packet = bt_get(packet), + }, + }; + + assert(stream_state); + add_action(iterator, &action); +} + +static +int ensure_stream_state_exists(struct bt_notification_iterator *iterator, + struct bt_notification *stream_begin_notif, + struct bt_ctf_stream *notif_stream, + struct stream_state **stream_state) +{ + int ret = 0; + + if (!notif_stream) { + /* + * The notification does not reference any stream: no + * need to get or create a stream state. + */ + goto end; + } + + *stream_state = g_hash_table_lookup(iterator->stream_states, + notif_stream); + if (!*stream_state) { + /* + * This iterator did not bump into this stream yet: + * create a stream state and a "stream begin" + * notification. + */ + struct action action = { + .type = ACTION_TYPE_ADD_STREAM_STATE, + .payload.add_stream_state = { + .stream = bt_get(notif_stream), + .stream_state = NULL, + }, + }; + + *stream_state = create_stream_state(notif_stream); + if (!stream_state) { + goto error; + } + + action.payload.add_stream_state.stream_state = + *stream_state; + add_action(iterator, &action); + + if (stream_begin_notif) { + add_action_push_notif(iterator, stream_begin_notif); + } else { + ret = add_action_push_notif_stream_begin(iterator, + notif_stream); + if (ret) { + goto error; + } + } + } + + goto end; + +error: + destroy_stream_state(*stream_state); + ret = -1; + +end: + return ret; +} + +static +int handle_packet_switch(struct bt_notification_iterator *iterator, + struct bt_notification *packet_begin_notif, + struct bt_ctf_packet *new_packet, + struct stream_state *stream_state) +{ + int ret = 0; + + if (stream_state->cur_packet == new_packet) { + goto end; + } + + if (stream_state->cur_packet) { + /* End of the current packet */ + ret = add_action_push_notif_packet_end(iterator, + stream_state->cur_packet); + if (ret) { + goto error; + } + } + + /* Beginning of the new packet */ + if (packet_begin_notif) { + add_action_push_notif(iterator, packet_begin_notif); + } else if (new_packet) { + ret = add_action_push_notif_packet_begin(iterator, + new_packet); + if (ret) { + goto error; + } + } + + add_action_set_stream_state_cur_packet(iterator, stream_state, + new_packet); + goto end; + +error: + ret = -1; + +end: + return ret; +} + +static +int handle_notif_stream_begin( + struct bt_notification_iterator *iterator, + struct bt_notification *notif, + struct bt_ctf_stream *notif_stream) +{ + int ret = 0; + struct stream_state *stream_state; + + assert(notif->type == BT_NOTIFICATION_TYPE_STREAM_BEGIN); + assert(notif_stream); + ret = ensure_stream_state_exists(iterator, notif, notif_stream, + &stream_state); + if (ret) { + goto error; + } + + goto end; + +error: + ret = -1; + +end: + return ret; +} + +static +int handle_notif_stream_end( + struct bt_notification_iterator *iterator, + struct bt_notification *notif, + struct bt_ctf_stream *notif_stream) +{ + int ret = 0; + struct stream_state *stream_state; + + assert(notif->type == BT_NOTIFICATION_TYPE_STREAM_END); + assert(notif_stream); + ret = ensure_stream_state_exists(iterator, NULL, notif_stream, + &stream_state); + if (ret) { + goto error; + } + + ret = handle_packet_switch(iterator, NULL, NULL, stream_state); + if (ret) { + goto error; + } + + add_action_push_notif(iterator, notif); + add_action_set_stream_state_is_ended(iterator, stream_state); + goto end; + +error: + ret = -1; + +end: + return ret; +} + +static +int handle_notif_packet_begin( + struct bt_notification_iterator *iterator, + struct bt_notification *notif, + struct bt_ctf_stream *notif_stream, + struct bt_ctf_packet *notif_packet) +{ + int ret = 0; + struct stream_state *stream_state; + + assert(notif->type == BT_NOTIFICATION_TYPE_PACKET_BEGIN); + assert(notif_packet); + ret = ensure_stream_state_exists(iterator, NULL, notif_stream, + &stream_state); + if (ret) { + goto error; + } + + ret = handle_packet_switch(iterator, notif, notif_packet, stream_state); + if (ret) { + goto error; + } + + goto end; + +error: + ret = -1; + +end: + return ret; +} + +static +int handle_notif_packet_end( + struct bt_notification_iterator *iterator, + struct bt_notification *notif, + struct bt_ctf_stream *notif_stream, + struct bt_ctf_packet *notif_packet) +{ + int ret = 0; + struct stream_state *stream_state; + + assert(notif->type == BT_NOTIFICATION_TYPE_PACKET_END); + assert(notif_packet); + ret = ensure_stream_state_exists(iterator, NULL, notif_stream, + &stream_state); + if (ret) { + goto error; + } + + ret = handle_packet_switch(iterator, NULL, notif_packet, stream_state); + if (ret) { + goto error; + } + + /* End of the current packet */ + add_action_push_notif(iterator, notif); + add_action_set_stream_state_cur_packet(iterator, stream_state, NULL); + goto end; + +error: + ret = -1; + +end: + return ret; +} + +static +int handle_notif_event( + struct bt_notification_iterator *iterator, + struct bt_notification *notif, + struct bt_ctf_stream *notif_stream, + struct bt_ctf_packet *notif_packet) +{ + int ret = 0; + struct stream_state *stream_state; + + assert(notif->type == BT_NOTIFICATION_TYPE_EVENT); + assert(notif_packet); + ret = ensure_stream_state_exists(iterator, NULL, notif_stream, + &stream_state); + if (ret) { + goto error; + } + + ret = handle_packet_switch(iterator, NULL, notif_packet, stream_state); + if (ret) { + goto error; + } + + add_action_push_notif(iterator, notif); + goto end; + +error: + ret = -1; + +end: + return ret; +} + +static +int enqueue_notification_and_automatic( + struct bt_notification_iterator *iterator, + struct bt_notification *notif) +{ + int ret = 0; + struct bt_ctf_event *notif_event = NULL; + struct bt_ctf_stream *notif_stream = NULL; + struct bt_ctf_packet *notif_packet = NULL; + + assert(notif); + + /* Get the stream and packet referred by the notification */ + switch (notif->type) { + case BT_NOTIFICATION_TYPE_EVENT: + notif_event = bt_notification_event_borrow_event(notif); + assert(notif_event); + notif_packet = bt_ctf_event_borrow_packet(notif_event); + assert(notif_packet); + break; + case BT_NOTIFICATION_TYPE_STREAM_BEGIN: + notif_stream = + bt_notification_stream_begin_borrow_stream(notif); + assert(notif_stream); + break; + case BT_NOTIFICATION_TYPE_STREAM_END: + notif_stream = bt_notification_stream_end_borrow_stream(notif); + assert(notif_stream); + break; + case BT_NOTIFICATION_TYPE_PACKET_BEGIN: + notif_packet = + bt_notification_packet_begin_borrow_packet(notif); + assert(notif_packet); + break; + case BT_NOTIFICATION_TYPE_PACKET_END: + notif_packet = bt_notification_packet_end_borrow_packet(notif); + assert(notif_packet); + break; + case BT_NOTIFICATION_TYPE_INACTIVITY: + /* Always valid */ + break; + default: + /* + * Invalid type of notification. Only the notification + * types above are allowed to be returned by a user + * component. + */ + goto error; + } + + if (notif_packet) { + notif_stream = bt_ctf_packet_borrow_stream(notif_packet); + assert(notif_stream); + } + + if (!notif_stream) { + /* + * The notification has no reference to a stream: it + * cannot cause the creation of automatic notifications. + */ + goto end; + } + + if (!validate_notification(iterator, notif, notif_stream, + notif_packet)) { + goto error; + } + + switch (notif->type) { + case BT_NOTIFICATION_TYPE_EVENT: + ret = handle_notif_event(iterator, notif, notif_stream, + notif_packet); + break; + case BT_NOTIFICATION_TYPE_STREAM_BEGIN: + ret = handle_notif_stream_begin(iterator, notif, notif_stream); + break; + case BT_NOTIFICATION_TYPE_STREAM_END: + ret = handle_notif_stream_end(iterator, notif, notif_stream); + break; + case BT_NOTIFICATION_TYPE_PACKET_BEGIN: + ret = handle_notif_packet_begin(iterator, notif, notif_stream, + notif_packet); + break; + case BT_NOTIFICATION_TYPE_PACKET_END: + ret = handle_notif_packet_end(iterator, notif, notif_stream, + notif_packet); + break; + default: + break; + } + + if (ret) { + goto error; + } + + apply_actions(iterator); + goto end; + +error: + ret = -1; + +end: + return ret; +} + +static +int handle_end(struct bt_notification_iterator *iterator) +{ + GHashTableIter stream_state_iter; + gpointer stream_gptr, stream_state_gptr; + int ret = 0; + + /* + * Emit a "stream end" notification for each non-ended stream + * known by this iterator and mark them as ended. + */ + g_hash_table_iter_init(&stream_state_iter, iterator->stream_states); + + while (g_hash_table_iter_next(&stream_state_iter, &stream_gptr, + &stream_state_gptr)) { + struct stream_state *stream_state = stream_state_gptr; + + assert(stream_state_gptr); + + if (stream_state->is_ended) { + continue; + } + + ret = handle_packet_switch(iterator, NULL, NULL, stream_state); + if (ret) { + goto error; + } + + ret = add_action_push_notif_stream_end(iterator, stream_gptr); + if (ret) { + goto error; + } + + add_action_set_stream_state_is_ended(iterator, stream_state); + } + + apply_actions(iterator); + goto end; + +error: + ret = -1; + +end: + return ret; +} + +static +enum bt_notification_iterator_status ensure_queue_has_notifications( + struct bt_notification_iterator *iterator) { struct bt_private_notification_iterator *priv_iterator = bt_private_notification_iterator_from_notification_iterator(iterator); @@ -183,20 +1117,31 @@ bt_notification_iterator_next(struct bt_notification_iterator *iterator) .status = BT_NOTIFICATION_ITERATOR_STATUS_OK, .notification = NULL, }; + enum bt_notification_iterator_status status = + BT_NOTIFICATION_ITERATOR_STATUS_OK; + int ret; - if (!iterator) { - next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_INVALID; + assert(iterator); + + if (iterator->queue->length > 0) { + /* We already have enough */ + goto end; + } + + if (iterator->is_ended) { + status = BT_NOTIFICATION_ITERATOR_STATUS_END; goto end; } - assert(iterator->component); - assert(iterator->component->class); + assert(iterator->upstream_component); + assert(iterator->upstream_component->class); - switch (iterator->component->class->type) { + /* Pick the appropriate "next" method */ + switch (iterator->upstream_component->class->type) { case BT_COMPONENT_CLASS_TYPE_SOURCE: { struct bt_component_class_source *source_class = - container_of(iterator->component->class, + container_of(iterator->upstream_component->class, struct bt_component_class_source, parent); assert(source_class->methods.iterator.next); @@ -206,7 +1151,7 @@ bt_notification_iterator_next(struct bt_notification_iterator *iterator) case BT_COMPONENT_CLASS_TYPE_FILTER: { struct bt_component_class_filter *filter_class = - container_of(iterator->component->class, + container_of(iterator->upstream_component->class, struct bt_component_class_filter, parent); assert(filter_class->methods.iterator.next); @@ -218,28 +1163,99 @@ bt_notification_iterator_next(struct bt_notification_iterator *iterator) break; } + /* + * Call the user's "next" method to get the next notification + * and status, skipping the forwarded automatic notifications + * if any. + */ assert(next_method); next_return = next_method(priv_iterator); - if (next_return.status == BT_NOTIFICATION_ITERATOR_STATUS_OK) { + if (next_return.status < 0) { + status = next_return.status; + goto end; + } + + switch (next_return.status) { + case BT_NOTIFICATION_ITERATOR_STATUS_END: + ret = handle_end(iterator); + if (ret) { + status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR; + goto end; + } + + if (iterator->queue->length == 0) { + status = BT_NOTIFICATION_ITERATOR_STATUS_END; + } + + iterator->is_ended = true; + break; + case BT_NOTIFICATION_ITERATOR_STATUS_AGAIN: + status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN; + break; + case BT_NOTIFICATION_ITERATOR_STATUS_OK: if (!next_return.notification) { - next_return.status = - BT_NOTIFICATION_ITERATOR_STATUS_ERROR; + status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR; goto end; } - BT_MOVE(iterator->current_notification, + /* + * We know the notification is valid. Before we push it + * to the head of the queue, push the appropriate + * automatic notifications if any. + */ + ret = enqueue_notification_and_automatic(iterator, next_return.notification); - bt_notification_freeze(iterator->current_notification); + BT_PUT(next_return.notification); + if (ret) { + status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR; + goto end; + } + break; + default: + /* Unknown non-error status */ + assert(false); } end: - return next_return.status; + return status; +} + +enum bt_notification_iterator_status +bt_notification_iterator_next(struct bt_notification_iterator *iterator) +{ + enum bt_notification_iterator_status status; + + if (!iterator) { + status = BT_NOTIFICATION_ITERATOR_STATUS_INVALID; + goto end; + } + + /* + * Make sure that the iterator's queue contains at least one + * notification. + */ + status = ensure_queue_has_notifications(iterator); + if (status != BT_NOTIFICATION_ITERATOR_STATUS_OK) { + goto end; + } + + /* + * Move the notification at the tail of the queue to the + * iterator's current notification. + */ + assert(iterator->queue->length > 0); + bt_put(iterator->current_notification); + iterator->current_notification = g_queue_pop_tail(iterator->queue); + assert(iterator->current_notification); + +end: + return status; } struct bt_component *bt_notification_iterator_get_component( struct bt_notification_iterator *iterator) { - return bt_get(iterator->component); + return bt_get(iterator->upstream_component); } struct bt_private_component * diff --git a/lib/graph/notification/stream.c b/lib/graph/notification/stream.c index 68b9bcdb..68b999c0 100644 --- a/lib/graph/notification/stream.c +++ b/lib/graph/notification/stream.c @@ -25,6 +25,7 @@ */ #include +#include #include static @@ -42,7 +43,7 @@ struct bt_notification *bt_notification_stream_end_create( { struct bt_notification_stream_end *notification; - if (!stream) { + if (!stream || stream->pos.fd >= 0) { goto error; } @@ -65,3 +66,42 @@ struct bt_ctf_packet *bt_notification_stream_end_get_stream( struct bt_notification_stream_end, parent); return bt_get(stream_end->stream); } + +static +void bt_notification_stream_begin_destroy(struct bt_object *obj) +{ + struct bt_notification_stream_begin *notification = + (struct bt_notification_stream_begin *) obj; + + BT_PUT(notification->stream); + g_free(notification); +} + +struct bt_notification *bt_notification_stream_begin_create( + struct bt_ctf_stream *stream) +{ + struct bt_notification_stream_begin *notification; + + if (!stream || stream->pos.fd >= 0) { + goto error; + } + + notification = g_new0(struct bt_notification_stream_begin, 1); + bt_notification_init(¬ification->parent, + BT_NOTIFICATION_TYPE_STREAM_BEGIN, + bt_notification_stream_begin_destroy); + notification->stream = bt_get(stream); + return ¬ification->parent; +error: + return NULL; +} + +struct bt_ctf_packet *bt_notification_stream_begin_get_stream( + struct bt_notification *notification) +{ + struct bt_notification_stream_begin *stream_begin; + + stream_begin = container_of(notification, + struct bt_notification_stream_begin, parent); + return bt_get(stream_begin->stream); +} diff --git a/lib/plugin/plugin-so.c b/lib/plugin/plugin-so.c index 77c29c13..771937ed 100644 --- a/lib/plugin/plugin-so.c +++ b/lib/plugin/plugin-so.c @@ -943,10 +943,9 @@ void plugin_comp_class_destroy_listener(struct bt_component_class *comp_class, } BT_HIDDEN -int bt_plugin_so_on_add_component_class(struct bt_plugin *plugin, +void bt_plugin_so_on_add_component_class(struct bt_plugin *plugin, struct bt_component_class *comp_class) { - int ret; struct bt_plugin_so_spec_data *spec = plugin->spec_data; assert(plugin->spec_data); @@ -957,18 +956,6 @@ int bt_plugin_so_on_add_component_class(struct bt_plugin *plugin, bt_get(spec->shared_lib_handle)); /* Add our custom destroy listener */ - ret = bt_component_class_add_destroy_listener(comp_class, + bt_component_class_add_destroy_listener(comp_class, plugin_comp_class_destroy_listener, NULL); - if (ret) { - goto error; - } - goto end; - -error: - /* Remove entry from global hash table (if exists) */ - g_hash_table_remove(comp_classes_to_shlib_handles, - comp_class); - -end: - return ret; } diff --git a/lib/plugin/plugin.c b/lib/plugin/plugin.c index 7b9644d3..d85604af 100644 --- a/lib/plugin/plugin.c +++ b/lib/plugin/plugin.c @@ -555,7 +555,6 @@ enum bt_plugin_status bt_plugin_add_component_class( { enum bt_plugin_status status = BT_PLUGIN_STATUS_OK; struct bt_component_class *comp_class_dup = NULL; - int ret; int comp_class_index = -1; if (!plugin || !comp_class || plugin->frozen) { @@ -580,10 +579,7 @@ enum bt_plugin_status bt_plugin_add_component_class( /* Special case for a shared object plugin */ if (plugin->type == BT_PLUGIN_TYPE_SO) { - ret = bt_plugin_so_on_add_component_class(plugin, comp_class); - if (ret) { - goto error; - } + bt_plugin_so_on_add_component_class(plugin, comp_class); } goto end; diff --git a/tests/plugins/test-utils-muxer.c b/tests/plugins/test-utils-muxer.c index 8b0e5209..9c369b39 100644 --- a/tests/plugins/test-utils-muxer.c +++ b/tests/plugins/test-utils-muxer.c @@ -55,12 +55,11 @@ #include "tap/tap.h" -#define NR_TESTS 14 +#define NR_TESTS 12 enum test { TEST_NO_TS, TEST_NO_UPSTREAM_CONNECTION, - TEST_EVENT_NOTIF_VS_THE_REST, TEST_SIMPLE_4_PORTS, TEST_4_PORTS_WITH_RETRIES, TEST_SINGLE_END_THEN_MULTIPLE_FULL, @@ -73,6 +72,8 @@ enum test_event_type { TEST_EV_TYPE_NOTIF_INACTIVITY, TEST_EV_TYPE_NOTIF_PACKET_BEGIN, TEST_EV_TYPE_NOTIF_PACKET_END, + TEST_EV_TYPE_NOTIF_STREAM_BEGIN, + TEST_EV_TYPE_NOTIF_STREAM_END, TEST_EV_TYPE_AGAIN, TEST_EV_TYPE_END, TEST_EV_TYPE_SENTINEL, @@ -104,7 +105,10 @@ static struct bt_clock_class_priority_map *src_empty_cc_prio_map; static struct bt_ctf_clock_class *src_clock_class; static struct bt_ctf_stream_class *src_stream_class; static struct bt_ctf_event_class *src_event_class; -static struct bt_ctf_packet *src_packet; +static struct bt_ctf_packet *src_packet0; +static struct bt_ctf_packet *src_packet1; +static struct bt_ctf_packet *src_packet2; +static struct bt_ctf_packet *src_packet3; enum { SEQ_END = -1, @@ -117,6 +121,7 @@ struct src_iter_user_data { size_t iter_index; int64_t *seq; size_t at; + struct bt_ctf_packet *packet; }; struct sink_user_data { @@ -173,14 +178,6 @@ static int64_t seq5[] = { 1, 4, 189, 1001, SEQ_END, }; -static int64_t seq6[] = { - 1, 2, 12, SEQ_PACKET_BEGIN, 14, 19, SEQ_END, -}; - -static int64_t seq7[] = { - 8, 9, SEQ_PACKET_BEGIN, 10, 13, SEQ_PACKET_END, 22, SEQ_END, -}; - static void clear_test_events(void) { @@ -208,6 +205,12 @@ void print_test_event(FILE *fp, const struct test_event *event) case TEST_EV_TYPE_NOTIF_PACKET_END: fprintf(fp, "TEST_EV_TYPE_NOTIF_PACKET_END"); break; + case TEST_EV_TYPE_NOTIF_STREAM_BEGIN: + fprintf(fp, "TEST_EV_TYPE_NOTIF_STREAM_BEGIN"); + break; + case TEST_EV_TYPE_NOTIF_STREAM_END: + fprintf(fp, "TEST_EV_TYPE_NOTIF_STREAM_END"); + break; case TEST_EV_TYPE_AGAIN: fprintf(fp, "TEST_EV_TYPE_AGAIN"); break; @@ -363,13 +366,28 @@ void init_static_data(void) assert(ret == 0); ret = bt_ctf_trace_add_stream_class(trace, src_stream_class); assert(ret == 0); - stream = bt_ctf_stream_create(src_stream_class, "my-stream"); + stream = bt_ctf_stream_create(src_stream_class, "stream0"); + assert(stream); + src_packet0 = bt_ctf_packet_create(stream); + assert(src_packet0); + bt_put(stream); + stream = bt_ctf_stream_create(src_stream_class, "stream1"); assert(stream); - src_packet = bt_ctf_packet_create(stream); - assert(src_packet); + src_packet1 = bt_ctf_packet_create(stream); + assert(src_packet0); + bt_put(stream); + stream = bt_ctf_stream_create(src_stream_class, "stream2"); + assert(stream); + src_packet2 = bt_ctf_packet_create(stream); + assert(src_packet0); + bt_put(stream); + stream = bt_ctf_stream_create(src_stream_class, "stream3"); + assert(stream); + src_packet3 = bt_ctf_packet_create(stream); + assert(src_packet0); + bt_put(stream); bt_put(trace); - bt_put(stream); bt_put(empty_struct_ft); } @@ -385,6 +403,10 @@ void fini_static_data(void) bt_put(src_clock_class); bt_put(src_stream_class); bt_put(src_event_class); + bt_put(src_packet0); + bt_put(src_packet1); + bt_put(src_packet2); + bt_put(src_packet3); } static @@ -421,19 +443,29 @@ enum bt_notification_iterator_status src_iter_init( user_data->iter_index = port_name[3] - '0'; bt_put(port); + switch (user_data->iter_index) { + case 0: + user_data->packet = src_packet0; + break; + case 1: + user_data->packet = src_packet1; + break; + case 2: + user_data->packet = src_packet2; + break; + case 3: + user_data->packet = src_packet3; + break; + default: + assert(false); + } + switch (current_test) { case TEST_NO_TS: if (user_data->iter_index == 1) { user_data->seq = seq5; } break; - case TEST_EVENT_NOTIF_VS_THE_REST: - if (user_data->iter_index == 0) { - user_data->seq = seq6; - } else { - user_data->seq = seq7; - } - break; case TEST_SIMPLE_4_PORTS: if (user_data->iter_index == 0) { user_data->seq = seq1; @@ -474,13 +506,14 @@ enum bt_notification_iterator_status src_iter_init( } static -struct bt_ctf_event *src_create_event(int64_t ts_ns) +struct bt_ctf_event *src_create_event(struct bt_ctf_packet *packet, + int64_t ts_ns) { struct bt_ctf_event *event = bt_ctf_event_create(src_event_class); int ret; assert(event); - ret = bt_ctf_event_set_packet(event, src_packet); + ret = bt_ctf_event_set_packet(event, packet); assert(ret == 0); if (ts_ns != -1) { @@ -519,17 +552,18 @@ struct bt_notification_iterator_next_return src_iter_next_seq( break; case SEQ_PACKET_BEGIN: next_return.notification = - bt_notification_packet_begin_create(src_packet); + bt_notification_packet_begin_create(user_data->packet); assert(next_return.notification); break; case SEQ_PACKET_END: next_return.notification = - bt_notification_packet_end_create(src_packet); + bt_notification_packet_end_create(user_data->packet); assert(next_return.notification); break; default: { - struct bt_ctf_event *event = src_create_event(cur_ts_ns); + struct bt_ctf_event *event = src_create_event( + user_data->packet, cur_ts_ns); assert(event); next_return.notification = bt_notification_event_create(event, @@ -568,10 +602,12 @@ struct bt_notification_iterator_next_return src_iter_next( if (user_data->iter_index == 0) { if (user_data->at == 0) { next_return.notification = - bt_notification_packet_begin_create(src_packet); + bt_notification_packet_begin_create( + user_data->packet); assert(next_return.notification); } else if (user_data->at < 6) { - struct bt_ctf_event *event = src_create_event(-1); + struct bt_ctf_event *event = src_create_event( + user_data->packet, -1); assert(event); next_return.notification = @@ -589,7 +625,6 @@ struct bt_notification_iterator_next_return src_iter_next( next_return = src_iter_next_seq(user_data); } break; - case TEST_EVENT_NOTIF_VS_THE_REST: case TEST_SIMPLE_4_PORTS: case TEST_4_PORTS_WITH_RETRIES: next_return = src_iter_next_seq(user_data); @@ -660,7 +695,6 @@ enum bt_component_status src_init( switch (current_test) { case TEST_NO_TS: - case TEST_EVENT_NOTIF_VS_THE_REST: nb_ports = 2; break; case TEST_SINGLE_END_THEN_MULTIPLE_FULL: @@ -822,6 +856,12 @@ enum bt_component_status sink_consume( case BT_NOTIFICATION_TYPE_PACKET_END: test_event.type = TEST_EV_TYPE_NOTIF_PACKET_END; break; + case BT_NOTIFICATION_TYPE_STREAM_BEGIN: + test_event.type = TEST_EV_TYPE_NOTIF_STREAM_BEGIN; + break; + case BT_NOTIFICATION_TYPE_STREAM_END: + test_event.type = TEST_EV_TYPE_NOTIF_STREAM_END; + break; default: test_event.type = TEST_EV_TYPE_NOTIF_UNEXPECTED; break; @@ -1008,16 +1048,23 @@ static void test_no_ts(void) { const struct test_event expected_test_events[] = { + { .type = TEST_EV_TYPE_NOTIF_STREAM_BEGIN, }, + { .type = TEST_EV_TYPE_NOTIF_PACKET_BEGIN, }, + { .type = TEST_EV_TYPE_NOTIF_STREAM_BEGIN, }, { .type = TEST_EV_TYPE_NOTIF_PACKET_BEGIN, }, { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = -1, }, { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = -1, }, { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = -1, }, { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = -1, }, { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = -1, }, + { .type = TEST_EV_TYPE_NOTIF_PACKET_END, }, + { .type = TEST_EV_TYPE_NOTIF_STREAM_END, }, { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 1, }, { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 4, }, { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 189, }, { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 1001, }, + { .type = TEST_EV_TYPE_NOTIF_PACKET_END, }, + { .type = TEST_EV_TYPE_NOTIF_STREAM_END, }, { .type = TEST_EV_TYPE_END, }, { .type = TEST_EV_TYPE_SENTINEL, }, }; @@ -1039,35 +1086,17 @@ void test_no_upstream_connection(void) } static -void test_event_notif_vs_the_rest(void) +void test_simple_4_ports(void) { const struct test_event expected_test_events[] = { - { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 1, }, - { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 2, }, - { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 8, }, - { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 9, }, + { .type = TEST_EV_TYPE_NOTIF_STREAM_BEGIN, }, { .type = TEST_EV_TYPE_NOTIF_PACKET_BEGIN, }, - { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 10, }, - { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 12, }, + { .type = TEST_EV_TYPE_NOTIF_STREAM_BEGIN, }, + { .type = TEST_EV_TYPE_NOTIF_PACKET_BEGIN, }, + { .type = TEST_EV_TYPE_NOTIF_STREAM_BEGIN, }, + { .type = TEST_EV_TYPE_NOTIF_PACKET_BEGIN, }, + { .type = TEST_EV_TYPE_NOTIF_STREAM_BEGIN, }, { .type = TEST_EV_TYPE_NOTIF_PACKET_BEGIN, }, - { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 13, }, - { .type = TEST_EV_TYPE_NOTIF_PACKET_END, }, - { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 14, }, - { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 19, }, - { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 22, }, - { .type = TEST_EV_TYPE_END, }, - { .type = TEST_EV_TYPE_SENTINEL, }, - }; - - do_std_test(TEST_EVENT_NOTIF_VS_THE_REST, "event notifications vs. the rest", - expected_test_events, true); -} - - -static -void test_simple_4_ports(void) -{ - const struct test_event expected_test_events[] = { { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 8 }, { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 24 }, { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 41 }, @@ -1146,6 +1175,8 @@ void test_simple_4_ports(void) { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 820 }, { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 825 }, { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 836 }, + { .type = TEST_EV_TYPE_NOTIF_PACKET_END, }, + { .type = TEST_EV_TYPE_NOTIF_STREAM_END, }, { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 850 }, { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 852 }, { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 857 }, @@ -1162,8 +1193,14 @@ void test_simple_4_ports(void) { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 956 }, { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 985 }, { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 996 }, + { .type = TEST_EV_TYPE_NOTIF_PACKET_END, }, + { .type = TEST_EV_TYPE_NOTIF_STREAM_END, }, { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 998 }, + { .type = TEST_EV_TYPE_NOTIF_PACKET_END, }, + { .type = TEST_EV_TYPE_NOTIF_STREAM_END, }, { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 999 }, + { .type = TEST_EV_TYPE_NOTIF_PACKET_END, }, + { .type = TEST_EV_TYPE_NOTIF_STREAM_END, }, { .type = TEST_EV_TYPE_END, }, { .type = TEST_EV_TYPE_SENTINEL, }, }; @@ -1177,6 +1214,14 @@ void test_4_ports_with_retries(void) { const struct test_event expected_test_events[] = { { .type = TEST_EV_TYPE_AGAIN, }, + { .type = TEST_EV_TYPE_NOTIF_STREAM_BEGIN, }, + { .type = TEST_EV_TYPE_NOTIF_PACKET_BEGIN, }, + { .type = TEST_EV_TYPE_NOTIF_STREAM_BEGIN, }, + { .type = TEST_EV_TYPE_NOTIF_PACKET_BEGIN, }, + { .type = TEST_EV_TYPE_NOTIF_STREAM_BEGIN, }, + { .type = TEST_EV_TYPE_NOTIF_PACKET_BEGIN, }, + { .type = TEST_EV_TYPE_NOTIF_STREAM_BEGIN, }, + { .type = TEST_EV_TYPE_NOTIF_PACKET_BEGIN, }, { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 8 }, { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 24 }, { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 41 }, @@ -1258,6 +1303,8 @@ void test_4_ports_with_retries(void) { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 825 }, { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 836 }, { .type = TEST_EV_TYPE_AGAIN, }, + { .type = TEST_EV_TYPE_NOTIF_PACKET_END, }, + { .type = TEST_EV_TYPE_NOTIF_STREAM_END, }, { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 850 }, { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 852 }, { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 857 }, @@ -1274,8 +1321,14 @@ void test_4_ports_with_retries(void) { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 956 }, { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 985 }, { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 996 }, + { .type = TEST_EV_TYPE_NOTIF_PACKET_END, }, + { .type = TEST_EV_TYPE_NOTIF_STREAM_END, }, { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 998 }, + { .type = TEST_EV_TYPE_NOTIF_PACKET_END, }, + { .type = TEST_EV_TYPE_NOTIF_STREAM_END, }, { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 999 }, + { .type = TEST_EV_TYPE_NOTIF_PACKET_END, }, + { .type = TEST_EV_TYPE_NOTIF_STREAM_END, }, { .type = TEST_EV_TYPE_END, }, { .type = TEST_EV_TYPE_SENTINEL, }, }; @@ -1356,6 +1409,10 @@ void test_single_end_then_multiple_full(void) enum bt_graph_status graph_status = BT_GRAPH_STATUS_OK; struct graph_listener_data graph_listener_data; const struct test_event expected_test_events[] = { + { .type = TEST_EV_TYPE_NOTIF_STREAM_BEGIN, }, + { .type = TEST_EV_TYPE_NOTIF_PACKET_BEGIN, }, + { .type = TEST_EV_TYPE_NOTIF_STREAM_BEGIN, }, + { .type = TEST_EV_TYPE_NOTIF_PACKET_BEGIN, }, { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 8 }, { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 51 }, { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 59 }, @@ -1396,11 +1453,15 @@ void test_single_end_then_multiple_full(void) { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 819 }, { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 820 }, { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 836 }, + { .type = TEST_EV_TYPE_NOTIF_PACKET_END, }, + { .type = TEST_EV_TYPE_NOTIF_STREAM_END, }, { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 857 }, { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 892 }, { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 903 }, { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 944 }, { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 998 }, + { .type = TEST_EV_TYPE_NOTIF_PACKET_END, }, + { .type = TEST_EV_TYPE_NOTIF_STREAM_END, }, { .type = TEST_EV_TYPE_END, }, { .type = TEST_EV_TYPE_SENTINEL, }, }; @@ -1477,6 +1538,10 @@ void test_single_again_end_then_multiple_full(void) struct graph_listener_data graph_listener_data; const struct test_event expected_test_events[] = { { .type = TEST_EV_TYPE_AGAIN, }, + { .type = TEST_EV_TYPE_NOTIF_STREAM_BEGIN, }, + { .type = TEST_EV_TYPE_NOTIF_PACKET_BEGIN, }, + { .type = TEST_EV_TYPE_NOTIF_STREAM_BEGIN, }, + { .type = TEST_EV_TYPE_NOTIF_PACKET_BEGIN, }, { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 8 }, { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 51 }, { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 59 }, @@ -1517,11 +1582,15 @@ void test_single_again_end_then_multiple_full(void) { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 819 }, { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 820 }, { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 836 }, + { .type = TEST_EV_TYPE_NOTIF_PACKET_END, }, + { .type = TEST_EV_TYPE_NOTIF_STREAM_END, }, { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 857 }, { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 892 }, { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 903 }, { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 944 }, { .type = TEST_EV_TYPE_NOTIF_EVENT, .ts_ns = 998 }, + { .type = TEST_EV_TYPE_NOTIF_PACKET_END, }, + { .type = TEST_EV_TYPE_NOTIF_STREAM_END, }, { .type = TEST_EV_TYPE_END, }, { .type = TEST_EV_TYPE_SENTINEL, }, }; @@ -1593,7 +1662,6 @@ int main(int argc, char **argv) init_static_data(); test_no_ts(); test_no_upstream_connection(); - test_event_notif_vs_the_rest(); test_simple_4_ports(); test_4_ports_with_retries(); test_single_end_then_multiple_full(); -- 2.34.1