4 * Babeltrace Plugin Component Graph
6 * Copyright 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
8 * Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
29 #include <babeltrace/component/component-internal.h>
30 #include <babeltrace/component/graph-internal.h>
31 #include <babeltrace/component/connection-internal.h>
32 #include <babeltrace/component/component-sink-internal.h>
33 #include <babeltrace/component/component-source.h>
34 #include <babeltrace/component/component-filter.h>
35 #include <babeltrace/component/port.h>
36 #include <babeltrace/compiler.h>
40 struct bt_graph_listener
{
46 void bt_graph_destroy(struct bt_object
*obj
)
48 struct bt_graph
*graph
= container_of(obj
,
49 struct bt_graph
, base
);
51 if (graph
->components
) {
52 g_ptr_array_free(graph
->components
, TRUE
);
54 if (graph
->connections
) {
55 g_ptr_array_free(graph
->connections
, TRUE
);
57 if (graph
->sinks_to_consume
) {
58 g_queue_free(graph
->sinks_to_consume
);
61 if (graph
->listeners
.port_added
) {
62 g_array_free(graph
->listeners
.port_added
, TRUE
);
65 if (graph
->listeners
.port_removed
) {
66 g_array_free(graph
->listeners
.port_removed
, TRUE
);
69 if (graph
->listeners
.port_connected
) {
70 g_array_free(graph
->listeners
.port_connected
, TRUE
);
73 if (graph
->listeners
.port_disconnected
) {
74 g_array_free(graph
->listeners
.port_disconnected
, TRUE
);
81 int init_listeners_array(GArray
**listeners
)
86 *listeners
= g_array_new(FALSE
, TRUE
, sizeof(struct bt_graph_listener
));
96 struct bt_graph
*bt_graph_create(void)
98 struct bt_graph
*graph
;
101 graph
= g_new0(struct bt_graph
, 1);
106 bt_object_init(graph
, bt_graph_destroy
);
108 graph
->connections
= g_ptr_array_new_with_free_func(bt_object_release
);
109 if (!graph
->connections
) {
112 graph
->components
= g_ptr_array_new_with_free_func(bt_object_release
);
113 if (!graph
->components
) {
116 graph
->sinks_to_consume
= g_queue_new();
117 if (!graph
->sinks_to_consume
) {
121 ret
= init_listeners_array(&graph
->listeners
.port_added
);
126 ret
= init_listeners_array(&graph
->listeners
.port_removed
);
131 ret
= init_listeners_array(&graph
->listeners
.port_connected
);
136 ret
= init_listeners_array(&graph
->listeners
.port_disconnected
);
148 struct bt_connection
*bt_graph_connect(struct bt_graph
*graph
,
149 struct bt_port
*upstream_port
,
150 struct bt_port
*downstream_port
)
152 struct bt_connection
*connection
= NULL
;
153 struct bt_graph
*upstream_graph
= NULL
;
154 struct bt_graph
*downstream_graph
= NULL
;
155 struct bt_component
*upstream_component
= NULL
;
156 struct bt_component
*downstream_component
= NULL
;
157 struct bt_connection
*existing_conn
= NULL
;
158 enum bt_component_status component_status
;
159 bool upstream_was_already_in_graph
;
160 bool downstream_was_already_in_graph
;
161 int components_to_remove
= 0;
164 if (!graph
|| !upstream_port
|| !downstream_port
) {
168 /* Ensure appropriate types for upstream and downstream ports. */
169 if (bt_port_get_type(upstream_port
) != BT_PORT_TYPE_OUTPUT
) {
172 if (bt_port_get_type(downstream_port
) != BT_PORT_TYPE_INPUT
) {
176 /* Ensure that both ports are currently unconnected. */
177 existing_conn
= bt_port_get_connection(upstream_port
);
178 bt_put(existing_conn
);
180 fprintf(stderr
, "Upstream port is already connected\n");
184 existing_conn
= bt_port_get_connection(downstream_port
);
185 bt_put(existing_conn
);
187 fprintf(stderr
, "Downstream port is already connected\n");
192 * Ensure that both ports are still attached to their creating
195 upstream_component
= bt_port_get_component(upstream_port
);
196 if (!upstream_component
) {
197 fprintf(stderr
, "Upstream port does not belong to a component\n");
201 downstream_component
= bt_port_get_component(downstream_port
);
202 if (!downstream_component
) {
203 fprintf(stderr
, "Downstream port does not belong to a component\n");
207 /* Ensure the components are not already part of another graph. */
208 upstream_graph
= bt_component_get_graph(upstream_component
);
209 if (upstream_graph
&& (graph
!= upstream_graph
)) {
210 fprintf(stderr
, "Upstream component is already part of another graph\n");
213 upstream_was_already_in_graph
= (graph
== upstream_graph
);
214 downstream_graph
= bt_component_get_graph(downstream_component
);
215 if (downstream_graph
&& (graph
!= downstream_graph
)) {
216 fprintf(stderr
, "Downstream component is already part of another graph\n");
219 downstream_was_already_in_graph
= (graph
== downstream_graph
);
221 connection
= bt_connection_create(graph
, upstream_port
,
228 * Ownership of upstream_component/downstream_component and of
229 * the connection object is transferred to the graph.
231 g_ptr_array_add(graph
->connections
, connection
);
233 if (!upstream_was_already_in_graph
) {
234 g_ptr_array_add(graph
->components
, upstream_component
);
235 bt_component_set_graph(upstream_component
, graph
);
237 if (!downstream_was_already_in_graph
) {
238 g_ptr_array_add(graph
->components
, downstream_component
);
239 bt_component_set_graph(downstream_component
, graph
);
240 if (bt_component_get_class_type(downstream_component
) ==
241 BT_COMPONENT_CLASS_TYPE_SINK
) {
242 g_queue_push_tail(graph
->sinks_to_consume
,
243 downstream_component
);
248 * The graph is now the parent of these components which garantees their
249 * existence for the duration of the graph's lifetime.
253 * The components and connection are added to the graph before
254 * invoking the `accept_port_connection` method in order to make
255 * them visible to the components during the method's
258 component_status
= bt_component_accept_port_connection(
259 upstream_component
, upstream_port
);
260 if (component_status
!= BT_COMPONENT_STATUS_OK
) {
263 component_status
= bt_component_accept_port_connection(
264 downstream_component
, downstream_port
);
265 if (component_status
!= BT_COMPONENT_STATUS_OK
) {
270 * Both components accepted the connection. Notify the graph's
271 * creator that both ports are connected.
273 bt_graph_notify_port_connected(graph
, upstream_port
);
274 bt_graph_notify_port_connected(graph
, downstream_port
);
277 bt_put(upstream_graph
);
278 bt_put(downstream_graph
);
279 bt_put(upstream_component
);
280 bt_put(downstream_component
);
284 * Remove newly-added components from the graph, being careful
285 * not to remove a component that was already present in the graph
286 * and is connected to other components.
288 components_to_remove
+= upstream_was_already_in_graph
? 0 : 1;
289 components_to_remove
+= downstream_was_already_in_graph
? 0 : 1;
291 if (!downstream_was_already_in_graph
) {
292 if (bt_component_get_class_type(downstream_component
) ==
293 BT_COMPONENT_CLASS_TYPE_SINK
) {
294 g_queue_pop_tail(graph
->sinks_to_consume
);
297 /* Remove newly created connection. */
298 g_ptr_array_set_size(graph
->connections
,
299 graph
->connections
->len
- 1);
302 * Remove newly added components.
304 * Note that this is a tricky situation. The graph, being the parent
305 * of the components, does not hold a reference to them. Normally,
306 * components are destroyed right away when the graph is released since
307 * the graph, being their parent, bounds their lifetime
308 * (see doc/ref-counting.md).
310 * In this particular case, we must take a number of steps:
311 * 1) unset the components' parent to rollback the initial state of
312 * the components being connected.
313 * Note that the reference taken by the component on its graph is
314 * released by the set_parent call.
315 * 2) set the pointer in the components array to NULL so that the
316 * destruction function called on the array's resize in invoked on
319 * NOTE: Point #1 assumes that *something* holds a reference to both
320 * components being connected. The fact that a reference is being
321 * held to a component means that it must hold a reference to its
322 * parent to prevent the parent from being destroyed (again, refer
323 * to doc/red-counting.md). This reference to a component is
324 * most likely being held *transitively* by the caller which holds
325 * a reference to both ports (a port has its component as a
328 * This assumes that a graph is not connecting components by
329 * itself while not holding a reference to the ports/components
330 * being connected (i.e. "cheating" by using internal APIs).
332 for (i
= 0; i
< components_to_remove
; i
++) {
333 struct bt_component
*component
= g_ptr_array_index(
334 graph
->components
, graph
->components
->len
- 1);
336 bt_component_set_graph(component
, NULL
);
337 g_ptr_array_index(graph
->components
,
338 graph
->components
->len
- 1) = NULL
;
339 g_ptr_array_set_size(graph
->components
,
340 graph
->components
->len
- 1);
342 /* NOTE: Resizing the ptr_arrays invokes the destruction of the elements. */
345 BT_PUT(upstream_component
);
346 BT_PUT(downstream_component
);
351 enum bt_component_status
get_component_port_counts(
352 struct bt_component
*component
, uint64_t *input_count
,
353 uint64_t *output_count
)
355 enum bt_component_status ret
;
357 switch (bt_component_get_class_type(component
)) {
358 case BT_COMPONENT_CLASS_TYPE_SOURCE
:
359 ret
= bt_component_source_get_output_port_count(component
,
361 if (ret
!= BT_COMPONENT_STATUS_OK
) {
365 case BT_COMPONENT_CLASS_TYPE_FILTER
:
366 ret
= bt_component_filter_get_output_port_count(component
,
368 if (ret
!= BT_COMPONENT_STATUS_OK
) {
371 ret
= bt_component_filter_get_input_port_count(component
,
373 if (ret
!= BT_COMPONENT_STATUS_OK
) {
377 case BT_COMPONENT_CLASS_TYPE_SINK
:
378 ret
= bt_component_sink_get_input_port_count(component
,
380 if (ret
!= BT_COMPONENT_STATUS_OK
) {
388 ret
= BT_COMPONENT_STATUS_OK
;
393 enum bt_graph_status
bt_graph_add_component_as_sibling(struct bt_graph
*graph
,
394 struct bt_component
*origin
,
395 struct bt_component
*new_component
)
397 uint64_t origin_input_port_count
= 0;
398 uint64_t origin_output_port_count
= 0;
399 uint64_t new_input_port_count
= 0;
400 uint64_t new_output_port_count
= 0;
401 enum bt_graph_status status
= BT_GRAPH_STATUS_OK
;
402 struct bt_graph
*origin_graph
= NULL
;
403 struct bt_graph
*new_graph
= NULL
;
404 struct bt_port
*origin_port
= NULL
;
405 struct bt_port
*new_port
= NULL
;
406 struct bt_port
*upstream_port
= NULL
;
407 struct bt_port
*downstream_port
= NULL
;
408 struct bt_connection
*origin_connection
= NULL
;
409 struct bt_connection
*new_connection
= NULL
;
412 if (!graph
|| !origin
|| !new_component
) {
413 status
= BT_GRAPH_STATUS_INVALID
;
417 if (bt_component_get_class_type(origin
) !=
418 bt_component_get_class_type(new_component
)) {
419 status
= BT_GRAPH_STATUS_INVALID
;
423 origin_graph
= bt_component_get_graph(origin
);
424 if (!origin_graph
|| (origin_graph
!= graph
)) {
425 status
= BT_GRAPH_STATUS_INVALID
;
429 new_graph
= bt_component_get_graph(new_component
);
431 status
= BT_GRAPH_STATUS_ALREADY_IN_A_GRAPH
;
435 if (get_component_port_counts(origin
, &origin_input_port_count
,
436 &origin_output_port_count
) != BT_COMPONENT_STATUS_OK
) {
437 status
= BT_GRAPH_STATUS_INVALID
;
440 if (get_component_port_counts(new_component
, &new_input_port_count
,
441 &new_output_port_count
) != BT_COMPONENT_STATUS_OK
) {
442 status
= BT_GRAPH_STATUS_INVALID
;
446 if (origin_input_port_count
!= new_input_port_count
||
447 origin_output_port_count
!= new_output_port_count
) {
448 status
= BT_GRAPH_STATUS_INVALID
;
452 /* Replicate input connections. */
453 for (port_index
= 0; port_index
< origin_input_port_count
; port_index
++) {
454 origin_port
= bt_component_get_input_port_at_index(origin
,
457 status
= BT_GRAPH_STATUS_ERROR
;
458 goto error_disconnect
;
461 new_port
= bt_component_get_input_port_at_index(new_component
,
464 status
= BT_GRAPH_STATUS_ERROR
;
465 goto error_disconnect
;
468 origin_connection
= bt_port_get_connection(origin_port
);
469 if (origin_connection
) {
470 upstream_port
= bt_connection_get_upstream_port(
472 if (!upstream_port
) {
473 goto error_disconnect
;
476 new_connection
= bt_graph_connect(graph
, upstream_port
,
478 if (!new_connection
) {
479 goto error_disconnect
;
483 BT_PUT(upstream_port
);
484 BT_PUT(origin_connection
);
485 BT_PUT(new_connection
);
490 /* Replicate output connections. */
491 for (port_index
= 0; port_index
< origin_output_port_count
; port_index
++) {
492 origin_port
= bt_component_get_output_port_at_index(origin
,
495 status
= BT_GRAPH_STATUS_ERROR
;
496 goto error_disconnect
;
498 new_port
= bt_component_get_output_port_at_index(new_component
,
501 status
= BT_GRAPH_STATUS_ERROR
;
502 goto error_disconnect
;
505 origin_connection
= bt_port_get_connection(origin_port
);
506 if (origin_connection
) {
507 downstream_port
= bt_connection_get_downstream_port(
509 if (!downstream_port
) {
510 goto error_disconnect
;
513 new_connection
= bt_graph_connect(graph
, new_port
,
515 if (!new_connection
) {
516 goto error_disconnect
;
520 BT_PUT(downstream_port
);
521 BT_PUT(origin_connection
);
522 BT_PUT(new_connection
);
527 bt_put(origin_graph
);
531 bt_put(upstream_port
);
532 bt_put(downstream_port
);
533 bt_put(origin_connection
);
534 bt_put(new_connection
);
537 /* Destroy all connections of the new component. */
542 enum bt_graph_status
bt_graph_consume(struct bt_graph
*graph
)
544 struct bt_component
*sink
;
545 enum bt_graph_status status
= BT_GRAPH_STATUS_OK
;
546 enum bt_component_status comp_status
;
550 status
= BT_GRAPH_STATUS_INVALID
;
554 if (g_queue_is_empty(graph
->sinks_to_consume
)) {
555 status
= BT_GRAPH_STATUS_END
;
559 current_node
= g_queue_pop_head_link(graph
->sinks_to_consume
);
560 sink
= current_node
->data
;
561 comp_status
= bt_component_sink_consume(sink
);
562 switch (comp_status
) {
563 case BT_COMPONENT_STATUS_OK
:
565 case BT_COMPONENT_STATUS_END
:
566 status
= BT_GRAPH_STATUS_END
;
568 case BT_COMPONENT_STATUS_AGAIN
:
569 status
= BT_GRAPH_STATUS_AGAIN
;
571 case BT_COMPONENT_STATUS_INVALID
:
572 status
= BT_GRAPH_STATUS_INVALID
;
575 status
= BT_GRAPH_STATUS_ERROR
;
579 if (status
!= BT_GRAPH_STATUS_END
) {
580 g_queue_push_tail_link(graph
->sinks_to_consume
, current_node
);
584 /* End reached, the node is not added back to the queue and free'd. */
585 g_queue_delete_link(graph
->sinks_to_consume
, current_node
);
587 /* Don't forward an END status if there are sinks left to consume. */
588 if (!g_queue_is_empty(graph
->sinks_to_consume
)) {
589 status
= BT_GRAPH_STATUS_OK
;
596 enum bt_graph_status
bt_graph_run(struct bt_graph
*graph
)
598 enum bt_graph_status status
= BT_GRAPH_STATUS_OK
;
601 status
= BT_GRAPH_STATUS_INVALID
;
606 status
= bt_graph_consume(graph
);
607 if (status
== BT_GRAPH_STATUS_AGAIN
) {
609 * If AGAIN is received and there are multiple sinks,
610 * go ahead and consume from the next sink.
612 * However, in the case where a single sink is left,
613 * the caller can decide to busy-wait and call
614 * bt_graph_run continuously until the source is ready
615 * or it can decide to sleep for an arbitrary amount of
618 if (graph
->sinks_to_consume
->length
> 1) {
619 status
= BT_GRAPH_STATUS_OK
;
622 } while (status
== BT_GRAPH_STATUS_OK
);
624 if (g_queue_is_empty(graph
->sinks_to_consume
)) {
625 status
= BT_GRAPH_STATUS_END
;
632 void add_listener(GArray
*listeners
, void *func
, void *data
)
634 struct bt_graph_listener listener
= {
639 g_array_append_val(listeners
, listener
);
642 enum bt_graph_status
bt_graph_add_port_added_listener(
643 struct bt_graph
*graph
,
644 bt_graph_port_added_listener listener
, void *data
)
646 enum bt_graph_status status
= BT_GRAPH_STATUS_OK
;
648 if (!graph
|| !listener
) {
649 status
= BT_GRAPH_STATUS_INVALID
;
653 add_listener(graph
->listeners
.port_added
, listener
, data
);
659 enum bt_graph_status
bt_graph_add_port_removed_listener(
660 struct bt_graph
*graph
,
661 bt_graph_port_removed_listener listener
, void *data
)
663 enum bt_graph_status status
= BT_GRAPH_STATUS_OK
;
665 if (!graph
|| !listener
) {
666 status
= BT_GRAPH_STATUS_INVALID
;
670 add_listener(graph
->listeners
.port_removed
, listener
, data
);
676 enum bt_graph_status
bt_graph_add_port_connected_listener(
677 struct bt_graph
*graph
,
678 bt_graph_port_connected_listener listener
, void *data
)
680 enum bt_graph_status status
= BT_GRAPH_STATUS_OK
;
682 if (!graph
|| !listener
) {
683 status
= BT_GRAPH_STATUS_INVALID
;
687 add_listener(graph
->listeners
.port_connected
, listener
, data
);
693 enum bt_graph_status
bt_graph_add_port_disconnected_listener(
694 struct bt_graph
*graph
,
695 bt_graph_port_disconnected_listener listener
, void *data
)
697 enum bt_graph_status status
= BT_GRAPH_STATUS_OK
;
699 if (!graph
|| !listener
) {
700 status
= BT_GRAPH_STATUS_INVALID
;
704 add_listener(graph
->listeners
.port_disconnected
, listener
, data
);
711 void bt_graph_notify_port_added(struct bt_graph
*graph
, struct bt_port
*port
)
715 for (i
= 0; i
< graph
->listeners
.port_added
->len
; i
++) {
716 struct bt_graph_listener listener
=
717 g_array_index(graph
->listeners
.port_added
,
718 struct bt_graph_listener
, i
);
719 bt_graph_port_added_listener func
= listener
.func
;
722 func(port
, listener
.data
);
727 void bt_graph_notify_port_removed(struct bt_graph
*graph
,
728 struct bt_component
*comp
, struct bt_port
*port
)
732 for (i
= 0; i
< graph
->listeners
.port_removed
->len
; i
++) {
733 struct bt_graph_listener listener
=
734 g_array_index(graph
->listeners
.port_removed
,
735 struct bt_graph_listener
, i
);
736 bt_graph_port_removed_listener func
= listener
.func
;
739 func(comp
, port
, listener
.data
);
744 void bt_graph_notify_port_connected(struct bt_graph
*graph
,
745 struct bt_port
*port
)
749 for (i
= 0; i
< graph
->listeners
.port_connected
->len
; i
++) {
750 struct bt_graph_listener listener
=
751 g_array_index(graph
->listeners
.port_connected
,
752 struct bt_graph_listener
, i
);
753 bt_graph_port_connected_listener func
= listener
.func
;
756 func(port
, listener
.data
);
761 void bt_graph_notify_port_disconnected(struct bt_graph
*graph
,
762 struct bt_component
*comp
, struct bt_port
*port
)
766 for (i
= 0; i
< graph
->listeners
.port_disconnected
->len
; i
++) {
767 struct bt_graph_listener listener
=
768 g_array_index(graph
->listeners
.port_disconnected
,
769 struct bt_graph_listener
, i
);
770 bt_graph_port_disconnected_listener func
= listener
.func
;
773 func(comp
, port
, listener
.data
);