Re-organize sources
[babeltrace.git] / src / lib / graph / graph.c
1 /*
2 * Copyright 2017-2018 Philippe Proulx <pproulx@efficios.com>
3 * Copyright 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a copy
6 * of this software and associated documentation files (the "Software"), to deal
7 * in the Software without restriction, including without limitation the rights
8 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 * copies of the Software, and to permit persons to whom the Software is
10 * furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21 * SOFTWARE.
22 */
23
24 #define BT_LOG_TAG "GRAPH"
25 #include "lib/lib-logging.h"
26
27 #include "common/assert.h"
28 #include "lib/assert-pre.h"
29 #include <babeltrace2/graph/graph.h>
30 #include <babeltrace2/graph/graph-const.h>
31 #include <babeltrace2/graph/component-source-const.h>
32 #include <babeltrace2/graph/component-filter-const.h>
33 #include <babeltrace2/graph/port-const.h>
34 #include "lib/graph/message/message.h"
35 #include "compat/compiler.h"
36 #include "common/common.h"
37 #include <babeltrace2/types.h>
38 #include <babeltrace2/value.h>
39 #include <babeltrace2/value-const.h>
40 #include "lib/value.h"
41 #include <unistd.h>
42 #include <glib.h>
43
44 #include "component.h"
45 #include "component-sink.h"
46 #include "connection.h"
47 #include "graph.h"
48 #include "message/event.h"
49 #include "message/packet.h"
50
51 typedef enum bt_graph_listener_status (*port_added_func_t)(
52 const void *, const void *, void *);
53
54 typedef enum bt_graph_listener_status (*ports_connected_func_t)(
55 const void *, const void *, const void *, const void *, void *);
56
57 typedef enum bt_self_component_status (*comp_init_method_t)(const void *,
58 const void *, void *);
59
60 struct bt_graph_listener {
61 bt_graph_listener_removed_func removed;
62 void *data;
63 };
64
65 struct bt_graph_listener_port_added {
66 struct bt_graph_listener base;
67 port_added_func_t func;
68 };
69
70 struct bt_graph_listener_ports_connected {
71 struct bt_graph_listener base;
72 ports_connected_func_t func;
73 };
74
75 #define INIT_LISTENERS_ARRAY(_type, _listeners) \
76 do { \
77 _listeners = g_array_new(FALSE, TRUE, sizeof(_type)); \
78 if (!(_listeners)) { \
79 BT_LOGE_STR("Failed to allocate one GArray."); \
80 } \
81 } while (0)
82
83 #define CALL_REMOVE_LISTENERS(_type, _listeners) \
84 do { \
85 size_t i; \
86 \
87 if (!_listeners) { \
88 break; \
89 } \
90 for (i = 0; i < (_listeners)->len; i++) { \
91 _type *listener = \
92 &g_array_index((_listeners), _type, i); \
93 \
94 if (listener->base.removed) { \
95 listener->base.removed(listener->base.data); \
96 } \
97 } \
98 } while (0)
99
100 static
101 void destroy_graph(struct bt_object *obj)
102 {
103 struct bt_graph *graph = container_of(obj, struct bt_graph, base);
104
105 /*
106 * The graph's reference count is 0 if we're here. Increment
107 * it to avoid a double-destroy (possibly infinitely recursive)
108 * in this situation:
109 *
110 * 1. We put and destroy a connection.
111 * 2. This connection's destructor finalizes its active message
112 * iterators.
113 * 3. A message iterator's finalization function gets a new
114 * reference on its component (reference count goes from 0 to
115 * 1).
116 * 4. Since this component's reference count goes to 1, it takes
117 * a reference on its parent (this graph). This graph's
118 * reference count goes from 0 to 1.
119 * 5. The message iterator's finalization function puts its
120 * component reference (reference count goes from 1 to 0).
121 * 6. Since this component's reference count goes from 1 to 0,
122 * it puts its parent (this graph). This graph's reference
123 * count goes from 1 to 0.
124 * 7. Since this graph's reference count goes from 1 to 0, its
125 * destructor is called (this function).
126 *
127 * With the incrementation below, the graph's reference count at
128 * step 4 goes from 1 to 2, and from 2 to 1 at step 6. This
129 * ensures that this function is not called two times.
130 */
131 BT_LIB_LOGD("Destroying graph: %!+g", graph);
132 obj->ref_count++;
133
134 /*
135 * Cancel the graph to disallow some operations, like creating
136 * message iterators and adding ports to components.
137 */
138 (void) bt_graph_cancel((void *) graph);
139
140 /* Call all remove listeners */
141 CALL_REMOVE_LISTENERS(struct bt_graph_listener_port_added,
142 graph->listeners.source_output_port_added);
143 CALL_REMOVE_LISTENERS(struct bt_graph_listener_port_added,
144 graph->listeners.filter_output_port_added);
145 CALL_REMOVE_LISTENERS(struct bt_graph_listener_port_added,
146 graph->listeners.filter_input_port_added);
147 CALL_REMOVE_LISTENERS(struct bt_graph_listener_port_added,
148 graph->listeners.sink_input_port_added);
149 CALL_REMOVE_LISTENERS(struct bt_graph_listener_ports_connected,
150 graph->listeners.source_filter_ports_connected);
151 CALL_REMOVE_LISTENERS(struct bt_graph_listener_ports_connected,
152 graph->listeners.filter_filter_ports_connected);
153 CALL_REMOVE_LISTENERS(struct bt_graph_listener_ports_connected,
154 graph->listeners.source_sink_ports_connected);
155 CALL_REMOVE_LISTENERS(struct bt_graph_listener_ports_connected,
156 graph->listeners.filter_sink_ports_connected);
157
158 if (graph->messages) {
159 g_ptr_array_free(graph->messages, TRUE);
160 graph->messages = NULL;
161 }
162
163 if (graph->connections) {
164 BT_LOGD_STR("Destroying connections.");
165 g_ptr_array_free(graph->connections, TRUE);
166 graph->connections = NULL;
167 }
168
169 if (graph->components) {
170 BT_LOGD_STR("Destroying components.");
171 g_ptr_array_free(graph->components, TRUE);
172 graph->components = NULL;
173 }
174
175 if (graph->sinks_to_consume) {
176 g_queue_free(graph->sinks_to_consume);
177 graph->sinks_to_consume = NULL;
178 }
179
180 if (graph->listeners.source_output_port_added) {
181 g_array_free(graph->listeners.source_output_port_added, TRUE);
182 graph->listeners.source_output_port_added = NULL;
183 }
184
185 if (graph->listeners.filter_output_port_added) {
186 g_array_free(graph->listeners.filter_output_port_added, TRUE);
187 graph->listeners.filter_output_port_added = NULL;
188 }
189
190 if (graph->listeners.filter_input_port_added) {
191 g_array_free(graph->listeners.filter_input_port_added, TRUE);
192 graph->listeners.filter_input_port_added = NULL;
193 }
194
195 if (graph->listeners.sink_input_port_added) {
196 g_array_free(graph->listeners.sink_input_port_added, TRUE);
197 graph->listeners.sink_input_port_added = NULL;
198 }
199
200 if (graph->listeners.source_filter_ports_connected) {
201 g_array_free(graph->listeners.source_filter_ports_connected,
202 TRUE);
203 graph->listeners.source_filter_ports_connected = NULL;
204 }
205
206 if (graph->listeners.filter_filter_ports_connected) {
207 g_array_free(graph->listeners.filter_filter_ports_connected,
208 TRUE);
209 graph->listeners.filter_filter_ports_connected = NULL;
210 }
211
212 if (graph->listeners.source_sink_ports_connected) {
213 g_array_free(graph->listeners.source_sink_ports_connected,
214 TRUE);
215 graph->listeners.source_sink_ports_connected = NULL;
216 }
217
218 if (graph->listeners.filter_sink_ports_connected) {
219 g_array_free(graph->listeners.filter_sink_ports_connected,
220 TRUE);
221 graph->listeners.filter_sink_ports_connected = NULL;
222 }
223
224 bt_object_pool_finalize(&graph->event_msg_pool);
225 bt_object_pool_finalize(&graph->packet_begin_msg_pool);
226 bt_object_pool_finalize(&graph->packet_end_msg_pool);
227 g_free(graph);
228 }
229
230 static
231 void destroy_message_event(struct bt_message *msg,
232 struct bt_graph *graph)
233 {
234 bt_message_event_destroy(msg);
235 }
236
237 static
238 void destroy_message_packet_begin(struct bt_message *msg,
239 struct bt_graph *graph)
240 {
241 bt_message_packet_destroy(msg);
242 }
243
244 static
245 void destroy_message_packet_end(struct bt_message *msg,
246 struct bt_graph *graph)
247 {
248 bt_message_packet_destroy(msg);
249 }
250
251 static
252 void notify_message_graph_is_destroyed(struct bt_message *msg)
253 {
254 bt_message_unlink_graph(msg);
255 }
256
257 struct bt_graph *bt_graph_create(void)
258 {
259 struct bt_graph *graph;
260 int ret;
261
262 BT_LOGD_STR("Creating graph object.");
263 graph = g_new0(struct bt_graph, 1);
264 if (!graph) {
265 BT_LOGE_STR("Failed to allocate one graph.");
266 goto end;
267 }
268
269 bt_object_init_shared(&graph->base, destroy_graph);
270 graph->connections = g_ptr_array_new_with_free_func(
271 (GDestroyNotify) bt_object_try_spec_release);
272 if (!graph->connections) {
273 BT_LOGE_STR("Failed to allocate one GPtrArray.");
274 goto error;
275 }
276 graph->components = g_ptr_array_new_with_free_func(
277 (GDestroyNotify) bt_object_try_spec_release);
278 if (!graph->components) {
279 BT_LOGE_STR("Failed to allocate one GPtrArray.");
280 goto error;
281 }
282 graph->sinks_to_consume = g_queue_new();
283 if (!graph->sinks_to_consume) {
284 BT_LOGE_STR("Failed to allocate one GQueue.");
285 goto error;
286 }
287
288 bt_graph_set_can_consume(graph, true);
289 INIT_LISTENERS_ARRAY(struct bt_graph_listener_port_added,
290 graph->listeners.source_output_port_added);
291
292 if (!graph->listeners.source_output_port_added) {
293 ret = -1;
294 goto error;
295 }
296
297 INIT_LISTENERS_ARRAY(struct bt_graph_listener_port_added,
298 graph->listeners.filter_output_port_added);
299
300 if (!graph->listeners.filter_output_port_added) {
301 ret = -1;
302 goto error;
303 }
304
305 INIT_LISTENERS_ARRAY(struct bt_graph_listener_port_added,
306 graph->listeners.filter_input_port_added);
307
308 if (!graph->listeners.filter_input_port_added) {
309 ret = -1;
310 goto error;
311 }
312
313 INIT_LISTENERS_ARRAY(struct bt_graph_listener_port_added,
314 graph->listeners.sink_input_port_added);
315
316 if (!graph->listeners.sink_input_port_added) {
317 ret = -1;
318 goto error;
319 }
320
321 INIT_LISTENERS_ARRAY(struct bt_graph_listener_ports_connected,
322 graph->listeners.source_filter_ports_connected);
323
324 if (!graph->listeners.source_filter_ports_connected) {
325 ret = -1;
326 goto error;
327 }
328
329 INIT_LISTENERS_ARRAY(struct bt_graph_listener_ports_connected,
330 graph->listeners.source_sink_ports_connected);
331
332 if (!graph->listeners.source_sink_ports_connected) {
333 ret = -1;
334 goto error;
335 }
336
337 INIT_LISTENERS_ARRAY(struct bt_graph_listener_ports_connected,
338 graph->listeners.filter_filter_ports_connected);
339
340 if (!graph->listeners.filter_filter_ports_connected) {
341 ret = -1;
342 goto error;
343 }
344
345 INIT_LISTENERS_ARRAY(struct bt_graph_listener_ports_connected,
346 graph->listeners.filter_sink_ports_connected);
347
348 if (!graph->listeners.filter_sink_ports_connected) {
349 ret = -1;
350 goto error;
351 }
352
353 ret = bt_object_pool_initialize(&graph->event_msg_pool,
354 (bt_object_pool_new_object_func) bt_message_event_new,
355 (bt_object_pool_destroy_object_func) destroy_message_event,
356 graph);
357 if (ret) {
358 BT_LOGE("Failed to initialize event message pool: ret=%d",
359 ret);
360 goto error;
361 }
362
363 ret = bt_object_pool_initialize(&graph->packet_begin_msg_pool,
364 (bt_object_pool_new_object_func) bt_message_packet_beginning_new,
365 (bt_object_pool_destroy_object_func) destroy_message_packet_begin,
366 graph);
367 if (ret) {
368 BT_LOGE("Failed to initialize packet beginning message pool: ret=%d",
369 ret);
370 goto error;
371 }
372
373 ret = bt_object_pool_initialize(&graph->packet_end_msg_pool,
374 (bt_object_pool_new_object_func) bt_message_packet_end_new,
375 (bt_object_pool_destroy_object_func) destroy_message_packet_end,
376 graph);
377 if (ret) {
378 BT_LOGE("Failed to initialize packet end message pool: ret=%d",
379 ret);
380 goto error;
381 }
382
383 graph->messages = g_ptr_array_new_with_free_func(
384 (GDestroyNotify) notify_message_graph_is_destroyed);
385 BT_LIB_LOGD("Created graph object: %!+g", graph);
386
387 end:
388 return (void *) graph;
389
390 error:
391 BT_OBJECT_PUT_REF_AND_RESET(graph);
392 goto end;
393 }
394
395 enum bt_graph_status bt_graph_connect_ports(
396 struct bt_graph *graph,
397 const struct bt_port_output *upstream_port_out,
398 const struct bt_port_input *downstream_port_in,
399 const struct bt_connection **user_connection)
400 {
401 enum bt_graph_status status = BT_GRAPH_STATUS_OK;
402 enum bt_graph_listener_status listener_status;
403 struct bt_connection *connection = NULL;
404 struct bt_port *upstream_port = (void *) upstream_port_out;
405 struct bt_port *downstream_port = (void *) downstream_port_in;
406 struct bt_component *upstream_component = NULL;
407 struct bt_component *downstream_component = NULL;
408 enum bt_self_component_status component_status;
409 bool init_can_consume;
410
411 BT_ASSERT_PRE_NON_NULL(graph, "Graph");
412 BT_ASSERT_PRE_NON_NULL(upstream_port, "Upstream port");
413 BT_ASSERT_PRE_NON_NULL(downstream_port, "Downstream port port");
414 BT_ASSERT_PRE(!graph->canceled, "Graph is canceled: %!+g", graph);
415 BT_ASSERT_PRE(
416 graph->config_state == BT_GRAPH_CONFIGURATION_STATE_CONFIGURING,
417 "Graph is not in the \"configuring\" state: %!+g", graph);
418 BT_ASSERT_PRE(!bt_port_is_connected(upstream_port),
419 "Upstream port is already connected: %!+p", upstream_port);
420 BT_ASSERT_PRE(!bt_port_is_connected(downstream_port),
421 "Downstream port is already connected: %!+p", downstream_port);
422 BT_ASSERT_PRE(bt_port_borrow_component_inline((void *) upstream_port),
423 "Upstream port does not belong to a component: %!+p",
424 upstream_port);
425 BT_ASSERT_PRE(bt_port_borrow_component_inline((void *) downstream_port),
426 "Downstream port does not belong to a component: %!+p",
427 downstream_port);
428 init_can_consume = graph->can_consume;
429 BT_LIB_LOGD("Connecting component ports within graph: "
430 "%![graph-]+g, %![up-port-]+p, %![down-port-]+p",
431 graph, upstream_port, downstream_port);
432 bt_graph_set_can_consume(graph, false);
433 upstream_component = bt_port_borrow_component_inline(
434 (void *) upstream_port);
435 downstream_component = bt_port_borrow_component_inline(
436 (void *) downstream_port);
437
438 /*
439 * At this point the ports are not connected yet. Both
440 * components need to accept an eventual connection to their
441 * port by the other port before we continue.
442 */
443 BT_LIB_LOGD("Asking upstream component to accept the connection: "
444 "%![comp-]+c", upstream_component);
445 component_status = bt_component_accept_port_connection(
446 upstream_component, (void *) upstream_port,
447 (void *) downstream_port);
448 if (component_status != BT_SELF_COMPONENT_STATUS_OK) {
449 if (component_status == BT_SELF_COMPONENT_STATUS_REFUSE_PORT_CONNECTION) {
450 BT_LOGD_STR("Upstream component refused the connection.");
451 } else {
452 BT_LOGW("Cannot ask upstream component to accept the connection: "
453 "status=%s", bt_self_component_status_string(component_status));
454 }
455
456 status = (int) component_status;
457 goto end;
458 }
459
460 BT_LIB_LOGD("Asking downstream component to accept the connection: "
461 "%![comp-]+c", downstream_component);
462 component_status = bt_component_accept_port_connection(
463 downstream_component, (void *) downstream_port,
464 (void *) upstream_port);
465 if (component_status != BT_SELF_COMPONENT_STATUS_OK) {
466 if (component_status == BT_SELF_COMPONENT_STATUS_REFUSE_PORT_CONNECTION) {
467 BT_LOGD_STR("Downstream component refused the connection.");
468 } else {
469 BT_LOGW("Cannot ask downstream component to accept the connection: "
470 "status=%s", bt_self_component_status_string(component_status));
471 }
472
473 status = (int) component_status;
474 goto end;
475 }
476
477 BT_LOGD_STR("Creating connection.");
478 connection = bt_connection_create(graph, (void *) upstream_port,
479 (void *) downstream_port);
480 if (!connection) {
481 BT_LOGW("Cannot create connection object.");
482 status = BT_GRAPH_STATUS_NOMEM;
483 goto end;
484 }
485
486 BT_LIB_LOGD("Connection object created: %!+x", connection);
487
488 /*
489 * Ownership of upstream_component/downstream_component and of
490 * the connection object is transferred to the graph.
491 */
492 g_ptr_array_add(graph->connections, connection);
493
494 /*
495 * Notify both components that their port is connected.
496 */
497 BT_LIB_LOGD("Notifying upstream component that its port is connected: "
498 "%![comp-]+c, %![port-]+p", upstream_component, upstream_port);
499 component_status = bt_component_port_connected(upstream_component,
500 (void *) upstream_port, (void *) downstream_port);
501 if (component_status != BT_SELF_COMPONENT_STATUS_OK) {
502 BT_LIB_LOGW("Error while notifying upstream component that its port is connected: "
503 "status=%s, %![graph-]+g, %![up-comp-]+c, "
504 "%![down-comp-]+c, %![up-port-]+p, %![down-port-]+p",
505 bt_self_component_status_string(component_status),
506 graph, upstream_component, downstream_component,
507 upstream_port, downstream_port);
508 bt_connection_end(connection, true);
509 status = (int) component_status;
510 goto end;
511 }
512
513 connection->notified_upstream_port_connected = true;
514 BT_LIB_LOGD("Notifying downstream component that its port is connected: "
515 "%![comp-]+c, %![port-]+p", downstream_component,
516 downstream_port);
517 component_status = bt_component_port_connected(downstream_component,
518 (void *) downstream_port, (void *) upstream_port);
519 if (component_status != BT_SELF_COMPONENT_STATUS_OK) {
520 BT_LIB_LOGW("Error while notifying downstream component that its port is connected: "
521 "status=%s, %![graph-]+g, %![up-comp-]+c, "
522 "%![down-comp-]+c, %![up-port-]+p, %![down-port-]+p",
523 bt_self_component_status_string(component_status),
524 graph, upstream_component, downstream_component,
525 upstream_port, downstream_port);
526 bt_connection_end(connection, true);
527 status = (int) component_status;
528 goto end;
529 }
530
531 connection->notified_downstream_port_connected = true;
532
533 /*
534 * Notify the graph's creator that both ports are connected.
535 */
536 BT_LOGD_STR("Notifying graph's user that new component ports are connected.");
537 listener_status = bt_graph_notify_ports_connected(graph, upstream_port, downstream_port);
538 if (listener_status != BT_GRAPH_LISTENER_STATUS_OK) {
539 status = (int) listener_status;
540 goto end;
541 }
542
543 connection->notified_graph_ports_connected = true;
544 BT_LIB_LOGD("Connected component ports within graph: "
545 "%![graph-]+g, %![up-comp-]+c, %![down-comp-]+c, "
546 "%![up-port-]+p, %![down-port-]+p",
547 graph, upstream_component, downstream_component,
548 upstream_port, downstream_port);
549
550 if (user_connection) {
551 /* Move reference to user */
552 *user_connection = connection;
553 connection = NULL;
554 }
555
556 end:
557 if (status != BT_GRAPH_STATUS_OK) {
558 bt_graph_make_faulty(graph);
559 }
560
561 bt_object_put_ref(connection);
562 (void) init_can_consume;
563 bt_graph_set_can_consume(graph, init_can_consume);
564 return status;
565 }
566
567 static inline
568 enum bt_graph_status consume_graph_sink(struct bt_component_sink *comp)
569 {
570 enum bt_self_component_status comp_status;
571 struct bt_component_class_sink *sink_class = NULL;
572
573 BT_ASSERT(comp);
574 sink_class = (void *) comp->parent.class;
575 BT_ASSERT(sink_class->methods.consume);
576 BT_LIB_LOGD("Calling user's consume method: %!+c", comp);
577 comp_status = sink_class->methods.consume((void *) comp);
578 BT_LOGD("User method returned: status=%s",
579 bt_self_component_status_string(comp_status));
580 BT_ASSERT_PRE(comp_status == BT_SELF_COMPONENT_STATUS_OK ||
581 comp_status == BT_SELF_COMPONENT_STATUS_END ||
582 comp_status == BT_SELF_COMPONENT_STATUS_AGAIN ||
583 comp_status == BT_SELF_COMPONENT_STATUS_ERROR ||
584 comp_status == BT_SELF_COMPONENT_STATUS_NOMEM,
585 "Invalid component status returned by consuming method: "
586 "status=%s", bt_self_component_status_string(comp_status));
587 if (comp_status < 0) {
588 BT_LOGW_STR("Consume method failed.");
589 goto end;
590 }
591
592 BT_LIB_LOGV("Consumed from sink: %![comp-]+c, status=%s",
593 comp, bt_self_component_status_string(comp_status));
594
595 end:
596 return (int) comp_status;
597 }
598
599 /*
600 * `node` is removed from the queue of sinks to consume when passed to
601 * this function. This function adds it back to the queue if there's
602 * still something to consume afterwards.
603 */
604 static inline
605 enum bt_graph_status consume_sink_node(struct bt_graph *graph, GList *node)
606 {
607 enum bt_graph_status status;
608 struct bt_component_sink *sink;
609
610 sink = node->data;
611 status = consume_graph_sink(sink);
612 if (unlikely(status != BT_GRAPH_STATUS_END)) {
613 g_queue_push_tail_link(graph->sinks_to_consume, node);
614 goto end;
615 }
616
617 /* End reached, the node is not added back to the queue and free'd. */
618 g_queue_delete_link(graph->sinks_to_consume, node);
619
620 /* Don't forward an END status if there are sinks left to consume. */
621 if (!g_queue_is_empty(graph->sinks_to_consume)) {
622 status = BT_GRAPH_STATUS_OK;
623 goto end;
624 }
625
626 end:
627 BT_LIB_LOGV("Consumed sink node: %![comp-]+c, status=%s",
628 sink, bt_graph_status_string(status));
629 return status;
630 }
631
632 BT_HIDDEN
633 enum bt_graph_status bt_graph_consume_sink_no_check(struct bt_graph *graph,
634 struct bt_component_sink *sink)
635 {
636 enum bt_graph_status status;
637 GList *sink_node;
638 int index;
639
640 BT_LIB_LOGV("Making specific sink consume: %![comp-]+c", sink);
641 BT_ASSERT(bt_component_borrow_graph((void *) sink) == graph);
642
643 if (g_queue_is_empty(graph->sinks_to_consume)) {
644 BT_LOGV_STR("Graph's sink queue is empty: end of graph.");
645 status = BT_GRAPH_STATUS_END;
646 goto end;
647 }
648
649 index = g_queue_index(graph->sinks_to_consume, sink);
650 if (index < 0) {
651 BT_LOGV_STR("Sink is not marked as consumable: sink is ended.");
652 status = BT_GRAPH_STATUS_END;
653 goto end;
654 }
655
656 sink_node = g_queue_pop_nth_link(graph->sinks_to_consume, index);
657 BT_ASSERT(sink_node);
658 status = consume_sink_node(graph, sink_node);
659
660 end:
661 return status;
662 }
663
664 static inline
665 enum bt_graph_status consume_no_check(struct bt_graph *graph)
666 {
667 enum bt_graph_status status = BT_GRAPH_STATUS_OK;
668 struct bt_component *sink;
669 GList *current_node;
670
671 BT_ASSERT_PRE(graph->has_sink,
672 "Graph has no sink component: %!+g", graph);
673 BT_LIB_LOGV("Making next sink consume: %![graph-]+g", graph);
674
675 if (unlikely(g_queue_is_empty(graph->sinks_to_consume))) {
676 BT_LOGV_STR("Graph's sink queue is empty: end of graph.");
677 status = BT_GRAPH_STATUS_END;
678 goto end;
679 }
680
681 current_node = g_queue_pop_head_link(graph->sinks_to_consume);
682 sink = current_node->data;
683 BT_LIB_LOGV("Chose next sink to consume: %!+c", sink);
684 status = consume_sink_node(graph, current_node);
685
686 end:
687 return status;
688 }
689
690 enum bt_graph_status bt_graph_consume(struct bt_graph *graph)
691 {
692 enum bt_graph_status status;
693
694 BT_ASSERT_PRE_NON_NULL(graph, "Graph");
695 BT_ASSERT_PRE(!graph->canceled, "Graph is canceled: %!+g", graph);
696 BT_ASSERT_PRE(graph->can_consume,
697 "Cannot consume graph in its current state: %!+g", graph);
698 BT_ASSERT_PRE(graph->config_state != BT_GRAPH_CONFIGURATION_STATE_FAULTY,
699 "Graph is in a faulty state: %!+g", graph);
700 bt_graph_set_can_consume(graph, false);
701 status = bt_graph_configure(graph);
702 if (unlikely(status)) {
703 /* bt_graph_configure() logs errors */
704 goto end;
705 }
706
707 status = consume_no_check(graph);
708 bt_graph_set_can_consume(graph, true);
709
710 end:
711 return status;
712 }
713
714 enum bt_graph_status bt_graph_run(struct bt_graph *graph)
715 {
716 enum bt_graph_status status;
717
718 BT_ASSERT_PRE_NON_NULL(graph, "Graph");
719 BT_ASSERT_PRE(!graph->canceled, "Graph is canceled: %!+g", graph);
720 BT_ASSERT_PRE(graph->can_consume,
721 "Cannot consume graph in its current state: %!+g", graph);
722 BT_ASSERT_PRE(graph->config_state != BT_GRAPH_CONFIGURATION_STATE_FAULTY,
723 "Graph is in a faulty state: %!+g", graph);
724 bt_graph_set_can_consume(graph, false);
725 status = bt_graph_configure(graph);
726 if (unlikely(status)) {
727 /* bt_graph_configure() logs errors */
728 goto end;
729 }
730
731 BT_LIB_LOGV("Running graph: %!+g", graph);
732
733 do {
734 /*
735 * Check if the graph is canceled at each iteration. If
736 * the graph was canceled by another thread or by a
737 * signal handler, this is not a warning nor an error,
738 * it was intentional: log with a DEBUG level only.
739 */
740 if (unlikely(graph->canceled)) {
741 BT_LIB_LOGD("Stopping the graph: graph is canceled: "
742 "%!+g", graph);
743 status = BT_GRAPH_STATUS_CANCELED;
744 goto end;
745 }
746
747 status = consume_no_check(graph);
748 if (unlikely(status == BT_GRAPH_STATUS_AGAIN)) {
749 /*
750 * If AGAIN is received and there are multiple
751 * sinks, go ahead and consume from the next
752 * sink.
753 *
754 * However, in the case where a single sink is
755 * left, the caller can decide to busy-wait and
756 * call bt_graph_run() continuously
757 * until the source is ready or it can decide to
758 * sleep for an arbitrary amount of time.
759 */
760 if (graph->sinks_to_consume->length > 1) {
761 status = BT_GRAPH_STATUS_OK;
762 }
763 }
764 } while (status == BT_GRAPH_STATUS_OK);
765
766 if (g_queue_is_empty(graph->sinks_to_consume)) {
767 status = BT_GRAPH_STATUS_END;
768 }
769
770 end:
771 BT_LIB_LOGV("Graph ran: %![graph-]+g, status=%s", graph,
772 bt_graph_status_string(status));
773 bt_graph_set_can_consume(graph, true);
774 return status;
775 }
776
777 enum bt_graph_status
778 bt_graph_add_source_component_output_port_added_listener(
779 struct bt_graph *graph,
780 bt_graph_source_component_output_port_added_listener_func func,
781 bt_graph_listener_removed_func listener_removed, void *data,
782 int *out_listener_id)
783 {
784 struct bt_graph_listener_port_added listener = {
785 .base = {
786 .removed = listener_removed,
787 .data = data,
788 },
789 .func = (port_added_func_t) func,
790 };
791 int listener_id;
792
793 BT_ASSERT_PRE_NON_NULL(graph, "Graph");
794 BT_ASSERT_PRE_NON_NULL(func, "Listener");
795 BT_ASSERT_PRE_NON_NULL(func, "\"Listener removed\" listener");
796 BT_ASSERT_PRE(!graph->in_remove_listener,
797 "Graph currently executing a \"listener removed\" listener: "
798 "%!+g", graph);
799 g_array_append_val(graph->listeners.source_output_port_added, listener);
800 listener_id = graph->listeners.source_output_port_added->len - 1;
801 BT_LIB_LOGV("Added \"source component output port added\" listener to graph: "
802 "%![graph-]+g, listener-addr=%p, id=%d", graph, listener,
803 listener_id);
804
805 if (listener_id) {
806 *out_listener_id = listener_id;
807 }
808
809 return BT_GRAPH_STATUS_OK;
810 }
811
812 enum bt_graph_status
813 bt_graph_add_filter_component_output_port_added_listener(
814 struct bt_graph *graph,
815 bt_graph_filter_component_output_port_added_listener_func func,
816 bt_graph_listener_removed_func listener_removed, void *data,
817 int *out_listener_id)
818 {
819 struct bt_graph_listener_port_added listener = {
820 .base = {
821 .removed = listener_removed,
822 .data = data,
823 },
824 .func = (port_added_func_t) func,
825 };
826 int listener_id;
827
828 BT_ASSERT_PRE_NON_NULL(graph, "Graph");
829 BT_ASSERT_PRE_NON_NULL(func, "Listener");
830 BT_ASSERT_PRE_NON_NULL(func, "\"Listener removed\" listener");
831 BT_ASSERT_PRE(!graph->in_remove_listener,
832 "Graph currently executing a \"listener removed\" listener: "
833 "%!+g", graph);
834 g_array_append_val(graph->listeners.filter_output_port_added, listener);
835 listener_id = graph->listeners.filter_output_port_added->len - 1;
836 BT_LIB_LOGV("Added \"filter component output port added\" listener to graph: "
837 "%![graph-]+g, listener-addr=%p, id=%d", graph, listener,
838 listener_id);
839
840 if (listener_id) {
841 *out_listener_id = listener_id;
842 }
843
844 return BT_GRAPH_STATUS_OK;
845 }
846
847 enum bt_graph_status
848 bt_graph_add_filter_component_input_port_added_listener(
849 struct bt_graph *graph,
850 bt_graph_filter_component_input_port_added_listener_func func,
851 bt_graph_listener_removed_func listener_removed, void *data,
852 int *out_listener_id)
853 {
854 struct bt_graph_listener_port_added listener = {
855 .base = {
856 .removed = listener_removed,
857 .data = data,
858 },
859 .func = (port_added_func_t) func,
860 };
861 int listener_id;
862
863 BT_ASSERT_PRE_NON_NULL(graph, "Graph");
864 BT_ASSERT_PRE_NON_NULL(func, "Listener");
865 BT_ASSERT_PRE_NON_NULL(func, "\"Listener removed\" listener");
866 BT_ASSERT_PRE(!graph->in_remove_listener,
867 "Graph currently executing a \"listener removed\" listener: "
868 "%!+g", graph);
869 g_array_append_val(graph->listeners.filter_input_port_added, listener);
870 listener_id = graph->listeners.filter_input_port_added->len - 1;
871 BT_LIB_LOGV("Added \"filter component input port added\" listener to graph: "
872 "%![graph-]+g, listener-addr=%p, id=%d", graph, listener,
873 listener_id);
874
875 if (listener_id) {
876 *out_listener_id = listener_id;
877 }
878
879 return BT_GRAPH_STATUS_OK;
880 }
881
882 enum bt_graph_status
883 bt_graph_add_sink_component_input_port_added_listener(
884 struct bt_graph *graph,
885 bt_graph_sink_component_input_port_added_listener_func func,
886 bt_graph_listener_removed_func listener_removed, void *data,
887 int *out_listener_id)
888 {
889 struct bt_graph_listener_port_added listener = {
890 .base = {
891 .removed = listener_removed,
892 .data = data,
893 },
894 .func = (port_added_func_t) func,
895 };
896 int listener_id;
897
898 BT_ASSERT_PRE_NON_NULL(graph, "Graph");
899 BT_ASSERT_PRE_NON_NULL(func, "Listener");
900 BT_ASSERT_PRE_NON_NULL(func, "\"Listener removed\" listener");
901 BT_ASSERT_PRE(!graph->in_remove_listener,
902 "Graph currently executing a \"listener removed\" listener: "
903 "%!+g", graph);
904 g_array_append_val(graph->listeners.sink_input_port_added, listener);
905 listener_id = graph->listeners.sink_input_port_added->len - 1;
906 BT_LIB_LOGV("Added \"sink component input port added\" listener to graph: "
907 "%![graph-]+g, listener-addr=%p, id=%d", graph, listener,
908 listener_id);
909
910 if (listener_id) {
911 *out_listener_id = listener_id;
912 }
913
914 return BT_GRAPH_STATUS_OK;
915 }
916
917 enum bt_graph_status
918 bt_graph_add_source_filter_component_ports_connected_listener(
919 struct bt_graph *graph,
920 bt_graph_source_filter_component_ports_connected_listener_func func,
921 bt_graph_listener_removed_func listener_removed, void *data,
922 int *out_listener_id)
923 {
924 struct bt_graph_listener_ports_connected listener = {
925 .base = {
926 .removed = listener_removed,
927 .data = data,
928 },
929 .func = (ports_connected_func_t) func,
930 };
931 int listener_id;
932
933 BT_ASSERT_PRE_NON_NULL(graph, "Graph");
934 BT_ASSERT_PRE_NON_NULL(func, "Listener");
935 BT_ASSERT_PRE_NON_NULL(func, "\"Listener removed\" listener");
936 BT_ASSERT_PRE(!graph->in_remove_listener,
937 "Graph currently executing a \"listener removed\" listener: "
938 "%!+g", graph);
939 g_array_append_val(graph->listeners.source_filter_ports_connected,
940 listener);
941 listener_id = graph->listeners.source_filter_ports_connected->len - 1;
942 BT_LIB_LOGV("Added \"source to filter component ports connected\" listener to graph: "
943 "%![graph-]+g, listener-addr=%p, id=%d", graph, listener,
944 listener_id);
945
946 if (listener_id) {
947 *out_listener_id = listener_id;
948 }
949
950 return BT_GRAPH_STATUS_OK;
951 }
952
953 enum bt_graph_status
954 bt_graph_add_source_sink_component_ports_connected_listener(
955 struct bt_graph *graph,
956 bt_graph_source_sink_component_ports_connected_listener_func func,
957 bt_graph_listener_removed_func listener_removed, void *data,
958 int *out_listener_id)
959 {
960 struct bt_graph_listener_ports_connected listener = {
961 .base = {
962 .removed = listener_removed,
963 .data = data,
964 },
965 .func = (ports_connected_func_t) func,
966 };
967 int listener_id;
968
969 BT_ASSERT_PRE_NON_NULL(graph, "Graph");
970 BT_ASSERT_PRE_NON_NULL(func, "Listener");
971 BT_ASSERT_PRE_NON_NULL(func, "\"Listener removed\" listener");
972 BT_ASSERT_PRE(!graph->in_remove_listener,
973 "Graph currently executing a \"listener removed\" listener: "
974 "%!+g", graph);
975 g_array_append_val(graph->listeners.source_sink_ports_connected,
976 listener);
977 listener_id = graph->listeners.source_sink_ports_connected->len - 1;
978 BT_LIB_LOGV("Added \"source to sink component ports connected\" listener to graph: "
979 "%![graph-]+g, listener-addr=%p, id=%d", graph, listener,
980 listener_id);
981
982 if (listener_id) {
983 *out_listener_id = listener_id;
984 }
985
986 return BT_GRAPH_STATUS_OK;
987 }
988
989 enum bt_graph_status
990 bt_graph_add_filter_filter_component_ports_connected_listener(
991 struct bt_graph *graph,
992 bt_graph_filter_filter_component_ports_connected_listener_func func,
993 bt_graph_listener_removed_func listener_removed, void *data,
994 int *out_listener_id)
995 {
996 struct bt_graph_listener_ports_connected listener = {
997 .base = {
998 .removed = listener_removed,
999 .data = data,
1000 },
1001 .func = (ports_connected_func_t) func,
1002 };
1003 int listener_id;
1004
1005 BT_ASSERT_PRE_NON_NULL(graph, "Graph");
1006 BT_ASSERT_PRE_NON_NULL(func, "Listener");
1007 BT_ASSERT_PRE_NON_NULL(func, "\"Listener removed\" listener");
1008 BT_ASSERT_PRE(!graph->in_remove_listener,
1009 "Graph currently executing a \"listener removed\" listener: "
1010 "%!+g", graph);
1011 g_array_append_val(graph->listeners.filter_filter_ports_connected,
1012 listener);
1013 listener_id = graph->listeners.filter_filter_ports_connected->len - 1;
1014 BT_LIB_LOGV("Added \"filter to filter component ports connected\" listener to graph: "
1015 "%![graph-]+g, listener-addr=%p, id=%d", graph, listener,
1016 listener_id);
1017
1018 if (listener_id) {
1019 *out_listener_id = listener_id;
1020 }
1021
1022 return BT_GRAPH_STATUS_OK;
1023 }
1024
1025 enum bt_graph_status
1026 bt_graph_add_filter_sink_component_ports_connected_listener(
1027 struct bt_graph *graph,
1028 bt_graph_filter_sink_component_ports_connected_listener_func func,
1029 bt_graph_listener_removed_func listener_removed, void *data,
1030 int *out_listener_id)
1031 {
1032 struct bt_graph_listener_ports_connected listener = {
1033 .base = {
1034 .removed = listener_removed,
1035 .data = data,
1036 },
1037 .func = (ports_connected_func_t) func,
1038 };
1039 int listener_id;
1040
1041 BT_ASSERT_PRE_NON_NULL(graph, "Graph");
1042 BT_ASSERT_PRE_NON_NULL(func, "Listener");
1043 BT_ASSERT_PRE_NON_NULL(func, "\"Listener removed\" listener");
1044 BT_ASSERT_PRE(!graph->in_remove_listener,
1045 "Graph currently executing a \"listener removed\" listener: "
1046 "%!+g", graph);
1047 g_array_append_val(graph->listeners.filter_sink_ports_connected,
1048 listener);
1049 listener_id = graph->listeners.filter_sink_ports_connected->len - 1;
1050 BT_LIB_LOGV("Added \"filter to sink component ports connected\" listener to graph: "
1051 "%![graph-]+g, listener-addr=%p, id=%d", graph, listener,
1052 listener_id);
1053
1054 if (listener_id) {
1055 *out_listener_id = listener_id;
1056 }
1057
1058 return BT_GRAPH_STATUS_OK;
1059 }
1060
1061 BT_HIDDEN
1062 enum bt_graph_listener_status bt_graph_notify_port_added(
1063 struct bt_graph *graph, struct bt_port *port)
1064 {
1065 uint64_t i;
1066 GArray *listeners;
1067 struct bt_component *comp;
1068 enum bt_graph_listener_status status = BT_GRAPH_LISTENER_STATUS_OK;
1069
1070 BT_ASSERT(graph);
1071 BT_ASSERT(port);
1072 BT_LIB_LOGV("Notifying graph listeners that a port was added: "
1073 "%![graph-]+g, %![port-]+p", graph, port);
1074 comp = bt_port_borrow_component_inline(port);
1075 BT_ASSERT(comp);
1076
1077 switch (comp->class->type) {
1078 case BT_COMPONENT_CLASS_TYPE_SOURCE:
1079 {
1080 switch (port->type) {
1081 case BT_PORT_TYPE_OUTPUT:
1082 listeners = graph->listeners.source_output_port_added;
1083 break;
1084 default:
1085 abort();
1086 }
1087
1088 break;
1089 }
1090 case BT_COMPONENT_CLASS_TYPE_FILTER:
1091 {
1092 switch (port->type) {
1093 case BT_PORT_TYPE_INPUT:
1094 listeners = graph->listeners.filter_input_port_added;
1095 break;
1096 case BT_PORT_TYPE_OUTPUT:
1097 listeners = graph->listeners.filter_output_port_added;
1098 break;
1099 default:
1100 abort();
1101 }
1102
1103 break;
1104 }
1105 case BT_COMPONENT_CLASS_TYPE_SINK:
1106 {
1107 switch (port->type) {
1108 case BT_PORT_TYPE_INPUT:
1109 listeners = graph->listeners.sink_input_port_added;
1110 break;
1111 default:
1112 abort();
1113 }
1114
1115 break;
1116 }
1117 default:
1118 abort();
1119 }
1120
1121 for (i = 0; i < listeners->len; i++) {
1122 struct bt_graph_listener_port_added *listener =
1123 &g_array_index(listeners,
1124 struct bt_graph_listener_port_added, i);
1125
1126
1127 BT_ASSERT(listener->func);
1128 status = listener->func(comp, port, listener->base.data);
1129 if (status != BT_GRAPH_LISTENER_STATUS_OK) {
1130 goto end;
1131 }
1132 }
1133
1134 end:
1135 return status;
1136 }
1137
1138 BT_HIDDEN
1139 enum bt_graph_listener_status bt_graph_notify_ports_connected(
1140 struct bt_graph *graph, struct bt_port *upstream_port,
1141 struct bt_port *downstream_port)
1142 {
1143 uint64_t i;
1144 GArray *listeners;
1145 struct bt_component *upstream_comp;
1146 struct bt_component *downstream_comp;
1147 enum bt_graph_listener_status status = BT_GRAPH_LISTENER_STATUS_OK;
1148
1149 BT_ASSERT(graph);
1150 BT_ASSERT(upstream_port);
1151 BT_ASSERT(downstream_port);
1152 BT_LIB_LOGV("Notifying graph listeners that ports were connected: "
1153 "%![graph-]+g, %![up-port-]+p, %![down-port-]+p",
1154 graph, upstream_port, downstream_port);
1155 upstream_comp = bt_port_borrow_component_inline(upstream_port);
1156 BT_ASSERT(upstream_comp);
1157 downstream_comp = bt_port_borrow_component_inline(downstream_port);
1158 BT_ASSERT(downstream_comp);
1159
1160 switch (upstream_comp->class->type) {
1161 case BT_COMPONENT_CLASS_TYPE_SOURCE:
1162 {
1163 switch (downstream_comp->class->type) {
1164 case BT_COMPONENT_CLASS_TYPE_FILTER:
1165 listeners =
1166 graph->listeners.source_filter_ports_connected;
1167 break;
1168 case BT_COMPONENT_CLASS_TYPE_SINK:
1169 listeners =
1170 graph->listeners.source_sink_ports_connected;
1171 break;
1172 default:
1173 abort();
1174 }
1175
1176 break;
1177 }
1178 case BT_COMPONENT_CLASS_TYPE_FILTER:
1179 {
1180 switch (downstream_comp->class->type) {
1181 case BT_COMPONENT_CLASS_TYPE_FILTER:
1182 listeners =
1183 graph->listeners.filter_filter_ports_connected;
1184 break;
1185 case BT_COMPONENT_CLASS_TYPE_SINK:
1186 listeners =
1187 graph->listeners.filter_sink_ports_connected;
1188 break;
1189 default:
1190 abort();
1191 }
1192
1193 break;
1194 }
1195 default:
1196 abort();
1197 }
1198
1199 for (i = 0; i < listeners->len; i++) {
1200 struct bt_graph_listener_ports_connected *listener =
1201 &g_array_index(listeners,
1202 struct bt_graph_listener_ports_connected, i);
1203
1204 BT_ASSERT(listener->func);
1205 status = listener->func(upstream_comp, downstream_comp,
1206 upstream_port, downstream_port, listener->base.data);
1207 if (status != BT_GRAPH_LISTENER_STATUS_OK) {
1208 goto end;
1209 }
1210 }
1211
1212 end:
1213 return status;
1214 }
1215
1216 enum bt_graph_status bt_graph_cancel(struct bt_graph *graph)
1217 {
1218
1219 BT_ASSERT_PRE_NON_NULL(graph, "Graph");
1220 graph->canceled = true;
1221 BT_LIB_LOGV("Canceled graph: %!+i", graph);
1222 return BT_GRAPH_STATUS_OK;
1223 }
1224
1225 bt_bool bt_graph_is_canceled(const struct bt_graph *graph)
1226 {
1227 BT_ASSERT_PRE_NON_NULL(graph, "Graph");
1228 return graph->canceled ? BT_TRUE : BT_FALSE;
1229 }
1230
1231 BT_HIDDEN
1232 void bt_graph_remove_connection(struct bt_graph *graph,
1233 struct bt_connection *connection)
1234 {
1235 BT_ASSERT(graph);
1236 BT_ASSERT(connection);
1237 BT_LIB_LOGV("Removing graph's connection: %![graph-]+g, %![conn-]+x",
1238 graph, connection);
1239 g_ptr_array_remove(graph->connections, connection);
1240 }
1241
1242 BT_ASSERT_PRE_FUNC
1243 static inline
1244 bool component_name_exists(struct bt_graph *graph, const char *name)
1245 {
1246 bool exists = false;
1247 uint64_t i;
1248
1249 for (i = 0; i < graph->components->len; i++) {
1250 struct bt_component *other_comp = graph->components->pdata[i];
1251
1252 if (strcmp(name, bt_component_get_name(other_comp)) == 0) {
1253 BT_ASSERT_PRE_MSG("Another component with the same name already exists in the graph: "
1254 "%![other-comp-]+c, name=\"%s\"",
1255 other_comp, name);
1256 exists = true;
1257 goto end;
1258 }
1259 }
1260
1261 end:
1262 return exists;
1263 }
1264
1265 static
1266 enum bt_graph_status add_component_with_init_method_data(
1267 struct bt_graph *graph,
1268 struct bt_component_class *comp_cls,
1269 comp_init_method_t init_method,
1270 const char *name, const struct bt_value *params,
1271 void *init_method_data, struct bt_component **user_component)
1272 {
1273 enum bt_graph_status graph_status = BT_GRAPH_STATUS_OK;
1274 enum bt_self_component_status comp_status;
1275 struct bt_component *component = NULL;
1276 int ret;
1277 bool init_can_consume;
1278 struct bt_value *new_params = NULL;
1279
1280 BT_ASSERT(comp_cls);
1281 BT_ASSERT_PRE_NON_NULL(graph, "Graph");
1282 BT_ASSERT_PRE_NON_NULL(name, "Name");
1283 BT_ASSERT_PRE(!graph->canceled, "Graph is canceled: %!+g", graph);
1284 BT_ASSERT_PRE(
1285 graph->config_state == BT_GRAPH_CONFIGURATION_STATE_CONFIGURING,
1286 "Graph is not in the \"configuring\" state: %!+g", graph);
1287 BT_ASSERT_PRE(!component_name_exists(graph, name),
1288 "Duplicate component name: %!+g, name=\"%s\"", graph, name);
1289 BT_ASSERT_PRE(!params || bt_value_is_map(params),
1290 "Parameter value is not a map value: %!+v", params);
1291 init_can_consume = graph->can_consume;
1292 bt_graph_set_can_consume(graph, false);
1293 BT_LIB_LOGD("Adding component to graph: "
1294 "%![graph-]+g, %![cc-]+C, name=\"%s\", %![params-]+v, "
1295 "init-method-data-addr=%p",
1296 graph, comp_cls, name, params, init_method_data);
1297
1298 if (!params) {
1299 new_params = bt_value_map_create();
1300 if (!new_params) {
1301 BT_LOGE_STR("Cannot create map value object.");
1302 graph_status = BT_GRAPH_STATUS_NOMEM;
1303 goto end;
1304 }
1305
1306 params = new_params;
1307 }
1308
1309 ret = bt_component_create(comp_cls, name, &component);
1310 if (ret) {
1311 BT_LOGE("Cannot create empty component object: ret=%d",
1312 ret);
1313 graph_status = BT_GRAPH_STATUS_NOMEM;
1314 goto end;
1315 }
1316
1317 /*
1318 * The user's initialization method needs to see that this
1319 * component is part of the graph. If the user method fails, we
1320 * immediately remove the component from the graph's components.
1321 */
1322 g_ptr_array_add(graph->components, component);
1323 bt_component_set_graph(component, graph);
1324 bt_value_freeze(params);
1325
1326 if (init_method) {
1327 BT_LOGD_STR("Calling user's initialization method.");
1328 comp_status = init_method(component, params, init_method_data);
1329 BT_LOGD("User method returned: status=%s",
1330 bt_self_component_status_string(comp_status));
1331 if (comp_status != BT_SELF_COMPONENT_STATUS_OK) {
1332 BT_LOGW_STR("Initialization method failed.");
1333 graph_status = (int) comp_status;
1334 bt_component_set_graph(component, NULL);
1335 g_ptr_array_remove_fast(graph->components, component);
1336 goto end;
1337 }
1338 }
1339
1340 /*
1341 * Mark the component as initialized so that its finalization
1342 * method is called when it is destroyed.
1343 */
1344 component->initialized = true;
1345
1346 /*
1347 * If it's a sink component, it needs to be part of the graph's
1348 * sink queue to be consumed by bt_graph_consume().
1349 */
1350 if (bt_component_is_sink(component)) {
1351 graph->has_sink = true;
1352 g_queue_push_tail(graph->sinks_to_consume, component);
1353 }
1354
1355 /*
1356 * Freeze the component class now that it's instantiated at
1357 * least once.
1358 */
1359 BT_LOGD_STR("Freezing component class.");
1360 bt_component_class_freeze(comp_cls);
1361 BT_LIB_LOGD("Added component to graph: "
1362 "%![graph-]+g, %![cc-]+C, name=\"%s\", %![params-]+v, "
1363 "init-method-data-addr=%p, %![comp-]+c",
1364 graph, comp_cls, name, params, init_method_data, component);
1365
1366 if (user_component) {
1367 /* Move reference to user */
1368 *user_component = component;
1369 component = NULL;
1370 }
1371
1372 end:
1373 if (graph_status != BT_GRAPH_STATUS_OK) {
1374 bt_graph_make_faulty(graph);
1375 }
1376
1377 bt_object_put_ref(component);
1378 bt_object_put_ref(new_params);
1379 (void) init_can_consume;
1380 bt_graph_set_can_consume(graph, init_can_consume);
1381 return graph_status;
1382 }
1383
1384 enum bt_graph_status
1385 bt_graph_add_source_component_with_init_method_data(
1386 struct bt_graph *graph,
1387 const struct bt_component_class_source *comp_cls,
1388 const char *name, const struct bt_value *params,
1389 void *init_method_data,
1390 const struct bt_component_source **component)
1391 {
1392 BT_ASSERT_PRE_NON_NULL(comp_cls, "Component class");
1393 return add_component_with_init_method_data(graph,
1394 (void *) comp_cls, (comp_init_method_t) comp_cls->methods.init,
1395 name, params, init_method_data, (void *) component);
1396 }
1397
1398 enum bt_graph_status bt_graph_add_source_component(
1399 struct bt_graph *graph,
1400 const struct bt_component_class_source *comp_cls,
1401 const char *name, const struct bt_value *params,
1402 const struct bt_component_source **component)
1403 {
1404 return bt_graph_add_source_component_with_init_method_data(
1405 graph, comp_cls, name, params, NULL, component);
1406 }
1407
1408 enum bt_graph_status
1409 bt_graph_add_filter_component_with_init_method_data(
1410 struct bt_graph *graph,
1411 const struct bt_component_class_filter *comp_cls,
1412 const char *name, const struct bt_value *params,
1413 void *init_method_data,
1414 const struct bt_component_filter **component)
1415 {
1416 BT_ASSERT_PRE_NON_NULL(comp_cls, "Component class");
1417 return add_component_with_init_method_data(graph,
1418 (void *) comp_cls, (comp_init_method_t) comp_cls->methods.init,
1419 name, params, init_method_data, (void *) component);
1420 }
1421
1422 enum bt_graph_status bt_graph_add_filter_component(
1423 struct bt_graph *graph,
1424 const struct bt_component_class_filter *comp_cls,
1425 const char *name, const struct bt_value *params,
1426 const struct bt_component_filter **component)
1427 {
1428 return bt_graph_add_filter_component_with_init_method_data(
1429 graph, comp_cls, name, params, NULL, component);
1430 }
1431
1432 enum bt_graph_status
1433 bt_graph_add_sink_component_with_init_method_data(
1434 struct bt_graph *graph,
1435 const struct bt_component_class_sink *comp_cls,
1436 const char *name, const struct bt_value *params,
1437 void *init_method_data,
1438 const struct bt_component_sink **component)
1439 {
1440 BT_ASSERT_PRE_NON_NULL(comp_cls, "Component class");
1441 return add_component_with_init_method_data(graph,
1442 (void *) comp_cls, (comp_init_method_t) comp_cls->methods.init,
1443 name, params, init_method_data, (void *) component);
1444 }
1445
1446 enum bt_graph_status bt_graph_add_sink_component(
1447 struct bt_graph *graph,
1448 const struct bt_component_class_sink *comp_cls,
1449 const char *name, const struct bt_value *params,
1450 const struct bt_component_sink **component)
1451 {
1452 return bt_graph_add_sink_component_with_init_method_data(
1453 graph, comp_cls, name, params, NULL, component);
1454 }
1455
1456 BT_HIDDEN
1457 int bt_graph_remove_unconnected_component(struct bt_graph *graph,
1458 struct bt_component *component)
1459 {
1460 bool init_can_consume;
1461 uint64_t count;
1462 uint64_t i;
1463 int ret = 0;
1464
1465 BT_ASSERT(graph);
1466 BT_ASSERT(component);
1467 BT_ASSERT(component->base.ref_count == 0);
1468 BT_ASSERT(bt_component_borrow_graph(component) == graph);
1469
1470 init_can_consume = graph->can_consume;
1471 count = bt_component_get_input_port_count(component);
1472
1473 for (i = 0; i < count; i++) {
1474 struct bt_port *port = (void *)
1475 bt_component_borrow_input_port_by_index(component, i);
1476
1477 BT_ASSERT(port);
1478
1479 if (bt_port_is_connected(port)) {
1480 BT_LIB_LOGW("Cannot remove component from graph: "
1481 "an input port is connected: "
1482 "%![graph-]+g, %![comp-]+c, %![port-]+p",
1483 graph, component, port);
1484 goto error;
1485 }
1486 }
1487
1488 count = bt_component_get_output_port_count(component);
1489
1490 for (i = 0; i < count; i++) {
1491 struct bt_port *port = (void *)
1492 bt_component_borrow_output_port_by_index(component, i);
1493
1494 BT_ASSERT(port);
1495
1496 if (bt_port_is_connected(port)) {
1497 BT_LIB_LOGW("Cannot remove component from graph: "
1498 "an output port is connected: "
1499 "%![graph-]+g, %![comp-]+c, %![port-]+p",
1500 graph, component, port);
1501 goto error;
1502 }
1503 }
1504
1505 bt_graph_set_can_consume(graph, false);
1506
1507 /* Possibly remove from sinks to consume */
1508 (void) g_queue_remove(graph->sinks_to_consume, component);
1509
1510 if (graph->sinks_to_consume->length == 0) {
1511 graph->has_sink = false;
1512 }
1513
1514 /*
1515 * This calls bt_object_try_spec_release() on the component, and
1516 * since its reference count is 0, its destructor is called. Its
1517 * destructor calls the user's finalization method (if set).
1518 */
1519 g_ptr_array_remove(graph->components, component);
1520 goto end;
1521
1522 error:
1523 ret = -1;
1524
1525 end:
1526 (void) init_can_consume;
1527 bt_graph_set_can_consume(graph, init_can_consume);
1528 return ret;
1529 }
1530
1531 BT_HIDDEN
1532 void bt_graph_add_message(struct bt_graph *graph,
1533 struct bt_message *msg)
1534 {
1535 BT_ASSERT(graph);
1536 BT_ASSERT(msg);
1537
1538 /*
1539 * It's okay not to take a reference because, when a
1540 * message's reference count drops to 0, either:
1541 *
1542 * * It is recycled back to one of this graph's pool.
1543 * * It is destroyed because it doesn't have any link to any
1544 * graph, which means the original graph is already destroyed.
1545 */
1546 g_ptr_array_add(graph->messages, msg);
1547 }
1548
1549 void bt_graph_get_ref(const struct bt_graph *graph)
1550 {
1551 bt_object_get_ref(graph);
1552 }
1553
1554 void bt_graph_put_ref(const struct bt_graph *graph)
1555 {
1556 bt_object_put_ref(graph);
1557 }
This page took 0.10737 seconds and 4 git commands to generate.