lib: add output port notification iterator
authorPhilippe Proulx <eeppeliteloop@gmail.com>
Sat, 2 Sep 2017 01:38:26 +0000 (21:38 -0400)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Fri, 15 Sep 2017 18:48:43 +0000 (14:48 -0400)
As of this patch, you can create an output port notification iterator
with a component's output port with
bt_output_port_notification_iterator_create(). This function, on
success:

1. Creates a notification iterator object, which contains a
   notification pointer.

2. Adds (creates) a colander sink component, passing this notification
   pointer's address to its initialization method data, along with
   a list of notification types to subscribe to received as a
   parameter.

3. Connects the passed output port to the colander component's input
   port.

4. Makes the targeted graph nonconsumable. This means, at this point,
   only bt_notification_iterator_next() can consume the graph, not the
   user.

5. Returns the created notification iterator.

When you call bt_notification_iterator_next() with this new notification
iterator, it puts its current notification and consumes its specific
colander sink within the graph to make it store the next notification,
forwarding any exceptional graph status as a notification iterator
status.

More than one output port notification iterators on a single graph are
supported: multiple colander sinks are added to the graph and each one
makes its own colander sink consume.

To avoid immediate or eventual component name clashes,
bt_output_port_notification_iterator_create() accepts an optional
colander name parameter. If you use NULL, then `colander` is used.

Signed-off-by: Philippe Proulx <eeppeliteloop@gmail.com>
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
include/Makefile.am
include/babeltrace/babeltrace.h
include/babeltrace/graph/component-class-sink-colander-internal.h [new file with mode: 0644]
include/babeltrace/graph/component-class-sink-colander.h [deleted file]
include/babeltrace/graph/graph-internal.h
include/babeltrace/graph/notification-iterator-internal.h
include/babeltrace/graph/output-port-notification-iterator.h [new file with mode: 0644]
lib/graph/component-class-sink-colander.c
lib/graph/graph.c
lib/graph/iterator.c

index 43a7adb1c8cd7ac344f7f7c9879e8e9e06a64221..cd03f56ac35260252c2c6cb078ed8aab71f30f7f 100644 (file)
@@ -51,7 +51,6 @@ babeltracegraphincludedir = "$(includedir)/babeltrace/graph"
 babeltracegraphinclude_HEADERS = \
        babeltrace/graph/clock-class-priority-map.h \
        babeltrace/graph/component-class-filter.h \
-       babeltrace/graph/component-class-sink-colander.h \
        babeltrace/graph/component-class-sink.h \
        babeltrace/graph/component-class-source.h \
        babeltrace/graph/component-class.h \
@@ -71,6 +70,7 @@ babeltracegraphinclude_HEADERS = \
        babeltrace/graph/notification.h \
        babeltrace/graph/notification-packet.h \
        babeltrace/graph/notification-stream.h \
+       babeltrace/graph/output-port-notification-iterator.h \
        babeltrace/graph/port.h \
        babeltrace/graph/private-component-filter.h \
        babeltrace/graph/private-component-sink.h \
@@ -122,6 +122,7 @@ noinst_HEADERS = \
        babeltrace/endian-internal.h \
        babeltrace/graph/clock-class-priority-map-internal.h \
        babeltrace/graph/component-class-internal.h \
+       babeltrace/graph/component-class-sink-colander-internal.h \
        babeltrace/graph/component-filter-internal.h \
        babeltrace/graph/component-internal.h \
        babeltrace/graph/component-sink-internal.h \
index 859cb41b98c7eca0fbb227c704eca0eee0bd02ab..45dac14d44a20d84bbab97f8f280babc690b4b8e 100644 (file)
@@ -65,7 +65,6 @@
 /* Graph, component, and notification API */
 #include <babeltrace/graph/clock-class-priority-map.h>
 #include <babeltrace/graph/component-class-filter.h>
-#include <babeltrace/graph/component-class-sink-colander.h>
 #include <babeltrace/graph/component-class-sink.h>
 #include <babeltrace/graph/component-class-source.h>
 #include <babeltrace/graph/component-class.h>
@@ -85,6 +84,7 @@
 #include <babeltrace/graph/notification-packet.h>
 #include <babeltrace/graph/notification-stream.h>
 #include <babeltrace/graph/notification.h>
+#include <babeltrace/graph/output-port-notification-iterator.h>
 #include <babeltrace/graph/port.h>
 #include <babeltrace/graph/private-component-filter.h>
 #include <babeltrace/graph/private-component-sink.h>
