Fix: release the reference held by a child to its former parent
[babeltrace.git] / lib / component / graph.c
CommitLineData
c0418dd9 1/*
7d55361f 2 * graph.c
c0418dd9
JG
3 *
4 * Babeltrace Plugin Component Graph
5 *
f60c8b34 6 * Copyright 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
c0418dd9
JG
7 *
8 * Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
9 *
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
16 *
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
19 *
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
26 * SOFTWARE.
27 */
28
f60c8b34 29#include <babeltrace/component/component-internal.h>
7d55361f
JG
30#include <babeltrace/component/graph-internal.h>
31#include <babeltrace/component/connection-internal.h>
f60c8b34
JG
32#include <babeltrace/component/component-sink-internal.h>
33#include <babeltrace/component/component-source.h>
34#include <babeltrace/component/component-filter.h>
7d55361f 35#include <babeltrace/component/port.h>
c0418dd9 36#include <babeltrace/compiler.h>
f60c8b34 37#include <unistd.h>
c0418dd9 38
f60c8b34
JG
39static
40void bt_graph_destroy(struct bt_object *obj)
c0418dd9 41{
f60c8b34
JG
42 struct bt_graph *graph = container_of(obj,
43 struct bt_graph, base);
c0418dd9 44
f60c8b34
JG
45 if (graph->components) {
46 g_ptr_array_free(graph->components, TRUE);
47 }
c0418dd9
JG
48 if (graph->connections) {
49 g_ptr_array_free(graph->connections, TRUE);
50 }
f60c8b34
JG
51 if (graph->sinks_to_consume) {
52 g_queue_free(graph->sinks_to_consume);
c0418dd9
JG
53 }
54 g_free(graph);
55}
56
f60c8b34 57struct bt_graph *bt_graph_create(void)
c0418dd9 58{
f60c8b34 59 struct bt_graph *graph;
c0418dd9 60
f60c8b34 61 graph = g_new0(struct bt_graph, 1);
c0418dd9
JG
62 if (!graph) {
63 goto end;
64 }
65
f60c8b34 66 bt_object_init(graph, bt_graph_destroy);
c0418dd9 67
f60c8b34 68 graph->connections = g_ptr_array_new_with_free_func(bt_object_release);
c0418dd9
JG
69 if (!graph->connections) {
70 goto error;
71 }
f60c8b34
JG
72 graph->components = g_ptr_array_new_with_free_func(bt_object_release);
73 if (!graph->components) {
74 goto error;
75 }
76 graph->sinks_to_consume = g_queue_new();
77 if (!graph->sinks_to_consume) {
c0418dd9
JG
78 goto error;
79 }
80end:
81 return graph;
82error:
83 BT_PUT(graph);
84 goto end;
85}
86
f60c8b34
JG
87struct bt_connection *bt_graph_connect(struct bt_graph *graph,
88 struct bt_port *upstream_port,
89 struct bt_port *downstream_port)
90{
91 struct bt_connection *connection = NULL;
92 struct bt_graph *upstream_graph = NULL;
93 struct bt_graph *downstream_graph = NULL;
94 struct bt_component *upstream_component = NULL;
95 struct bt_component *downstream_component = NULL;
96 enum bt_component_status component_status;
97 bool components_added = false;
98
99 if (!graph || !upstream_port || !downstream_port) {
100 goto end;
101 }
102
103 if (bt_port_get_type(upstream_port) != BT_PORT_TYPE_OUTPUT) {
104 goto end;
105 }
106 if (bt_port_get_type(downstream_port) != BT_PORT_TYPE_INPUT) {
107 goto end;
108 }
109
110 /* Ensure the components are not already part of another graph. */
111 upstream_component = bt_port_get_component(upstream_port);
112 assert(upstream_component);
113 upstream_graph = bt_component_get_graph(upstream_component);
114 if (upstream_graph && (graph != upstream_graph)) {
115 fprintf(stderr, "Upstream component is already part of another graph\n");
116 goto error;
117 }
118
119 downstream_component = bt_port_get_component(downstream_port);
120 assert(downstream_component);
121 downstream_graph = bt_component_get_graph(downstream_component);
122 if (downstream_graph && (graph != downstream_graph)) {
123 fprintf(stderr, "Downstream component is already part of another graph\n");
124 goto error;
125 }
126
127 connection = bt_connection_create(graph, upstream_port,
128 downstream_port);
129 if (!connection) {
130 goto error;
131 }
132
133 /*
134 * Ownership of up/downstream_component and of the connection object is
135 * transferred to the graph.
136 */
137 g_ptr_array_add(graph->connections, connection);
138 g_ptr_array_add(graph->components, upstream_component);
139 g_ptr_array_add(graph->components, downstream_component);
140 if (bt_component_get_class_type(downstream_component) ==
141 BT_COMPONENT_CLASS_TYPE_SINK) {
142 g_queue_push_tail(graph->sinks_to_consume,
143 downstream_component);
144 }
145
146 /*
147 * The graph is now the parent of these components which garantees their
148 * existence for the duration of the graph's lifetime.
149 */
150 bt_component_set_graph(upstream_component, graph);
151 bt_put(upstream_component);
152 bt_component_set_graph(downstream_component, graph);
153 bt_put(downstream_component);
154
155 /* Rollback the connection from this point on. */
156 components_added = true;
157
158 /*
159 * The components and connection are added to the graph before invoking
160 * the new_connection method in order to make them visible to the
161 * components during the method's invocation.
162 */
163 component_status = bt_component_new_connection(upstream_component,
164 upstream_port, connection);
165 if (component_status != BT_COMPONENT_STATUS_OK) {
166 goto error;
167 }
168 component_status = bt_component_new_connection(downstream_component,
169 downstream_port, connection);
170 if (component_status != BT_COMPONENT_STATUS_OK) {
171 goto error;
172 }
173end:
174 bt_put(upstream_graph);
175 bt_put(downstream_graph);
176 return connection;
177error:
178 if (components_added) {
179 if (bt_component_get_class_type(downstream_component) ==
180 BT_COMPONENT_CLASS_TYPE_SINK) {
181 g_queue_pop_tail(graph->sinks_to_consume);
182 }
183 g_ptr_array_set_size(graph->connections,
184 graph->connections->len - 1);
185 g_ptr_array_set_size(graph->components,
186 graph->components->len - 2);
187 }
188 goto end;
189}
190
191static
346df6cf
JG
192enum bt_component_status get_component_port_counts(
193 struct bt_component *component, uint64_t *input_count,
194 uint64_t *output_count)
f60c8b34 195{
346df6cf 196 enum bt_component_status ret;
f60c8b34
JG
197
198 switch (bt_component_get_class_type(component)) {
199 case BT_COMPONENT_CLASS_TYPE_SOURCE:
346df6cf
JG
200 ret = bt_component_source_get_output_port_count(component,
201 output_count);
202 if (ret != BT_COMPONENT_STATUS_OK) {
f60c8b34
JG
203 goto end;
204 }
f60c8b34
JG
205 break;
206 case BT_COMPONENT_CLASS_TYPE_FILTER:
346df6cf
JG
207 ret = bt_component_filter_get_output_port_count(component,
208 output_count);
209 if (ret != BT_COMPONENT_STATUS_OK) {
f60c8b34
JG
210 goto end;
211 }
346df6cf
JG
212 ret = bt_component_filter_get_input_port_count(component,
213 input_count);
214 if (ret != BT_COMPONENT_STATUS_OK) {
f60c8b34
JG
215 goto end;
216 }
f60c8b34
JG
217 break;
218 case BT_COMPONENT_CLASS_TYPE_SINK:
346df6cf
JG
219 ret = bt_component_sink_get_input_port_count(component,
220 input_count);
221 if (ret != BT_COMPONENT_STATUS_OK) {
f60c8b34
JG
222 goto end;
223 }
f60c8b34
JG
224 break;
225 default:
226 assert(false);
227 break;
228 }
346df6cf 229 ret = BT_COMPONENT_STATUS_OK;
f60c8b34
JG
230end:
231 return ret;
232}
233
234static
235struct bt_port *get_input_port(struct bt_component *component, int index)
c0418dd9 236{
f60c8b34
JG
237 struct bt_port *port = NULL;
238
239 switch (bt_component_get_class_type(component)) {
240 case BT_COMPONENT_CLASS_TYPE_FILTER:
241 port = bt_component_filter_get_input_port_at_index(component,
242 index);
243 break;
244 case BT_COMPONENT_CLASS_TYPE_SINK:
245 port = bt_component_sink_get_input_port_at_index(component,
246 index);
247 break;
248 default:
249 assert(false);
250 }
251 return port;
c0418dd9
JG
252}
253
f60c8b34
JG
254static
255struct bt_port *get_output_port(struct bt_component *component, int index)
c0418dd9 256{
f60c8b34
JG
257 struct bt_port *port = NULL;
258
259 switch (bt_component_get_class_type(component)) {
260 case BT_COMPONENT_CLASS_TYPE_SOURCE:
261 port = bt_component_source_get_output_port_at_index(component,
262 index);
263 break;
264 case BT_COMPONENT_CLASS_TYPE_FILTER:
265 port = bt_component_filter_get_output_port_at_index(component,
266 index);
267 break;
268 default:
269 assert(false);
270 }
271 return port;
c0418dd9
JG
272}
273
f60c8b34
JG
274enum bt_graph_status bt_graph_add_component_as_sibling(struct bt_graph *graph,
275 struct bt_component *origin,
c0418dd9
JG
276 struct bt_component *new_component)
277{
346df6cf
JG
278 uint64_t origin_input_port_count = 0;
279 uint64_t origin_output_port_count = 0;
280 uint64_t new_input_port_count = 0;
281 uint64_t new_output_port_count = 0;
f60c8b34
JG
282 enum bt_graph_status status = BT_GRAPH_STATUS_OK;
283 struct bt_graph *origin_graph = NULL;
284 struct bt_graph *new_graph = NULL;
285 struct bt_port *origin_port = NULL;
286 struct bt_port *new_port = NULL;
287 struct bt_port *upstream_port = NULL;
288 struct bt_port *downstream_port = NULL;
289 struct bt_connection *origin_connection = NULL;
290 struct bt_connection *new_connection = NULL;
291 int port_index;
292
293 if (!graph || !origin || !new_component) {
294 status = BT_GRAPH_STATUS_INVALID;
295 goto end;
296 }
297
298 if (bt_component_get_class_type(origin) !=
299 bt_component_get_class_type(new_component)) {
300 status = BT_GRAPH_STATUS_INVALID;
301 goto end;
302 }
303
304 origin_graph = bt_component_get_graph(origin);
305 if (!origin_graph || (origin_graph != graph)) {
306 status = BT_GRAPH_STATUS_INVALID;
307 goto end;
308 }
309
310 new_graph = bt_component_get_graph(new_component);
311 if (new_graph) {
312 status = BT_GRAPH_STATUS_ALREADY_IN_A_GRAPH;
313 goto end;
314 }
315
316 if (get_component_port_counts(origin, &origin_input_port_count,
346df6cf 317 &origin_output_port_count) != BT_COMPONENT_STATUS_OK) {
f60c8b34
JG
318 status = BT_GRAPH_STATUS_INVALID;
319 goto end;
320 }
321 if (get_component_port_counts(new_component, &new_input_port_count,
346df6cf 322 &new_output_port_count) != BT_COMPONENT_STATUS_OK) {
f60c8b34
JG
323 status = BT_GRAPH_STATUS_INVALID;
324 goto end;
325 }
326
327 if (origin_input_port_count != new_input_port_count ||
328 origin_output_port_count != new_output_port_count) {
329 status = BT_GRAPH_STATUS_INVALID;
330 goto end;
331 }
332
333 /* Replicate input connections. */
334 for (port_index = 0; port_index< origin_input_port_count; port_index++) {
335 uint64_t connection_count, connection_index;
336
337 origin_port = get_input_port(origin, port_index);
338 if (!origin_port) {
339 status = BT_GRAPH_STATUS_ERROR;
340 goto error_disconnect;
341 }
342 new_port = get_input_port(new_component, port_index);
343 if (!new_port) {
344 status = BT_GRAPH_STATUS_ERROR;
345 goto error_disconnect;
346 }
347
348 if (bt_port_get_connection_count(origin_port, &connection_count) !=
349 BT_PORT_STATUS_OK) {
350 status = BT_GRAPH_STATUS_ERROR;
351 goto error_disconnect;
352 }
353
354 for (connection_index = 0; connection_index < connection_count;
355 connection_index++) {
356 origin_connection = bt_port_get_connection(origin_port,
357 connection_index);
358 if (!origin_connection) {
359 goto error_disconnect;
360 }
361
362 upstream_port = bt_connection_get_output_port(
363 origin_connection);
364 if (!upstream_port) {
365 goto error_disconnect;
366 }
367
368 new_connection = bt_graph_connect(graph, upstream_port,
369 new_port);
370 if (!new_connection) {
371 goto error_disconnect;
372 }
373
374 BT_PUT(upstream_port);
375 BT_PUT(origin_connection);
376 BT_PUT(new_connection);
377 }
378 BT_PUT(origin_port);
379 BT_PUT(new_port);
380 }
381
382 /* Replicate output connections. */
383 for (port_index = 0; port_index< origin_output_port_count; port_index++) {
384 uint64_t connection_count, connection_index;
385
386 origin_port = get_output_port(origin, port_index);
387 if (!origin_port) {
388 status = BT_GRAPH_STATUS_ERROR;
389 goto error_disconnect;
390 }
391 new_port = get_output_port(new_component, port_index);
392 if (!new_port) {
393 status = BT_GRAPH_STATUS_ERROR;
394 goto error_disconnect;
395 }
396
397 if (bt_port_get_connection_count(origin_port, &connection_count) !=
398 BT_PORT_STATUS_OK) {
399 status = BT_GRAPH_STATUS_ERROR;
400 goto error_disconnect;
401 }
402
403 for (connection_index = 0; connection_index < connection_count;
404 connection_index++) {
405 origin_connection = bt_port_get_connection(origin_port,
406 connection_index);
407 if (!origin_connection) {
408 goto error_disconnect;
409 }
410
411 downstream_port = bt_connection_get_input_port(
412 origin_connection);
413 if (!downstream_port) {
414 goto error_disconnect;
415 }
416
417 new_connection = bt_graph_connect(graph, new_port,
418 downstream_port);
419 if (!new_connection) {
420 goto error_disconnect;
421 }
422
423 BT_PUT(downstream_port);
424 BT_PUT(origin_connection);
425 BT_PUT(new_connection);
426 }
427 BT_PUT(origin_port);
428 BT_PUT(new_port);
429 }
430end:
431 bt_put(origin_graph);
432 bt_put(new_graph);
433 bt_put(origin_port);
434 bt_put(new_port);
435 bt_put(upstream_port);
436 bt_put(downstream_port);
437 bt_put(origin_connection);
438 bt_put(new_connection);
439 return status;
440error_disconnect:
441 /* Destroy all connections of the new component. */
442 /* FIXME. */
443 goto end;
c0418dd9
JG
444}
445
f60c8b34 446enum bt_component_status bt_graph_consume(struct bt_graph *graph)
c0418dd9 447{
f60c8b34
JG
448 struct bt_component *sink;
449 enum bt_component_status status;
450 GList *current_node;
451
452 if (!graph) {
453 status = BT_COMPONENT_STATUS_INVALID;
454 goto end;
455 }
456
457 if (g_queue_is_empty(graph->sinks_to_consume)) {
458 status = BT_COMPONENT_STATUS_END;
459 goto end;
460 }
461
462 current_node = g_queue_pop_head_link(graph->sinks_to_consume);
463 sink = current_node->data;
464 status = bt_component_sink_consume(sink);
465 if (status != BT_COMPONENT_STATUS_END) {
466 g_queue_push_tail_link(graph->sinks_to_consume, current_node);
467 goto end;
468 }
469
470 /* End reached, the node is not added back to the queue and free'd. */
471 g_queue_delete_link(graph->sinks_to_consume, current_node);
472
473 /* Don't forward an END status if there are sinks left to consume. */
474 if (!g_queue_is_empty(graph->sinks_to_consume)) {
475 status = BT_GRAPH_STATUS_OK;
476 goto end;
477 }
478end:
479 return status;
c0418dd9
JG
480}
481
f60c8b34
JG
482enum bt_graph_status bt_graph_run(struct bt_graph *graph,
483 enum bt_component_status *_component_status)
484{
485 enum bt_component_status component_status;
486 enum bt_graph_status graph_status = BT_GRAPH_STATUS_OK;
487
488 if (!graph) {
489 graph_status = BT_GRAPH_STATUS_INVALID;
490 goto error;
491 }
492
493 do {
494 component_status = bt_graph_consume(graph);
495 if (component_status == BT_COMPONENT_STATUS_AGAIN) {
496 /*
497 * If AGAIN is received and there are multiple sinks,
498 * go ahead and consume from the next sink.
499 *
500 * However, in the case where a single sink is left,
501 * the caller can decide to busy-wait and call
502 * bt_graph_run continuously until the source is ready
503 * or it can decide to sleep for an arbitrary amount of
504 * time.
505 */
506 if (graph->sinks_to_consume->length > 1) {
507 component_status = BT_COMPONENT_STATUS_OK;
508 }
509 }
510 } while (component_status == BT_COMPONENT_STATUS_OK);
511
512 if (_component_status) {
513 *_component_status = component_status;
514 }
515
516 if (g_queue_is_empty(graph->sinks_to_consume)) {
517 graph_status = BT_GRAPH_STATUS_END;
518 } else if (component_status == BT_COMPONENT_STATUS_AGAIN) {
519 graph_status = BT_GRAPH_STATUS_AGAIN;
520 } else {
521 graph_status = BT_GRAPH_STATUS_ERROR;
522 }
523error:
524 return graph_status;
525}
This page took 0.046822 seconds and 4 git commands to generate.