Add "port connected" component class method
[babeltrace.git] / lib / graph / graph.c
1 /*
2 * graph.c
3 *
4 * Babeltrace Plugin Component Graph
5 *
6 * Copyright 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
7 *
8 * Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
9 *
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
16 *
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
19 *
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
26 * SOFTWARE.
27 */
28
29 #include <babeltrace/graph/component-internal.h>
30 #include <babeltrace/graph/graph-internal.h>
31 #include <babeltrace/graph/connection-internal.h>
32 #include <babeltrace/graph/component-sink-internal.h>
33 #include <babeltrace/graph/component-source.h>
34 #include <babeltrace/graph/component-filter.h>
35 #include <babeltrace/graph/port.h>
36 #include <babeltrace/compiler-internal.h>
37 #include <unistd.h>
38 #include <glib.h>
39
40 struct bt_graph_listener {
41 void *func;
42 void *data;
43 };
44
45 static
46 void bt_graph_destroy(struct bt_object *obj)
47 {
48 struct bt_graph *graph = container_of(obj,
49 struct bt_graph, base);
50
51 if (graph->components) {
52 g_ptr_array_free(graph->components, TRUE);
53 }
54 if (graph->connections) {
55 g_ptr_array_free(graph->connections, TRUE);
56 }
57 if (graph->sinks_to_consume) {
58 g_queue_free(graph->sinks_to_consume);
59 }
60
61 if (graph->listeners.port_added) {
62 g_array_free(graph->listeners.port_added, TRUE);
63 }
64
65 if (graph->listeners.port_removed) {
66 g_array_free(graph->listeners.port_removed, TRUE);
67 }
68
69 if (graph->listeners.ports_connected) {
70 g_array_free(graph->listeners.ports_connected, TRUE);
71 }
72
73 if (graph->listeners.ports_disconnected) {
74 g_array_free(graph->listeners.ports_disconnected, TRUE);
75 }
76
77 g_free(graph);
78 }
79
80 static
81 int init_listeners_array(GArray **listeners)
82 {
83 int ret = 0;
84
85 assert(listeners);
86 *listeners = g_array_new(FALSE, TRUE, sizeof(struct bt_graph_listener));
87 if (!*listeners) {
88 ret = -1;
89 goto end;
90 }
91
92 end:
93 return ret;
94 }
95
96 struct bt_graph *bt_graph_create(void)
97 {
98 struct bt_graph *graph;
99 int ret;
100
101 graph = g_new0(struct bt_graph, 1);
102 if (!graph) {
103 goto end;
104 }
105
106 bt_object_init(graph, bt_graph_destroy);
107
108 graph->connections = g_ptr_array_new_with_free_func(bt_object_release);
109 if (!graph->connections) {
110 goto error;
111 }
112 graph->components = g_ptr_array_new_with_free_func(bt_object_release);
113 if (!graph->components) {
114 goto error;
115 }
116 graph->sinks_to_consume = g_queue_new();
117 if (!graph->sinks_to_consume) {
118 goto error;
119 }
120
121 ret = init_listeners_array(&graph->listeners.port_added);
122 if (ret) {
123 goto error;
124 }
125
126 ret = init_listeners_array(&graph->listeners.port_removed);
127 if (ret) {
128 goto error;
129 }
130
131 ret = init_listeners_array(&graph->listeners.ports_connected);
132 if (ret) {
133 goto error;
134 }
135
136 ret = init_listeners_array(&graph->listeners.ports_disconnected);
137 if (ret) {
138 goto error;
139 }
140
141 end:
142 return graph;
143 error:
144 BT_PUT(graph);
145 goto end;
146 }
147
148 struct bt_connection *bt_graph_connect_ports(struct bt_graph *graph,
149 struct bt_port *upstream_port,
150 struct bt_port *downstream_port)
151 {
152 struct bt_connection *connection = NULL;
153 struct bt_graph *upstream_graph = NULL;
154 struct bt_graph *downstream_graph = NULL;
155 struct bt_component *upstream_component = NULL;
156 struct bt_component *downstream_component = NULL;
157 enum bt_component_status component_status;
158 bool upstream_was_already_in_graph;
159 bool downstream_was_already_in_graph;
160
161 if (!graph || !upstream_port || !downstream_port) {
162 goto end;
163 }
164
165 /* Ensure appropriate types for upstream and downstream ports. */
166 if (bt_port_get_type(upstream_port) != BT_PORT_TYPE_OUTPUT) {
167 goto end;
168 }
169 if (bt_port_get_type(downstream_port) != BT_PORT_TYPE_INPUT) {
170 goto end;
171 }
172
173 /* Ensure that both ports are currently unconnected. */
174 if (bt_port_is_connected(upstream_port)) {
175 fprintf(stderr, "Upstream port is already connected\n");
176 goto end;
177 }
178
179 if (bt_port_is_connected(downstream_port)) {
180 fprintf(stderr, "Downstream port is already connected\n");
181 goto end;
182 }
183
184 /*
185 * Ensure that both ports are still attached to their creating
186 * component.
187 */
188 upstream_component = bt_port_get_component(upstream_port);
189 if (!upstream_component) {
190 fprintf(stderr, "Upstream port does not belong to a component\n");
191 goto end;
192 }
193
194 downstream_component = bt_port_get_component(downstream_port);
195 if (!downstream_component) {
196 fprintf(stderr, "Downstream port does not belong to a component\n");
197 goto end;
198 }
199
200 /* Ensure the components are not already part of another graph. */
201 upstream_graph = bt_component_get_graph(upstream_component);
202 if (upstream_graph && (graph != upstream_graph)) {
203 fprintf(stderr, "Upstream component is already part of another graph\n");
204 goto error;
205 }
206 upstream_was_already_in_graph = (graph == upstream_graph);
207 downstream_graph = bt_component_get_graph(downstream_component);
208 if (downstream_graph && (graph != downstream_graph)) {
209 fprintf(stderr, "Downstream component is already part of another graph\n");
210 goto error;
211 }
212 downstream_was_already_in_graph = (graph == downstream_graph);
213
214 /*
215 * At this point the ports are not connected yet. Both
216 * components need to accept an eventual connection to their
217 * port by the other port before we continue.
218 */
219 component_status = bt_component_accept_port_connection(
220 upstream_component, upstream_port, downstream_port);
221 if (component_status != BT_COMPONENT_STATUS_OK) {
222 goto error;
223 }
224 component_status = bt_component_accept_port_connection(
225 downstream_component, downstream_port, upstream_port);
226 if (component_status != BT_COMPONENT_STATUS_OK) {
227 goto error;
228 }
229
230 connection = bt_connection_create(graph, upstream_port,
231 downstream_port);
232 if (!connection) {
233 goto error;
234 }
235
236 /*
237 * Ownership of upstream_component/downstream_component and of
238 * the connection object is transferred to the graph.
239 */
240 g_ptr_array_add(graph->connections, connection);
241
242 if (!upstream_was_already_in_graph) {
243 g_ptr_array_add(graph->components, upstream_component);
244 bt_component_set_graph(upstream_component, graph);
245 }
246 if (!downstream_was_already_in_graph) {
247 g_ptr_array_add(graph->components, downstream_component);
248 bt_component_set_graph(downstream_component, graph);
249 if (bt_component_get_class_type(downstream_component) ==
250 BT_COMPONENT_CLASS_TYPE_SINK) {
251 g_queue_push_tail(graph->sinks_to_consume,
252 downstream_component);
253 }
254 }
255
256 /*
257 * The graph is now the parent of these components which
258 * garantees their existence for the duration of the graph's
259 * lifetime.
260 */
261
262 /*
263 * Notify both components that their port is connected.
264 */
265 bt_component_port_connected(upstream_component, upstream_port,
266 downstream_port);
267 bt_component_port_connected(downstream_component, downstream_port,
268 upstream_port);
269
270 /*
271 * Notify the graph's creator that both ports are connected.
272 */
273 bt_graph_notify_ports_connected(graph, upstream_port, downstream_port);
274
275 end:
276 bt_put(upstream_graph);
277 bt_put(downstream_graph);
278 bt_put(upstream_component);
279 bt_put(downstream_component);
280 return connection;
281
282 error:
283 BT_PUT(upstream_component);
284 BT_PUT(downstream_component);
285 goto end;
286 }
287
288 static
289 enum bt_component_status get_component_port_counts(
290 struct bt_component *component, uint64_t *input_count,
291 uint64_t *output_count)
292 {
293 enum bt_component_status ret;
294
295 switch (bt_component_get_class_type(component)) {
296 case BT_COMPONENT_CLASS_TYPE_SOURCE:
297 ret = bt_component_source_get_output_port_count(component,
298 output_count);
299 if (ret != BT_COMPONENT_STATUS_OK) {
300 goto end;
301 }
302 break;
303 case BT_COMPONENT_CLASS_TYPE_FILTER:
304 ret = bt_component_filter_get_output_port_count(component,
305 output_count);
306 if (ret != BT_COMPONENT_STATUS_OK) {
307 goto end;
308 }
309 ret = bt_component_filter_get_input_port_count(component,
310 input_count);
311 if (ret != BT_COMPONENT_STATUS_OK) {
312 goto end;
313 }
314 break;
315 case BT_COMPONENT_CLASS_TYPE_SINK:
316 ret = bt_component_sink_get_input_port_count(component,
317 input_count);
318 if (ret != BT_COMPONENT_STATUS_OK) {
319 goto end;
320 }
321 break;
322 default:
323 assert(false);
324 break;
325 }
326 ret = BT_COMPONENT_STATUS_OK;
327 end:
328 return ret;
329 }
330
331 enum bt_graph_status bt_graph_add_component_as_sibling(struct bt_graph *graph,
332 struct bt_component *origin,
333 struct bt_component *new_component)
334 {
335 uint64_t origin_input_port_count = 0;
336 uint64_t origin_output_port_count = 0;
337 uint64_t new_input_port_count = 0;
338 uint64_t new_output_port_count = 0;
339 enum bt_graph_status status = BT_GRAPH_STATUS_OK;
340 struct bt_graph *origin_graph = NULL;
341 struct bt_graph *new_graph = NULL;
342 struct bt_port *origin_port = NULL;
343 struct bt_port *new_port = NULL;
344 struct bt_port *upstream_port = NULL;
345 struct bt_port *downstream_port = NULL;
346 struct bt_connection *origin_connection = NULL;
347 struct bt_connection *new_connection = NULL;
348 int port_index;
349
350 if (!graph || !origin || !new_component) {
351 status = BT_GRAPH_STATUS_INVALID;
352 goto end;
353 }
354
355 if (bt_component_get_class_type(origin) !=
356 bt_component_get_class_type(new_component)) {
357 status = BT_GRAPH_STATUS_INVALID;
358 goto end;
359 }
360
361 origin_graph = bt_component_get_graph(origin);
362 if (!origin_graph || (origin_graph != graph)) {
363 status = BT_GRAPH_STATUS_INVALID;
364 goto end;
365 }
366
367 new_graph = bt_component_get_graph(new_component);
368 if (new_graph) {
369 status = BT_GRAPH_STATUS_ALREADY_IN_A_GRAPH;
370 goto end;
371 }
372
373 if (get_component_port_counts(origin, &origin_input_port_count,
374 &origin_output_port_count) != BT_COMPONENT_STATUS_OK) {
375 status = BT_GRAPH_STATUS_INVALID;
376 goto end;
377 }
378 if (get_component_port_counts(new_component, &new_input_port_count,
379 &new_output_port_count) != BT_COMPONENT_STATUS_OK) {
380 status = BT_GRAPH_STATUS_INVALID;
381 goto end;
382 }
383
384 if (origin_input_port_count != new_input_port_count ||
385 origin_output_port_count != new_output_port_count) {
386 status = BT_GRAPH_STATUS_INVALID;
387 goto end;
388 }
389
390 /* Replicate input connections. */
391 for (port_index = 0; port_index< origin_input_port_count; port_index++) {
392 origin_port = bt_component_get_input_port_at_index(origin,
393 port_index);
394 if (!origin_port) {
395 status = BT_GRAPH_STATUS_ERROR;
396 goto error_disconnect;
397 }
398
399 new_port = bt_component_get_input_port_at_index(new_component,
400 port_index);
401 if (!new_port) {
402 status = BT_GRAPH_STATUS_ERROR;
403 goto error_disconnect;
404 }
405
406 origin_connection = bt_port_get_connection(origin_port);
407 if (origin_connection) {
408 upstream_port = bt_connection_get_upstream_port(
409 origin_connection);
410 if (!upstream_port) {
411 goto error_disconnect;
412 }
413
414 new_connection = bt_graph_connect_ports(graph,
415 upstream_port, new_port);
416 if (!new_connection) {
417 goto error_disconnect;
418 }
419 }
420
421 BT_PUT(upstream_port);
422 BT_PUT(origin_connection);
423 BT_PUT(new_connection);
424 BT_PUT(origin_port);
425 BT_PUT(new_port);
426 }
427
428 /* Replicate output connections. */
429 for (port_index = 0; port_index < origin_output_port_count; port_index++) {
430 origin_port = bt_component_get_output_port_at_index(origin,
431 port_index);
432 if (!origin_port) {
433 status = BT_GRAPH_STATUS_ERROR;
434 goto error_disconnect;
435 }
436 new_port = bt_component_get_output_port_at_index(new_component,
437 port_index);
438 if (!new_port) {
439 status = BT_GRAPH_STATUS_ERROR;
440 goto error_disconnect;
441 }
442
443 origin_connection = bt_port_get_connection(origin_port);
444 if (origin_connection) {
445 downstream_port = bt_connection_get_downstream_port(
446 origin_connection);
447 if (!downstream_port) {
448 goto error_disconnect;
449 }
450
451 new_connection = bt_graph_connect_ports(graph,
452 new_port, downstream_port);
453 if (!new_connection) {
454 goto error_disconnect;
455 }
456 }
457
458 BT_PUT(downstream_port);
459 BT_PUT(origin_connection);
460 BT_PUT(new_connection);
461 BT_PUT(origin_port);
462 BT_PUT(new_port);
463 }
464 end:
465 bt_put(origin_graph);
466 bt_put(new_graph);
467 bt_put(origin_port);
468 bt_put(new_port);
469 bt_put(upstream_port);
470 bt_put(downstream_port);
471 bt_put(origin_connection);
472 bt_put(new_connection);
473 return status;
474 error_disconnect:
475 /* Destroy all connections of the new component. */
476 /* FIXME. */
477 goto end;
478 }
479
480 enum bt_graph_status bt_graph_consume(struct bt_graph *graph)
481 {
482 struct bt_component *sink;
483 enum bt_graph_status status = BT_GRAPH_STATUS_OK;
484 enum bt_component_status comp_status;
485 GList *current_node;
486
487 if (!graph) {
488 status = BT_GRAPH_STATUS_INVALID;
489 goto end;
490 }
491
492 if (g_queue_is_empty(graph->sinks_to_consume)) {
493 status = BT_GRAPH_STATUS_END;
494 goto end;
495 }
496
497 current_node = g_queue_pop_head_link(graph->sinks_to_consume);
498 sink = current_node->data;
499 comp_status = bt_component_sink_consume(sink);
500 switch (comp_status) {
501 case BT_COMPONENT_STATUS_OK:
502 break;
503 case BT_COMPONENT_STATUS_END:
504 status = BT_GRAPH_STATUS_END;
505 break;
506 case BT_COMPONENT_STATUS_AGAIN:
507 status = BT_GRAPH_STATUS_AGAIN;
508 break;
509 case BT_COMPONENT_STATUS_INVALID:
510 status = BT_GRAPH_STATUS_INVALID;
511 break;
512 default:
513 status = BT_GRAPH_STATUS_ERROR;
514 break;
515 }
516
517 if (status != BT_GRAPH_STATUS_END) {
518 g_queue_push_tail_link(graph->sinks_to_consume, current_node);
519 goto end;
520 }
521
522 /* End reached, the node is not added back to the queue and free'd. */
523 g_queue_delete_link(graph->sinks_to_consume, current_node);
524
525 /* Don't forward an END status if there are sinks left to consume. */
526 if (!g_queue_is_empty(graph->sinks_to_consume)) {
527 status = BT_GRAPH_STATUS_OK;
528 goto end;
529 }
530 end:
531 return status;
532 }
533
534 enum bt_graph_status bt_graph_run(struct bt_graph *graph)
535 {
536 enum bt_graph_status status = BT_GRAPH_STATUS_OK;
537
538 if (!graph) {
539 status = BT_GRAPH_STATUS_INVALID;
540 goto error;
541 }
542
543 do {
544 status = bt_graph_consume(graph);
545 if (status == BT_GRAPH_STATUS_AGAIN) {
546 /*
547 * If AGAIN is received and there are multiple sinks,
548 * go ahead and consume from the next sink.
549 *
550 * However, in the case where a single sink is left,
551 * the caller can decide to busy-wait and call
552 * bt_graph_run continuously until the source is ready
553 * or it can decide to sleep for an arbitrary amount of
554 * time.
555 */
556 if (graph->sinks_to_consume->length > 1) {
557 status = BT_GRAPH_STATUS_OK;
558 }
559 }
560 } while (status == BT_GRAPH_STATUS_OK);
561
562 if (g_queue_is_empty(graph->sinks_to_consume)) {
563 status = BT_GRAPH_STATUS_END;
564 }
565 error:
566 return status;
567 }
568
569 static
570 void add_listener(GArray *listeners, void *func, void *data)
571 {
572 struct bt_graph_listener listener = {
573 .func = func,
574 .data = data,
575 };
576
577 g_array_append_val(listeners, listener);
578 }
579
580 enum bt_graph_status bt_graph_add_port_added_listener(
581 struct bt_graph *graph,
582 bt_graph_port_added_listener listener, void *data)
583 {
584 enum bt_graph_status status = BT_GRAPH_STATUS_OK;
585
586 if (!graph || !listener) {
587 status = BT_GRAPH_STATUS_INVALID;
588 goto end;
589 }
590
591 add_listener(graph->listeners.port_added, listener, data);
592
593 end:
594 return status;
595 }
596
597 enum bt_graph_status bt_graph_add_port_removed_listener(
598 struct bt_graph *graph,
599 bt_graph_port_removed_listener listener, void *data)
600 {
601 enum bt_graph_status status = BT_GRAPH_STATUS_OK;
602
603 if (!graph || !listener) {
604 status = BT_GRAPH_STATUS_INVALID;
605 goto end;
606 }
607
608 add_listener(graph->listeners.port_removed, listener, data);
609
610 end:
611 return status;
612 }
613
614 enum bt_graph_status bt_graph_add_ports_connected_listener(
615 struct bt_graph *graph,
616 bt_graph_ports_connected_listener listener, void *data)
617 {
618 enum bt_graph_status status = BT_GRAPH_STATUS_OK;
619
620 if (!graph || !listener) {
621 status = BT_GRAPH_STATUS_INVALID;
622 goto end;
623 }
624
625 add_listener(graph->listeners.ports_connected, listener, data);
626
627 end:
628 return status;
629 }
630
631 enum bt_graph_status bt_graph_add_ports_disconnected_listener(
632 struct bt_graph *graph,
633 bt_graph_ports_disconnected_listener listener, void *data)
634 {
635 enum bt_graph_status status = BT_GRAPH_STATUS_OK;
636
637 if (!graph || !listener) {
638 status = BT_GRAPH_STATUS_INVALID;
639 goto end;
640 }
641
642 add_listener(graph->listeners.ports_disconnected, listener, data);
643
644 end:
645 return status;
646 }
647
648 BT_HIDDEN
649 void bt_graph_notify_port_added(struct bt_graph *graph, struct bt_port *port)
650 {
651 size_t i;
652
653 for (i = 0; i < graph->listeners.port_added->len; i++) {
654 struct bt_graph_listener listener =
655 g_array_index(graph->listeners.port_added,
656 struct bt_graph_listener, i);
657 bt_graph_port_added_listener func = listener.func;
658
659 assert(func);
660 func(port, listener.data);
661 }
662 }
663
664 BT_HIDDEN
665 void bt_graph_notify_port_removed(struct bt_graph *graph,
666 struct bt_component *comp, struct bt_port *port)
667 {
668 size_t i;
669
670 for (i = 0; i < graph->listeners.port_removed->len; i++) {
671 struct bt_graph_listener listener =
672 g_array_index(graph->listeners.port_removed,
673 struct bt_graph_listener, i);
674 bt_graph_port_removed_listener func = listener.func;
675
676 assert(func);
677 func(comp, port, listener.data);
678 }
679 }
680
681 BT_HIDDEN
682 void bt_graph_notify_ports_connected(struct bt_graph *graph,
683 struct bt_port *upstream_port, struct bt_port *downstream_port)
684 {
685 size_t i;
686
687 for (i = 0; i < graph->listeners.ports_connected->len; i++) {
688 struct bt_graph_listener listener =
689 g_array_index(graph->listeners.ports_connected,
690 struct bt_graph_listener, i);
691 bt_graph_ports_connected_listener func = listener.func;
692
693 assert(func);
694 func(upstream_port, downstream_port, listener.data);
695 }
696 }
697
698 BT_HIDDEN
699 void bt_graph_notify_ports_disconnected(struct bt_graph *graph,
700 struct bt_component *upstream_comp,
701 struct bt_component *downstream_comp,
702 struct bt_port *upstream_port, struct bt_port *downstream_port)
703 {
704 size_t i;
705
706 for (i = 0; i < graph->listeners.ports_disconnected->len; i++) {
707 struct bt_graph_listener listener =
708 g_array_index(graph->listeners.ports_disconnected,
709 struct bt_graph_listener, i);
710 bt_graph_ports_disconnected_listener func = listener.func;
711
712 assert(func);
713 func(upstream_comp, downstream_comp, upstream_port,
714 downstream_port, listener.data);
715 }
716 }
This page took 0.043857 seconds and 4 git commands to generate.