4 * Babeltrace Plugin Component Graph
6 * Copyright 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
8 * Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
29 #include <babeltrace/graph/component-internal.h>
30 #include <babeltrace/graph/graph-internal.h>
31 #include <babeltrace/graph/connection-internal.h>
32 #include <babeltrace/graph/component-sink-internal.h>
33 #include <babeltrace/graph/component-source.h>
34 #include <babeltrace/graph/component-filter.h>
35 #include <babeltrace/graph/port.h>
36 #include <babeltrace/compiler-internal.h>
40 struct bt_graph_listener
{
46 void bt_graph_destroy(struct bt_object
*obj
)
48 struct bt_graph
*graph
= container_of(obj
,
49 struct bt_graph
, base
);
51 if (graph
->components
) {
52 g_ptr_array_free(graph
->components
, TRUE
);
54 if (graph
->connections
) {
55 g_ptr_array_free(graph
->connections
, TRUE
);
57 if (graph
->sinks_to_consume
) {
58 g_queue_free(graph
->sinks_to_consume
);
61 if (graph
->listeners
.port_added
) {
62 g_array_free(graph
->listeners
.port_added
, TRUE
);
65 if (graph
->listeners
.port_removed
) {
66 g_array_free(graph
->listeners
.port_removed
, TRUE
);
69 if (graph
->listeners
.ports_connected
) {
70 g_array_free(graph
->listeners
.ports_connected
, TRUE
);
73 if (graph
->listeners
.ports_disconnected
) {
74 g_array_free(graph
->listeners
.ports_disconnected
, TRUE
);
81 int init_listeners_array(GArray
**listeners
)
86 *listeners
= g_array_new(FALSE
, TRUE
, sizeof(struct bt_graph_listener
));
96 struct bt_graph
*bt_graph_create(void)
98 struct bt_graph
*graph
;
101 graph
= g_new0(struct bt_graph
, 1);
106 bt_object_init(graph
, bt_graph_destroy
);
108 graph
->connections
= g_ptr_array_new_with_free_func(bt_object_release
);
109 if (!graph
->connections
) {
112 graph
->components
= g_ptr_array_new_with_free_func(bt_object_release
);
113 if (!graph
->components
) {
116 graph
->sinks_to_consume
= g_queue_new();
117 if (!graph
->sinks_to_consume
) {
121 ret
= init_listeners_array(&graph
->listeners
.port_added
);
126 ret
= init_listeners_array(&graph
->listeners
.port_removed
);
131 ret
= init_listeners_array(&graph
->listeners
.ports_connected
);
136 ret
= init_listeners_array(&graph
->listeners
.ports_disconnected
);
148 struct bt_connection
*bt_graph_connect_ports(struct bt_graph
*graph
,
149 struct bt_port
*upstream_port
,
150 struct bt_port
*downstream_port
)
152 struct bt_connection
*connection
= NULL
;
153 struct bt_graph
*upstream_graph
= NULL
;
154 struct bt_graph
*downstream_graph
= NULL
;
155 struct bt_component
*upstream_component
= NULL
;
156 struct bt_component
*downstream_component
= NULL
;
157 enum bt_component_status component_status
;
158 bool upstream_was_already_in_graph
;
159 bool downstream_was_already_in_graph
;
161 if (!graph
|| !upstream_port
|| !downstream_port
) {
165 /* Ensure appropriate types for upstream and downstream ports. */
166 if (bt_port_get_type(upstream_port
) != BT_PORT_TYPE_OUTPUT
) {
169 if (bt_port_get_type(downstream_port
) != BT_PORT_TYPE_INPUT
) {
173 /* Ensure that both ports are currently unconnected. */
174 if (bt_port_is_connected(upstream_port
)) {
175 fprintf(stderr
, "Upstream port is already connected\n");
179 if (bt_port_is_connected(downstream_port
)) {
180 fprintf(stderr
, "Downstream port is already connected\n");
185 * Ensure that both ports are still attached to their creating
188 upstream_component
= bt_port_get_component(upstream_port
);
189 if (!upstream_component
) {
190 fprintf(stderr
, "Upstream port does not belong to a component\n");
194 downstream_component
= bt_port_get_component(downstream_port
);
195 if (!downstream_component
) {
196 fprintf(stderr
, "Downstream port does not belong to a component\n");
200 /* Ensure the components are not already part of another graph. */
201 upstream_graph
= bt_component_get_graph(upstream_component
);
202 if (upstream_graph
&& (graph
!= upstream_graph
)) {
203 fprintf(stderr
, "Upstream component is already part of another graph\n");
206 upstream_was_already_in_graph
= (graph
== upstream_graph
);
207 downstream_graph
= bt_component_get_graph(downstream_component
);
208 if (downstream_graph
&& (graph
!= downstream_graph
)) {
209 fprintf(stderr
, "Downstream component is already part of another graph\n");
212 downstream_was_already_in_graph
= (graph
== downstream_graph
);
215 * At this point the ports are not connected yet. Both
216 * components need to accept an eventual connection to their
217 * port by the other port before we continue.
219 component_status
= bt_component_accept_port_connection(
220 upstream_component
, upstream_port
, downstream_port
);
221 if (component_status
!= BT_COMPONENT_STATUS_OK
) {
224 component_status
= bt_component_accept_port_connection(
225 downstream_component
, downstream_port
, upstream_port
);
226 if (component_status
!= BT_COMPONENT_STATUS_OK
) {
230 connection
= bt_connection_create(graph
, upstream_port
,
237 * Ownership of upstream_component/downstream_component and of
238 * the connection object is transferred to the graph.
240 g_ptr_array_add(graph
->connections
, connection
);
242 if (!upstream_was_already_in_graph
) {
243 g_ptr_array_add(graph
->components
, upstream_component
);
244 bt_component_set_graph(upstream_component
, graph
);
246 if (!downstream_was_already_in_graph
) {
247 g_ptr_array_add(graph
->components
, downstream_component
);
248 bt_component_set_graph(downstream_component
, graph
);
249 if (bt_component_get_class_type(downstream_component
) ==
250 BT_COMPONENT_CLASS_TYPE_SINK
) {
251 g_queue_push_tail(graph
->sinks_to_consume
,
252 downstream_component
);
257 * The graph is now the parent of these components which
258 * garantees their existence for the duration of the graph's
263 * Notify both components that their port is connected.
265 bt_component_port_connected(upstream_component
, upstream_port
,
267 bt_component_port_connected(downstream_component
, downstream_port
,
271 * Notify the graph's creator that both ports are connected.
273 bt_graph_notify_ports_connected(graph
, upstream_port
, downstream_port
);
276 bt_put(upstream_graph
);
277 bt_put(downstream_graph
);
278 bt_put(upstream_component
);
279 bt_put(downstream_component
);
283 BT_PUT(upstream_component
);
284 BT_PUT(downstream_component
);
289 enum bt_component_status
get_component_port_counts(
290 struct bt_component
*component
, int64_t *input_count
,
291 int64_t *output_count
)
293 enum bt_component_status ret
;
295 switch (bt_component_get_class_type(component
)) {
296 case BT_COMPONENT_CLASS_TYPE_SOURCE
:
298 bt_component_source_get_output_port_count(component
);
299 if (*output_count
< 0) {
300 ret
= BT_COMPONENT_STATUS_ERROR
;
304 case BT_COMPONENT_CLASS_TYPE_FILTER
:
306 bt_component_filter_get_output_port_count(component
);
307 if (*output_count
< 0) {
308 ret
= BT_COMPONENT_STATUS_ERROR
;
312 bt_component_filter_get_input_port_count(component
);
313 if (*input_count
< 0) {
314 ret
= BT_COMPONENT_STATUS_ERROR
;
318 case BT_COMPONENT_CLASS_TYPE_SINK
:
320 bt_component_sink_get_input_port_count(component
);
321 if (*input_count
< 0) {
322 ret
= BT_COMPONENT_STATUS_ERROR
;
330 ret
= BT_COMPONENT_STATUS_OK
;
335 enum bt_graph_status
bt_graph_add_component_as_sibling(struct bt_graph
*graph
,
336 struct bt_component
*origin
,
337 struct bt_component
*new_component
)
339 int64_t origin_input_port_count
= 0;
340 int64_t origin_output_port_count
= 0;
341 int64_t new_input_port_count
= 0;
342 int64_t new_output_port_count
= 0;
343 enum bt_graph_status status
= BT_GRAPH_STATUS_OK
;
344 struct bt_graph
*origin_graph
= NULL
;
345 struct bt_graph
*new_graph
= NULL
;
346 struct bt_port
*origin_port
= NULL
;
347 struct bt_port
*new_port
= NULL
;
348 struct bt_port
*upstream_port
= NULL
;
349 struct bt_port
*downstream_port
= NULL
;
350 struct bt_connection
*origin_connection
= NULL
;
351 struct bt_connection
*new_connection
= NULL
;
354 if (!graph
|| !origin
|| !new_component
) {
355 status
= BT_GRAPH_STATUS_INVALID
;
359 if (bt_component_get_class_type(origin
) !=
360 bt_component_get_class_type(new_component
)) {
361 status
= BT_GRAPH_STATUS_INVALID
;
365 origin_graph
= bt_component_get_graph(origin
);
366 if (!origin_graph
|| (origin_graph
!= graph
)) {
367 status
= BT_GRAPH_STATUS_INVALID
;
371 new_graph
= bt_component_get_graph(new_component
);
373 status
= BT_GRAPH_STATUS_ALREADY_IN_A_GRAPH
;
377 if (get_component_port_counts(origin
, &origin_input_port_count
,
378 &origin_output_port_count
) != BT_COMPONENT_STATUS_OK
) {
379 status
= BT_GRAPH_STATUS_INVALID
;
382 if (get_component_port_counts(new_component
, &new_input_port_count
,
383 &new_output_port_count
) != BT_COMPONENT_STATUS_OK
) {
384 status
= BT_GRAPH_STATUS_INVALID
;
388 if (origin_input_port_count
!= new_input_port_count
||
389 origin_output_port_count
!= new_output_port_count
) {
390 status
= BT_GRAPH_STATUS_INVALID
;
394 /* Replicate input connections. */
395 for (port_index
= 0; port_index
< origin_input_port_count
; port_index
++) {
396 origin_port
= bt_component_get_input_port_by_index(origin
,
399 status
= BT_GRAPH_STATUS_ERROR
;
400 goto error_disconnect
;
403 new_port
= bt_component_get_input_port_by_index(new_component
,
406 status
= BT_GRAPH_STATUS_ERROR
;
407 goto error_disconnect
;
410 origin_connection
= bt_port_get_connection(origin_port
);
411 if (origin_connection
) {
412 upstream_port
= bt_connection_get_upstream_port(
414 if (!upstream_port
) {
415 goto error_disconnect
;
418 new_connection
= bt_graph_connect_ports(graph
,
419 upstream_port
, new_port
);
420 if (!new_connection
) {
421 goto error_disconnect
;
425 BT_PUT(upstream_port
);
426 BT_PUT(origin_connection
);
427 BT_PUT(new_connection
);
432 /* Replicate output connections. */
433 for (port_index
= 0; port_index
< origin_output_port_count
; port_index
++) {
434 origin_port
= bt_component_get_output_port_by_index(origin
,
437 status
= BT_GRAPH_STATUS_ERROR
;
438 goto error_disconnect
;
440 new_port
= bt_component_get_output_port_by_index(new_component
,
443 status
= BT_GRAPH_STATUS_ERROR
;
444 goto error_disconnect
;
447 origin_connection
= bt_port_get_connection(origin_port
);
448 if (origin_connection
) {
449 downstream_port
= bt_connection_get_downstream_port(
451 if (!downstream_port
) {
452 goto error_disconnect
;
455 new_connection
= bt_graph_connect_ports(graph
,
456 new_port
, downstream_port
);
457 if (!new_connection
) {
458 goto error_disconnect
;
462 BT_PUT(downstream_port
);
463 BT_PUT(origin_connection
);
464 BT_PUT(new_connection
);
469 bt_put(origin_graph
);
473 bt_put(upstream_port
);
474 bt_put(downstream_port
);
475 bt_put(origin_connection
);
476 bt_put(new_connection
);
479 /* Destroy all connections of the new component. */
484 enum bt_graph_status
bt_graph_consume(struct bt_graph
*graph
)
486 struct bt_component
*sink
;
487 enum bt_graph_status status
= BT_GRAPH_STATUS_OK
;
488 enum bt_component_status comp_status
;
492 status
= BT_GRAPH_STATUS_INVALID
;
496 if (g_queue_is_empty(graph
->sinks_to_consume
)) {
497 status
= BT_GRAPH_STATUS_END
;
501 current_node
= g_queue_pop_head_link(graph
->sinks_to_consume
);
502 sink
= current_node
->data
;
503 comp_status
= bt_component_sink_consume(sink
);
504 switch (comp_status
) {
505 case BT_COMPONENT_STATUS_OK
:
507 case BT_COMPONENT_STATUS_END
:
508 status
= BT_GRAPH_STATUS_END
;
510 case BT_COMPONENT_STATUS_AGAIN
:
511 status
= BT_GRAPH_STATUS_AGAIN
;
513 case BT_COMPONENT_STATUS_INVALID
:
514 status
= BT_GRAPH_STATUS_INVALID
;
517 status
= BT_GRAPH_STATUS_ERROR
;
521 if (status
!= BT_GRAPH_STATUS_END
) {
522 g_queue_push_tail_link(graph
->sinks_to_consume
, current_node
);
526 /* End reached, the node is not added back to the queue and free'd. */
527 g_queue_delete_link(graph
->sinks_to_consume
, current_node
);
529 /* Don't forward an END status if there are sinks left to consume. */
530 if (!g_queue_is_empty(graph
->sinks_to_consume
)) {
531 status
= BT_GRAPH_STATUS_OK
;
538 enum bt_graph_status
bt_graph_run(struct bt_graph
*graph
)
540 enum bt_graph_status status
= BT_GRAPH_STATUS_OK
;
543 status
= BT_GRAPH_STATUS_INVALID
;
548 status
= bt_graph_consume(graph
);
549 if (status
== BT_GRAPH_STATUS_AGAIN
) {
551 * If AGAIN is received and there are multiple sinks,
552 * go ahead and consume from the next sink.
554 * However, in the case where a single sink is left,
555 * the caller can decide to busy-wait and call
556 * bt_graph_run continuously until the source is ready
557 * or it can decide to sleep for an arbitrary amount of
560 if (graph
->sinks_to_consume
->length
> 1) {
561 status
= BT_GRAPH_STATUS_OK
;
564 } while (status
== BT_GRAPH_STATUS_OK
);
566 if (g_queue_is_empty(graph
->sinks_to_consume
)) {
567 status
= BT_GRAPH_STATUS_END
;
574 void add_listener(GArray
*listeners
, void *func
, void *data
)
576 struct bt_graph_listener listener
= {
581 g_array_append_val(listeners
, listener
);
584 enum bt_graph_status
bt_graph_add_port_added_listener(
585 struct bt_graph
*graph
,
586 bt_graph_port_added_listener listener
, void *data
)
588 enum bt_graph_status status
= BT_GRAPH_STATUS_OK
;
590 if (!graph
|| !listener
) {
591 status
= BT_GRAPH_STATUS_INVALID
;
595 add_listener(graph
->listeners
.port_added
, listener
, data
);
601 enum bt_graph_status
bt_graph_add_port_removed_listener(
602 struct bt_graph
*graph
,
603 bt_graph_port_removed_listener listener
, void *data
)
605 enum bt_graph_status status
= BT_GRAPH_STATUS_OK
;
607 if (!graph
|| !listener
) {
608 status
= BT_GRAPH_STATUS_INVALID
;
612 add_listener(graph
->listeners
.port_removed
, listener
, data
);
618 enum bt_graph_status
bt_graph_add_ports_connected_listener(
619 struct bt_graph
*graph
,
620 bt_graph_ports_connected_listener listener
, void *data
)
622 enum bt_graph_status status
= BT_GRAPH_STATUS_OK
;
624 if (!graph
|| !listener
) {
625 status
= BT_GRAPH_STATUS_INVALID
;
629 add_listener(graph
->listeners
.ports_connected
, listener
, data
);
635 enum bt_graph_status
bt_graph_add_ports_disconnected_listener(
636 struct bt_graph
*graph
,
637 bt_graph_ports_disconnected_listener listener
, void *data
)
639 enum bt_graph_status status
= BT_GRAPH_STATUS_OK
;
641 if (!graph
|| !listener
) {
642 status
= BT_GRAPH_STATUS_INVALID
;
646 add_listener(graph
->listeners
.ports_disconnected
, listener
, data
);
653 void bt_graph_notify_port_added(struct bt_graph
*graph
, struct bt_port
*port
)
657 for (i
= 0; i
< graph
->listeners
.port_added
->len
; i
++) {
658 struct bt_graph_listener listener
=
659 g_array_index(graph
->listeners
.port_added
,
660 struct bt_graph_listener
, i
);
661 bt_graph_port_added_listener func
= listener
.func
;
664 func(port
, listener
.data
);
669 void bt_graph_notify_port_removed(struct bt_graph
*graph
,
670 struct bt_component
*comp
, struct bt_port
*port
)
674 for (i
= 0; i
< graph
->listeners
.port_removed
->len
; i
++) {
675 struct bt_graph_listener listener
=
676 g_array_index(graph
->listeners
.port_removed
,
677 struct bt_graph_listener
, i
);
678 bt_graph_port_removed_listener func
= listener
.func
;
681 func(comp
, port
, listener
.data
);
686 void bt_graph_notify_ports_connected(struct bt_graph
*graph
,
687 struct bt_port
*upstream_port
, struct bt_port
*downstream_port
)
691 for (i
= 0; i
< graph
->listeners
.ports_connected
->len
; i
++) {
692 struct bt_graph_listener listener
=
693 g_array_index(graph
->listeners
.ports_connected
,
694 struct bt_graph_listener
, i
);
695 bt_graph_ports_connected_listener func
= listener
.func
;
698 func(upstream_port
, downstream_port
, listener
.data
);
703 void bt_graph_notify_ports_disconnected(struct bt_graph
*graph
,
704 struct bt_component
*upstream_comp
,
705 struct bt_component
*downstream_comp
,
706 struct bt_port
*upstream_port
, struct bt_port
*downstream_port
)
710 for (i
= 0; i
< graph
->listeners
.ports_disconnected
->len
; i
++) {
711 struct bt_graph_listener listener
=
712 g_array_index(graph
->listeners
.ports_disconnected
,
713 struct bt_graph_listener
, i
);
714 bt_graph_ports_disconnected_listener func
= listener
.func
;
717 func(upstream_comp
, downstream_comp
, upstream_port
,
718 downstream_port
, listener
.data
);