diff --git a/include/babeltrace/graph/component-class-sink-colander-internal.h b/include/babeltrace/graph/component-class-sink-colander-internal.h
new file mode 100644 (file)
index 0000000..3616111
--- /dev/null
@@ -0,0 +1,46 @@
+#ifndef BABELTRACE_GRAPH_COMPONENT_CLASS_SINK_COLANDER_H
+#define BABELTRACE_GRAPH_COMPONENT_CLASS_SINK_COLANDER_H
+
+/*
+ * Copyright 2017 Philippe Proulx <pproulx@efficios.com>
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+#include <babeltrace/types.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct bt_component_class;
+struct bt_notification;
+
+struct bt_component_class_sink_colander_data {
+       struct bt_notification **notification;
+       const enum bt_notification_type *notification_types;
+};
+
+extern struct bt_component_class *bt_component_class_sink_colander_get(void);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* BABELTRACE_GRAPH_COMPONENT_CLASS_SINK_COLANDER_H */
diff --git a/include/babeltrace/graph/component-class-sink-colander.h b/include/babeltrace/graph/component-class-sink-colander.h
deleted file mode 100644 (file)
index 2a21603..0000000
+++ /dev/null
@@ -1,40 +0,0 @@
-#ifndef BABELTRACE_GRAPH_COMPONENT_CLASS_SINK_COLANDER_H
-#define BABELTRACE_GRAPH_COMPONENT_CLASS_SINK_COLANDER_H
-
-/*
- * Copyright 2017 Philippe Proulx <pproulx@efficios.com>
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in
- * all copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-#include <babeltrace/types.h>
-
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-struct bt_component_class;
-
-extern struct bt_component_class *bt_component_class_sink_colander_get(void);
-
-#ifdef __cplusplus
-}
-#endif
-
-#endif /* BABELTRACE_GRAPH_COMPONENT_CLASS_SINK_COLANDER_H */
index 7fdca46ae8cdfb105ad7b30cd3188c95f9c36045..09c8b111acf3bae2d465bc4658517dfcf5fbf9c4 100644 (file)
@@ -32,6 +32,7 @@
 #include <babeltrace/babeltrace-internal.h>
 #include <babeltrace/object-internal.h>
 #include <stdlib.h>
+#include <assert.h>
 #include <glib.h>
 
 struct bt_component;
