Cleanup: remove private babeltrace.h
[babeltrace.git] / src / lib / graph / graph.c
CommitLineData
c0418dd9 1/*
e2f7325d 2 * Copyright 2017-2018 Philippe Proulx <pproulx@efficios.com>
f60c8b34 3 * Copyright 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
c0418dd9
JG
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a copy
6 * of this software and associated documentation files (the "Software"), to deal
7 * in the Software without restriction, including without limitation the rights
8 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 * copies of the Software, and to permit persons to whom the Software is
10 * furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21 * SOFTWARE.
22 */
23
262e5473 24#define BT_LOG_TAG "GRAPH"
578e048b 25#include "lib/lib-logging.h"
262e5473 26
578e048b
MJ
27#include "common/assert.h"
28#include "lib/assert-pre.h"
3fadfbc0
MJ
29#include <babeltrace2/graph/graph.h>
30#include <babeltrace2/graph/graph-const.h>
3fadfbc0
MJ
31#include <babeltrace2/graph/component-source-const.h>
32#include <babeltrace2/graph/component-filter-const.h>
33#include <babeltrace2/graph/port-const.h>
578e048b
MJ
34#include "lib/graph/message/message.h"
35#include "compat/compiler.h"
36#include "common/common.h"
3fadfbc0
MJ
37#include <babeltrace2/types.h>
38#include <babeltrace2/value.h>
39#include <babeltrace2/value-const.h>
578e048b 40#include "lib/value.h"
f60c8b34 41#include <unistd.h>
1bf957a0
PP
42#include <glib.h>
43
578e048b
MJ
44#include "component.h"
45#include "component-sink.h"
46#include "connection.h"
47#include "graph.h"
48#include "message/event.h"
49#include "message/packet.h"
50
8cc56726
SM
51typedef enum bt_graph_listener_status (*port_added_func_t)(
52 const void *, const void *, void *);
0d72b8c3 53
8cc56726
SM
54typedef enum bt_graph_listener_status (*ports_connected_func_t)(
55 const void *, const void *, const void *, const void *, void *);
0d72b8c3 56
0d72b8c3
PP
57typedef enum bt_self_component_status (*comp_init_method_t)(const void *,
58 const void *, void *);
d94d92ac 59
1bf957a0 60struct bt_graph_listener {
0d72b8c3 61 bt_graph_listener_removed_func removed;
1bf957a0
PP
62 void *data;
63};
c0418dd9 64
d94d92ac
PP
65struct bt_graph_listener_port_added {
66 struct bt_graph_listener base;
67 port_added_func_t func;
68};
8cc092c9 69
d94d92ac
PP
70struct bt_graph_listener_ports_connected {
71 struct bt_graph_listener base;
72 ports_connected_func_t func;
73};
8cc092c9 74
d94d92ac
PP
75#define INIT_LISTENERS_ARRAY(_type, _listeners) \
76 do { \
77 _listeners = g_array_new(FALSE, TRUE, sizeof(_type)); \
78 if (!(_listeners)) { \
79 BT_LOGE_STR("Failed to allocate one GArray."); \
80 } \
81 } while (0)
82
83#define CALL_REMOVE_LISTENERS(_type, _listeners) \
84 do { \
85 size_t i; \
86 \
9fde7203
FD
87 if (!_listeners) { \
88 break; \
89 } \
d94d92ac
PP
90 for (i = 0; i < (_listeners)->len; i++) { \
91 _type *listener = \
92 &g_array_index((_listeners), _type, i); \
93 \
94 if (listener->base.removed) { \
95 listener->base.removed(listener->base.data); \
96 } \
97 } \
98 } while (0)
8cc092c9 99
f60c8b34 100static
d94d92ac 101void destroy_graph(struct bt_object *obj)
c0418dd9 102{
0d72b8c3 103 struct bt_graph *graph = container_of(obj, struct bt_graph, base);
c0418dd9 104
bd14d768
PP
105 /*
106 * The graph's reference count is 0 if we're here. Increment
107 * it to avoid a double-destroy (possibly infinitely recursive)
108 * in this situation:
109 *
110 * 1. We put and destroy a connection.
d0fea130
PP
111 * 2. This connection's destructor finalizes its active message
112 * iterators.
113 * 3. A message iterator's finalization function gets a new
114 * reference on its component (reference count goes from 0 to
115 * 1).
bd14d768
PP
116 * 4. Since this component's reference count goes to 1, it takes
117 * a reference on its parent (this graph). This graph's
118 * reference count goes from 0 to 1.
d6e69534 119 * 5. The message iterator's finalization function puts its
bd14d768
PP
120 * component reference (reference count goes from 1 to 0).
121 * 6. Since this component's reference count goes from 1 to 0,
122 * it puts its parent (this graph). This graph's reference
123 * count goes from 1 to 0.
d0fea130
PP
124 * 7. Since this graph's reference count goes from 1 to 0, its
125 * destructor is called (this function).
bd14d768
PP
126 *
127 * With the incrementation below, the graph's reference count at
128 * step 4 goes from 1 to 2, and from 2 to 1 at step 6. This
129 * ensures that this function is not called two times.
130 */
d94d92ac 131 BT_LIB_LOGD("Destroying graph: %!+g", graph);
3fea54f6 132 obj->ref_count++;
bd14d768 133
49682acd
PP
134 /*
135 * Cancel the graph to disallow some operations, like creating
d6e69534 136 * message iterators and adding ports to components.
49682acd 137 */
0d72b8c3 138 (void) bt_graph_cancel((void *) graph);
49682acd 139
8cc092c9 140 /* Call all remove listeners */
d94d92ac
PP
141 CALL_REMOVE_LISTENERS(struct bt_graph_listener_port_added,
142 graph->listeners.source_output_port_added);
143 CALL_REMOVE_LISTENERS(struct bt_graph_listener_port_added,
144 graph->listeners.filter_output_port_added);
145 CALL_REMOVE_LISTENERS(struct bt_graph_listener_port_added,
146 graph->listeners.filter_input_port_added);
147 CALL_REMOVE_LISTENERS(struct bt_graph_listener_port_added,
148 graph->listeners.sink_input_port_added);
d94d92ac
PP
149 CALL_REMOVE_LISTENERS(struct bt_graph_listener_ports_connected,
150 graph->listeners.source_filter_ports_connected);
9c0a126a
PP
151 CALL_REMOVE_LISTENERS(struct bt_graph_listener_ports_connected,
152 graph->listeners.filter_filter_ports_connected);
d94d92ac
PP
153 CALL_REMOVE_LISTENERS(struct bt_graph_listener_ports_connected,
154 graph->listeners.source_sink_ports_connected);
155 CALL_REMOVE_LISTENERS(struct bt_graph_listener_ports_connected,
156 graph->listeners.filter_sink_ports_connected);
8cc092c9 157
d6e69534
PP
158 if (graph->messages) {
159 g_ptr_array_free(graph->messages, TRUE);
160 graph->messages = NULL;
5c563278
PP
161 }
162
c0418dd9 163 if (graph->connections) {
262e5473 164 BT_LOGD_STR("Destroying connections.");
c0418dd9 165 g_ptr_array_free(graph->connections, TRUE);
d94d92ac 166 graph->connections = NULL;
c0418dd9 167 }
5c563278 168
bd14d768 169 if (graph->components) {
262e5473 170 BT_LOGD_STR("Destroying components.");
bd14d768 171 g_ptr_array_free(graph->components, TRUE);
d94d92ac 172 graph->components = NULL;
bd14d768 173 }
5c563278 174
f60c8b34
JG
175 if (graph->sinks_to_consume) {
176 g_queue_free(graph->sinks_to_consume);
d94d92ac
PP
177 graph->sinks_to_consume = NULL;
178 }
179
180 if (graph->listeners.source_output_port_added) {
181 g_array_free(graph->listeners.source_output_port_added, TRUE);
182 graph->listeners.source_output_port_added = NULL;
183 }
184
185 if (graph->listeners.filter_output_port_added) {
186 g_array_free(graph->listeners.filter_output_port_added, TRUE);
187 graph->listeners.filter_output_port_added = NULL;
188 }
189
190 if (graph->listeners.filter_input_port_added) {
191 g_array_free(graph->listeners.filter_input_port_added, TRUE);
192 graph->listeners.filter_input_port_added = NULL;
c0418dd9 193 }
1bf957a0 194
d94d92ac
PP
195 if (graph->listeners.sink_input_port_added) {
196 g_array_free(graph->listeners.sink_input_port_added, TRUE);
197 graph->listeners.sink_input_port_added = NULL;
1bf957a0
PP
198 }
199
d94d92ac
PP
200 if (graph->listeners.source_filter_ports_connected) {
201 g_array_free(graph->listeners.source_filter_ports_connected,
202 TRUE);
203 graph->listeners.source_filter_ports_connected = NULL;
204 }
205
9c0a126a
PP
206 if (graph->listeners.filter_filter_ports_connected) {
207 g_array_free(graph->listeners.filter_filter_ports_connected,
208 TRUE);
209 graph->listeners.filter_filter_ports_connected = NULL;
210 }
211
d94d92ac
PP
212 if (graph->listeners.source_sink_ports_connected) {
213 g_array_free(graph->listeners.source_sink_ports_connected,
214 TRUE);
215 graph->listeners.source_sink_ports_connected = NULL;
216 }
217
218 if (graph->listeners.filter_sink_ports_connected) {
219 g_array_free(graph->listeners.filter_sink_ports_connected,
220 TRUE);
221 graph->listeners.filter_sink_ports_connected = NULL;
222 }
223
d6e69534
PP
224 bt_object_pool_finalize(&graph->event_msg_pool);
225 bt_object_pool_finalize(&graph->packet_begin_msg_pool);
226 bt_object_pool_finalize(&graph->packet_end_msg_pool);
c0418dd9
JG
227 g_free(graph);
228}
229
5c563278 230static
d6e69534 231void destroy_message_event(struct bt_message *msg,
5c563278
PP
232 struct bt_graph *graph)
233{
d6e69534 234 bt_message_event_destroy(msg);
5c563278
PP
235}
236
237static
d6e69534 238void destroy_message_packet_begin(struct bt_message *msg,
5c563278
PP
239 struct bt_graph *graph)
240{
5df26c89 241 bt_message_packet_destroy(msg);
5c563278
PP
242}
243
244static
d6e69534 245void destroy_message_packet_end(struct bt_message *msg,
5c563278
PP
246 struct bt_graph *graph)
247{
5df26c89 248 bt_message_packet_destroy(msg);
5c563278
PP
249}
250
251static
d6e69534 252void notify_message_graph_is_destroyed(struct bt_message *msg)
5c563278 253{
d6e69534 254 bt_message_unlink_graph(msg);
5c563278
PP
255}
256
0d72b8c3 257struct bt_graph *bt_graph_create(void)
c0418dd9 258{
f60c8b34 259 struct bt_graph *graph;
1bf957a0 260 int ret;
c0418dd9 261
262e5473 262 BT_LOGD_STR("Creating graph object.");
f60c8b34 263 graph = g_new0(struct bt_graph, 1);
c0418dd9 264 if (!graph) {
262e5473 265 BT_LOGE_STR("Failed to allocate one graph.");
c0418dd9
JG
266 goto end;
267 }
268
d94d92ac 269 bt_object_init_shared(&graph->base, destroy_graph);
3fea54f6
PP
270 graph->connections = g_ptr_array_new_with_free_func(
271 (GDestroyNotify) bt_object_try_spec_release);
c0418dd9 272 if (!graph->connections) {
262e5473 273 BT_LOGE_STR("Failed to allocate one GPtrArray.");
c0418dd9
JG
274 goto error;
275 }
3fea54f6
PP
276 graph->components = g_ptr_array_new_with_free_func(
277 (GDestroyNotify) bt_object_try_spec_release);
f60c8b34 278 if (!graph->components) {
262e5473 279 BT_LOGE_STR("Failed to allocate one GPtrArray.");
f60c8b34
JG
280 goto error;
281 }
282 graph->sinks_to_consume = g_queue_new();
283 if (!graph->sinks_to_consume) {
262e5473 284 BT_LOGE_STR("Failed to allocate one GQueue.");
c0418dd9
JG
285 goto error;
286 }
1bf957a0 287
d94d92ac
PP
288 bt_graph_set_can_consume(graph, true);
289 INIT_LISTENERS_ARRAY(struct bt_graph_listener_port_added,
290 graph->listeners.source_output_port_added);
291
292 if (!graph->listeners.source_output_port_added) {
293 ret = -1;
1bf957a0
PP
294 goto error;
295 }
296
d94d92ac
PP
297 INIT_LISTENERS_ARRAY(struct bt_graph_listener_port_added,
298 graph->listeners.filter_output_port_added);
299
300 if (!graph->listeners.filter_output_port_added) {
301 ret = -1;
1bf957a0
PP
302 goto error;
303 }
304
d94d92ac
PP
305 INIT_LISTENERS_ARRAY(struct bt_graph_listener_port_added,
306 graph->listeners.filter_input_port_added);
307
308 if (!graph->listeners.filter_input_port_added) {
309 ret = -1;
1bf957a0
PP
310 goto error;
311 }
312
d94d92ac
PP
313 INIT_LISTENERS_ARRAY(struct bt_graph_listener_port_added,
314 graph->listeners.sink_input_port_added);
315
316 if (!graph->listeners.sink_input_port_added) {
317 ret = -1;
318 goto error;
319 }
320
d94d92ac
PP
321 INIT_LISTENERS_ARRAY(struct bt_graph_listener_ports_connected,
322 graph->listeners.source_filter_ports_connected);
323
324 if (!graph->listeners.source_filter_ports_connected) {
325 ret = -1;
326 goto error;
327 }
328
329 INIT_LISTENERS_ARRAY(struct bt_graph_listener_ports_connected,
330 graph->listeners.source_sink_ports_connected);
331
332 if (!graph->listeners.source_sink_ports_connected) {
333 ret = -1;
334 goto error;
335 }
336
9c0a126a
PP
337 INIT_LISTENERS_ARRAY(struct bt_graph_listener_ports_connected,
338 graph->listeners.filter_filter_ports_connected);
339
340 if (!graph->listeners.filter_filter_ports_connected) {
341 ret = -1;
342 goto error;
343 }
344
d94d92ac
PP
345 INIT_LISTENERS_ARRAY(struct bt_graph_listener_ports_connected,
346 graph->listeners.filter_sink_ports_connected);
347
348 if (!graph->listeners.filter_sink_ports_connected) {
349 ret = -1;
350 goto error;
351 }
352
d6e69534
PP
353 ret = bt_object_pool_initialize(&graph->event_msg_pool,
354 (bt_object_pool_new_object_func) bt_message_event_new,
355 (bt_object_pool_destroy_object_func) destroy_message_event,
5c563278
PP
356 graph);
357 if (ret) {
d6e69534 358 BT_LOGE("Failed to initialize event message pool: ret=%d",
5c563278
PP
359 ret);
360 goto error;
361 }
362
d6e69534
PP
363 ret = bt_object_pool_initialize(&graph->packet_begin_msg_pool,
364 (bt_object_pool_new_object_func) bt_message_packet_beginning_new,
365 (bt_object_pool_destroy_object_func) destroy_message_packet_begin,
5c563278
PP
366 graph);
367 if (ret) {
d6e69534 368 BT_LOGE("Failed to initialize packet beginning message pool: ret=%d",
5c563278
PP
369 ret);
370 goto error;
371 }
372
d6e69534
PP
373 ret = bt_object_pool_initialize(&graph->packet_end_msg_pool,
374 (bt_object_pool_new_object_func) bt_message_packet_end_new,
375 (bt_object_pool_destroy_object_func) destroy_message_packet_end,
5c563278
PP
376 graph);
377 if (ret) {
d6e69534 378 BT_LOGE("Failed to initialize packet end message pool: ret=%d",
5c563278
PP
379 ret);
380 goto error;
381 }
382
d6e69534
PP
383 graph->messages = g_ptr_array_new_with_free_func(
384 (GDestroyNotify) notify_message_graph_is_destroyed);
d94d92ac 385 BT_LIB_LOGD("Created graph object: %!+g", graph);
262e5473 386
c0418dd9 387end:
a2d06fd5 388 return (void *) graph;
d94d92ac 389
c0418dd9 390error:
65300d60 391 BT_OBJECT_PUT_REF_AND_RESET(graph);
c0418dd9
JG
392 goto end;
393}
394
0d72b8c3
PP
395enum bt_graph_status bt_graph_connect_ports(
396 struct bt_graph *graph,
397 const struct bt_port_output *upstream_port_out,
398 const struct bt_port_input *downstream_port_in,
399 const struct bt_connection **user_connection)
f60c8b34 400{
a256a42d 401 enum bt_graph_status status = BT_GRAPH_STATUS_OK;
8cc56726 402 enum bt_graph_listener_status listener_status;
f60c8b34 403 struct bt_connection *connection = NULL;
d94d92ac
PP
404 struct bt_port *upstream_port = (void *) upstream_port_out;
405 struct bt_port *downstream_port = (void *) downstream_port_in;
f60c8b34
JG
406 struct bt_component *upstream_component = NULL;
407 struct bt_component *downstream_component = NULL;
d94d92ac
PP
408 enum bt_self_component_status component_status;
409 bool init_can_consume;
f60c8b34 410
d94d92ac
PP
411 BT_ASSERT_PRE_NON_NULL(graph, "Graph");
412 BT_ASSERT_PRE_NON_NULL(upstream_port, "Upstream port");
413 BT_ASSERT_PRE_NON_NULL(downstream_port, "Downstream port port");
414 BT_ASSERT_PRE(!graph->canceled, "Graph is canceled: %!+g", graph);
5badd463
PP
415 BT_ASSERT_PRE(
416 graph->config_state == BT_GRAPH_CONFIGURATION_STATE_CONFIGURING,
38cda5da 417 "Graph is not in the \"configuring\" state: %!+g", graph);
d94d92ac
PP
418 BT_ASSERT_PRE(!bt_port_is_connected(upstream_port),
419 "Upstream port is already connected: %!+p", upstream_port);
420 BT_ASSERT_PRE(!bt_port_is_connected(downstream_port),
421 "Downstream port is already connected: %!+p", downstream_port);
0d72b8c3 422 BT_ASSERT_PRE(bt_port_borrow_component_inline((void *) upstream_port),
d94d92ac
PP
423 "Upstream port does not belong to a component: %!+p",
424 upstream_port);
0d72b8c3 425 BT_ASSERT_PRE(bt_port_borrow_component_inline((void *) downstream_port),
d94d92ac
PP
426 "Downstream port does not belong to a component: %!+p",
427 downstream_port);
4aa7981f 428 init_can_consume = graph->can_consume;
d94d92ac
PP
429 BT_LIB_LOGD("Connecting component ports within graph: "
430 "%![graph-]+g, %![up-port-]+p, %![down-port-]+p",
431 graph, upstream_port, downstream_port);
432 bt_graph_set_can_consume(graph, false);
0d72b8c3 433 upstream_component = bt_port_borrow_component_inline(
d94d92ac 434 (void *) upstream_port);
0d72b8c3 435 downstream_component = bt_port_borrow_component_inline(
d94d92ac 436 (void *) downstream_port);
262e5473 437
0d8b4d8e
PP
438 /*
439 * At this point the ports are not connected yet. Both
440 * components need to accept an eventual connection to their
441 * port by the other port before we continue.
442 */
d94d92ac
PP
443 BT_LIB_LOGD("Asking upstream component to accept the connection: "
444 "%![comp-]+c", upstream_component);
0d8b4d8e 445 component_status = bt_component_accept_port_connection(
d94d92ac
PP
446 upstream_component, (void *) upstream_port,
447 (void *) downstream_port);
448 if (component_status != BT_SELF_COMPONENT_STATUS_OK) {
449 if (component_status == BT_SELF_COMPONENT_STATUS_REFUSE_PORT_CONNECTION) {
262e5473
PP
450 BT_LOGD_STR("Upstream component refused the connection.");
451 } else {
452 BT_LOGW("Cannot ask upstream component to accept the connection: "
d94d92ac 453 "status=%s", bt_self_component_status_string(component_status));
262e5473
PP
454 }
455
d94d92ac 456 status = (int) component_status;
a256a42d 457 goto end;
0d8b4d8e 458 }
262e5473 459
d94d92ac
PP
460 BT_LIB_LOGD("Asking downstream component to accept the connection: "
461 "%![comp-]+c", downstream_component);
0d8b4d8e 462 component_status = bt_component_accept_port_connection(
d94d92ac
PP
463 downstream_component, (void *) downstream_port,
464 (void *) upstream_port);
465 if (component_status != BT_SELF_COMPONENT_STATUS_OK) {
466 if (component_status == BT_SELF_COMPONENT_STATUS_REFUSE_PORT_CONNECTION) {
262e5473
PP
467 BT_LOGD_STR("Downstream component refused the connection.");
468 } else {
469 BT_LOGW("Cannot ask downstream component to accept the connection: "
d94d92ac 470 "status=%s", bt_self_component_status_string(component_status));
262e5473
PP
471 }
472
d94d92ac 473 status = (int) component_status;
a256a42d 474 goto end;
0d8b4d8e
PP
475 }
476
262e5473 477 BT_LOGD_STR("Creating connection.");
d94d92ac
PP
478 connection = bt_connection_create(graph, (void *) upstream_port,
479 (void *) downstream_port);
f60c8b34 480 if (!connection) {
262e5473 481 BT_LOGW("Cannot create connection object.");
a256a42d
PP
482 status = BT_GRAPH_STATUS_NOMEM;
483 goto end;
f60c8b34
JG
484 }
485
d94d92ac 486 BT_LIB_LOGD("Connection object created: %!+x", connection);
262e5473 487
f60c8b34 488 /*
72b913fb
PP
489 * Ownership of upstream_component/downstream_component and of
490 * the connection object is transferred to the graph.
f60c8b34
JG
491 */
492 g_ptr_array_add(graph->connections, connection);
ffeb0eed 493
f60c8b34 494 /*
0d8b4d8e 495 * Notify both components that their port is connected.
f60c8b34 496 */
d94d92ac
PP
497 BT_LIB_LOGD("Notifying upstream component that its port is connected: "
498 "%![comp-]+c, %![port-]+p", upstream_component, upstream_port);
bf55043c 499 component_status = bt_component_port_connected(upstream_component,
d94d92ac
PP
500 (void *) upstream_port, (void *) downstream_port);
501 if (component_status != BT_SELF_COMPONENT_STATUS_OK) {
502 BT_LIB_LOGW("Error while notifying upstream component that its port is connected: "
503 "status=%s, %![graph-]+g, %![up-comp-]+c, "
504 "%![down-comp-]+c, %![up-port-]+p, %![down-port-]+p",
505 bt_self_component_status_string(component_status),
506 graph, upstream_component, downstream_component,
507 upstream_port, downstream_port);
bf55043c 508 bt_connection_end(connection, true);
d94d92ac 509 status = (int) component_status;
bf55043c
PP
510 goto end;
511 }
512
85031ceb 513 connection->notified_upstream_port_connected = true;
d94d92ac
PP
514 BT_LIB_LOGD("Notifying downstream component that its port is connected: "
515 "%![comp-]+c, %![port-]+p", downstream_component,
516 downstream_port);
bf55043c 517 component_status = bt_component_port_connected(downstream_component,
d94d92ac
PP
518 (void *) downstream_port, (void *) upstream_port);
519 if (component_status != BT_SELF_COMPONENT_STATUS_OK) {
520 BT_LIB_LOGW("Error while notifying downstream component that its port is connected: "
521 "status=%s, %![graph-]+g, %![up-comp-]+c, "
522 "%![down-comp-]+c, %![up-port-]+p, %![down-port-]+p",
523 bt_self_component_status_string(component_status),
524 graph, upstream_component, downstream_component,
525 upstream_port, downstream_port);
bf55043c 526 bt_connection_end(connection, true);
d94d92ac 527 status = (int) component_status;
bf55043c
PP
528 goto end;
529 }
530
85031ceb 531 connection->notified_downstream_port_connected = true;
1bf957a0
PP
532
533 /*
0d8b4d8e 534 * Notify the graph's creator that both ports are connected.
1bf957a0 535 */
262e5473 536 BT_LOGD_STR("Notifying graph's user that new component ports are connected.");
8cc56726
SM
537 listener_status = bt_graph_notify_ports_connected(graph, upstream_port, downstream_port);
538 if (listener_status != BT_GRAPH_LISTENER_STATUS_OK) {
539 status = (int) listener_status;
540 goto end;
541 }
542
85031ceb 543 connection->notified_graph_ports_connected = true;
d94d92ac
PP
544 BT_LIB_LOGD("Connected component ports within graph: "
545 "%![graph-]+g, %![up-comp-]+c, %![down-comp-]+c, "
546 "%![up-port-]+p, %![down-port-]+p",
547 graph, upstream_component, downstream_component,
548 upstream_port, downstream_port);
1bf957a0 549
a256a42d 550 if (user_connection) {
1a6a376a
PP
551 /* Move reference to user */
552 *user_connection = connection;
553 connection = NULL;
a256a42d
PP
554 }
555
f60c8b34 556end:
38cda5da 557 if (status != BT_GRAPH_STATUS_OK) {
8cc56726 558 bt_graph_make_faulty(graph);
38cda5da
PP
559 }
560
65300d60 561 bt_object_put_ref(connection);
d94d92ac
PP
562 (void) init_can_consume;
563 bt_graph_set_can_consume(graph, init_can_consume);
a256a42d 564 return status;
f60c8b34
JG
565}
566
ad847455 567static inline
d94d92ac 568enum bt_graph_status consume_graph_sink(struct bt_component_sink *comp)
c0418dd9 569{
d94d92ac
PP
570 enum bt_self_component_status comp_status;
571 struct bt_component_class_sink *sink_class = NULL;
572
573 BT_ASSERT(comp);
574 sink_class = (void *) comp->parent.class;
575 BT_ASSERT(sink_class->methods.consume);
576 BT_LIB_LOGD("Calling user's consume method: %!+c", comp);
577 comp_status = sink_class->methods.consume((void *) comp);
578 BT_LOGD("User method returned: status=%s",
579 bt_self_component_status_string(comp_status));
580 BT_ASSERT_PRE(comp_status == BT_SELF_COMPONENT_STATUS_OK ||
581 comp_status == BT_SELF_COMPONENT_STATUS_END ||
582 comp_status == BT_SELF_COMPONENT_STATUS_AGAIN ||
583 comp_status == BT_SELF_COMPONENT_STATUS_ERROR ||
584 comp_status == BT_SELF_COMPONENT_STATUS_NOMEM,
585 "Invalid component status returned by consuming method: "
586 "status=%s", bt_self_component_status_string(comp_status));
587 if (comp_status < 0) {
588 BT_LOGW_STR("Consume method failed.");
589 goto end;
590 }
591
592 BT_LIB_LOGV("Consumed from sink: %![comp-]+c, status=%s",
593 comp, bt_self_component_status_string(comp_status));
594
595end:
596 return (int) comp_status;
8ed535b5
PP
597}
598
599/*
600 * `node` is removed from the queue of sinks to consume when passed to
601 * this function. This function adds it back to the queue if there's
602 * still something to consume afterwards.
603 */
ad847455 604static inline
d94d92ac 605enum bt_graph_status consume_sink_node(struct bt_graph *graph, GList *node)
8ed535b5
PP
606{
607 enum bt_graph_status status;
d94d92ac 608 struct bt_component_sink *sink;
8ed535b5
PP
609
610 sink = node->data;
611 status = consume_graph_sink(sink);
91d81473 612 if (G_UNLIKELY(status != BT_GRAPH_STATUS_END)) {
8ed535b5 613 g_queue_push_tail_link(graph->sinks_to_consume, node);
f60c8b34
JG
614 goto end;
615 }
616
617 /* End reached, the node is not added back to the queue and free'd. */
8ed535b5 618 g_queue_delete_link(graph->sinks_to_consume, node);
f60c8b34
JG
619
620 /* Don't forward an END status if there are sinks left to consume. */
621 if (!g_queue_is_empty(graph->sinks_to_consume)) {
622 status = BT_GRAPH_STATUS_OK;
623 goto end;
624 }
8ed535b5
PP
625
626end:
d94d92ac
PP
627 BT_LIB_LOGV("Consumed sink node: %![comp-]+c, status=%s",
628 sink, bt_graph_status_string(status));
8ed535b5
PP
629 return status;
630}
631
632BT_HIDDEN
633enum bt_graph_status bt_graph_consume_sink_no_check(struct bt_graph *graph,
d94d92ac 634 struct bt_component_sink *sink)
8ed535b5
PP
635{
636 enum bt_graph_status status;
637 GList *sink_node;
638 int index;
639
d94d92ac
PP
640 BT_LIB_LOGV("Making specific sink consume: %![comp-]+c", sink);
641 BT_ASSERT(bt_component_borrow_graph((void *) sink) == graph);
8ed535b5
PP
642
643 if (g_queue_is_empty(graph->sinks_to_consume)) {
644 BT_LOGV_STR("Graph's sink queue is empty: end of graph.");
645 status = BT_GRAPH_STATUS_END;
646 goto end;
647 }
648
649 index = g_queue_index(graph->sinks_to_consume, sink);
650 if (index < 0) {
651 BT_LOGV_STR("Sink is not marked as consumable: sink is ended.");
652 status = BT_GRAPH_STATUS_END;
653 goto end;
654 }
655
656 sink_node = g_queue_pop_nth_link(graph->sinks_to_consume, index);
f6ccaed9 657 BT_ASSERT(sink_node);
8ed535b5
PP
658 status = consume_sink_node(graph, sink_node);
659
660end:
661 return status;
662}
663
ad847455 664static inline
d94d92ac 665enum bt_graph_status consume_no_check(struct bt_graph *graph)
8ed535b5
PP
666{
667 enum bt_graph_status status = BT_GRAPH_STATUS_OK;
668 struct bt_component *sink;
669 GList *current_node;
670
f6ccaed9
PP
671 BT_ASSERT_PRE(graph->has_sink,
672 "Graph has no sink component: %!+g", graph);
d94d92ac 673 BT_LIB_LOGV("Making next sink consume: %![graph-]+g", graph);
8ed535b5 674
91d81473 675 if (G_UNLIKELY(g_queue_is_empty(graph->sinks_to_consume))) {
8ed535b5
PP
676 BT_LOGV_STR("Graph's sink queue is empty: end of graph.");
677 status = BT_GRAPH_STATUS_END;
678 goto end;
679 }
680
681 current_node = g_queue_pop_head_link(graph->sinks_to_consume);
682 sink = current_node->data;
d94d92ac 683 BT_LIB_LOGV("Chose next sink to consume: %!+c", sink);
8ed535b5
PP
684 status = consume_sink_node(graph, current_node);
685
f60c8b34
JG
686end:
687 return status;
c0418dd9
JG
688}
689
4725a201 690enum bt_graph_status bt_graph_consume(struct bt_graph *graph)
851b70bd 691{
f6ccaed9 692 enum bt_graph_status status;
1d915789 693
f6ccaed9
PP
694 BT_ASSERT_PRE_NON_NULL(graph, "Graph");
695 BT_ASSERT_PRE(!graph->canceled, "Graph is canceled: %!+g", graph);
696 BT_ASSERT_PRE(graph->can_consume,
697 "Cannot consume graph in its current state: %!+g", graph);
38cda5da
PP
698 BT_ASSERT_PRE(graph->config_state != BT_GRAPH_CONFIGURATION_STATE_FAULTY,
699 "Graph is in a faulty state: %!+g", graph);
4725a201 700 bt_graph_set_can_consume(graph, false);
5badd463 701 status = bt_graph_configure(graph);
91d81473 702 if (G_UNLIKELY(status)) {
5badd463
PP
703 /* bt_graph_configure() logs errors */
704 goto end;
705 }
706
d94d92ac 707 status = consume_no_check(graph);
4725a201 708 bt_graph_set_can_consume(graph, true);
5badd463
PP
709
710end:
26a15756 711 return status;
851b70bd
PP
712}
713
0d72b8c3 714enum bt_graph_status bt_graph_run(struct bt_graph *graph)
f60c8b34 715{
5badd463 716 enum bt_graph_status status;
f60c8b34 717
d94d92ac
PP
718 BT_ASSERT_PRE_NON_NULL(graph, "Graph");
719 BT_ASSERT_PRE(!graph->canceled, "Graph is canceled: %!+g", graph);
ad847455
PP
720 BT_ASSERT_PRE(graph->can_consume,
721 "Cannot consume graph in its current state: %!+g", graph);
38cda5da
PP
722 BT_ASSERT_PRE(graph->config_state != BT_GRAPH_CONFIGURATION_STATE_FAULTY,
723 "Graph is in a faulty state: %!+g", graph);
4725a201 724 bt_graph_set_can_consume(graph, false);
5badd463 725 status = bt_graph_configure(graph);
91d81473 726 if (G_UNLIKELY(status)) {
5badd463
PP
727 /* bt_graph_configure() logs errors */
728 goto end;
729 }
730
d94d92ac 731 BT_LIB_LOGV("Running graph: %!+g", graph);
262e5473 732
f60c8b34 733 do {
851b70bd
PP
734 /*
735 * Check if the graph is canceled at each iteration. If
736 * the graph was canceled by another thread or by a
d94d92ac
PP
737 * signal handler, this is not a warning nor an error,
738 * it was intentional: log with a DEBUG level only.
851b70bd 739 */
91d81473 740 if (G_UNLIKELY(graph->canceled)) {
d94d92ac
PP
741 BT_LIB_LOGD("Stopping the graph: graph is canceled: "
742 "%!+g", graph);
851b70bd
PP
743 status = BT_GRAPH_STATUS_CANCELED;
744 goto end;
745 }
746
d94d92ac 747 status = consume_no_check(graph);
91d81473 748 if (G_UNLIKELY(status == BT_GRAPH_STATUS_AGAIN)) {
f60c8b34 749 /*
202a3a13
PP
750 * If AGAIN is received and there are multiple
751 * sinks, go ahead and consume from the next
752 * sink.
f60c8b34 753 *
202a3a13
PP
754 * However, in the case where a single sink is
755 * left, the caller can decide to busy-wait and
0d72b8c3 756 * call bt_graph_run() continuously
d94d92ac
PP
757 * until the source is ready or it can decide to
758 * sleep for an arbitrary amount of time.
f60c8b34
JG
759 */
760 if (graph->sinks_to_consume->length > 1) {
72b913fb 761 status = BT_GRAPH_STATUS_OK;
f60c8b34
JG
762 }
763 }
72b913fb 764 } while (status == BT_GRAPH_STATUS_OK);
f60c8b34
JG
765
766 if (g_queue_is_empty(graph->sinks_to_consume)) {
72b913fb 767 status = BT_GRAPH_STATUS_END;
f60c8b34 768 }
262e5473 769
202a3a13 770end:
d94d92ac
PP
771 BT_LIB_LOGV("Graph ran: %![graph-]+g, status=%s", graph,
772 bt_graph_status_string(status));
4725a201 773 bt_graph_set_can_consume(graph, true);
72b913fb 774 return status;
f60c8b34 775}
1bf957a0 776
d94d92ac 777enum bt_graph_status
0d72b8c3
PP
778bt_graph_add_source_component_output_port_added_listener(
779 struct bt_graph *graph,
780 bt_graph_source_component_output_port_added_listener_func func,
781 bt_graph_listener_removed_func listener_removed, void *data,
d94d92ac 782 int *out_listener_id)
1bf957a0 783{
d94d92ac
PP
784 struct bt_graph_listener_port_added listener = {
785 .base = {
786 .removed = listener_removed,
787 .data = data,
788 },
789 .func = (port_added_func_t) func,
1bf957a0 790 };
d94d92ac 791 int listener_id;
1bf957a0 792
d94d92ac
PP
793 BT_ASSERT_PRE_NON_NULL(graph, "Graph");
794 BT_ASSERT_PRE_NON_NULL(func, "Listener");
795 BT_ASSERT_PRE_NON_NULL(func, "\"Listener removed\" listener");
796 BT_ASSERT_PRE(!graph->in_remove_listener,
797 "Graph currently executing a \"listener removed\" listener: "
798 "%!+g", graph);
799 g_array_append_val(graph->listeners.source_output_port_added, listener);
800 listener_id = graph->listeners.source_output_port_added->len - 1;
801 BT_LIB_LOGV("Added \"source component output port added\" listener to graph: "
802 "%![graph-]+g, listener-addr=%p, id=%d", graph, listener,
803 listener_id);
804
805 if (listener_id) {
806 *out_listener_id = listener_id;
807 }
808
809 return BT_GRAPH_STATUS_OK;
1bf957a0
PP
810}
811
d94d92ac 812enum bt_graph_status
0d72b8c3
PP
813bt_graph_add_filter_component_output_port_added_listener(
814 struct bt_graph *graph,
815 bt_graph_filter_component_output_port_added_listener_func func,
816 bt_graph_listener_removed_func listener_removed, void *data,
d94d92ac 817 int *out_listener_id)
1bf957a0 818{
d94d92ac
PP
819 struct bt_graph_listener_port_added listener = {
820 .base = {
821 .removed = listener_removed,
822 .data = data,
823 },
824 .func = (port_added_func_t) func,
825 };
826 int listener_id;
1bf957a0 827
d94d92ac
PP
828 BT_ASSERT_PRE_NON_NULL(graph, "Graph");
829 BT_ASSERT_PRE_NON_NULL(func, "Listener");
830 BT_ASSERT_PRE_NON_NULL(func, "\"Listener removed\" listener");
831 BT_ASSERT_PRE(!graph->in_remove_listener,
832 "Graph currently executing a \"listener removed\" listener: "
833 "%!+g", graph);
834 g_array_append_val(graph->listeners.filter_output_port_added, listener);
835 listener_id = graph->listeners.filter_output_port_added->len - 1;
836 BT_LIB_LOGV("Added \"filter component output port added\" listener to graph: "
837 "%![graph-]+g, listener-addr=%p, id=%d", graph, listener,
838 listener_id);
839
840 if (listener_id) {
841 *out_listener_id = listener_id;
842 }
843
844 return BT_GRAPH_STATUS_OK;
845}
262e5473 846
d94d92ac 847enum bt_graph_status
0d72b8c3
PP
848bt_graph_add_filter_component_input_port_added_listener(
849 struct bt_graph *graph,
850 bt_graph_filter_component_input_port_added_listener_func func,
851 bt_graph_listener_removed_func listener_removed, void *data,
d94d92ac
PP
852 int *out_listener_id)
853{
d94d92ac
PP
854 struct bt_graph_listener_port_added listener = {
855 .base = {
856 .removed = listener_removed,
857 .data = data,
858 },
859 .func = (port_added_func_t) func,
860 };
861 int listener_id;
8cc092c9 862
d94d92ac
PP
863 BT_ASSERT_PRE_NON_NULL(graph, "Graph");
864 BT_ASSERT_PRE_NON_NULL(func, "Listener");
865 BT_ASSERT_PRE_NON_NULL(func, "\"Listener removed\" listener");
866 BT_ASSERT_PRE(!graph->in_remove_listener,
867 "Graph currently executing a \"listener removed\" listener: "
868 "%!+g", graph);
869 g_array_append_val(graph->listeners.filter_input_port_added, listener);
870 listener_id = graph->listeners.filter_input_port_added->len - 1;
871 BT_LIB_LOGV("Added \"filter component input port added\" listener to graph: "
872 "%![graph-]+g, listener-addr=%p, id=%d", graph, listener,
873 listener_id);
874
875 if (listener_id) {
876 *out_listener_id = listener_id;
877 }
878
879 return BT_GRAPH_STATUS_OK;
880}
1bf957a0 881
d94d92ac 882enum bt_graph_status
0d72b8c3
PP
883bt_graph_add_sink_component_input_port_added_listener(
884 struct bt_graph *graph,
885 bt_graph_sink_component_input_port_added_listener_func func,
886 bt_graph_listener_removed_func listener_removed, void *data,
d94d92ac
PP
887 int *out_listener_id)
888{
d94d92ac
PP
889 struct bt_graph_listener_port_added listener = {
890 .base = {
891 .removed = listener_removed,
892 .data = data,
893 },
894 .func = (port_added_func_t) func,
895 };
896 int listener_id;
1bf957a0 897
d94d92ac
PP
898 BT_ASSERT_PRE_NON_NULL(graph, "Graph");
899 BT_ASSERT_PRE_NON_NULL(func, "Listener");
900 BT_ASSERT_PRE_NON_NULL(func, "\"Listener removed\" listener");
901 BT_ASSERT_PRE(!graph->in_remove_listener,
902 "Graph currently executing a \"listener removed\" listener: "
903 "%!+g", graph);
904 g_array_append_val(graph->listeners.sink_input_port_added, listener);
905 listener_id = graph->listeners.sink_input_port_added->len - 1;
906 BT_LIB_LOGV("Added \"sink component input port added\" listener to graph: "
907 "%![graph-]+g, listener-addr=%p, id=%d", graph, listener,
908 listener_id);
909
910 if (listener_id) {
911 *out_listener_id = listener_id;
912 }
913
914 return BT_GRAPH_STATUS_OK;
1bf957a0
PP
915}
916
d94d92ac 917enum bt_graph_status
0d72b8c3
PP
918bt_graph_add_source_filter_component_ports_connected_listener(
919 struct bt_graph *graph,
920 bt_graph_source_filter_component_ports_connected_listener_func func,
921 bt_graph_listener_removed_func listener_removed, void *data,
d94d92ac
PP
922 int *out_listener_id)
923{
d94d92ac
PP
924 struct bt_graph_listener_ports_connected listener = {
925 .base = {
926 .removed = listener_removed,
927 .data = data,
928 },
929 .func = (ports_connected_func_t) func,
930 };
931 int listener_id;
8cc092c9 932
d94d92ac
PP
933 BT_ASSERT_PRE_NON_NULL(graph, "Graph");
934 BT_ASSERT_PRE_NON_NULL(func, "Listener");
935 BT_ASSERT_PRE_NON_NULL(func, "\"Listener removed\" listener");
936 BT_ASSERT_PRE(!graph->in_remove_listener,
937 "Graph currently executing a \"listener removed\" listener: "
938 "%!+g", graph);
939 g_array_append_val(graph->listeners.source_filter_ports_connected,
940 listener);
941 listener_id = graph->listeners.source_filter_ports_connected->len - 1;
942 BT_LIB_LOGV("Added \"source to filter component ports connected\" listener to graph: "
943 "%![graph-]+g, listener-addr=%p, id=%d", graph, listener,
944 listener_id);
945
946 if (listener_id) {
947 *out_listener_id = listener_id;
948 }
949
950 return BT_GRAPH_STATUS_OK;
951}
1bf957a0 952
d94d92ac 953enum bt_graph_status
0d72b8c3
PP
954bt_graph_add_source_sink_component_ports_connected_listener(
955 struct bt_graph *graph,
956 bt_graph_source_sink_component_ports_connected_listener_func func,
957 bt_graph_listener_removed_func listener_removed, void *data,
d94d92ac
PP
958 int *out_listener_id)
959{
d94d92ac
PP
960 struct bt_graph_listener_ports_connected listener = {
961 .base = {
962 .removed = listener_removed,
963 .data = data,
964 },
965 .func = (ports_connected_func_t) func,
966 };
967 int listener_id;
1bf957a0 968
d94d92ac
PP
969 BT_ASSERT_PRE_NON_NULL(graph, "Graph");
970 BT_ASSERT_PRE_NON_NULL(func, "Listener");
971 BT_ASSERT_PRE_NON_NULL(func, "\"Listener removed\" listener");
972 BT_ASSERT_PRE(!graph->in_remove_listener,
973 "Graph currently executing a \"listener removed\" listener: "
974 "%!+g", graph);
975 g_array_append_val(graph->listeners.source_sink_ports_connected,
976 listener);
977 listener_id = graph->listeners.source_sink_ports_connected->len - 1;
978 BT_LIB_LOGV("Added \"source to sink component ports connected\" listener to graph: "
979 "%![graph-]+g, listener-addr=%p, id=%d", graph, listener,
980 listener_id);
981
982 if (listener_id) {
983 *out_listener_id = listener_id;
984 }
985
986 return BT_GRAPH_STATUS_OK;
1bf957a0
PP
987}
988
9c0a126a
PP
989enum bt_graph_status
990bt_graph_add_filter_filter_component_ports_connected_listener(
991 struct bt_graph *graph,
992 bt_graph_filter_filter_component_ports_connected_listener_func func,
993 bt_graph_listener_removed_func listener_removed, void *data,
994 int *out_listener_id)
995{
996 struct bt_graph_listener_ports_connected listener = {
997 .base = {
998 .removed = listener_removed,
999 .data = data,
1000 },
1001 .func = (ports_connected_func_t) func,
1002 };
1003 int listener_id;
1004
1005 BT_ASSERT_PRE_NON_NULL(graph, "Graph");
1006 BT_ASSERT_PRE_NON_NULL(func, "Listener");
1007 BT_ASSERT_PRE_NON_NULL(func, "\"Listener removed\" listener");
1008 BT_ASSERT_PRE(!graph->in_remove_listener,
1009 "Graph currently executing a \"listener removed\" listener: "
1010 "%!+g", graph);
1011 g_array_append_val(graph->listeners.filter_filter_ports_connected,
1012 listener);
1013 listener_id = graph->listeners.filter_filter_ports_connected->len - 1;
1014 BT_LIB_LOGV("Added \"filter to filter component ports connected\" listener to graph: "
1015 "%![graph-]+g, listener-addr=%p, id=%d", graph, listener,
1016 listener_id);
1017
1018 if (listener_id) {
1019 *out_listener_id = listener_id;
1020 }
1021
1022 return BT_GRAPH_STATUS_OK;
1023}
1024
d94d92ac 1025enum bt_graph_status
0d72b8c3
PP
1026bt_graph_add_filter_sink_component_ports_connected_listener(
1027 struct bt_graph *graph,
1028 bt_graph_filter_sink_component_ports_connected_listener_func func,
1029 bt_graph_listener_removed_func listener_removed, void *data,
d94d92ac 1030 int *out_listener_id)
1bf957a0 1031{
d94d92ac
PP
1032 struct bt_graph_listener_ports_connected listener = {
1033 .base = {
1034 .removed = listener_removed,
1035 .data = data,
1036 },
1037 .func = (ports_connected_func_t) func,
1038 };
1039 int listener_id;
1bf957a0 1040
d94d92ac
PP
1041 BT_ASSERT_PRE_NON_NULL(graph, "Graph");
1042 BT_ASSERT_PRE_NON_NULL(func, "Listener");
1043 BT_ASSERT_PRE_NON_NULL(func, "\"Listener removed\" listener");
1044 BT_ASSERT_PRE(!graph->in_remove_listener,
1045 "Graph currently executing a \"listener removed\" listener: "
1046 "%!+g", graph);
1047 g_array_append_val(graph->listeners.filter_sink_ports_connected,
1048 listener);
1049 listener_id = graph->listeners.filter_sink_ports_connected->len - 1;
1050 BT_LIB_LOGV("Added \"filter to sink component ports connected\" listener to graph: "
1051 "%![graph-]+g, listener-addr=%p, id=%d", graph, listener,
1052 listener_id);
1053
1054 if (listener_id) {
1055 *out_listener_id = listener_id;
1056 }
1057
1058 return BT_GRAPH_STATUS_OK;
1059}
262e5473 1060
1bf957a0 1061BT_HIDDEN
8cc56726
SM
1062enum bt_graph_listener_status bt_graph_notify_port_added(
1063 struct bt_graph *graph, struct bt_port *port)
1bf957a0 1064{
d94d92ac
PP
1065 uint64_t i;
1066 GArray *listeners;
1067 struct bt_component *comp;
8cc56726 1068 enum bt_graph_listener_status status = BT_GRAPH_LISTENER_STATUS_OK;
1bf957a0 1069
d94d92ac
PP
1070 BT_ASSERT(graph);
1071 BT_ASSERT(port);
1072 BT_LIB_LOGV("Notifying graph listeners that a port was added: "
1073 "%![graph-]+g, %![port-]+p", graph, port);
0d72b8c3 1074 comp = bt_port_borrow_component_inline(port);
d94d92ac
PP
1075 BT_ASSERT(comp);
1076
1077 switch (comp->class->type) {
1078 case BT_COMPONENT_CLASS_TYPE_SOURCE:
1079 {
1080 switch (port->type) {
1081 case BT_PORT_TYPE_OUTPUT:
1082 listeners = graph->listeners.source_output_port_added;
1083 break;
1084 default:
1085 abort();
1086 }
1087
1088 break;
1089 }
1090 case BT_COMPONENT_CLASS_TYPE_FILTER:
1091 {
1092 switch (port->type) {
1093 case BT_PORT_TYPE_INPUT:
1094 listeners = graph->listeners.filter_input_port_added;
1095 break;
1096 case BT_PORT_TYPE_OUTPUT:
1097 listeners = graph->listeners.filter_output_port_added;
1098 break;
1099 default:
1100 abort();
1101 }
262e5473 1102
d94d92ac
PP
1103 break;
1104 }
1105 case BT_COMPONENT_CLASS_TYPE_SINK:
1106 {
1107 switch (port->type) {
1108 case BT_PORT_TYPE_INPUT:
1109 listeners = graph->listeners.sink_input_port_added;
1110 break;
1111 default:
1112 abort();
1113 }
1bf957a0 1114
d94d92ac
PP
1115 break;
1116 }
1117 default:
1118 abort();
1119 }
1120
1121 for (i = 0; i < listeners->len; i++) {
1122 struct bt_graph_listener_port_added *listener =
1123 &g_array_index(listeners,
1124 struct bt_graph_listener_port_added, i);
1125
8cc56726 1126
d94d92ac 1127 BT_ASSERT(listener->func);
8cc56726
SM
1128 status = listener->func(comp, port, listener->base.data);
1129 if (status != BT_GRAPH_LISTENER_STATUS_OK) {
1130 goto end;
1131 }
1bf957a0 1132 }
8cc56726
SM
1133
1134end:
1135 return status;
1bf957a0
PP
1136}
1137
1bf957a0 1138BT_HIDDEN
8cc56726
SM
1139enum bt_graph_listener_status bt_graph_notify_ports_connected(
1140 struct bt_graph *graph, struct bt_port *upstream_port,
1141 struct bt_port *downstream_port)
1bf957a0 1142{
d94d92ac
PP
1143 uint64_t i;
1144 GArray *listeners;
1145 struct bt_component *upstream_comp;
1146 struct bt_component *downstream_comp;
8cc56726 1147 enum bt_graph_listener_status status = BT_GRAPH_LISTENER_STATUS_OK;
1bf957a0 1148
d94d92ac
PP
1149 BT_ASSERT(graph);
1150 BT_ASSERT(upstream_port);
1151 BT_ASSERT(downstream_port);
1152 BT_LIB_LOGV("Notifying graph listeners that ports were connected: "
1153 "%![graph-]+g, %![up-port-]+p, %![down-port-]+p",
1154 graph, upstream_port, downstream_port);
0d72b8c3 1155 upstream_comp = bt_port_borrow_component_inline(upstream_port);
d94d92ac 1156 BT_ASSERT(upstream_comp);
0d72b8c3 1157 downstream_comp = bt_port_borrow_component_inline(downstream_port);
d94d92ac
PP
1158 BT_ASSERT(downstream_comp);
1159
1160 switch (upstream_comp->class->type) {
1161 case BT_COMPONENT_CLASS_TYPE_SOURCE:
1162 {
1163 switch (downstream_comp->class->type) {
1164 case BT_COMPONENT_CLASS_TYPE_FILTER:
1165 listeners =
1166 graph->listeners.source_filter_ports_connected;
1167 break;
1168 case BT_COMPONENT_CLASS_TYPE_SINK:
1169 listeners =
1170 graph->listeners.source_sink_ports_connected;
1171 break;
1172 default:
1173 abort();
1174 }
262e5473 1175
d94d92ac
PP
1176 break;
1177 }
1178 case BT_COMPONENT_CLASS_TYPE_FILTER:
1179 {
1180 switch (downstream_comp->class->type) {
9c0a126a
PP
1181 case BT_COMPONENT_CLASS_TYPE_FILTER:
1182 listeners =
1183 graph->listeners.filter_filter_ports_connected;
1184 break;
d94d92ac
PP
1185 case BT_COMPONENT_CLASS_TYPE_SINK:
1186 listeners =
1187 graph->listeners.filter_sink_ports_connected;
1188 break;
1189 default:
1190 abort();
1191 }
1bf957a0 1192
d94d92ac
PP
1193 break;
1194 }
1195 default:
1196 abort();
1197 }
1198
1199 for (i = 0; i < listeners->len; i++) {
1200 struct bt_graph_listener_ports_connected *listener =
1201 &g_array_index(listeners,
1202 struct bt_graph_listener_ports_connected, i);
1203
1204 BT_ASSERT(listener->func);
8cc56726 1205 status = listener->func(upstream_comp, downstream_comp,
d94d92ac 1206 upstream_port, downstream_port, listener->base.data);
8cc56726
SM
1207 if (status != BT_GRAPH_LISTENER_STATUS_OK) {
1208 goto end;
1209 }
1bf957a0 1210 }
8cc56726
SM
1211
1212end:
1213 return status;
1bf957a0
PP
1214}
1215
4725a201 1216enum bt_graph_status bt_graph_cancel(struct bt_graph *graph)
202a3a13 1217{
202a3a13 1218
d94d92ac
PP
1219 BT_ASSERT_PRE_NON_NULL(graph, "Graph");
1220 graph->canceled = true;
1221 BT_LIB_LOGV("Canceled graph: %!+i", graph);
1222 return BT_GRAPH_STATUS_OK;
202a3a13
PP
1223}
1224
0d72b8c3 1225bt_bool bt_graph_is_canceled(const struct bt_graph *graph)
202a3a13 1226{
d94d92ac
PP
1227 BT_ASSERT_PRE_NON_NULL(graph, "Graph");
1228 return graph->canceled ? BT_TRUE : BT_FALSE;
202a3a13 1229}
f167d3c0
PP
1230
1231BT_HIDDEN
1232void bt_graph_remove_connection(struct bt_graph *graph,
1233 struct bt_connection *connection)
1234{
f6ccaed9
PP
1235 BT_ASSERT(graph);
1236 BT_ASSERT(connection);
d94d92ac 1237 BT_LIB_LOGV("Removing graph's connection: %![graph-]+g, %![conn-]+x",
262e5473 1238 graph, connection);
f167d3c0
PP
1239 g_ptr_array_remove(graph->connections, connection);
1240}
36712f1d 1241
d94d92ac
PP
1242BT_ASSERT_PRE_FUNC
1243static inline
1244bool component_name_exists(struct bt_graph *graph, const char *name)
1245{
1246 bool exists = false;
1247 uint64_t i;
1248
1249 for (i = 0; i < graph->components->len; i++) {
1250 struct bt_component *other_comp = graph->components->pdata[i];
1251
1252 if (strcmp(name, bt_component_get_name(other_comp)) == 0) {
1253 BT_ASSERT_PRE_MSG("Another component with the same name already exists in the graph: "
1254 "%![other-comp-]+c, name=\"%s\"",
1255 other_comp, name);
1256 exists = true;
1257 goto end;
1258 }
1259 }
1260
1261end:
1262 return exists;
1263}
1264
1265static
1266enum bt_graph_status add_component_with_init_method_data(
0d72b8c3 1267 struct bt_graph *graph,
d94d92ac
PP
1268 struct bt_component_class *comp_cls,
1269 comp_init_method_t init_method,
05e21286 1270 const char *name, const struct bt_value *params,
d94d92ac 1271 void *init_method_data, struct bt_component **user_component)
36712f1d
PP
1272{
1273 enum bt_graph_status graph_status = BT_GRAPH_STATUS_OK;
d94d92ac 1274 enum bt_self_component_status comp_status;
36712f1d 1275 struct bt_component *component = NULL;
d94d92ac
PP
1276 int ret;
1277 bool init_can_consume;
05e21286 1278 struct bt_value *new_params = NULL;
36712f1d 1279
d94d92ac
PP
1280 BT_ASSERT(comp_cls);
1281 BT_ASSERT_PRE_NON_NULL(graph, "Graph");
1282 BT_ASSERT_PRE_NON_NULL(name, "Name");
1283 BT_ASSERT_PRE(!graph->canceled, "Graph is canceled: %!+g", graph);
5badd463
PP
1284 BT_ASSERT_PRE(
1285 graph->config_state == BT_GRAPH_CONFIGURATION_STATE_CONFIGURING,
38cda5da 1286 "Graph is not in the \"configuring\" state: %!+g", graph);
d94d92ac
PP
1287 BT_ASSERT_PRE(!component_name_exists(graph, name),
1288 "Duplicate component name: %!+g, name=\"%s\"", graph, name);
1289 BT_ASSERT_PRE(!params || bt_value_is_map(params),
1290 "Parameter value is not a map value: %!+v", params);
4aa7981f 1291 init_can_consume = graph->can_consume;
d94d92ac
PP
1292 bt_graph_set_can_consume(graph, false);
1293 BT_LIB_LOGD("Adding component to graph: "
1294 "%![graph-]+g, %![cc-]+C, name=\"%s\", %![params-]+v, "
36712f1d 1295 "init-method-data-addr=%p",
d94d92ac 1296 graph, comp_cls, name, params, init_method_data);
36712f1d 1297
d94d92ac 1298 if (!params) {
05e21286
PP
1299 new_params = bt_value_map_create();
1300 if (!new_params) {
36712f1d
PP
1301 BT_LOGE_STR("Cannot create map value object.");
1302 graph_status = BT_GRAPH_STATUS_NOMEM;
1303 goto end;
1304 }
05e21286
PP
1305
1306 params = new_params;
36712f1d
PP
1307 }
1308
d94d92ac
PP
1309 ret = bt_component_create(comp_cls, name, &component);
1310 if (ret) {
1311 BT_LOGE("Cannot create empty component object: ret=%d",
1312 ret);
1313 graph_status = BT_GRAPH_STATUS_NOMEM;
36712f1d
PP
1314 goto end;
1315 }
1316
1317 /*
1318 * The user's initialization method needs to see that this
1319 * component is part of the graph. If the user method fails, we
1320 * immediately remove the component from the graph's components.
1321 */
1322 g_ptr_array_add(graph->components, component);
1323 bt_component_set_graph(component, graph);
0d47d31b 1324 bt_value_freeze(params);
36712f1d 1325
d94d92ac 1326 if (init_method) {
36712f1d 1327 BT_LOGD_STR("Calling user's initialization method.");
d94d92ac 1328 comp_status = init_method(component, params, init_method_data);
36712f1d 1329 BT_LOGD("User method returned: status=%s",
d94d92ac
PP
1330 bt_self_component_status_string(comp_status));
1331 if (comp_status != BT_SELF_COMPONENT_STATUS_OK) {
36712f1d 1332 BT_LOGW_STR("Initialization method failed.");
d94d92ac 1333 graph_status = (int) comp_status;
36712f1d
PP
1334 bt_component_set_graph(component, NULL);
1335 g_ptr_array_remove_fast(graph->components, component);
1336 goto end;
1337 }
1338 }
1339
1340 /*
1341 * Mark the component as initialized so that its finalization
1342 * method is called when it is destroyed.
1343 */
1344 component->initialized = true;
1345
1346 /*
1347 * If it's a sink component, it needs to be part of the graph's
1348 * sink queue to be consumed by bt_graph_consume().
1349 */
1350 if (bt_component_is_sink(component)) {
d94d92ac 1351 graph->has_sink = true;
36712f1d
PP
1352 g_queue_push_tail(graph->sinks_to_consume, component);
1353 }
1354
1355 /*
1356 * Freeze the component class now that it's instantiated at
1357 * least once.
1358 */
1359 BT_LOGD_STR("Freezing component class.");
d94d92ac
PP
1360 bt_component_class_freeze(comp_cls);
1361 BT_LIB_LOGD("Added component to graph: "
1362 "%![graph-]+g, %![cc-]+C, name=\"%s\", %![params-]+v, "
1363 "init-method-data-addr=%p, %![comp-]+c",
1364 graph, comp_cls, name, params, init_method_data, component);
36712f1d
PP
1365
1366 if (user_component) {
1367 /* Move reference to user */
1368 *user_component = component;
1369 component = NULL;
1370 }
1371
1372end:
38cda5da 1373 if (graph_status != BT_GRAPH_STATUS_OK) {
8cc56726 1374 bt_graph_make_faulty(graph);
38cda5da
PP
1375 }
1376
65300d60 1377 bt_object_put_ref(component);
05e21286 1378 bt_object_put_ref(new_params);
d94d92ac
PP
1379 (void) init_can_consume;
1380 bt_graph_set_can_consume(graph, init_can_consume);
36712f1d
PP
1381 return graph_status;
1382}
1383
d94d92ac 1384enum bt_graph_status
0d72b8c3
PP
1385bt_graph_add_source_component_with_init_method_data(
1386 struct bt_graph *graph,
1387 const struct bt_component_class_source *comp_cls,
05e21286 1388 const char *name, const struct bt_value *params,
0d72b8c3
PP
1389 void *init_method_data,
1390 const struct bt_component_source **component)
d94d92ac
PP
1391{
1392 BT_ASSERT_PRE_NON_NULL(comp_cls, "Component class");
1393 return add_component_with_init_method_data(graph,
1394 (void *) comp_cls, (comp_init_method_t) comp_cls->methods.init,
1395 name, params, init_method_data, (void *) component);
1396}
1397
0d72b8c3
PP
1398enum bt_graph_status bt_graph_add_source_component(
1399 struct bt_graph *graph,
1400 const struct bt_component_class_source *comp_cls,
05e21286 1401 const char *name, const struct bt_value *params,
0d72b8c3 1402 const struct bt_component_source **component)
d94d92ac 1403{
0d72b8c3 1404 return bt_graph_add_source_component_with_init_method_data(
d94d92ac
PP
1405 graph, comp_cls, name, params, NULL, component);
1406}
1407
1408enum bt_graph_status
0d72b8c3
PP
1409bt_graph_add_filter_component_with_init_method_data(
1410 struct bt_graph *graph,
1411 const struct bt_component_class_filter *comp_cls,
05e21286 1412 const char *name, const struct bt_value *params,
0d72b8c3
PP
1413 void *init_method_data,
1414 const struct bt_component_filter **component)
d94d92ac
PP
1415{
1416 BT_ASSERT_PRE_NON_NULL(comp_cls, "Component class");
1417 return add_component_with_init_method_data(graph,
1418 (void *) comp_cls, (comp_init_method_t) comp_cls->methods.init,
1419 name, params, init_method_data, (void *) component);
1420}
1421
0d72b8c3
PP
1422enum bt_graph_status bt_graph_add_filter_component(
1423 struct bt_graph *graph,
1424 const struct bt_component_class_filter *comp_cls,
05e21286 1425 const char *name, const struct bt_value *params,
0d72b8c3 1426 const struct bt_component_filter **component)
d94d92ac 1427{
0d72b8c3 1428 return bt_graph_add_filter_component_with_init_method_data(
d94d92ac
PP
1429 graph, comp_cls, name, params, NULL, component);
1430}
1431
1432enum bt_graph_status
0d72b8c3
PP
1433bt_graph_add_sink_component_with_init_method_data(
1434 struct bt_graph *graph,
1435 const struct bt_component_class_sink *comp_cls,
05e21286 1436 const char *name, const struct bt_value *params,
0d72b8c3
PP
1437 void *init_method_data,
1438 const struct bt_component_sink **component)
36712f1d 1439{
d94d92ac
PP
1440 BT_ASSERT_PRE_NON_NULL(comp_cls, "Component class");
1441 return add_component_with_init_method_data(graph,
1442 (void *) comp_cls, (comp_init_method_t) comp_cls->methods.init,
1443 name, params, init_method_data, (void *) component);
1444}
1445
0d72b8c3
PP
1446enum bt_graph_status bt_graph_add_sink_component(
1447 struct bt_graph *graph,
1448 const struct bt_component_class_sink *comp_cls,
05e21286 1449 const char *name, const struct bt_value *params,
0d72b8c3 1450 const struct bt_component_sink **component)
d94d92ac 1451{
0d72b8c3 1452 return bt_graph_add_sink_component_with_init_method_data(
d94d92ac 1453 graph, comp_cls, name, params, NULL, component);
36712f1d 1454}
8ed535b5
PP
1455
1456BT_HIDDEN
1457int bt_graph_remove_unconnected_component(struct bt_graph *graph,
1458 struct bt_component *component)
1459{
d94d92ac
PP
1460 bool init_can_consume;
1461 uint64_t count;
8ed535b5
PP
1462 uint64_t i;
1463 int ret = 0;
1464
f6ccaed9
PP
1465 BT_ASSERT(graph);
1466 BT_ASSERT(component);
3fea54f6 1467 BT_ASSERT(component->base.ref_count == 0);
f6ccaed9 1468 BT_ASSERT(bt_component_borrow_graph(component) == graph);
8ed535b5 1469
4aa7981f 1470 init_can_consume = graph->can_consume;
8ed535b5
PP
1471 count = bt_component_get_input_port_count(component);
1472
1473 for (i = 0; i < count; i++) {
d94d92ac
PP
1474 struct bt_port *port = (void *)
1475 bt_component_borrow_input_port_by_index(component, i);
8ed535b5 1476
f6ccaed9 1477 BT_ASSERT(port);
8ed535b5
PP
1478
1479 if (bt_port_is_connected(port)) {
d94d92ac 1480 BT_LIB_LOGW("Cannot remove component from graph: "
8ed535b5 1481 "an input port is connected: "
d94d92ac
PP
1482 "%![graph-]+g, %![comp-]+c, %![port-]+p",
1483 graph, component, port);
8ed535b5
PP
1484 goto error;
1485 }
1486 }
1487
1488 count = bt_component_get_output_port_count(component);
1489
1490 for (i = 0; i < count; i++) {
d94d92ac
PP
1491 struct bt_port *port = (void *)
1492 bt_component_borrow_output_port_by_index(component, i);
8ed535b5 1493
f6ccaed9 1494 BT_ASSERT(port);
8ed535b5
PP
1495
1496 if (bt_port_is_connected(port)) {
d94d92ac 1497 BT_LIB_LOGW("Cannot remove component from graph: "
8ed535b5 1498 "an output port is connected: "
d94d92ac
PP
1499 "%![graph-]+g, %![comp-]+c, %![port-]+p",
1500 graph, component, port);
8ed535b5
PP
1501 goto error;
1502 }
1503 }
1504
d94d92ac 1505 bt_graph_set_can_consume(graph, false);
8ed535b5
PP
1506
1507 /* Possibly remove from sinks to consume */
1508 (void) g_queue_remove(graph->sinks_to_consume, component);
1509
1510 if (graph->sinks_to_consume->length == 0) {
d94d92ac 1511 graph->has_sink = false;
8ed535b5
PP
1512 }
1513
1514 /*
3fea54f6
PP
1515 * This calls bt_object_try_spec_release() on the component, and
1516 * since its reference count is 0, its destructor is called. Its
8ed535b5
PP
1517 * destructor calls the user's finalization method (if set).
1518 */
1519 g_ptr_array_remove(graph->components, component);
1520 goto end;
1521
1522error:
1523 ret = -1;
1524
1525end:
d94d92ac
PP
1526 (void) init_can_consume;
1527 bt_graph_set_can_consume(graph, init_can_consume);
8ed535b5
PP
1528 return ret;
1529}
5c563278
PP
1530
1531BT_HIDDEN
d6e69534
PP
1532void bt_graph_add_message(struct bt_graph *graph,
1533 struct bt_message *msg)
5c563278
PP
1534{
1535 BT_ASSERT(graph);
d6e69534 1536 BT_ASSERT(msg);
5c563278
PP
1537
1538 /*
1539 * It's okay not to take a reference because, when a
d6e69534 1540 * message's reference count drops to 0, either:
5c563278
PP
1541 *
1542 * * It is recycled back to one of this graph's pool.
1543 * * It is destroyed because it doesn't have any link to any
1544 * graph, which means the original graph is already destroyed.
1545 */
d6e69534 1546 g_ptr_array_add(graph->messages, msg);
5c563278 1547}
c5b9b441
PP
1548
1549void bt_graph_get_ref(const struct bt_graph *graph)
1550{
1551 bt_object_get_ref(graph);
1552}
1553
1554void bt_graph_put_ref(const struct bt_graph *graph)
1555{
1556 bt_object_put_ref(graph);
1557}
This page took 0.120221 seconds and 4 git commands to generate.