cc27a6998e48ef3e911050b4b92e1c6e9068875a
[babeltrace.git] / lib / graph / graph.c
1 /*
2 * Copyright 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 * Copyright 2017 Philippe Proulx <pproulx@efficios.com>
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a copy
6 * of this software and associated documentation files (the "Software"), to deal
7 * in the Software without restriction, including without limitation the rights
8 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 * copies of the Software, and to permit persons to whom the Software is
10 * furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21 * SOFTWARE.
22 */
23
24 #define BT_LOG_TAG "GRAPH"
25 #include <babeltrace/lib-logging-internal.h>
26
27 #include <babeltrace/graph/component-internal.h>
28 #include <babeltrace/graph/private-graph.h>
29 #include <babeltrace/graph/graph-internal.h>
30 #include <babeltrace/graph/connection-internal.h>
31 #include <babeltrace/graph/component-sink-internal.h>
32 #include <babeltrace/graph/component-source.h>
33 #include <babeltrace/graph/component-filter.h>
34 #include <babeltrace/graph/port.h>
35 #include <babeltrace/graph/notification-internal.h>
36 #include <babeltrace/graph/notification-event-internal.h>
37 #include <babeltrace/graph/notification-packet-internal.h>
38 #include <babeltrace/compiler-internal.h>
39 #include <babeltrace/common-internal.h>
40 #include <babeltrace/types.h>
41 #include <babeltrace/values.h>
42 #include <babeltrace/private-values.h>
43 #include <babeltrace/values-internal.h>
44 #include <babeltrace/object.h>
45 #include <babeltrace/assert-internal.h>
46 #include <babeltrace/assert-pre-internal.h>
47 #include <unistd.h>
48 #include <glib.h>
49
50 typedef void (*port_added_func_t)(void *, void *, void *);
51 typedef void (*port_removed_func_t)(void *, void *, void *);
52 typedef void (*ports_connected_func_t)(void *, void *, void *, void *, void *);
53 typedef void (*ports_disconnected_func_t)(void *, void *, void *, void *, void *);
54 typedef enum bt_self_component_status (*comp_init_method_t)(void *, void *, void *);
55
56 struct bt_graph_listener {
57 bt_private_graph_listener_removed removed;
58 void *data;
59 };
60
61 struct bt_graph_listener_port_added {
62 struct bt_graph_listener base;
63 port_added_func_t func;
64 };
65
66 struct bt_graph_listener_port_removed {
67 struct bt_graph_listener base;
68 port_removed_func_t func;
69 };
70
71 struct bt_graph_listener_ports_connected {
72 struct bt_graph_listener base;
73 ports_connected_func_t func;
74 };
75
76 struct bt_graph_listener_ports_disconnected {
77 struct bt_graph_listener base;
78 ports_disconnected_func_t func;
79 };
80
81 #define INIT_LISTENERS_ARRAY(_type, _listeners) \
82 do { \
83 _listeners = g_array_new(FALSE, TRUE, sizeof(_type)); \
84 if (!(_listeners)) { \
85 BT_LOGE_STR("Failed to allocate one GArray."); \
86 } \
87 } while (0)
88
89 #define CALL_REMOVE_LISTENERS(_type, _listeners) \
90 do { \
91 size_t i; \
92 \
93 for (i = 0; i < (_listeners)->len; i++) { \
94 _type *listener = \
95 &g_array_index((_listeners), _type, i); \
96 \
97 if (listener->base.removed) { \
98 listener->base.removed(listener->base.data); \
99 } \
100 } \
101 } while (0)
102
103 static
104 void destroy_graph(struct bt_object *obj)
105 {
106 struct bt_graph *graph = container_of(obj,
107 struct bt_graph, base);
108
109 /*
110 * The graph's reference count is 0 if we're here. Increment
111 * it to avoid a double-destroy (possibly infinitely recursive)
112 * in this situation:
113 *
114 * 1. We put and destroy a connection.
115 * 2. This connection's destructor finalizes its active
116 * notification iterators.
117 * 3. A notification iterator's finalization function gets a
118 * new reference on its component (reference count goes from
119 * 0 to 1).
120 * 4. Since this component's reference count goes to 1, it takes
121 * a reference on its parent (this graph). This graph's
122 * reference count goes from 0 to 1.
123 * 5. The notification iterator's finalization function puts its
124 * component reference (reference count goes from 1 to 0).
125 * 6. Since this component's reference count goes from 1 to 0,
126 * it puts its parent (this graph). This graph's reference
127 * count goes from 1 to 0.
128 * 7. Since this graph's reference count goes from 1 to 0,
129 * its destructor is called (this function).
130 *
131 * With the incrementation below, the graph's reference count at
132 * step 4 goes from 1 to 2, and from 2 to 1 at step 6. This
133 * ensures that this function is not called two times.
134 */
135 BT_LIB_LOGD("Destroying graph: %!+g", graph);
136 obj->ref_count++;
137
138 /*
139 * Cancel the graph to disallow some operations, like creating
140 * notification iterators and adding ports to components.
141 */
142 (void) bt_private_graph_cancel((void *) graph);
143
144 /* Call all remove listeners */
145 CALL_REMOVE_LISTENERS(struct bt_graph_listener_port_added,
146 graph->listeners.source_output_port_added);
147 CALL_REMOVE_LISTENERS(struct bt_graph_listener_port_added,
148 graph->listeners.filter_output_port_added);
149 CALL_REMOVE_LISTENERS(struct bt_graph_listener_port_added,
150 graph->listeners.filter_input_port_added);
151 CALL_REMOVE_LISTENERS(struct bt_graph_listener_port_added,
152 graph->listeners.sink_input_port_added);
153 CALL_REMOVE_LISTENERS(struct bt_graph_listener_port_removed,
154 graph->listeners.source_output_port_removed);
155 CALL_REMOVE_LISTENERS(struct bt_graph_listener_port_removed,
156 graph->listeners.filter_output_port_removed);
157 CALL_REMOVE_LISTENERS(struct bt_graph_listener_port_removed,
158 graph->listeners.filter_input_port_removed);
159 CALL_REMOVE_LISTENERS(struct bt_graph_listener_port_removed,
160 graph->listeners.sink_input_port_removed);
161 CALL_REMOVE_LISTENERS(struct bt_graph_listener_ports_connected,
162 graph->listeners.source_filter_ports_connected);
163 CALL_REMOVE_LISTENERS(struct bt_graph_listener_ports_connected,
164 graph->listeners.source_sink_ports_connected);
165 CALL_REMOVE_LISTENERS(struct bt_graph_listener_ports_connected,
166 graph->listeners.filter_sink_ports_connected);
167 CALL_REMOVE_LISTENERS(struct bt_graph_listener_ports_disconnected,
168 graph->listeners.source_filter_ports_disconnected);
169 CALL_REMOVE_LISTENERS(struct bt_graph_listener_ports_disconnected,
170 graph->listeners.source_sink_ports_disconnected);
171 CALL_REMOVE_LISTENERS(struct bt_graph_listener_ports_disconnected,
172 graph->listeners.filter_sink_ports_disconnected);
173
174 if (graph->notifications) {
175 g_ptr_array_free(graph->notifications, TRUE);
176 graph->notifications = NULL;
177 }
178
179 if (graph->connections) {
180 BT_LOGD_STR("Destroying connections.");
181 g_ptr_array_free(graph->connections, TRUE);
182 graph->connections = NULL;
183 }
184
185 if (graph->components) {
186 BT_LOGD_STR("Destroying components.");
187 g_ptr_array_free(graph->components, TRUE);
188 graph->components = NULL;
189 }
190
191 if (graph->sinks_to_consume) {
192 g_queue_free(graph->sinks_to_consume);
193 graph->sinks_to_consume = NULL;
194 }
195
196 if (graph->listeners.source_output_port_added) {
197 g_array_free(graph->listeners.source_output_port_added, TRUE);
198 graph->listeners.source_output_port_added = NULL;
199 }
200
201 if (graph->listeners.filter_output_port_added) {
202 g_array_free(graph->listeners.filter_output_port_added, TRUE);
203 graph->listeners.filter_output_port_added = NULL;
204 }
205
206 if (graph->listeners.filter_input_port_added) {
207 g_array_free(graph->listeners.filter_input_port_added, TRUE);
208 graph->listeners.filter_input_port_added = NULL;
209 }
210
211 if (graph->listeners.sink_input_port_added) {
212 g_array_free(graph->listeners.sink_input_port_added, TRUE);
213 graph->listeners.sink_input_port_added = NULL;
214 }
215
216 if (graph->listeners.source_output_port_removed) {
217 g_array_free(graph->listeners.source_output_port_removed, TRUE);
218 graph->listeners.source_output_port_removed = NULL;
219 }
220
221 if (graph->listeners.filter_output_port_removed) {
222 g_array_free(graph->listeners.filter_output_port_removed, TRUE);
223 graph->listeners.filter_output_port_removed = NULL;
224 }
225
226 if (graph->listeners.filter_input_port_removed) {
227 g_array_free(graph->listeners.filter_input_port_removed, TRUE);
228 graph->listeners.filter_input_port_removed = NULL;
229 }
230
231 if (graph->listeners.sink_input_port_removed) {
232 g_array_free(graph->listeners.sink_input_port_removed, TRUE);
233 graph->listeners.sink_input_port_removed = NULL;
234 }
235
236 if (graph->listeners.source_filter_ports_connected) {
237 g_array_free(graph->listeners.source_filter_ports_connected,
238 TRUE);
239 graph->listeners.source_filter_ports_connected = NULL;
240 }
241
242 if (graph->listeners.source_sink_ports_connected) {
243 g_array_free(graph->listeners.source_sink_ports_connected,
244 TRUE);
245 graph->listeners.source_sink_ports_connected = NULL;
246 }
247
248 if (graph->listeners.filter_sink_ports_connected) {
249 g_array_free(graph->listeners.filter_sink_ports_connected,
250 TRUE);
251 graph->listeners.filter_sink_ports_connected = NULL;
252 }
253
254 if (graph->listeners.source_filter_ports_disconnected) {
255 g_array_free(graph->listeners.source_filter_ports_disconnected,
256 TRUE);
257 graph->listeners.source_filter_ports_disconnected = NULL;
258 }
259
260 if (graph->listeners.source_sink_ports_disconnected) {
261 g_array_free(graph->listeners.source_sink_ports_disconnected,
262 TRUE);
263 graph->listeners.source_sink_ports_disconnected = NULL;
264 }
265
266 if (graph->listeners.filter_sink_ports_disconnected) {
267 g_array_free(graph->listeners.filter_sink_ports_disconnected,
268 TRUE);
269 graph->listeners.filter_sink_ports_disconnected = NULL;
270 }
271
272 bt_object_pool_finalize(&graph->event_notif_pool);
273 bt_object_pool_finalize(&graph->packet_begin_notif_pool);
274 bt_object_pool_finalize(&graph->packet_end_notif_pool);
275 g_free(graph);
276 }
277
278 static
279 void destroy_notification_event(struct bt_notification *notif,
280 struct bt_graph *graph)
281 {
282 bt_notification_event_destroy(notif);
283 }
284
285 static
286 void destroy_notification_packet_begin(struct bt_notification *notif,
287 struct bt_graph *graph)
288 {
289 bt_notification_packet_begin_destroy(notif);
290 }
291
292 static
293 void destroy_notification_packet_end(struct bt_notification *notif,
294 struct bt_graph *graph)
295 {
296 bt_notification_packet_end_destroy(notif);
297 }
298
299 static
300 void notify_notification_graph_is_destroyed(struct bt_notification *notif)
301 {
302 bt_notification_unlink_graph(notif);
303 }
304
305 struct bt_private_graph *bt_private_graph_create(void)
306 {
307 struct bt_graph *graph;
308 int ret;
309
310 BT_LOGD_STR("Creating graph object.");
311 graph = g_new0(struct bt_graph, 1);
312 if (!graph) {
313 BT_LOGE_STR("Failed to allocate one graph.");
314 goto end;
315 }
316
317 bt_object_init_shared(&graph->base, destroy_graph);
318 graph->connections = g_ptr_array_new_with_free_func(
319 (GDestroyNotify) bt_object_try_spec_release);
320 if (!graph->connections) {
321 BT_LOGE_STR("Failed to allocate one GPtrArray.");
322 goto error;
323 }
324 graph->components = g_ptr_array_new_with_free_func(
325 (GDestroyNotify) bt_object_try_spec_release);
326 if (!graph->components) {
327 BT_LOGE_STR("Failed to allocate one GPtrArray.");
328 goto error;
329 }
330 graph->sinks_to_consume = g_queue_new();
331 if (!graph->sinks_to_consume) {
332 BT_LOGE_STR("Failed to allocate one GQueue.");
333 goto error;
334 }
335
336 bt_graph_set_can_consume(graph, true);
337 INIT_LISTENERS_ARRAY(struct bt_graph_listener_port_added,
338 graph->listeners.source_output_port_added);
339
340 if (!graph->listeners.source_output_port_added) {
341 ret = -1;
342 goto error;
343 }
344
345 INIT_LISTENERS_ARRAY(struct bt_graph_listener_port_added,
346 graph->listeners.filter_output_port_added);
347
348 if (!graph->listeners.filter_output_port_added) {
349 ret = -1;
350 goto error;
351 }
352
353 INIT_LISTENERS_ARRAY(struct bt_graph_listener_port_added,
354 graph->listeners.filter_input_port_added);
355
356 if (!graph->listeners.filter_input_port_added) {
357 ret = -1;
358 goto error;
359 }
360
361 INIT_LISTENERS_ARRAY(struct bt_graph_listener_port_added,
362 graph->listeners.sink_input_port_added);
363
364 if (!graph->listeners.sink_input_port_added) {
365 ret = -1;
366 goto error;
367 }
368
369 INIT_LISTENERS_ARRAY(struct bt_graph_listener_port_removed,
370 graph->listeners.source_output_port_removed);
371
372 if (!graph->listeners.source_output_port_removed) {
373 ret = -1;
374 goto error;
375 }
376
377 INIT_LISTENERS_ARRAY(struct bt_graph_listener_port_removed,
378 graph->listeners.filter_output_port_removed);
379
380 if (!graph->listeners.filter_output_port_removed) {
381 ret = -1;
382 goto error;
383 }
384
385 INIT_LISTENERS_ARRAY(struct bt_graph_listener_port_removed,
386 graph->listeners.filter_input_port_removed);
387
388 if (!graph->listeners.filter_input_port_removed) {
389 ret = -1;
390 goto error;
391 }
392
393 INIT_LISTENERS_ARRAY(struct bt_graph_listener_port_removed,
394 graph->listeners.sink_input_port_removed);
395
396 if (!graph->listeners.sink_input_port_removed) {
397 ret = -1;
398 goto error;
399 }
400
401 INIT_LISTENERS_ARRAY(struct bt_graph_listener_ports_connected,
402 graph->listeners.source_filter_ports_connected);
403
404 if (!graph->listeners.source_filter_ports_connected) {
405 ret = -1;
406 goto error;
407 }
408
409 INIT_LISTENERS_ARRAY(struct bt_graph_listener_ports_connected,
410 graph->listeners.source_sink_ports_connected);
411
412 if (!graph->listeners.source_sink_ports_connected) {
413 ret = -1;
414 goto error;
415 }
416
417 INIT_LISTENERS_ARRAY(struct bt_graph_listener_ports_connected,
418 graph->listeners.filter_sink_ports_connected);
419
420 if (!graph->listeners.filter_sink_ports_connected) {
421 ret = -1;
422 goto error;
423 }
424
425 INIT_LISTENERS_ARRAY(struct bt_graph_listener_ports_disconnected,
426 graph->listeners.source_filter_ports_disconnected);
427
428 if (!graph->listeners.source_filter_ports_disconnected) {
429 ret = -1;
430 goto error;
431 }
432
433 INIT_LISTENERS_ARRAY(struct bt_graph_listener_ports_disconnected,
434 graph->listeners.source_sink_ports_disconnected);
435
436 if (!graph->listeners.source_sink_ports_disconnected) {
437 ret = -1;
438 goto error;
439 }
440
441 INIT_LISTENERS_ARRAY(struct bt_graph_listener_ports_disconnected,
442 graph->listeners.filter_sink_ports_disconnected);
443
444 if (!graph->listeners.filter_sink_ports_disconnected) {
445 ret = -1;
446 goto error;
447 }
448
449 ret = bt_object_pool_initialize(&graph->event_notif_pool,
450 (bt_object_pool_new_object_func) bt_notification_event_new,
451 (bt_object_pool_destroy_object_func) destroy_notification_event,
452 graph);
453 if (ret) {
454 BT_LOGE("Failed to initialize event notification pool: ret=%d",
455 ret);
456 goto error;
457 }
458
459 ret = bt_object_pool_initialize(&graph->packet_begin_notif_pool,
460 (bt_object_pool_new_object_func) bt_notification_packet_begin_new,
461 (bt_object_pool_destroy_object_func) destroy_notification_packet_begin,
462 graph);
463 if (ret) {
464 BT_LOGE("Failed to initialize packet beginning notification pool: ret=%d",
465 ret);
466 goto error;
467 }
468
469 ret = bt_object_pool_initialize(&graph->packet_end_notif_pool,
470 (bt_object_pool_new_object_func) bt_notification_packet_end_new,
471 (bt_object_pool_destroy_object_func) destroy_notification_packet_end,
472 graph);
473 if (ret) {
474 BT_LOGE("Failed to initialize packet end notification pool: ret=%d",
475 ret);
476 goto error;
477 }
478
479 graph->notifications = g_ptr_array_new_with_free_func(
480 (GDestroyNotify) notify_notification_graph_is_destroyed);
481 BT_LIB_LOGD("Created graph object: %!+g", graph);
482
483 end:
484 return (void *) graph;
485
486 error:
487 BT_OBJECT_PUT_REF_AND_RESET(graph);
488 goto end;
489 }
490
491 enum bt_graph_status bt_private_graph_connect_ports(
492 struct bt_private_graph *priv_graph,
493 struct bt_port_output *upstream_port_out,
494 struct bt_port_input *downstream_port_in,
495 struct bt_connection **user_connection)
496 {
497 struct bt_graph *graph = (void *) priv_graph;
498 enum bt_graph_status status = BT_GRAPH_STATUS_OK;
499 struct bt_connection *connection = NULL;
500 struct bt_port *upstream_port = (void *) upstream_port_out;
501 struct bt_port *downstream_port = (void *) downstream_port_in;
502 struct bt_component *upstream_component = NULL;
503 struct bt_component *downstream_component = NULL;
504 enum bt_self_component_status component_status;
505 bool init_can_consume;
506
507 BT_ASSERT_PRE_NON_NULL(graph, "Graph");
508 BT_ASSERT_PRE_NON_NULL(upstream_port, "Upstream port");
509 BT_ASSERT_PRE_NON_NULL(downstream_port, "Downstream port port");
510 BT_ASSERT_PRE(!graph->canceled, "Graph is canceled: %!+g", graph);
511 BT_ASSERT_PRE(!bt_port_is_connected(upstream_port),
512 "Upstream port is already connected: %!+p", upstream_port);
513 BT_ASSERT_PRE(!bt_port_is_connected(downstream_port),
514 "Downstream port is already connected: %!+p", downstream_port);
515 BT_ASSERT_PRE(bt_port_borrow_component((void *) upstream_port),
516 "Upstream port does not belong to a component: %!+p",
517 upstream_port);
518 BT_ASSERT_PRE(bt_port_borrow_component((void *) downstream_port),
519 "Downstream port does not belong to a component: %!+p",
520 downstream_port);
521 init_can_consume = graph->can_consume;
522 BT_LIB_LOGD("Connecting component ports within graph: "
523 "%![graph-]+g, %![up-port-]+p, %![down-port-]+p",
524 graph, upstream_port, downstream_port);
525 bt_graph_set_can_consume(graph, false);
526 upstream_component = bt_port_borrow_component(
527 (void *) upstream_port);
528 downstream_component = bt_port_borrow_component(
529 (void *) downstream_port);
530
531 /*
532 * At this point the ports are not connected yet. Both
533 * components need to accept an eventual connection to their
534 * port by the other port before we continue.
535 */
536 BT_LIB_LOGD("Asking upstream component to accept the connection: "
537 "%![comp-]+c", upstream_component);
538 component_status = bt_component_accept_port_connection(
539 upstream_component, (void *) upstream_port,
540 (void *) downstream_port);
541 if (component_status != BT_SELF_COMPONENT_STATUS_OK) {
542 if (component_status == BT_SELF_COMPONENT_STATUS_REFUSE_PORT_CONNECTION) {
543 BT_LOGD_STR("Upstream component refused the connection.");
544 } else {
545 BT_LOGW("Cannot ask upstream component to accept the connection: "
546 "status=%s", bt_self_component_status_string(component_status));
547 }
548
549 status = (int) component_status;
550 goto end;
551 }
552
553 BT_LIB_LOGD("Asking downstream component to accept the connection: "
554 "%![comp-]+c", downstream_component);
555 component_status = bt_component_accept_port_connection(
556 downstream_component, (void *) downstream_port,
557 (void *) upstream_port);
558 if (component_status != BT_SELF_COMPONENT_STATUS_OK) {
559 if (component_status == BT_SELF_COMPONENT_STATUS_REFUSE_PORT_CONNECTION) {
560 BT_LOGD_STR("Downstream component refused the connection.");
561 } else {
562 BT_LOGW("Cannot ask downstream component to accept the connection: "
563 "status=%s", bt_self_component_status_string(component_status));
564 }
565
566 status = (int) component_status;
567 goto end;
568 }
569
570 BT_LOGD_STR("Creating connection.");
571 connection = bt_connection_create(graph, (void *) upstream_port,
572 (void *) downstream_port);
573 if (!connection) {
574 BT_LOGW("Cannot create connection object.");
575 status = BT_GRAPH_STATUS_NOMEM;
576 goto end;
577 }
578
579 BT_LIB_LOGD("Connection object created: %!+x", connection);
580
581 /*
582 * Ownership of upstream_component/downstream_component and of
583 * the connection object is transferred to the graph.
584 */
585 g_ptr_array_add(graph->connections, connection);
586
587 /*
588 * Notify both components that their port is connected.
589 */
590 BT_LIB_LOGD("Notifying upstream component that its port is connected: "
591 "%![comp-]+c, %![port-]+p", upstream_component, upstream_port);
592 component_status = bt_component_port_connected(upstream_component,
593 (void *) upstream_port, (void *) downstream_port);
594 if (component_status != BT_SELF_COMPONENT_STATUS_OK) {
595 BT_LIB_LOGW("Error while notifying upstream component that its port is connected: "
596 "status=%s, %![graph-]+g, %![up-comp-]+c, "
597 "%![down-comp-]+c, %![up-port-]+p, %![down-port-]+p",
598 bt_self_component_status_string(component_status),
599 graph, upstream_component, downstream_component,
600 upstream_port, downstream_port);
601 bt_connection_end(connection, true);
602 status = (int) component_status;
603 goto end;
604 }
605
606 connection->notified_upstream_port_connected = true;
607 BT_LIB_LOGD("Notifying downstream component that its port is connected: "
608 "%![comp-]+c, %![port-]+p", downstream_component,
609 downstream_port);
610 component_status = bt_component_port_connected(downstream_component,
611 (void *) downstream_port, (void *) upstream_port);
612 if (component_status != BT_SELF_COMPONENT_STATUS_OK) {
613 BT_LIB_LOGW("Error while notifying downstream component that its port is connected: "
614 "status=%s, %![graph-]+g, %![up-comp-]+c, "
615 "%![down-comp-]+c, %![up-port-]+p, %![down-port-]+p",
616 bt_self_component_status_string(component_status),
617 graph, upstream_component, downstream_component,
618 upstream_port, downstream_port);
619 bt_connection_end(connection, true);
620 status = (int) component_status;
621 goto end;
622 }
623
624 connection->notified_downstream_port_connected = true;
625
626 /*
627 * Notify the graph's creator that both ports are connected.
628 */
629 BT_LOGD_STR("Notifying graph's user that new component ports are connected.");
630 bt_graph_notify_ports_connected(graph, upstream_port, downstream_port);
631 connection->notified_graph_ports_connected = true;
632 BT_LIB_LOGD("Connected component ports within graph: "
633 "%![graph-]+g, %![up-comp-]+c, %![down-comp-]+c, "
634 "%![up-port-]+p, %![down-port-]+p",
635 graph, upstream_component, downstream_component,
636 upstream_port, downstream_port);
637
638 if (user_connection) {
639 /* Move reference to user */
640 *user_connection = connection;
641 connection = NULL;
642 }
643
644 end:
645 bt_object_put_ref(connection);
646 (void) init_can_consume;
647 bt_graph_set_can_consume(graph, init_can_consume);
648 return status;
649 }
650
651 static inline
652 enum bt_graph_status consume_graph_sink(struct bt_component_sink *comp)
653 {
654 enum bt_self_component_status comp_status;
655 struct bt_component_class_sink *sink_class = NULL;
656
657 BT_ASSERT(comp);
658 sink_class = (void *) comp->parent.class;
659 BT_ASSERT(sink_class->methods.consume);
660 BT_LIB_LOGD("Calling user's consume method: %!+c", comp);
661 comp_status = sink_class->methods.consume((void *) comp);
662 BT_LOGD("User method returned: status=%s",
663 bt_self_component_status_string(comp_status));
664 BT_ASSERT_PRE(comp_status == BT_SELF_COMPONENT_STATUS_OK ||
665 comp_status == BT_SELF_COMPONENT_STATUS_END ||
666 comp_status == BT_SELF_COMPONENT_STATUS_AGAIN ||
667 comp_status == BT_SELF_COMPONENT_STATUS_ERROR ||
668 comp_status == BT_SELF_COMPONENT_STATUS_NOMEM,
669 "Invalid component status returned by consuming method: "
670 "status=%s", bt_self_component_status_string(comp_status));
671 if (comp_status < 0) {
672 BT_LOGW_STR("Consume method failed.");
673 goto end;
674 }
675
676 BT_LIB_LOGV("Consumed from sink: %![comp-]+c, status=%s",
677 comp, bt_self_component_status_string(comp_status));
678
679 end:
680 return (int) comp_status;
681 }
682
683 /*
684 * `node` is removed from the queue of sinks to consume when passed to
685 * this function. This function adds it back to the queue if there's
686 * still something to consume afterwards.
687 */
688 static inline
689 enum bt_graph_status consume_sink_node(struct bt_graph *graph, GList *node)
690 {
691 enum bt_graph_status status;
692 struct bt_component_sink *sink;
693
694 sink = node->data;
695 status = consume_graph_sink(sink);
696 if (unlikely(status != BT_GRAPH_STATUS_END)) {
697 g_queue_push_tail_link(graph->sinks_to_consume, node);
698 goto end;
699 }
700
701 /* End reached, the node is not added back to the queue and free'd. */
702 g_queue_delete_link(graph->sinks_to_consume, node);
703
704 /* Don't forward an END status if there are sinks left to consume. */
705 if (!g_queue_is_empty(graph->sinks_to_consume)) {
706 status = BT_GRAPH_STATUS_OK;
707 goto end;
708 }
709
710 end:
711 BT_LIB_LOGV("Consumed sink node: %![comp-]+c, status=%s",
712 sink, bt_graph_status_string(status));
713 return status;
714 }
715
716 BT_HIDDEN
717 enum bt_graph_status bt_graph_consume_sink_no_check(struct bt_graph *graph,
718 struct bt_component_sink *sink)
719 {
720 enum bt_graph_status status;
721 GList *sink_node;
722 int index;
723
724 BT_LIB_LOGV("Making specific sink consume: %![comp-]+c", sink);
725 BT_ASSERT(bt_component_borrow_graph((void *) sink) == graph);
726
727 if (g_queue_is_empty(graph->sinks_to_consume)) {
728 BT_LOGV_STR("Graph's sink queue is empty: end of graph.");
729 status = BT_GRAPH_STATUS_END;
730 goto end;
731 }
732
733 index = g_queue_index(graph->sinks_to_consume, sink);
734 if (index < 0) {
735 BT_LOGV_STR("Sink is not marked as consumable: sink is ended.");
736 status = BT_GRAPH_STATUS_END;
737 goto end;
738 }
739
740 sink_node = g_queue_pop_nth_link(graph->sinks_to_consume, index);
741 BT_ASSERT(sink_node);
742 status = consume_sink_node(graph, sink_node);
743
744 end:
745 return status;
746 }
747
748 static inline
749 enum bt_graph_status consume_no_check(struct bt_graph *graph)
750 {
751 enum bt_graph_status status = BT_GRAPH_STATUS_OK;
752 struct bt_component *sink;
753 GList *current_node;
754
755 BT_ASSERT_PRE(graph->has_sink,
756 "Graph has no sink component: %!+g", graph);
757 BT_LIB_LOGV("Making next sink consume: %![graph-]+g", graph);
758
759 if (unlikely(g_queue_is_empty(graph->sinks_to_consume))) {
760 BT_LOGV_STR("Graph's sink queue is empty: end of graph.");
761 status = BT_GRAPH_STATUS_END;
762 goto end;
763 }
764
765 current_node = g_queue_pop_head_link(graph->sinks_to_consume);
766 sink = current_node->data;
767 BT_LIB_LOGV("Chose next sink to consume: %!+c", sink);
768 status = consume_sink_node(graph, current_node);
769
770 end:
771 return status;
772 }
773
774 enum bt_graph_status bt_private_graph_consume(
775 struct bt_private_graph *priv_graph)
776 {
777 struct bt_graph *graph = (void *) priv_graph;
778 enum bt_graph_status status;
779
780 BT_ASSERT_PRE_NON_NULL(graph, "Graph");
781 BT_ASSERT_PRE(!graph->canceled, "Graph is canceled: %!+g", graph);
782 BT_ASSERT_PRE(graph->can_consume,
783 "Cannot consume graph in its current state: %!+g", graph);
784 bt_graph_set_can_consume(graph, BT_FALSE);
785 status = consume_no_check(graph);
786 bt_graph_set_can_consume(graph, BT_TRUE);
787 return status;
788 }
789
790 enum bt_graph_status bt_private_graph_run(struct bt_private_graph *priv_graph)
791 {
792 struct bt_graph *graph = (void *) priv_graph;
793 enum bt_graph_status status = BT_GRAPH_STATUS_OK;
794
795 BT_ASSERT_PRE_NON_NULL(graph, "Graph");
796 BT_ASSERT_PRE(!graph->canceled, "Graph is canceled: %!+g", graph);
797 BT_ASSERT_PRE(graph->can_consume,
798 "Cannot consume graph in its current state: %!+g", graph);
799 bt_graph_set_can_consume(graph, BT_FALSE);
800 BT_LIB_LOGV("Running graph: %!+g", graph);
801
802 do {
803 /*
804 * Check if the graph is canceled at each iteration. If
805 * the graph was canceled by another thread or by a
806 * signal handler, this is not a warning nor an error,
807 * it was intentional: log with a DEBUG level only.
808 */
809 if (unlikely(graph->canceled)) {
810 BT_LIB_LOGD("Stopping the graph: graph is canceled: "
811 "%!+g", graph);
812 status = BT_GRAPH_STATUS_CANCELED;
813 goto end;
814 }
815
816 status = consume_no_check(graph);
817 if (unlikely(status == BT_GRAPH_STATUS_AGAIN)) {
818 /*
819 * If AGAIN is received and there are multiple
820 * sinks, go ahead and consume from the next
821 * sink.
822 *
823 * However, in the case where a single sink is
824 * left, the caller can decide to busy-wait and
825 * call bt_private_graph_run() continuously
826 * until the source is ready or it can decide to
827 * sleep for an arbitrary amount of time.
828 */
829 if (graph->sinks_to_consume->length > 1) {
830 status = BT_GRAPH_STATUS_OK;
831 }
832 } else if (status == BT_GRAPH_STATUS_NO_SINK) {
833 goto end;
834 }
835 } while (status == BT_GRAPH_STATUS_OK);
836
837 if (g_queue_is_empty(graph->sinks_to_consume)) {
838 status = BT_GRAPH_STATUS_END;
839 }
840
841 end:
842 BT_LIB_LOGV("Graph ran: %![graph-]+g, status=%s", graph,
843 bt_graph_status_string(status));
844 bt_graph_set_can_consume(graph, BT_TRUE);
845 return status;
846 }
847
848 enum bt_graph_status
849 bt_private_graph_add_source_component_output_port_added_listener(
850 struct bt_private_graph *priv_graph,
851 bt_private_graph_source_component_output_port_added_listener func,
852 bt_private_graph_listener_removed listener_removed, void *data,
853 int *out_listener_id)
854 {
855 struct bt_graph *graph = (void *) priv_graph;
856 struct bt_graph_listener_port_added listener = {
857 .base = {
858 .removed = listener_removed,
859 .data = data,
860 },
861 .func = (port_added_func_t) func,
862 };
863 int listener_id;
864
865 BT_ASSERT_PRE_NON_NULL(graph, "Graph");
866 BT_ASSERT_PRE_NON_NULL(func, "Listener");
867 BT_ASSERT_PRE_NON_NULL(func, "\"Listener removed\" listener");
868 BT_ASSERT_PRE(!graph->in_remove_listener,
869 "Graph currently executing a \"listener removed\" listener: "
870 "%!+g", graph);
871 g_array_append_val(graph->listeners.source_output_port_added, listener);
872 listener_id = graph->listeners.source_output_port_added->len - 1;
873 BT_LIB_LOGV("Added \"source component output port added\" listener to graph: "
874 "%![graph-]+g, listener-addr=%p, id=%d", graph, listener,
875 listener_id);
876
877 if (listener_id) {
878 *out_listener_id = listener_id;
879 }
880
881 return BT_GRAPH_STATUS_OK;
882 }
883
884 enum bt_graph_status
885 bt_private_graph_add_filter_component_output_port_added_listener(
886 struct bt_private_graph *priv_graph,
887 bt_private_graph_filter_component_output_port_added_listener func,
888 bt_private_graph_listener_removed listener_removed, void *data,
889 int *out_listener_id)
890 {
891 struct bt_graph *graph = (void *) priv_graph;
892 struct bt_graph_listener_port_added listener = {
893 .base = {
894 .removed = listener_removed,
895 .data = data,
896 },
897 .func = (port_added_func_t) func,
898 };
899 int listener_id;
900
901 BT_ASSERT_PRE_NON_NULL(graph, "Graph");
902 BT_ASSERT_PRE_NON_NULL(func, "Listener");
903 BT_ASSERT_PRE_NON_NULL(func, "\"Listener removed\" listener");
904 BT_ASSERT_PRE(!graph->in_remove_listener,
905 "Graph currently executing a \"listener removed\" listener: "
906 "%!+g", graph);
907 g_array_append_val(graph->listeners.filter_output_port_added, listener);
908 listener_id = graph->listeners.filter_output_port_added->len - 1;
909 BT_LIB_LOGV("Added \"filter component output port added\" listener to graph: "
910 "%![graph-]+g, listener-addr=%p, id=%d", graph, listener,
911 listener_id);
912
913 if (listener_id) {
914 *out_listener_id = listener_id;
915 }
916
917 return BT_GRAPH_STATUS_OK;
918 }
919
920 enum bt_graph_status
921 bt_private_graph_add_filter_component_input_port_added_listener(
922 struct bt_private_graph *priv_graph,
923 bt_private_graph_filter_component_input_port_added_listener func,
924 bt_private_graph_listener_removed listener_removed, void *data,
925 int *out_listener_id)
926 {
927 struct bt_graph *graph = (void *) priv_graph;
928 struct bt_graph_listener_port_added listener = {
929 .base = {
930 .removed = listener_removed,
931 .data = data,
932 },
933 .func = (port_added_func_t) func,
934 };
935 int listener_id;
936
937 BT_ASSERT_PRE_NON_NULL(graph, "Graph");
938 BT_ASSERT_PRE_NON_NULL(func, "Listener");
939 BT_ASSERT_PRE_NON_NULL(func, "\"Listener removed\" listener");
940 BT_ASSERT_PRE(!graph->in_remove_listener,
941 "Graph currently executing a \"listener removed\" listener: "
942 "%!+g", graph);
943 g_array_append_val(graph->listeners.filter_input_port_added, listener);
944 listener_id = graph->listeners.filter_input_port_added->len - 1;
945 BT_LIB_LOGV("Added \"filter component input port added\" listener to graph: "
946 "%![graph-]+g, listener-addr=%p, id=%d", graph, listener,
947 listener_id);
948
949 if (listener_id) {
950 *out_listener_id = listener_id;
951 }
952
953 return BT_GRAPH_STATUS_OK;
954 }
955
956 enum bt_graph_status
957 bt_private_graph_add_sink_component_input_port_added_listener(
958 struct bt_private_graph *priv_graph,
959 bt_private_graph_sink_component_input_port_added_listener func,
960 bt_private_graph_listener_removed listener_removed, void *data,
961 int *out_listener_id)
962 {
963 struct bt_graph *graph = (void *) priv_graph;
964 struct bt_graph_listener_port_added listener = {
965 .base = {
966 .removed = listener_removed,
967 .data = data,
968 },
969 .func = (port_added_func_t) func,
970 };
971 int listener_id;
972
973 BT_ASSERT_PRE_NON_NULL(graph, "Graph");
974 BT_ASSERT_PRE_NON_NULL(func, "Listener");
975 BT_ASSERT_PRE_NON_NULL(func, "\"Listener removed\" listener");
976 BT_ASSERT_PRE(!graph->in_remove_listener,
977 "Graph currently executing a \"listener removed\" listener: "
978 "%!+g", graph);
979 g_array_append_val(graph->listeners.sink_input_port_added, listener);
980 listener_id = graph->listeners.sink_input_port_added->len - 1;
981 BT_LIB_LOGV("Added \"sink component input port added\" listener to graph: "
982 "%![graph-]+g, listener-addr=%p, id=%d", graph, listener,
983 listener_id);
984
985 if (listener_id) {
986 *out_listener_id = listener_id;
987 }
988
989 return BT_GRAPH_STATUS_OK;
990 }
991
992 enum bt_graph_status
993 bt_private_graph_add_source_component_output_port_removed_listener(
994 struct bt_private_graph *priv_graph,
995 bt_private_graph_source_component_output_port_removed_listener func,
996 bt_private_graph_listener_removed listener_removed, void *data,
997 int *out_listener_id)
998 {
999 struct bt_graph *graph = (void *) priv_graph;
1000 struct bt_graph_listener_port_removed listener = {
1001 .base = {
1002 .removed = listener_removed,
1003 .data = data,
1004 },
1005 .func = (port_removed_func_t) func,
1006 };
1007 int listener_id;
1008
1009 BT_ASSERT_PRE_NON_NULL(graph, "Graph");
1010 BT_ASSERT_PRE_NON_NULL(func, "Listener");
1011 BT_ASSERT_PRE_NON_NULL(func, "\"Listener removed\" listener");
1012 BT_ASSERT_PRE(!graph->in_remove_listener,
1013 "Graph currently executing a \"listener removed\" listener: "
1014 "%!+g", graph);
1015 g_array_append_val(graph->listeners.source_output_port_removed, listener);
1016 listener_id = graph->listeners.source_output_port_removed->len - 1;
1017 BT_LIB_LOGV("Added \"source component output port removed\" listener to graph: "
1018 "%![graph-]+g, listener-addr=%p, id=%d", graph, listener,
1019 listener_id);
1020
1021 if (listener_id) {
1022 *out_listener_id = listener_id;
1023 }
1024
1025 return BT_GRAPH_STATUS_OK;
1026 }
1027
1028 enum bt_graph_status
1029 bt_private_graph_add_filter_component_output_port_removed_listener(
1030 struct bt_private_graph *priv_graph,
1031 bt_private_graph_filter_component_output_port_removed_listener func,
1032 bt_private_graph_listener_removed listener_removed, void *data,
1033 int *out_listener_id)
1034 {
1035 struct bt_graph *graph = (void *) priv_graph;
1036 struct bt_graph_listener_port_removed listener = {
1037 .base = {
1038 .removed = listener_removed,
1039 .data = data,
1040 },
1041 .func = (port_removed_func_t) func,
1042 };
1043 int listener_id;
1044
1045 BT_ASSERT_PRE_NON_NULL(graph, "Graph");
1046 BT_ASSERT_PRE_NON_NULL(func, "Listener");
1047 BT_ASSERT_PRE_NON_NULL(func, "\"Listener removed\" listener");
1048 BT_ASSERT_PRE(!graph->in_remove_listener,
1049 "Graph currently executing a \"listener removed\" listener: "
1050 "%!+g", graph);
1051 g_array_append_val(graph->listeners.filter_output_port_removed, listener);
1052 listener_id = graph->listeners.filter_output_port_removed->len - 1;
1053 BT_LIB_LOGV("Added \"filter component output port removed\" listener to graph: "
1054 "%![graph-]+g, listener-addr=%p, id=%d", graph, listener,
1055 listener_id);
1056
1057 if (listener_id) {
1058 *out_listener_id = listener_id;
1059 }
1060
1061 return BT_GRAPH_STATUS_OK;
1062 }
1063
1064 enum bt_graph_status
1065 bt_private_graph_add_filter_component_input_port_removed_listener(
1066 struct bt_private_graph *priv_graph,
1067 bt_private_graph_filter_component_input_port_removed_listener func,
1068 bt_private_graph_listener_removed listener_removed, void *data,
1069 int *out_listener_id)
1070 {
1071 struct bt_graph *graph = (void *) priv_graph;
1072 struct bt_graph_listener_port_removed listener = {
1073 .base = {
1074 .removed = listener_removed,
1075 .data = data,
1076 },
1077 .func = (port_removed_func_t) func,
1078 };
1079 int listener_id;
1080
1081 BT_ASSERT_PRE_NON_NULL(graph, "Graph");
1082 BT_ASSERT_PRE_NON_NULL(func, "Listener");
1083 BT_ASSERT_PRE_NON_NULL(func, "\"Listener removed\" listener");
1084 BT_ASSERT_PRE(!graph->in_remove_listener,
1085 "Graph currently executing a \"listener removed\" listener: "
1086 "%!+g", graph);
1087 g_array_append_val(graph->listeners.filter_input_port_removed, listener);
1088 listener_id = graph->listeners.filter_input_port_removed->len - 1;
1089 BT_LIB_LOGV("Added \"filter component input port removed\" listener to graph: "
1090 "%![graph-]+g, listener-addr=%p, id=%d", graph, listener,
1091 listener_id);
1092
1093 if (listener_id) {
1094 *out_listener_id = listener_id;
1095 }
1096
1097 return BT_GRAPH_STATUS_OK;
1098 }
1099
1100 enum bt_graph_status
1101 bt_private_graph_add_sink_component_input_port_removed_listener(
1102 struct bt_private_graph *priv_graph,
1103 bt_private_graph_sink_component_input_port_removed_listener func,
1104 bt_private_graph_listener_removed listener_removed, void *data,
1105 int *out_listener_id)
1106 {
1107 struct bt_graph *graph = (void *) priv_graph;
1108 struct bt_graph_listener_port_removed listener = {
1109 .base = {
1110 .removed = listener_removed,
1111 .data = data,
1112 },
1113 .func = (port_removed_func_t) func,
1114 };
1115 int listener_id;
1116
1117 BT_ASSERT_PRE_NON_NULL(graph, "Graph");
1118 BT_ASSERT_PRE_NON_NULL(func, "Listener");
1119 BT_ASSERT_PRE_NON_NULL(func, "\"Listener removed\" listener");
1120 BT_ASSERT_PRE(!graph->in_remove_listener,
1121 "Graph currently executing a \"listener removed\" listener: "
1122 "%!+g", graph);
1123 g_array_append_val(graph->listeners.sink_input_port_removed, listener);
1124 listener_id = graph->listeners.sink_input_port_removed->len - 1;
1125 BT_LIB_LOGV("Added \"sink component input port removed\" listener to graph: "
1126 "%![graph-]+g, listener-addr=%p, id=%d", graph, listener,
1127 listener_id);
1128
1129 if (listener_id) {
1130 *out_listener_id = listener_id;
1131 }
1132
1133 return BT_GRAPH_STATUS_OK;
1134 }
1135
1136 enum bt_graph_status
1137 bt_private_graph_add_source_filter_component_ports_connected_listener(
1138 struct bt_private_graph *priv_graph,
1139 bt_private_graph_source_filter_component_ports_connected_listener func,
1140 bt_private_graph_listener_removed listener_removed, void *data,
1141 int *out_listener_id)
1142 {
1143 struct bt_graph *graph = (void *) priv_graph;
1144 struct bt_graph_listener_ports_connected listener = {
1145 .base = {
1146 .removed = listener_removed,
1147 .data = data,
1148 },
1149 .func = (ports_connected_func_t) func,
1150 };
1151 int listener_id;
1152
1153 BT_ASSERT_PRE_NON_NULL(graph, "Graph");
1154 BT_ASSERT_PRE_NON_NULL(func, "Listener");
1155 BT_ASSERT_PRE_NON_NULL(func, "\"Listener removed\" listener");
1156 BT_ASSERT_PRE(!graph->in_remove_listener,
1157 "Graph currently executing a \"listener removed\" listener: "
1158 "%!+g", graph);
1159 g_array_append_val(graph->listeners.source_filter_ports_connected,
1160 listener);
1161 listener_id = graph->listeners.source_filter_ports_connected->len - 1;
1162 BT_LIB_LOGV("Added \"source to filter component ports connected\" listener to graph: "
1163 "%![graph-]+g, listener-addr=%p, id=%d", graph, listener,
1164 listener_id);
1165
1166 if (listener_id) {
1167 *out_listener_id = listener_id;
1168 }
1169
1170 return BT_GRAPH_STATUS_OK;
1171 }
1172
1173 enum bt_graph_status
1174 bt_private_graph_add_source_sink_component_ports_connected_listener(
1175 struct bt_private_graph *priv_graph,
1176 bt_private_graph_source_sink_component_ports_connected_listener func,
1177 bt_private_graph_listener_removed listener_removed, void *data,
1178 int *out_listener_id)
1179 {
1180 struct bt_graph *graph = (void *) priv_graph;
1181 struct bt_graph_listener_ports_connected listener = {
1182 .base = {
1183 .removed = listener_removed,
1184 .data = data,
1185 },
1186 .func = (ports_connected_func_t) func,
1187 };
1188 int listener_id;
1189
1190 BT_ASSERT_PRE_NON_NULL(graph, "Graph");
1191 BT_ASSERT_PRE_NON_NULL(func, "Listener");
1192 BT_ASSERT_PRE_NON_NULL(func, "\"Listener removed\" listener");
1193 BT_ASSERT_PRE(!graph->in_remove_listener,
1194 "Graph currently executing a \"listener removed\" listener: "
1195 "%!+g", graph);
1196 g_array_append_val(graph->listeners.source_sink_ports_connected,
1197 listener);
1198 listener_id = graph->listeners.source_sink_ports_connected->len - 1;
1199 BT_LIB_LOGV("Added \"source to sink component ports connected\" listener to graph: "
1200 "%![graph-]+g, listener-addr=%p, id=%d", graph, listener,
1201 listener_id);
1202
1203 if (listener_id) {
1204 *out_listener_id = listener_id;
1205 }
1206
1207 return BT_GRAPH_STATUS_OK;
1208 }
1209
1210 enum bt_graph_status
1211 bt_private_graph_add_filter_sink_component_ports_connected_listener(
1212 struct bt_private_graph *priv_graph,
1213 bt_private_graph_filter_sink_component_ports_connected_listener func,
1214 bt_private_graph_listener_removed listener_removed, void *data,
1215 int *out_listener_id)
1216 {
1217 struct bt_graph *graph = (void *) priv_graph;
1218 struct bt_graph_listener_ports_connected listener = {
1219 .base = {
1220 .removed = listener_removed,
1221 .data = data,
1222 },
1223 .func = (ports_connected_func_t) func,
1224 };
1225 int listener_id;
1226
1227 BT_ASSERT_PRE_NON_NULL(graph, "Graph");
1228 BT_ASSERT_PRE_NON_NULL(func, "Listener");
1229 BT_ASSERT_PRE_NON_NULL(func, "\"Listener removed\" listener");
1230 BT_ASSERT_PRE(!graph->in_remove_listener,
1231 "Graph currently executing a \"listener removed\" listener: "
1232 "%!+g", graph);
1233 g_array_append_val(graph->listeners.filter_sink_ports_connected,
1234 listener);
1235 listener_id = graph->listeners.filter_sink_ports_connected->len - 1;
1236 BT_LIB_LOGV("Added \"filter to sink component ports connected\" listener to graph: "
1237 "%![graph-]+g, listener-addr=%p, id=%d", graph, listener,
1238 listener_id);
1239
1240 if (listener_id) {
1241 *out_listener_id = listener_id;
1242 }
1243
1244 return BT_GRAPH_STATUS_OK;
1245 }
1246
1247 enum bt_graph_status
1248 bt_private_graph_add_source_filter_component_ports_disconnected_listener(
1249 struct bt_private_graph *priv_graph,
1250 bt_private_graph_source_filter_component_ports_disconnected_listener func,
1251 bt_private_graph_listener_removed listener_removed, void *data,
1252 int *out_listener_id)
1253 {
1254 struct bt_graph *graph = (void *) priv_graph;
1255 struct bt_graph_listener_ports_disconnected listener = {
1256 .base = {
1257 .removed = listener_removed,
1258 .data = data,
1259 },
1260 .func = (ports_disconnected_func_t) func,
1261 };
1262 int listener_id;
1263
1264 BT_ASSERT_PRE_NON_NULL(graph, "Graph");
1265 BT_ASSERT_PRE_NON_NULL(func, "Listener");
1266 BT_ASSERT_PRE_NON_NULL(func, "\"Listener removed\" listener");
1267 BT_ASSERT_PRE(!graph->in_remove_listener,
1268 "Graph currently executing a \"listener removed\" listener: "
1269 "%!+g", graph);
1270 g_array_append_val(graph->listeners.source_filter_ports_disconnected,
1271 listener);
1272 listener_id = graph->listeners.source_filter_ports_disconnected->len - 1;
1273 BT_LIB_LOGV("Added \"source to filter component ports disconnected\" listener to graph: "
1274 "%![graph-]+g, listener-addr=%p, id=%d", graph, listener,
1275 listener_id);
1276
1277 if (listener_id) {
1278 *out_listener_id = listener_id;
1279 }
1280
1281 return BT_GRAPH_STATUS_OK;
1282 }
1283
1284 enum bt_graph_status
1285 bt_private_graph_add_source_sink_component_ports_disconnected_listener(
1286 struct bt_private_graph *priv_graph,
1287 bt_private_graph_source_sink_component_ports_disconnected_listener func,
1288 bt_private_graph_listener_removed listener_removed, void *data,
1289 int *out_listener_id)
1290 {
1291 struct bt_graph *graph = (void *) priv_graph;
1292 struct bt_graph_listener_ports_disconnected listener = {
1293 .base = {
1294 .removed = listener_removed,
1295 .data = data,
1296 },
1297 .func = (ports_disconnected_func_t) func,
1298 };
1299 int listener_id;
1300
1301 BT_ASSERT_PRE_NON_NULL(graph, "Graph");
1302 BT_ASSERT_PRE_NON_NULL(func, "Listener");
1303 BT_ASSERT_PRE_NON_NULL(func, "\"Listener removed\" listener");
1304 BT_ASSERT_PRE(!graph->in_remove_listener,
1305 "Graph currently executing a \"listener removed\" listener: "
1306 "%!+g", graph);
1307 g_array_append_val(graph->listeners.source_sink_ports_disconnected,
1308 listener);
1309 listener_id = graph->listeners.source_sink_ports_disconnected->len - 1;
1310 BT_LIB_LOGV("Added \"source to sink component ports disconnected\" listener to graph: "
1311 "%![graph-]+g, listener-addr=%p, id=%d", graph, listener,
1312 listener_id);
1313
1314 if (listener_id) {
1315 *out_listener_id = listener_id;
1316 }
1317
1318 return BT_GRAPH_STATUS_OK;
1319 }
1320
1321 enum bt_graph_status
1322 bt_private_graph_add_filter_sink_component_ports_disconnected_listener(
1323 struct bt_private_graph *priv_graph,
1324 bt_private_graph_filter_sink_component_ports_disconnected_listener func,
1325 bt_private_graph_listener_removed listener_removed, void *data,
1326 int *out_listener_id)
1327 {
1328 struct bt_graph *graph = (void *) priv_graph;
1329 struct bt_graph_listener_ports_disconnected listener = {
1330 .base = {
1331 .removed = listener_removed,
1332 .data = data,
1333 },
1334 .func = (ports_disconnected_func_t) func,
1335 };
1336 int listener_id;
1337
1338 BT_ASSERT_PRE_NON_NULL(graph, "Graph");
1339 BT_ASSERT_PRE_NON_NULL(func, "Listener");
1340 BT_ASSERT_PRE_NON_NULL(func, "\"Listener removed\" listener");
1341 BT_ASSERT_PRE(!graph->in_remove_listener,
1342 "Graph currently executing a \"listener removed\" listener: "
1343 "%!+g", graph);
1344 g_array_append_val(graph->listeners.filter_sink_ports_disconnected,
1345 listener);
1346 listener_id = graph->listeners.filter_sink_ports_disconnected->len - 1;
1347 BT_LIB_LOGV("Added \"filter to sink component ports disconnected\" listener to graph: "
1348 "%![graph-]+g, listener-addr=%p, id=%d", graph, listener,
1349 listener_id);
1350
1351 if (listener_id) {
1352 *out_listener_id = listener_id;
1353 }
1354
1355 return BT_GRAPH_STATUS_OK;
1356 }
1357
1358 BT_HIDDEN
1359 void bt_graph_notify_port_added(struct bt_graph *graph, struct bt_port *port)
1360 {
1361 uint64_t i;
1362 GArray *listeners;
1363 struct bt_component *comp;
1364
1365 BT_ASSERT(graph);
1366 BT_ASSERT(port);
1367 BT_LIB_LOGV("Notifying graph listeners that a port was added: "
1368 "%![graph-]+g, %![port-]+p", graph, port);
1369 comp = bt_port_borrow_component(port);
1370 BT_ASSERT(comp);
1371
1372 switch (comp->class->type) {
1373 case BT_COMPONENT_CLASS_TYPE_SOURCE:
1374 {
1375 switch (port->type) {
1376 case BT_PORT_TYPE_OUTPUT:
1377 listeners = graph->listeners.source_output_port_added;
1378 break;
1379 default:
1380 abort();
1381 }
1382
1383 break;
1384 }
1385 case BT_COMPONENT_CLASS_TYPE_FILTER:
1386 {
1387 switch (port->type) {
1388 case BT_PORT_TYPE_INPUT:
1389 listeners = graph->listeners.filter_input_port_added;
1390 break;
1391 case BT_PORT_TYPE_OUTPUT:
1392 listeners = graph->listeners.filter_output_port_added;
1393 break;
1394 default:
1395 abort();
1396 }
1397
1398 break;
1399 }
1400 case BT_COMPONENT_CLASS_TYPE_SINK:
1401 {
1402 switch (port->type) {
1403 case BT_PORT_TYPE_INPUT:
1404 listeners = graph->listeners.sink_input_port_added;
1405 break;
1406 default:
1407 abort();
1408 }
1409
1410 break;
1411 }
1412 default:
1413 abort();
1414 }
1415
1416 for (i = 0; i < listeners->len; i++) {
1417 struct bt_graph_listener_port_added *listener =
1418 &g_array_index(listeners,
1419 struct bt_graph_listener_port_added, i);
1420
1421 BT_ASSERT(listener->func);
1422 listener->func(comp, port, listener->base.data);
1423 }
1424 }
1425
1426 BT_HIDDEN
1427 void bt_graph_notify_port_removed(struct bt_graph *graph,
1428 struct bt_component *comp, struct bt_port *port)
1429 {
1430 uint64_t i;
1431 GArray *listeners;
1432
1433 BT_ASSERT(graph);
1434 BT_ASSERT(port);
1435 BT_LIB_LOGV("Notifying graph listeners that a port was removed: "
1436 "%![graph-]+g, %![comp-]+c, %![port-]+p", graph, comp, port);
1437
1438 switch (comp->class->type) {
1439 case BT_COMPONENT_CLASS_TYPE_SOURCE:
1440 {
1441 switch (port->type) {
1442 case BT_PORT_TYPE_OUTPUT:
1443 listeners = graph->listeners.source_output_port_removed;
1444 break;
1445 default:
1446 abort();
1447 }
1448
1449 break;
1450 }
1451 case BT_COMPONENT_CLASS_TYPE_FILTER:
1452 {
1453 switch (port->type) {
1454 case BT_PORT_TYPE_INPUT:
1455 listeners = graph->listeners.filter_input_port_removed;
1456 break;
1457 case BT_PORT_TYPE_OUTPUT:
1458 listeners = graph->listeners.filter_output_port_removed;
1459 break;
1460 default:
1461 abort();
1462 }
1463
1464 break;
1465 }
1466 case BT_COMPONENT_CLASS_TYPE_SINK:
1467 {
1468 switch (port->type) {
1469 case BT_PORT_TYPE_INPUT:
1470 listeners = graph->listeners.sink_input_port_removed;
1471 break;
1472 default:
1473 abort();
1474 }
1475
1476 break;
1477 }
1478 default:
1479 abort();
1480 }
1481
1482 for (i = 0; i < listeners->len; i++) {
1483 struct bt_graph_listener_port_removed *listener =
1484 &g_array_index(listeners,
1485 struct bt_graph_listener_port_removed, i);
1486
1487 BT_ASSERT(listener->func);
1488 listener->func(comp, port, listener->base.data);
1489 }
1490 }
1491
1492 BT_HIDDEN
1493 void bt_graph_notify_ports_connected(struct bt_graph *graph,
1494 struct bt_port *upstream_port, struct bt_port *downstream_port)
1495 {
1496 uint64_t i;
1497 GArray *listeners;
1498 struct bt_component *upstream_comp;
1499 struct bt_component *downstream_comp;
1500
1501 BT_ASSERT(graph);
1502 BT_ASSERT(upstream_port);
1503 BT_ASSERT(downstream_port);
1504 BT_LIB_LOGV("Notifying graph listeners that ports were connected: "
1505 "%![graph-]+g, %![up-port-]+p, %![down-port-]+p",
1506 graph, upstream_port, downstream_port);
1507 upstream_comp = bt_port_borrow_component(upstream_port);
1508 BT_ASSERT(upstream_comp);
1509 downstream_comp = bt_port_borrow_component(downstream_port);
1510 BT_ASSERT(downstream_comp);
1511
1512 switch (upstream_comp->class->type) {
1513 case BT_COMPONENT_CLASS_TYPE_SOURCE:
1514 {
1515 switch (downstream_comp->class->type) {
1516 case BT_COMPONENT_CLASS_TYPE_FILTER:
1517 listeners =
1518 graph->listeners.source_filter_ports_connected;
1519 break;
1520 case BT_COMPONENT_CLASS_TYPE_SINK:
1521 listeners =
1522 graph->listeners.source_sink_ports_connected;
1523 break;
1524 default:
1525 abort();
1526 }
1527
1528 break;
1529 }
1530 case BT_COMPONENT_CLASS_TYPE_FILTER:
1531 {
1532 switch (downstream_comp->class->type) {
1533 case BT_COMPONENT_CLASS_TYPE_SINK:
1534 listeners =
1535 graph->listeners.filter_sink_ports_connected;
1536 break;
1537 default:
1538 abort();
1539 }
1540
1541 break;
1542 }
1543 default:
1544 abort();
1545 }
1546
1547 for (i = 0; i < listeners->len; i++) {
1548 struct bt_graph_listener_ports_connected *listener =
1549 &g_array_index(listeners,
1550 struct bt_graph_listener_ports_connected, i);
1551
1552 BT_ASSERT(listener->func);
1553 listener->func(upstream_comp, downstream_comp,
1554 upstream_port, downstream_port, listener->base.data);
1555 }
1556 }
1557
1558 BT_HIDDEN
1559 void bt_graph_notify_ports_disconnected(struct bt_graph *graph,
1560 struct bt_component *upstream_comp,
1561 struct bt_component *downstream_comp,
1562 struct bt_port *upstream_port,
1563 struct bt_port *downstream_port)
1564 {
1565 uint64_t i;
1566 GArray *listeners;
1567
1568 BT_ASSERT(graph);
1569 BT_ASSERT(upstream_comp);
1570 BT_ASSERT(downstream_comp);
1571 BT_ASSERT(upstream_port);
1572 BT_ASSERT(downstream_port);
1573 BT_LIB_LOGV("Notifying graph listeners that ports were disconnected: "
1574 "%![graph-]+g, %![up-port-]+p, %![down-port-]+p, "
1575 "%![up-comp-]+c, %![down-comp-]+c",
1576 graph, upstream_port, downstream_port, upstream_comp,
1577 downstream_comp);
1578
1579 switch (upstream_comp->class->type) {
1580 case BT_COMPONENT_CLASS_TYPE_SOURCE:
1581 {
1582 switch (downstream_comp->class->type) {
1583 case BT_COMPONENT_CLASS_TYPE_FILTER:
1584 listeners =
1585 graph->listeners.source_filter_ports_disconnected;
1586 break;
1587 case BT_COMPONENT_CLASS_TYPE_SINK:
1588 listeners =
1589 graph->listeners.source_sink_ports_disconnected;
1590 break;
1591 default:
1592 abort();
1593 }
1594
1595 break;
1596 }
1597 case BT_COMPONENT_CLASS_TYPE_FILTER:
1598 {
1599 switch (downstream_comp->class->type) {
1600 case BT_COMPONENT_CLASS_TYPE_SINK:
1601 listeners =
1602 graph->listeners.filter_sink_ports_disconnected;
1603 break;
1604 default:
1605 abort();
1606 }
1607
1608 break;
1609 }
1610 default:
1611 abort();
1612 }
1613
1614 for (i = 0; i < listeners->len; i++) {
1615 struct bt_graph_listener_ports_disconnected *listener =
1616 &g_array_index(listeners,
1617 struct bt_graph_listener_ports_disconnected, i);
1618
1619 BT_ASSERT(listener->func);
1620 listener->func(upstream_comp, downstream_comp,
1621 upstream_port, downstream_port, listener->base.data);
1622 }
1623 }
1624
1625 enum bt_graph_status bt_private_graph_cancel(
1626 struct bt_private_graph *priv_graph)
1627 {
1628 struct bt_graph *graph = (void *) priv_graph;
1629
1630 BT_ASSERT_PRE_NON_NULL(graph, "Graph");
1631 graph->canceled = true;
1632 BT_LIB_LOGV("Canceled graph: %!+i", graph);
1633 return BT_GRAPH_STATUS_OK;
1634 }
1635
1636 bt_bool bt_graph_is_canceled(struct bt_graph *graph)
1637 {
1638 BT_ASSERT_PRE_NON_NULL(graph, "Graph");
1639 return graph->canceled ? BT_TRUE : BT_FALSE;
1640 }
1641
1642 BT_HIDDEN
1643 void bt_graph_remove_connection(struct bt_graph *graph,
1644 struct bt_connection *connection)
1645 {
1646 BT_ASSERT(graph);
1647 BT_ASSERT(connection);
1648 BT_LIB_LOGV("Removing graph's connection: %![graph-]+g, %![conn-]+x",
1649 graph, connection);
1650 g_ptr_array_remove(graph->connections, connection);
1651 }
1652
1653 BT_ASSERT_PRE_FUNC
1654 static inline
1655 bool component_name_exists(struct bt_graph *graph, const char *name)
1656 {
1657 bool exists = false;
1658 uint64_t i;
1659
1660 for (i = 0; i < graph->components->len; i++) {
1661 struct bt_component *other_comp = graph->components->pdata[i];
1662
1663 if (strcmp(name, bt_component_get_name(other_comp)) == 0) {
1664 BT_ASSERT_PRE_MSG("Another component with the same name already exists in the graph: "
1665 "%![other-comp-]+c, name=\"%s\"",
1666 other_comp, name);
1667 exists = true;
1668 goto end;
1669 }
1670 }
1671
1672 end:
1673 return exists;
1674 }
1675
1676 static
1677 enum bt_graph_status add_component_with_init_method_data(
1678 struct bt_private_graph *priv_graph,
1679 struct bt_component_class *comp_cls,
1680 comp_init_method_t init_method,
1681 const char *name, struct bt_value *params,
1682 void *init_method_data, struct bt_component **user_component)
1683 {
1684 struct bt_graph *graph = (void *) priv_graph;
1685 enum bt_graph_status graph_status = BT_GRAPH_STATUS_OK;
1686 enum bt_self_component_status comp_status;
1687 struct bt_component *component = NULL;
1688 int ret;
1689 bool init_can_consume;
1690
1691 BT_ASSERT(comp_cls);
1692 BT_ASSERT_PRE_NON_NULL(graph, "Graph");
1693 BT_ASSERT_PRE_NON_NULL(name, "Name");
1694 BT_ASSERT_PRE(!graph->canceled, "Graph is canceled: %!+g", graph);
1695 BT_ASSERT_PRE(!component_name_exists(graph, name),
1696 "Duplicate component name: %!+g, name=\"%s\"", graph, name);
1697 BT_ASSERT_PRE(!params || bt_value_is_map(params),
1698 "Parameter value is not a map value: %!+v", params);
1699 bt_object_get_ref(params);
1700 init_can_consume = graph->can_consume;
1701 bt_graph_set_can_consume(graph, false);
1702 BT_LIB_LOGD("Adding component to graph: "
1703 "%![graph-]+g, %![cc-]+C, name=\"%s\", %![params-]+v, "
1704 "init-method-data-addr=%p",
1705 graph, comp_cls, name, params, init_method_data);
1706
1707 if (!params) {
1708 params = bt_private_value_as_value(
1709 bt_private_value_map_create());
1710 if (!params) {
1711 BT_LOGE_STR("Cannot create map value object.");
1712 graph_status = BT_GRAPH_STATUS_NOMEM;
1713 goto end;
1714 }
1715 }
1716
1717 ret = bt_component_create(comp_cls, name, &component);
1718 if (ret) {
1719 BT_LOGE("Cannot create empty component object: ret=%d",
1720 ret);
1721 graph_status = BT_GRAPH_STATUS_NOMEM;
1722 goto end;
1723 }
1724
1725 /*
1726 * The user's initialization method needs to see that this
1727 * component is part of the graph. If the user method fails, we
1728 * immediately remove the component from the graph's components.
1729 */
1730 g_ptr_array_add(graph->components, component);
1731 bt_component_set_graph(component, graph);
1732
1733 if (init_method) {
1734 BT_LOGD_STR("Calling user's initialization method.");
1735 comp_status = init_method(component, params, init_method_data);
1736 BT_LOGD("User method returned: status=%s",
1737 bt_self_component_status_string(comp_status));
1738 if (comp_status != BT_SELF_COMPONENT_STATUS_OK) {
1739 BT_LOGW_STR("Initialization method failed.");
1740 graph_status = (int) comp_status;
1741 bt_component_set_graph(component, NULL);
1742 g_ptr_array_remove_fast(graph->components, component);
1743 goto end;
1744 }
1745 }
1746
1747 /*
1748 * Mark the component as initialized so that its finalization
1749 * method is called when it is destroyed.
1750 */
1751 component->initialized = true;
1752
1753 /*
1754 * If it's a sink component, it needs to be part of the graph's
1755 * sink queue to be consumed by bt_graph_consume().
1756 */
1757 if (bt_component_is_sink(component)) {
1758 graph->has_sink = true;
1759 g_queue_push_tail(graph->sinks_to_consume, component);
1760 }
1761
1762 /*
1763 * Freeze the component class now that it's instantiated at
1764 * least once.
1765 */
1766 BT_LOGD_STR("Freezing component class.");
1767 bt_component_class_freeze(comp_cls);
1768 BT_LIB_LOGD("Added component to graph: "
1769 "%![graph-]+g, %![cc-]+C, name=\"%s\", %![params-]+v, "
1770 "init-method-data-addr=%p, %![comp-]+c",
1771 graph, comp_cls, name, params, init_method_data, component);
1772
1773 if (user_component) {
1774 /* Move reference to user */
1775 *user_component = component;
1776 component = NULL;
1777 }
1778
1779 end:
1780 bt_object_put_ref(component);
1781 bt_object_put_ref(params);
1782 (void) init_can_consume;
1783 bt_graph_set_can_consume(graph, init_can_consume);
1784 return graph_status;
1785 }
1786
1787 enum bt_graph_status
1788 bt_private_graph_add_source_component_with_init_method_data(
1789 struct bt_private_graph *graph,
1790 struct bt_component_class_source *comp_cls,
1791 const char *name, struct bt_value *params,
1792 void *init_method_data, struct bt_component_source **component)
1793 {
1794 BT_ASSERT_PRE_NON_NULL(comp_cls, "Component class");
1795 return add_component_with_init_method_data(graph,
1796 (void *) comp_cls, (comp_init_method_t) comp_cls->methods.init,
1797 name, params, init_method_data, (void *) component);
1798 }
1799
1800 enum bt_graph_status bt_private_graph_add_source_component(
1801 struct bt_private_graph *graph,
1802 struct bt_component_class_source *comp_cls,
1803 const char *name, struct bt_value *params,
1804 struct bt_component_source **component)
1805 {
1806 return bt_private_graph_add_source_component_with_init_method_data(
1807 graph, comp_cls, name, params, NULL, component);
1808 }
1809
1810 enum bt_graph_status
1811 bt_private_graph_add_filter_component_with_init_method_data(
1812 struct bt_private_graph *graph,
1813 struct bt_component_class_filter *comp_cls,
1814 const char *name, struct bt_value *params,
1815 void *init_method_data, struct bt_component_filter **component)
1816 {
1817 BT_ASSERT_PRE_NON_NULL(comp_cls, "Component class");
1818 return add_component_with_init_method_data(graph,
1819 (void *) comp_cls, (comp_init_method_t) comp_cls->methods.init,
1820 name, params, init_method_data, (void *) component);
1821 }
1822
1823 enum bt_graph_status bt_private_graph_add_filter_component(
1824 struct bt_private_graph *graph,
1825 struct bt_component_class_filter *comp_cls,
1826 const char *name, struct bt_value *params,
1827 struct bt_component_filter **component)
1828 {
1829 return bt_private_graph_add_filter_component_with_init_method_data(
1830 graph, comp_cls, name, params, NULL, component);
1831 }
1832
1833 enum bt_graph_status
1834 bt_private_graph_add_sink_component_with_init_method_data(
1835 struct bt_private_graph *graph,
1836 struct bt_component_class_sink *comp_cls,
1837 const char *name, struct bt_value *params,
1838 void *init_method_data, struct bt_component_sink **component)
1839 {
1840 BT_ASSERT_PRE_NON_NULL(comp_cls, "Component class");
1841 return add_component_with_init_method_data(graph,
1842 (void *) comp_cls, (comp_init_method_t) comp_cls->methods.init,
1843 name, params, init_method_data, (void *) component);
1844 }
1845
1846 enum bt_graph_status bt_private_graph_add_sink_component(
1847 struct bt_private_graph *graph,
1848 struct bt_component_class_sink *comp_cls,
1849 const char *name, struct bt_value *params,
1850 struct bt_component_sink **component)
1851 {
1852 return bt_private_graph_add_sink_component_with_init_method_data(
1853 graph, comp_cls, name, params, NULL, component);
1854 }
1855
1856 BT_HIDDEN
1857 int bt_graph_remove_unconnected_component(struct bt_graph *graph,
1858 struct bt_component *component)
1859 {
1860 bool init_can_consume;
1861 uint64_t count;
1862 uint64_t i;
1863 int ret = 0;
1864
1865 BT_ASSERT(graph);
1866 BT_ASSERT(component);
1867 BT_ASSERT(component->base.ref_count == 0);
1868 BT_ASSERT(bt_component_borrow_graph(component) == graph);
1869
1870 init_can_consume = graph->can_consume;
1871 count = bt_component_get_input_port_count(component);
1872
1873 for (i = 0; i < count; i++) {
1874 struct bt_port *port = (void *)
1875 bt_component_borrow_input_port_by_index(component, i);
1876
1877 BT_ASSERT(port);
1878
1879 if (bt_port_is_connected(port)) {
1880 BT_LIB_LOGW("Cannot remove component from graph: "
1881 "an input port is connected: "
1882 "%![graph-]+g, %![comp-]+c, %![port-]+p",
1883 graph, component, port);
1884 goto error;
1885 }
1886 }
1887
1888 count = bt_component_get_output_port_count(component);
1889
1890 for (i = 0; i < count; i++) {
1891 struct bt_port *port = (void *)
1892 bt_component_borrow_output_port_by_index(component, i);
1893
1894 BT_ASSERT(port);
1895
1896 if (bt_port_is_connected(port)) {
1897 BT_LIB_LOGW("Cannot remove component from graph: "
1898 "an output port is connected: "
1899 "%![graph-]+g, %![comp-]+c, %![port-]+p",
1900 graph, component, port);
1901 goto error;
1902 }
1903 }
1904
1905 bt_graph_set_can_consume(graph, false);
1906
1907 /* Possibly remove from sinks to consume */
1908 (void) g_queue_remove(graph->sinks_to_consume, component);
1909
1910 if (graph->sinks_to_consume->length == 0) {
1911 graph->has_sink = false;
1912 }
1913
1914 /*
1915 * This calls bt_object_try_spec_release() on the component, and
1916 * since its reference count is 0, its destructor is called. Its
1917 * destructor calls the user's finalization method (if set).
1918 */
1919 g_ptr_array_remove(graph->components, component);
1920 goto end;
1921
1922 error:
1923 ret = -1;
1924
1925 end:
1926 (void) init_can_consume;
1927 bt_graph_set_can_consume(graph, init_can_consume);
1928 return ret;
1929 }
1930
1931 BT_HIDDEN
1932 void bt_graph_add_notification(struct bt_graph *graph,
1933 struct bt_notification *notif)
1934 {
1935 BT_ASSERT(graph);
1936 BT_ASSERT(notif);
1937
1938 /*
1939 * It's okay not to take a reference because, when a
1940 * notification's reference count drops to 0, either:
1941 *
1942 * * It is recycled back to one of this graph's pool.
1943 * * It is destroyed because it doesn't have any link to any
1944 * graph, which means the original graph is already destroyed.
1945 */
1946 g_ptr_array_add(graph->notifications, notif);
1947 }
This page took 0.067122 seconds and 3 git commands to generate.