@@ -60,6 +61,17 @@ struct bt_graph {
        bt_bool canceled;
        bt_bool in_remove_listener;
        bt_bool has_sink;
+
+       /*
+        * If this is BT_FALSE, then the public API's consuming
+        * functions (bt_graph_consume() and bt_graph_run()) return
+        * BT_GRAPH_STATUS_CANNOT_CONSUME. The internal "no check"
+        * functions always work.
+        *
+        * In bt_output_port_notification_iterator_create(), on success,
+        * this flag is cleared so that the iterator remains the only
+        * consumer for the graph's lifetime.
+        */
        bt_bool can_consume;
 
        struct {
@@ -70,6 +82,20 @@ struct bt_graph {
        } listeners;
 };
 
+static inline
+void bt_graph_set_can_consume(struct bt_graph *graph, bt_bool can_consume)
+{
+       assert(graph);
+       graph->can_consume = can_consume;
+}
+
+BT_HIDDEN
+enum bt_graph_status bt_graph_consume_no_check(struct bt_graph *graph);
+
+BT_HIDDEN
+enum bt_graph_status bt_graph_consume_sink_no_check(struct bt_graph *graph,
+               struct bt_component *sink);
+
 BT_HIDDEN
 void bt_graph_notify_port_added(struct bt_graph *graph, struct bt_port *port);
 
@@ -92,6 +118,18 @@ BT_HIDDEN
 void bt_graph_remove_connection(struct bt_graph *graph,
                struct bt_connection *connection);
 
+/*
+ * This only works with a component which is not connected at this
+ * point.
+ *
+ * Also the reference count of `component` should be 0 when you call
+ * this function, which means only `graph` owns the component, so it
+ * is safe to destroy.
+ */
+BT_HIDDEN
+int bt_graph_remove_unconnected_component(struct bt_graph *graph,
+               struct bt_component *component);
+
 static inline
 const char *bt_graph_status_string(enum bt_graph_status status)
 {
index d69ea117c13647c4ffc80ae4779605f89d854c95..fd3dd4f30ec0134f140ca1491e8967d6ca288947 100644 (file)
 #include <babeltrace/graph/private-connection-private-notification-iterator.h>
 #include <babeltrace/types.h>
 #include <stdbool.h>
+#include <assert.h>
 
 struct bt_port;
+struct bt_graph;
 
 enum bt_notification_iterator_type {
        BT_NOTIFICATION_ITERATOR_TYPE_PRIVATE_CONNECTION,
+       BT_NOTIFICATION_ITERATOR_TYPE_OUTPUT_PORT,
 };
 
 enum bt_private_connection_notification_iterator_notif_type {
@@ -84,6 +87,7 @@ enum bt_private_connection_notification_iterator_state {
 struct bt_notification_iterator {
        struct bt_object base;
        enum bt_notification_iterator_type type;
+       struct bt_notification *current_notification; /* owned by this */
 };
 
 struct bt_notification_iterator_private_connection {
@@ -91,7 +95,6 @@ struct bt_notification_iterator_private_connection {
        struct bt_component *upstream_component; /* Weak */
        struct bt_port *upstream_port; /* Weak */
        struct bt_connection *connection; /* Weak */
-       struct bt_notification *current_notification; /* owned by this */
        GQueue *queue; /* struct bt_notification * (owned by this) */
 
        /*
@@ -133,6 +136,31 @@ struct bt_notification_iterator_private_connection {
        void *user_data;
 };
 
+struct bt_notification_iterator_output_port {
+       struct bt_notification_iterator base;
+       struct bt_graph *graph; /* Owned by this */
+       struct bt_component *colander; /* Owned by this */
+       struct bt_port *output_port; /* Owned by this */
+};
+
+static inline
+struct bt_notification *bt_notification_iterator_borrow_current_notification(
+               struct bt_notification_iterator *iterator)
+{
+       assert(iterator);
+       return iterator->current_notification;
+}
+
+static inline
+void bt_notification_iterator_replace_current_notification(
+               struct bt_notification_iterator *iterator,
+               struct bt_notification *notification)
+{
+       assert(iterator);
+       bt_put(iterator->current_notification);
+       iterator->current_notification = bt_get(notification);
+}
+
 static inline
 struct bt_notification_iterator_private_connection *
 bt_private_connection_notification_iterator_from_private(
diff --git a/include/babeltrace/graph/output-port-notification-iterator.h b/include/babeltrace/graph/output-port-notification-iterator.h
new file mode 100644 (file)
index 0000000..4253f9d
--- /dev/null
@@ -0,0 +1,41 @@
+#ifndef BABELTRACE_GRAPH_OUTPUT_PORT_NOTIFICATION_ITERATOR_H
+#define BABELTRACE_GRAPH_OUTPUT_PORT_NOTIFICATION_ITERATOR_H
+
+/*
+ * Copyright 2017 Philippe Proulx <pproulx@efficios.com>
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct bt_port;
+struct bt_notification_iterator;
+
+extern struct bt_notification_iterator *bt_output_port_notification_iterator_create(
+               struct bt_port *port, const char *colander_component_name,
+               const enum bt_notification_type *notification_types);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* BABELTRACE_GRAPH_OUTPUT_PORT_NOTIFICATION_ITERATOR_H */
index 0d83e5d3be22aed42802e5f99640eb3add7d0cda..0df9ed4e6016a0bf3a6c0e5abc783e3301a0a857 100644 (file)
@@ -27,6 +27,7 @@
 #include <babeltrace/graph/private-component-sink.h>
 #include <babeltrace/graph/private-port.h>
 #include <babeltrace/graph/private-connection.h>
+#include <babeltrace/graph/component-class-sink-colander-internal.h>
 #include <glib.h>
 #include <assert.h>
 
@@ -35,6 +36,7 @@ struct bt_component_class *colander_comp_cls;
 
 struct colander_data {
        struct bt_notification **user_notif;
+       enum bt_notification_type *notif_types;
        struct bt_notification_iterator *notif_iter;
 };
 
@@ -45,6 +47,9 @@ enum bt_component_status colander_init(
 {
        enum bt_component_status status = BT_COMPONENT_STATUS_OK;
        struct colander_data *colander_data = NULL;
+       struct bt_component_class_sink_colander_data *user_provided_data =
+               init_method_data;
+       const enum bt_notification_type *notif_type;
 
        if (!init_method_data) {
                BT_LOGW_STR("Component initialization method data is NULL.");
@@ -59,7 +64,31 @@ enum bt_component_status colander_init(
                goto end;
        }
 
-       colander_data->user_notif = init_method_data;
+       colander_data->user_notif = user_provided_data->notification;
+
+       if (user_provided_data->notification_types) {
+               notif_type = user_provided_data->notification_types;
+               unsigned long count;
+
+               while (*notif_type != BT_NOTIFICATION_TYPE_SENTINEL) {
+                       notif_type++;
+               }
+
+               count = notif_type - user_provided_data->notification_types + 1;
+
+               colander_data->notif_types =
+                       g_new0(enum bt_notification_type, count);
+               if (!colander_data->notif_types) {
+                       BT_LOGE_STR("Failed to allocate an array of notification types.");
+                       status = BT_COMPONENT_STATUS_NOMEM;
+                       goto end;
+               }
+
+               memcpy(colander_data->notif_types,
+                       user_provided_data->notification_types,
+                       count * sizeof(enum bt_notification_type));
+       }
+
        status = bt_private_component_sink_add_input_private_port(
                priv_comp, "in", NULL, NULL);
        if (status != BT_COMPONENT_STATUS_OK) {
@@ -87,6 +116,7 @@ void colander_finalize(struct bt_private_component *priv_comp)
                bt_put(colander_data->notif_iter);
        }
 
+       g_free(colander_data->notif_types);
        g_free(colander_data);
 }
 
@@ -105,7 +135,8 @@ void colander_port_connected(struct bt_private_component *priv_comp,
        assert(colander_data);
        BT_PUT(colander_data->notif_iter);
        conn_status = bt_private_connection_create_notification_iterator(
-               priv_conn, NULL, &colander_data->notif_iter);
+               priv_conn, colander_data->notif_types,
+               &colander_data->notif_iter);
        if (conn_status) {
                BT_LOGE("Cannot create notification iterator from connection: "
                        "comp-addr=%p, conn-addr=%p", priv_comp, priv_conn);
index b6460f74e228bd684c6ad9f59b8fa139f440475a..4c78ea1983c98a6d7c12e2a24611252d3572af98 100644 (file)
@@ -415,34 +415,17 @@ end:
 }
 
 static
-enum bt_graph_status bt_graph_consume_no_check(struct bt_graph *graph)
+enum bt_graph_status consume_graph_sink(struct bt_component *sink)
 {
-       struct bt_component *sink;
        enum bt_graph_status status = BT_GRAPH_STATUS_OK;
        enum bt_component_status comp_status;
-       GList *current_node;
-
-       BT_LOGV("Making next sink consume: addr=%p", graph);
 
-       if (!graph->has_sink) {
-               BT_LOGW_STR("Graph has no sink component.");
-               status = BT_GRAPH_STATUS_NO_SINK;
-               goto end;
-       }
-
-       if (g_queue_is_empty(graph->sinks_to_consume)) {
-               BT_LOGV_STR("Graph's sink queue is empty: end of graph.");
-               status = BT_GRAPH_STATUS_END;
-               goto end;
-       }
-
-       current_node = g_queue_pop_head_link(graph->sinks_to_consume);
-       sink = current_node->data;
-       BT_LOGV("Chose next sink to consume: comp-addr=%p, comp-name=\"%s\"",
-               sink, bt_component_get_name(sink));
+       assert(sink);
        comp_status = bt_component_sink_consume(sink);
-       BT_LOGV("Consumed from sink: status=%s",
+       BT_LOGV("Consumed from sink: addr=%p, name=\"%s\", status=%s",
+               sink, bt_component_get_name(sink),
                bt_component_status_string(comp_status));
+
        switch (comp_status) {
        case BT_COMPONENT_STATUS_OK:
                break;
@@ -460,21 +443,105 @@ enum bt_graph_status bt_graph_consume_no_check(struct bt_graph *graph)
                break;
        }
 
+       return status;
+}
+
+/*
+ * `node` is removed from the queue of sinks to consume when passed to
+ * this function. This function adds it back to the queue if there's
+ * still something to consume afterwards.
+ */
+static
+enum bt_graph_status consume_sink_node(struct bt_graph *graph,
+               GList *node)
+{
+       enum bt_graph_status status;
+       struct bt_component *sink;
+
+       sink = node->data;
+       status = consume_graph_sink(sink);
        if (status != BT_GRAPH_STATUS_END) {
-               g_queue_push_tail_link(graph->sinks_to_consume, current_node);
+               g_queue_push_tail_link(graph->sinks_to_consume, node);
                goto end;
        }
 
        /* End reached, the node is not added back to the queue and free'd. */
-       g_queue_delete_link(graph->sinks_to_consume, current_node);
+       g_queue_delete_link(graph->sinks_to_consume, node);
 
        /* Don't forward an END status if there are sinks left to consume. */
        if (!g_queue_is_empty(graph->sinks_to_consume)) {
                status = BT_GRAPH_STATUS_OK;
                goto end;
        }
+
+end:
+       BT_LOGV("Consumed sink node: status=%s", bt_graph_status_string(status));
+       return status;
+}
+
+BT_HIDDEN
+enum bt_graph_status bt_graph_consume_sink_no_check(struct bt_graph *graph,
+               struct bt_component *sink)
+{
+       enum bt_graph_status status;
+       GList *sink_node;
+       int index;
+
+       BT_LOGV("Making specific sink consume: addr=%p, "
+               "comp-addr=%p, comp-name=\"%s\"",
+               graph, sink, bt_component_get_name(sink));
+
+       assert(bt_component_borrow_graph(sink) == graph);
+
+       if (g_queue_is_empty(graph->sinks_to_consume)) {
+               BT_LOGV_STR("Graph's sink queue is empty: end of graph.");
+               status = BT_GRAPH_STATUS_END;
+               goto end;
+       }
+
+       index = g_queue_index(graph->sinks_to_consume, sink);
+       if (index < 0) {
+               BT_LOGV_STR("Sink is not marked as consumable: sink is ended.");
+               status = BT_GRAPH_STATUS_END;
+               goto end;
+       }
+
+       sink_node = g_queue_pop_nth_link(graph->sinks_to_consume, index);
+       assert(sink_node);
+       status = consume_sink_node(graph, sink_node);
+
+end:
+       return status;
+}
+
+BT_HIDDEN
+enum bt_graph_status bt_graph_consume_no_check(struct bt_graph *graph)
+{
+       enum bt_graph_status status = BT_GRAPH_STATUS_OK;
+       struct bt_component *sink;
+       GList *current_node;
+
+       BT_LOGV("Making next sink consume: addr=%p", graph);
+
+       if (!graph->has_sink) {
+               BT_LOGW_STR("Graph has no sink component.");
+               status = BT_GRAPH_STATUS_NO_SINK;
+               goto end;
+       }
+
+       if (g_queue_is_empty(graph->sinks_to_consume)) {
+               BT_LOGV_STR("Graph's sink queue is empty: end of graph.");
+               status = BT_GRAPH_STATUS_END;
+               goto end;
+       }
+
+       current_node = g_queue_pop_head_link(graph->sinks_to_consume);
+       sink = current_node->data;
+       BT_LOGV("Chose next sink to consume: comp-addr=%p, comp-name=\"%s\"",
+               sink, bt_component_get_name(sink));
+       status = consume_sink_node(graph, current_node);
+
 end:
-       BT_LOGV("Graph consumed: status=%s", bt_graph_status_string(status));
        return status;
 }
 
@@ -1047,3 +1114,86 @@ enum bt_graph_status bt_graph_add_component(
        return bt_graph_add_component_with_init_method_data(graph,
                component_class, name, params, NULL, component);
 }
+
+BT_HIDDEN
+int bt_graph_remove_unconnected_component(struct bt_graph *graph,
+               struct bt_component *component)
+{
+       const bt_bool init_can_consume = graph->can_consume;
+       int64_t count;
+       uint64_t i;
+       int ret = 0;
+
+       assert(graph);
+       assert(component);
+       assert(component->base.ref_count.count == 0);
+       assert(bt_component_borrow_graph(component) == graph);
+
+       count = bt_component_get_input_port_count(component);
+
+       for (i = 0; i < count; i++) {
+               struct bt_port *port =
+                       bt_component_get_input_port_by_index(component, i);
+
+               assert(port);
+               bt_put(port);
+
+               if (bt_port_is_connected(port)) {
+                       BT_LOGW("Cannot remove component from graph: "
+                               "an input port is connected: "
+                               "graph-addr=%p, comp-addr=%p, "
+                               "comp-name=\"%s\", connected-port-addr=%p, "
+                               "connected-port-name=\"%s\"",
+                               graph, component,
+                               bt_component_get_name(component),
+                               port, bt_port_get_name(port));
+                       goto error;
+               }
+       }
+
+       count = bt_component_get_output_port_count(component);
+
+       for (i = 0; i < count; i++) {
+               struct bt_port *port =
+                       bt_component_get_output_port_by_index(component, i);
+
+               assert(port);
+               bt_put(port);
+
+               if (bt_port_is_connected(port)) {
+                       BT_LOGW("Cannot remove component from graph: "
+                               "an output port is connected: "
+                               "graph-addr=%p, comp-addr=%p, "
+                               "comp-name=\"%s\", connected-port-addr=%p, "
+                               "connected-port-name=\"%s\"",
+                               graph, component,
+                               bt_component_get_name(component),
+                               port, bt_port_get_name(port));
+                       goto error;
+               }
+       }
+
+       graph->can_consume = BT_FALSE;
+
+       /* Possibly remove from sinks to consume */
+       (void) g_queue_remove(graph->sinks_to_consume, component);
+
+       if (graph->sinks_to_consume->length == 0) {
+               graph->has_sink = BT_FALSE;
+       }
+
+       /*
+        * This calls bt_object_release() on the component, and since
+        * its reference count is 0, its destructor is called. Its
+        * destructor calls the user's finalization method (if set).
+        */
+       g_ptr_array_remove(graph->components, component);
+       goto end;
+
+error:
+       ret = -1;
+
+end:
+       graph->can_consume = init_can_consume;
+       return ret;
+}
index f011373b0592918b739bd413647a11d03eee20d5..1979c69a8791ee8fa194b93b61f559a511aafac8 100644 (file)
@@ -41,6 +41,8 @@
 #include <babeltrace/graph/component.h>
 #include <babeltrace/graph/component-source-internal.h>
 #include <babeltrace/graph/component-class-internal.h>
+#include <babeltrace/graph/component-class-sink-colander-internal.h>
+#include <babeltrace/graph/component-sink.h>
 #include <babeltrace/graph/notification.h>
 #include <babeltrace/graph/notification-iterator.h>
 #include <babeltrace/graph/notification-iterator-internal.h>
@@ -53,6 +55,7 @@
 #include <babeltrace/graph/notification-stream-internal.h>
 #include <babeltrace/graph/notification-discarded-elements-internal.h>
 #include <babeltrace/graph/port.h>
+#include <babeltrace/graph/graph-internal.h>
 #include <babeltrace/types.h>
 #include <stdint.h>
 #include <inttypes.h>
@@ -357,6 +360,17 @@ end:
        return stream_state;
 }
 
+static
+void destroy_base_notification_iterator(struct bt_object *obj)
+{
+       struct bt_notification_iterator *iterator =
+               container_of(obj, struct bt_notification_iterator, base);
+
+       BT_LOGD_STR("Putting current notification.");
+       bt_put(iterator->current_notification);
+       g_free(iterator);
+}
+
 static
 void bt_private_connection_notification_iterator_destroy(struct bt_object *obj)
 {
@@ -376,7 +390,7 @@ void bt_private_connection_notification_iterator_destroy(struct bt_object *obj)
         */
        obj->ref_count.count++;
        iterator = (void *) container_of(obj, struct bt_notification_iterator, base);
-       BT_LOGD("Destroying notification iterator object: addr=%p",
+       BT_LOGD("Destroying private connection notification iterator object: addr=%p",
                iterator);
        bt_private_connection_notification_iterator_finalize(iterator);
 
@@ -429,9 +443,7 @@ void bt_private_connection_notification_iterator_destroy(struct bt_object *obj)
                bt_connection_remove_iterator(iterator->connection, iterator);
        }
 
-       BT_LOGD_STR("Putting current notification.");
-       bt_put(iterator->current_notification);
-       g_free(iterator);
+       destroy_base_notification_iterator(obj);
 }
 
 BT_HIDDEN
@@ -612,7 +624,7 @@ enum bt_connection_status bt_private_connection_notification_iterator_create(
        assert(notification_types);
        assert(bt_port_is_connected(upstream_port));
        assert(user_iterator);
-       BT_LOGD("Creating notification iterator: "
+       BT_LOGD("Creating notification iterator on private connection: "
                "upstream-comp-addr=%p, upstream-comp-name=\"%s\", "
                "upstream-port-addr=%p, upstream-port-name=\"%s\", "
                "conn-addr=%p",
@@ -624,7 +636,7 @@ enum bt_connection_status bt_private_connection_notification_iterator_create(
                type == BT_COMPONENT_CLASS_TYPE_FILTER);
        iterator = g_new0(struct bt_notification_iterator_private_connection, 1);
        if (!iterator) {
-               BT_LOGE_STR("Failed to allocate one notification iterator.");
+               BT_LOGE_STR("Failed to allocate one private connection notification iterator.");
                status = BT_CONNECTION_STATUS_NOMEM;
                goto end;
        }
@@ -726,20 +738,8 @@ struct bt_notification *bt_notification_iterator_get_notification(
                goto end;
        }
 
-       switch (iterator->type) {
-       case BT_NOTIFICATION_ITERATOR_TYPE_PRIVATE_CONNECTION:
-       {
-               struct bt_notification_iterator_private_connection *priv_conn_iter =
-                       (void *) iterator;
-
-               notification = bt_get(priv_conn_iter->current_notification);
-               break;
-       }
-       default:
-               BT_LOGF("Unknown notification iterator type: addr=%p, type=%d",
-                       iterator, iterator->type);
-               abort();
-       }
+       notification = bt_get(
+               bt_notification_iterator_borrow_current_notification(iterator));
 
 end:
        return notification;
@@ -2252,6 +2252,7 @@ bt_notification_iterator_next(struct bt_notification_iterator *iterator)
        {
                struct bt_notification_iterator_private_connection *priv_conn_iter =
                        (void *) iterator;
+               struct bt_notification *notif;
 
                /*
                 * Make sure that the iterator's queue contains at least
@@ -2267,10 +2268,66 @@ bt_notification_iterator_next(struct bt_notification_iterator *iterator)
                 * iterator's current notification.
                 */
                assert(priv_conn_iter->queue->length > 0);
-               bt_put(priv_conn_iter->current_notification);
-               priv_conn_iter->current_notification =
-                       g_queue_pop_tail(priv_conn_iter->queue);
-               assert(priv_conn_iter->current_notification);
+               notif = g_queue_pop_tail(priv_conn_iter->queue);
+               bt_notification_iterator_replace_current_notification(
+                       iterator, notif);
+               bt_put(notif);
+               break;
+       }
+       case BT_NOTIFICATION_ITERATOR_TYPE_OUTPUT_PORT:
+       {
+               struct bt_notification_iterator_output_port *out_port_iter =
+                       (void *) iterator;
+
+               /*
+                * Keep current notification in case there's an error:
+                * restore this notification so that the current
+                * notification is not changed from the user's point of
+                * view.
+                */
+               struct bt_notification *old_notif =
+                       bt_get(bt_notification_iterator_borrow_current_notification(iterator));
+               enum bt_graph_status graph_status;
+
+               /*
+                * Put current notification since it's possibly
+                * about to be replaced by a new one by the
+                * colander sink.
+                */
+               bt_notification_iterator_replace_current_notification(
+                       iterator, NULL);
+               graph_status = bt_graph_consume_sink_no_check(
+                       out_port_iter->graph,
+                       out_port_iter->colander);
+               switch (graph_status) {
+               case BT_GRAPH_STATUS_CANCELED:
+                       status = BT_NOTIFICATION_ITERATOR_STATUS_CANCELED;
+                       break;
+               case BT_GRAPH_STATUS_AGAIN:
+                       status = BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
+                       break;
+               case BT_GRAPH_STATUS_END:
+                       status = BT_NOTIFICATION_ITERATOR_STATUS_END;
+                       break;
+               case BT_GRAPH_STATUS_NOMEM:
+                       status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
+                       break;
+               case BT_GRAPH_STATUS_OK:
+                       status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
+                       assert(bt_notification_iterator_borrow_current_notification(iterator));
+                       break;
+               default:
+                       /* Other errors */
+                       status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
+               }
+
+               if (status != BT_NOTIFICATION_ITERATOR_STATUS_OK) {
+                       /* Error/exception: restore old notification */
+                       bt_notification_iterator_replace_current_notification(
+                               iterator, old_notif);
+               }
+
+               bt_put(old_notif);
                break;
        }
        default:
@@ -2314,3 +2371,158 @@ bt_private_connection_private_notification_iterator_get_private_component(
                bt_private_connection_notification_iterator_get_component(
                        (void *) bt_private_connection_notification_iterator_from_private(private_iterator)));
 }
+
+static
+void bt_output_port_notification_iterator_destroy(struct bt_object *obj)
+{
+       struct bt_notification_iterator_output_port *iterator =
+               (void *) container_of(obj, struct bt_notification_iterator, base);
+
+       BT_LOGD("Destroying output port notification iterator object: addr=%p",
+               iterator);
+       BT_LOGD_STR("Putting graph.");
+       bt_put(iterator->graph);
+       BT_LOGD_STR("Putting output port.");
+       bt_put(iterator->output_port);
+       BT_LOGD_STR("Putting colander sink component.");
+       bt_put(iterator->colander);
+       destroy_base_notification_iterator(obj);
+}
+
+struct bt_notification_iterator *bt_output_port_notification_iterator_create(
+               struct bt_port *output_port,
+               const char *colander_component_name,
+               const enum bt_notification_type *notification_types)
+{
+       struct bt_notification_iterator *iterator_base = NULL;
+       struct bt_notification_iterator_output_port *iterator = NULL;
+       struct bt_component_class *colander_comp_cls = NULL;
+       struct bt_component *output_port_comp = NULL;
+       struct bt_component *colander_comp;
+       struct bt_graph *graph = NULL;
+       enum bt_graph_status graph_status;
+       const char *colander_comp_name;
+       struct bt_port *colander_in_port = NULL;
+       struct bt_component_class_sink_colander_data colander_data;
+
+       if (!output_port) {
+               BT_LOGW_STR("Invalid parameter: port is NULL.");
+               goto error;
+       }
+
+       if (bt_port_get_type(output_port) != BT_PORT_TYPE_OUTPUT) {
+               BT_LOGW_STR("Invalid parameter: port is not an output port.");
+               goto error;
+       }
+
+       output_port_comp = bt_port_get_component(output_port);
+       if (!output_port_comp) {
+               BT_LOGW("Cannot get output port's component: "
+                       "port-addr=%p, port-name=\"%s\"",
+                       output_port, bt_port_get_name(output_port));
+               goto error;
+       }
+
+       graph = bt_component_get_graph(output_port_comp);
+       assert(graph);
+
+       /* Create notification iterator */
+       BT_LOGD("Creating notification iterator on output port: "
+               "comp-addr=%p, comp-name\"%s\", port-addr=%p, port-name=\"%s\"",
+               output_port_comp, bt_component_get_name(output_port_comp),
+               output_port, bt_port_get_name(output_port));
+       iterator = g_new0(struct bt_notification_iterator_output_port, 1);
+       if (!iterator) {
+               BT_LOGE_STR("Failed to allocate one output port notification iterator.");
+               goto error;
+       }
+
+       init_notification_iterator((void *) iterator,
+               BT_NOTIFICATION_ITERATOR_TYPE_OUTPUT_PORT,
+               bt_output_port_notification_iterator_destroy);
+
+       /* Create colander component */
+       colander_comp_cls = bt_component_class_sink_colander_get();
+       if (!colander_comp_cls) {
+               BT_LOGW("Cannot get colander sink component class.");
+               goto error;
+       }
+
+       BT_MOVE(iterator->graph, graph);
+       iterator_base = (void *) iterator;
+       colander_comp_name =
+               colander_component_name ? colander_component_name : "colander";
+       colander_data.notification = &iterator_base->current_notification;
+       colander_data.notification_types = notification_types;
+       graph_status = bt_graph_add_component_with_init_method_data(
+               iterator->graph, colander_comp_cls, colander_comp_name,
+               NULL, &colander_data, &iterator->colander);
+       if (graph_status != BT_GRAPH_STATUS_OK) {
+               BT_LOGW("Cannot add colander sink component to graph: "
+                       "graph-addr=%p, name=\"%s\", graph-status=%s",
+                       iterator->graph, colander_comp_name,
+                       bt_graph_status_string(graph_status));
+               goto error;
+       }
+
+       /*
+        * Connect provided output port to the colander component's
+        * input port.
+        */
+       colander_in_port = bt_component_sink_get_input_port_by_index(
+               iterator->colander, 0);
+       assert(colander_in_port);
+       graph_status = bt_graph_connect_ports(iterator->graph,
+               output_port, colander_in_port, NULL);
+       if (graph_status != BT_GRAPH_STATUS_OK) {
+               BT_LOGW("Cannot add colander sink component to graph: "
+                       "graph-addr=%p, name=\"%s\", graph-status=%s",
+                       iterator->graph, colander_comp_name,
+                       bt_graph_status_string(graph_status));
+               goto error;
+       }
+
+       /*
+        * At this point everything went fine. Make the graph
+        * nonconsumable forever so that only this notification iterator
+        * can consume (thanks to bt_graph_consume_sink_no_check()).
+        * This avoids leaking the notification created by the colander
+        * sink and moved to the base notification iterator's current
+        * notification member.
+        */
+       bt_graph_set_can_consume(iterator->graph, BT_FALSE);
+       goto end;
+
+error:
+       if (iterator && iterator->graph && iterator->colander) {
+               int ret;
+
+               /* Remove created colander component from graph if any */
+               colander_comp = iterator->colander;
+               BT_PUT(iterator->colander);
+
+               /*
+                * At this point the colander component's reference
+                * count is 0 because iterator->colander was the only
+                * owner. We also know that it is not connected because
+                * this is the last operation before this function
+                * succeeds.
+                *
+                * Since we honor the preconditions here,
+                * bt_graph_remove_unconnected_component() always
+                * succeeds.
+                */
+               ret = bt_graph_remove_unconnected_component(iterator->graph,
+                       colander_comp);
+               assert(ret == 0);
+       }
+
+       BT_PUT(iterator);
+
+end:
+       bt_put(colander_in_port);
+       bt_put(colander_comp_cls);
+       bt_put(output_port_comp);
+       bt_put(graph);
+       return (void *) iterator;
+}
This page took 0.037572 seconds and 4 git commands to generate.