Do not use `bool` type; use new `bt_bool` instead
[babeltrace.git] / lib / graph / graph.c
1 /*
2 * graph.c
3 *
4 * Babeltrace Plugin Component Graph
5 *
6 * Copyright 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
7 *
8 * Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
9 *
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
16 *
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
19 *
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
26 * SOFTWARE.
27 */
28
29 #include <babeltrace/graph/component-internal.h>
30 #include <babeltrace/graph/graph-internal.h>
31 #include <babeltrace/graph/connection-internal.h>
32 #include <babeltrace/graph/component-sink-internal.h>
33 #include <babeltrace/graph/component-source.h>
34 #include <babeltrace/graph/component-filter.h>
35 #include <babeltrace/graph/port.h>
36 #include <babeltrace/compiler-internal.h>
37 #include <babeltrace/types.h>
38 #include <unistd.h>
39 #include <glib.h>
40
41 struct bt_graph_listener {
42 void *func;
43 void *data;
44 };
45
46 static
47 void bt_graph_destroy(struct bt_object *obj)
48 {
49 struct bt_graph *graph = container_of(obj,
50 struct bt_graph, base);
51
52 if (graph->components) {
53 g_ptr_array_free(graph->components, TRUE);
54 }
55 if (graph->connections) {
56 g_ptr_array_free(graph->connections, TRUE);
57 }
58 if (graph->sinks_to_consume) {
59 g_queue_free(graph->sinks_to_consume);
60 }
61
62 if (graph->listeners.port_added) {
63 g_array_free(graph->listeners.port_added, TRUE);
64 }
65
66 if (graph->listeners.port_removed) {
67 g_array_free(graph->listeners.port_removed, TRUE);
68 }
69
70 if (graph->listeners.ports_connected) {
71 g_array_free(graph->listeners.ports_connected, TRUE);
72 }
73
74 if (graph->listeners.ports_disconnected) {
75 g_array_free(graph->listeners.ports_disconnected, TRUE);
76 }
77
78 g_free(graph);
79 }
80
81 static
82 int init_listeners_array(GArray **listeners)
83 {
84 int ret = 0;
85
86 assert(listeners);
87 *listeners = g_array_new(FALSE, TRUE, sizeof(struct bt_graph_listener));
88 if (!*listeners) {
89 ret = -1;
90 goto end;
91 }
92
93 end:
94 return ret;
95 }
96
97 struct bt_graph *bt_graph_create(void)
98 {
99 struct bt_graph *graph;
100 int ret;
101
102 graph = g_new0(struct bt_graph, 1);
103 if (!graph) {
104 goto end;
105 }
106
107 bt_object_init(graph, bt_graph_destroy);
108
109 graph->connections = g_ptr_array_new_with_free_func(bt_object_release);
110 if (!graph->connections) {
111 goto error;
112 }
113 graph->components = g_ptr_array_new_with_free_func(bt_object_release);
114 if (!graph->components) {
115 goto error;
116 }
117 graph->sinks_to_consume = g_queue_new();
118 if (!graph->sinks_to_consume) {
119 goto error;
120 }
121
122 ret = init_listeners_array(&graph->listeners.port_added);
123 if (ret) {
124 goto error;
125 }
126
127 ret = init_listeners_array(&graph->listeners.port_removed);
128 if (ret) {
129 goto error;
130 }
131
132 ret = init_listeners_array(&graph->listeners.ports_connected);
133 if (ret) {
134 goto error;
135 }
136
137 ret = init_listeners_array(&graph->listeners.ports_disconnected);
138 if (ret) {
139 goto error;
140 }
141
142 end:
143 return graph;
144 error:
145 BT_PUT(graph);
146 goto end;
147 }
148
149 struct bt_connection *bt_graph_connect_ports(struct bt_graph *graph,
150 struct bt_port *upstream_port,
151 struct bt_port *downstream_port)
152 {
153 struct bt_connection *connection = NULL;
154 struct bt_graph *upstream_graph = NULL;
155 struct bt_graph *downstream_graph = NULL;
156 struct bt_component *upstream_component = NULL;
157 struct bt_component *downstream_component = NULL;
158 enum bt_component_status component_status;
159 bt_bool upstream_was_already_in_graph;
160 bt_bool downstream_was_already_in_graph;
161
162 if (!graph || !upstream_port || !downstream_port) {
163 goto end;
164 }
165
166 /* Ensure appropriate types for upstream and downstream ports. */
167 if (bt_port_get_type(upstream_port) != BT_PORT_TYPE_OUTPUT) {
168 goto end;
169 }
170 if (bt_port_get_type(downstream_port) != BT_PORT_TYPE_INPUT) {
171 goto end;
172 }
173
174 /* Ensure that both ports are currently unconnected. */
175 if (bt_port_is_connected(upstream_port)) {
176 fprintf(stderr, "Upstream port is already connected\n");
177 goto end;
178 }
179
180 if (bt_port_is_connected(downstream_port)) {
181 fprintf(stderr, "Downstream port is already connected\n");
182 goto end;
183 }
184
185 /*
186 * Ensure that both ports are still attached to their creating
187 * component.
188 */
189 upstream_component = bt_port_get_component(upstream_port);
190 if (!upstream_component) {
191 fprintf(stderr, "Upstream port does not belong to a component\n");
192 goto end;
193 }
194
195 downstream_component = bt_port_get_component(downstream_port);
196 if (!downstream_component) {
197 fprintf(stderr, "Downstream port does not belong to a component\n");
198 goto end;
199 }
200
201 /* Ensure the components are not already part of another graph. */
202 upstream_graph = bt_component_get_graph(upstream_component);
203 if (upstream_graph && (graph != upstream_graph)) {
204 fprintf(stderr, "Upstream component is already part of another graph\n");
205 goto error;
206 }
207 upstream_was_already_in_graph = (graph == upstream_graph);
208 downstream_graph = bt_component_get_graph(downstream_component);
209 if (downstream_graph && (graph != downstream_graph)) {
210 fprintf(stderr, "Downstream component is already part of another graph\n");
211 goto error;
212 }
213 downstream_was_already_in_graph = (graph == downstream_graph);
214
215 /*
216 * At this point the ports are not connected yet. Both
217 * components need to accept an eventual connection to their
218 * port by the other port before we continue.
219 */
220 component_status = bt_component_accept_port_connection(
221 upstream_component, upstream_port, downstream_port);
222 if (component_status != BT_COMPONENT_STATUS_OK) {
223 goto error;
224 }
225 component_status = bt_component_accept_port_connection(
226 downstream_component, downstream_port, upstream_port);
227 if (component_status != BT_COMPONENT_STATUS_OK) {
228 goto error;
229 }
230
231 connection = bt_connection_create(graph, upstream_port,
232 downstream_port);
233 if (!connection) {
234 goto error;
235 }
236
237 /*
238 * Ownership of upstream_component/downstream_component and of
239 * the connection object is transferred to the graph.
240 */
241 g_ptr_array_add(graph->connections, connection);
242
243 if (!upstream_was_already_in_graph) {
244 g_ptr_array_add(graph->components, upstream_component);
245 bt_component_set_graph(upstream_component, graph);
246 }
247 if (!downstream_was_already_in_graph) {
248 g_ptr_array_add(graph->components, downstream_component);
249 bt_component_set_graph(downstream_component, graph);
250 if (bt_component_get_class_type(downstream_component) ==
251 BT_COMPONENT_CLASS_TYPE_SINK) {
252 g_queue_push_tail(graph->sinks_to_consume,
253 downstream_component);
254 }
255 }
256
257 /*
258 * The graph is now the parent of these components which
259 * garantees their existence for the duration of the graph's
260 * lifetime.
261 */
262
263 /*
264 * Notify both components that their port is connected.
265 */
266 bt_component_port_connected(upstream_component, upstream_port,
267 downstream_port);
268 bt_component_port_connected(downstream_component, downstream_port,
269 upstream_port);
270
271 /*
272 * Notify the graph's creator that both ports are connected.
273 */
274 bt_graph_notify_ports_connected(graph, upstream_port, downstream_port);
275
276 end:
277 bt_put(upstream_graph);
278 bt_put(downstream_graph);
279 bt_put(upstream_component);
280 bt_put(downstream_component);
281 return connection;
282
283 error:
284 BT_PUT(upstream_component);
285 BT_PUT(downstream_component);
286 goto end;
287 }
288
289 static
290 enum bt_component_status get_component_port_counts(
291 struct bt_component *component, int64_t *input_count,
292 int64_t *output_count)
293 {
294 enum bt_component_status ret;
295
296 switch (bt_component_get_class_type(component)) {
297 case BT_COMPONENT_CLASS_TYPE_SOURCE:
298 *output_count =
299 bt_component_source_get_output_port_count(component);
300 if (*output_count < 0) {
301 ret = BT_COMPONENT_STATUS_ERROR;
302 goto end;
303 }
304 break;
305 case BT_COMPONENT_CLASS_TYPE_FILTER:
306 *output_count =
307 bt_component_filter_get_output_port_count(component);
308 if (*output_count < 0) {
309 ret = BT_COMPONENT_STATUS_ERROR;
310 goto end;
311 }
312 *input_count =
313 bt_component_filter_get_input_port_count(component);
314 if (*input_count < 0) {
315 ret = BT_COMPONENT_STATUS_ERROR;
316 goto end;
317 }
318 break;
319 case BT_COMPONENT_CLASS_TYPE_SINK:
320 *input_count =
321 bt_component_sink_get_input_port_count(component);
322 if (*input_count < 0) {
323 ret = BT_COMPONENT_STATUS_ERROR;
324 goto end;
325 }
326 break;
327 default:
328 assert(BT_FALSE);
329 break;
330 }
331 ret = BT_COMPONENT_STATUS_OK;
332 end:
333 return ret;
334 }
335
336 enum bt_graph_status bt_graph_add_component_as_sibling(struct bt_graph *graph,
337 struct bt_component *origin,
338 struct bt_component *new_component)
339 {
340 int64_t origin_input_port_count = 0;
341 int64_t origin_output_port_count = 0;
342 int64_t new_input_port_count = 0;
343 int64_t new_output_port_count = 0;
344 enum bt_graph_status status = BT_GRAPH_STATUS_OK;
345 struct bt_graph *origin_graph = NULL;
346 struct bt_graph *new_graph = NULL;
347 struct bt_port *origin_port = NULL;
348 struct bt_port *new_port = NULL;
349 struct bt_port *upstream_port = NULL;
350 struct bt_port *downstream_port = NULL;
351 struct bt_connection *origin_connection = NULL;
352 struct bt_connection *new_connection = NULL;
353 int64_t port_index;
354
355 if (!graph || !origin || !new_component) {
356 status = BT_GRAPH_STATUS_INVALID;
357 goto end;
358 }
359
360 if (bt_component_get_class_type(origin) !=
361 bt_component_get_class_type(new_component)) {
362 status = BT_GRAPH_STATUS_INVALID;
363 goto end;
364 }
365
366 origin_graph = bt_component_get_graph(origin);
367 if (!origin_graph || (origin_graph != graph)) {
368 status = BT_GRAPH_STATUS_INVALID;
369 goto end;
370 }
371
372 new_graph = bt_component_get_graph(new_component);
373 if (new_graph) {
374 status = BT_GRAPH_STATUS_ALREADY_IN_A_GRAPH;
375 goto end;
376 }
377
378 if (get_component_port_counts(origin, &origin_input_port_count,
379 &origin_output_port_count) != BT_COMPONENT_STATUS_OK) {
380 status = BT_GRAPH_STATUS_INVALID;
381 goto end;
382 }
383 if (get_component_port_counts(new_component, &new_input_port_count,
384 &new_output_port_count) != BT_COMPONENT_STATUS_OK) {
385 status = BT_GRAPH_STATUS_INVALID;
386 goto end;
387 }
388
389 if (origin_input_port_count != new_input_port_count ||
390 origin_output_port_count != new_output_port_count) {
391 status = BT_GRAPH_STATUS_INVALID;
392 goto end;
393 }
394
395 /* Replicate input connections. */
396 for (port_index = 0; port_index< origin_input_port_count; port_index++) {
397 origin_port = bt_component_get_input_port_by_index(origin,
398 port_index);
399 if (!origin_port) {
400 status = BT_GRAPH_STATUS_ERROR;
401 goto error_disconnect;
402 }
403
404 new_port = bt_component_get_input_port_by_index(new_component,
405 port_index);
406 if (!new_port) {
407 status = BT_GRAPH_STATUS_ERROR;
408 goto error_disconnect;
409 }
410
411 origin_connection = bt_port_get_connection(origin_port);
412 if (origin_connection) {
413 upstream_port = bt_connection_get_upstream_port(
414 origin_connection);
415 if (!upstream_port) {
416 goto error_disconnect;
417 }
418
419 new_connection = bt_graph_connect_ports(graph,
420 upstream_port, new_port);
421 if (!new_connection) {
422 goto error_disconnect;
423 }
424 }
425
426 BT_PUT(upstream_port);
427 BT_PUT(origin_connection);
428 BT_PUT(new_connection);
429 BT_PUT(origin_port);
430 BT_PUT(new_port);
431 }
432
433 /* Replicate output connections. */
434 for (port_index = 0; port_index < origin_output_port_count; port_index++) {
435 origin_port = bt_component_get_output_port_by_index(origin,
436 port_index);
437 if (!origin_port) {
438 status = BT_GRAPH_STATUS_ERROR;
439 goto error_disconnect;
440 }
441 new_port = bt_component_get_output_port_by_index(new_component,
442 port_index);
443 if (!new_port) {
444 status = BT_GRAPH_STATUS_ERROR;
445 goto error_disconnect;
446 }
447
448 origin_connection = bt_port_get_connection(origin_port);
449 if (origin_connection) {
450 downstream_port = bt_connection_get_downstream_port(
451 origin_connection);
452 if (!downstream_port) {
453 goto error_disconnect;
454 }
455
456 new_connection = bt_graph_connect_ports(graph,
457 new_port, downstream_port);
458 if (!new_connection) {
459 goto error_disconnect;
460 }
461 }
462
463 BT_PUT(downstream_port);
464 BT_PUT(origin_connection);
465 BT_PUT(new_connection);
466 BT_PUT(origin_port);
467 BT_PUT(new_port);
468 }
469 end:
470 bt_put(origin_graph);
471 bt_put(new_graph);
472 bt_put(origin_port);
473 bt_put(new_port);
474 bt_put(upstream_port);
475 bt_put(downstream_port);
476 bt_put(origin_connection);
477 bt_put(new_connection);
478 return status;
479 error_disconnect:
480 /* Destroy all connections of the new component. */
481 /* FIXME. */
482 goto end;
483 }
484
485 enum bt_graph_status bt_graph_consume(struct bt_graph *graph)
486 {
487 struct bt_component *sink;
488 enum bt_graph_status status = BT_GRAPH_STATUS_OK;
489 enum bt_component_status comp_status;
490 GList *current_node;
491
492 if (!graph) {
493 status = BT_GRAPH_STATUS_INVALID;
494 goto end;
495 }
496
497 if (g_queue_is_empty(graph->sinks_to_consume)) {
498 status = BT_GRAPH_STATUS_END;
499 goto end;
500 }
501
502 current_node = g_queue_pop_head_link(graph->sinks_to_consume);
503 sink = current_node->data;
504 comp_status = bt_component_sink_consume(sink);
505 switch (comp_status) {
506 case BT_COMPONENT_STATUS_OK:
507 break;
508 case BT_COMPONENT_STATUS_END:
509 status = BT_GRAPH_STATUS_END;
510 break;
511 case BT_COMPONENT_STATUS_AGAIN:
512 status = BT_GRAPH_STATUS_AGAIN;
513 break;
514 case BT_COMPONENT_STATUS_INVALID:
515 status = BT_GRAPH_STATUS_INVALID;
516 break;
517 default:
518 status = BT_GRAPH_STATUS_ERROR;
519 break;
520 }
521
522 if (status != BT_GRAPH_STATUS_END) {
523 g_queue_push_tail_link(graph->sinks_to_consume, current_node);
524 goto end;
525 }
526
527 /* End reached, the node is not added back to the queue and free'd. */
528 g_queue_delete_link(graph->sinks_to_consume, current_node);
529
530 /* Don't forward an END status if there are sinks left to consume. */
531 if (!g_queue_is_empty(graph->sinks_to_consume)) {
532 status = BT_GRAPH_STATUS_OK;
533 goto end;
534 }
535 end:
536 return status;
537 }
538
539 enum bt_graph_status bt_graph_run(struct bt_graph *graph)
540 {
541 enum bt_graph_status status = BT_GRAPH_STATUS_OK;
542
543 if (!graph) {
544 status = BT_GRAPH_STATUS_INVALID;
545 goto error;
546 }
547
548 do {
549 status = bt_graph_consume(graph);
550 if (status == BT_GRAPH_STATUS_AGAIN) {
551 /*
552 * If AGAIN is received and there are multiple sinks,
553 * go ahead and consume from the next sink.
554 *
555 * However, in the case where a single sink is left,
556 * the caller can decide to busy-wait and call
557 * bt_graph_run continuously until the source is ready
558 * or it can decide to sleep for an arbitrary amount of
559 * time.
560 */
561 if (graph->sinks_to_consume->length > 1) {
562 status = BT_GRAPH_STATUS_OK;
563 }
564 }
565 } while (status == BT_GRAPH_STATUS_OK);
566
567 if (g_queue_is_empty(graph->sinks_to_consume)) {
568 status = BT_GRAPH_STATUS_END;
569 }
570 error:
571 return status;
572 }
573
574 static
575 void add_listener(GArray *listeners, void *func, void *data)
576 {
577 struct bt_graph_listener listener = {
578 .func = func,
579 .data = data,
580 };
581
582 g_array_append_val(listeners, listener);
583 }
584
585 enum bt_graph_status bt_graph_add_port_added_listener(
586 struct bt_graph *graph,
587 bt_graph_port_added_listener listener, void *data)
588 {
589 enum bt_graph_status status = BT_GRAPH_STATUS_OK;
590
591 if (!graph || !listener) {
592 status = BT_GRAPH_STATUS_INVALID;
593 goto end;
594 }
595
596 add_listener(graph->listeners.port_added, listener, data);
597
598 end:
599 return status;
600 }
601
602 enum bt_graph_status bt_graph_add_port_removed_listener(
603 struct bt_graph *graph,
604 bt_graph_port_removed_listener listener, void *data)
605 {
606 enum bt_graph_status status = BT_GRAPH_STATUS_OK;
607
608 if (!graph || !listener) {
609 status = BT_GRAPH_STATUS_INVALID;
610 goto end;
611 }
612
613 add_listener(graph->listeners.port_removed, listener, data);
614
615 end:
616 return status;
617 }
618
619 enum bt_graph_status bt_graph_add_ports_connected_listener(
620 struct bt_graph *graph,
621 bt_graph_ports_connected_listener listener, void *data)
622 {
623 enum bt_graph_status status = BT_GRAPH_STATUS_OK;
624
625 if (!graph || !listener) {
626 status = BT_GRAPH_STATUS_INVALID;
627 goto end;
628 }
629
630 add_listener(graph->listeners.ports_connected, listener, data);
631
632 end:
633 return status;
634 }
635
636 enum bt_graph_status bt_graph_add_ports_disconnected_listener(
637 struct bt_graph *graph,
638 bt_graph_ports_disconnected_listener listener, void *data)
639 {
640 enum bt_graph_status status = BT_GRAPH_STATUS_OK;
641
642 if (!graph || !listener) {
643 status = BT_GRAPH_STATUS_INVALID;
644 goto end;
645 }
646
647 add_listener(graph->listeners.ports_disconnected, listener, data);
648
649 end:
650 return status;
651 }
652
653 BT_HIDDEN
654 void bt_graph_notify_port_added(struct bt_graph *graph, struct bt_port *port)
655 {
656 size_t i;
657
658 for (i = 0; i < graph->listeners.port_added->len; i++) {
659 struct bt_graph_listener listener =
660 g_array_index(graph->listeners.port_added,
661 struct bt_graph_listener, i);
662 bt_graph_port_added_listener func = listener.func;
663
664 assert(func);
665 func(port, listener.data);
666 }
667 }
668
669 BT_HIDDEN
670 void bt_graph_notify_port_removed(struct bt_graph *graph,
671 struct bt_component *comp, struct bt_port *port)
672 {
673 size_t i;
674
675 for (i = 0; i < graph->listeners.port_removed->len; i++) {
676 struct bt_graph_listener listener =
677 g_array_index(graph->listeners.port_removed,
678 struct bt_graph_listener, i);
679 bt_graph_port_removed_listener func = listener.func;
680
681 assert(func);
682 func(comp, port, listener.data);
683 }
684 }
685
686 BT_HIDDEN
687 void bt_graph_notify_ports_connected(struct bt_graph *graph,
688 struct bt_port *upstream_port, struct bt_port *downstream_port)
689 {
690 size_t i;
691
692 for (i = 0; i < graph->listeners.ports_connected->len; i++) {
693 struct bt_graph_listener listener =
694 g_array_index(graph->listeners.ports_connected,
695 struct bt_graph_listener, i);
696 bt_graph_ports_connected_listener func = listener.func;
697
698 assert(func);
699 func(upstream_port, downstream_port, listener.data);
700 }
701 }
702
703 BT_HIDDEN
704 void bt_graph_notify_ports_disconnected(struct bt_graph *graph,
705 struct bt_component *upstream_comp,
706 struct bt_component *downstream_comp,
707 struct bt_port *upstream_port, struct bt_port *downstream_port)
708 {
709 size_t i;
710
711 for (i = 0; i < graph->listeners.ports_disconnected->len; i++) {
712 struct bt_graph_listener listener =
713 g_array_index(graph->listeners.ports_disconnected,
714 struct bt_graph_listener, i);
715 bt_graph_ports_disconnected_listener func = listener.func;
716
717 assert(func);
718 func(upstream_comp, downstream_comp, upstream_port,
719 downstream_port, listener.data);
720 }
721 }
This page took 0.046736 seconds and 4 git commands to generate.