Put Python plugin support in a separate shared object
[babeltrace.git] / lib / component / graph.c
CommitLineData
c0418dd9 1/*
7d55361f 2 * graph.c
c0418dd9
JG
3 *
4 * Babeltrace Plugin Component Graph
5 *
f60c8b34 6 * Copyright 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
c0418dd9
JG
7 *
8 * Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
9 *
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
16 *
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
19 *
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
26 * SOFTWARE.
27 */
28
f60c8b34 29#include <babeltrace/component/component-internal.h>
7d55361f
JG
30#include <babeltrace/component/graph-internal.h>
31#include <babeltrace/component/connection-internal.h>
f60c8b34
JG
32#include <babeltrace/component/component-sink-internal.h>
33#include <babeltrace/component/component-source.h>
34#include <babeltrace/component/component-filter.h>
7d55361f 35#include <babeltrace/component/port.h>
c0418dd9 36#include <babeltrace/compiler.h>
f60c8b34 37#include <unistd.h>
c0418dd9 38
f60c8b34
JG
39static
40void bt_graph_destroy(struct bt_object *obj)
c0418dd9 41{
f60c8b34
JG
42 struct bt_graph *graph = container_of(obj,
43 struct bt_graph, base);
c0418dd9 44
f60c8b34
JG
45 if (graph->components) {
46 g_ptr_array_free(graph->components, TRUE);
47 }
c0418dd9
JG
48 if (graph->connections) {
49 g_ptr_array_free(graph->connections, TRUE);
50 }
f60c8b34
JG
51 if (graph->sinks_to_consume) {
52 g_queue_free(graph->sinks_to_consume);
c0418dd9
JG
53 }
54 g_free(graph);
55}
56
f60c8b34 57struct bt_graph *bt_graph_create(void)
c0418dd9 58{
f60c8b34 59 struct bt_graph *graph;
c0418dd9 60
f60c8b34 61 graph = g_new0(struct bt_graph, 1);
c0418dd9
JG
62 if (!graph) {
63 goto end;
64 }
65
f60c8b34 66 bt_object_init(graph, bt_graph_destroy);
c0418dd9 67
f60c8b34 68 graph->connections = g_ptr_array_new_with_free_func(bt_object_release);
c0418dd9
JG
69 if (!graph->connections) {
70 goto error;
71 }
f60c8b34
JG
72 graph->components = g_ptr_array_new_with_free_func(bt_object_release);
73 if (!graph->components) {
74 goto error;
75 }
76 graph->sinks_to_consume = g_queue_new();
77 if (!graph->sinks_to_consume) {
c0418dd9
JG
78 goto error;
79 }
80end:
81 return graph;
82error:
83 BT_PUT(graph);
84 goto end;
85}
86
f60c8b34
JG
87struct bt_connection *bt_graph_connect(struct bt_graph *graph,
88 struct bt_port *upstream_port,
89 struct bt_port *downstream_port)
90{
91 struct bt_connection *connection = NULL;
92 struct bt_graph *upstream_graph = NULL;
93 struct bt_graph *downstream_graph = NULL;
94 struct bt_component *upstream_component = NULL;
95 struct bt_component *downstream_component = NULL;
96 enum bt_component_status component_status;
ffeb0eed
JG
97 bool upstream_was_already_in_graph;
98 bool downstream_was_already_in_graph;
3eeacbb9
JG
99 int components_to_remove = 0;
100 int i;
f60c8b34
JG
101
102 if (!graph || !upstream_port || !downstream_port) {
103 goto end;
104 }
105
106 if (bt_port_get_type(upstream_port) != BT_PORT_TYPE_OUTPUT) {
107 goto end;
108 }
109 if (bt_port_get_type(downstream_port) != BT_PORT_TYPE_INPUT) {
110 goto end;
111 }
112
113 /* Ensure the components are not already part of another graph. */
114 upstream_component = bt_port_get_component(upstream_port);
115 assert(upstream_component);
116 upstream_graph = bt_component_get_graph(upstream_component);
117 if (upstream_graph && (graph != upstream_graph)) {
118 fprintf(stderr, "Upstream component is already part of another graph\n");
119 goto error;
120 }
ffeb0eed 121 upstream_was_already_in_graph = (graph == upstream_graph);
f60c8b34
JG
122
123 downstream_component = bt_port_get_component(downstream_port);
124 assert(downstream_component);
125 downstream_graph = bt_component_get_graph(downstream_component);
126 if (downstream_graph && (graph != downstream_graph)) {
127 fprintf(stderr, "Downstream component is already part of another graph\n");
128 goto error;
129 }
ffeb0eed 130 downstream_was_already_in_graph = (graph == downstream_graph);
f60c8b34
JG
131
132 connection = bt_connection_create(graph, upstream_port,
133 downstream_port);
134 if (!connection) {
135 goto error;
136 }
137
138 /*
139 * Ownership of up/downstream_component and of the connection object is
140 * transferred to the graph.
141 */
142 g_ptr_array_add(graph->connections, connection);
ffeb0eed
JG
143
144 if (!upstream_was_already_in_graph) {
145 g_ptr_array_add(graph->components, upstream_component);
146 bt_component_set_graph(upstream_component, graph);
147 }
148 if (!downstream_was_already_in_graph) {
149 g_ptr_array_add(graph->components, downstream_component);
150 bt_component_set_graph(downstream_component, graph);
151 if (bt_component_get_class_type(downstream_component) ==
152 BT_COMPONENT_CLASS_TYPE_SINK) {
153 g_queue_push_tail(graph->sinks_to_consume,
154 downstream_component);
155 }
f60c8b34
JG
156 }
157
158 /*
159 * The graph is now the parent of these components which garantees their
160 * existence for the duration of the graph's lifetime.
161 */
f60c8b34
JG
162
163 /*
164 * The components and connection are added to the graph before invoking
165 * the new_connection method in order to make them visible to the
166 * components during the method's invocation.
167 */
168 component_status = bt_component_new_connection(upstream_component,
169 upstream_port, connection);
170 if (component_status != BT_COMPONENT_STATUS_OK) {
3eeacbb9 171 goto error_rollback;
f60c8b34
JG
172 }
173 component_status = bt_component_new_connection(downstream_component,
174 downstream_port, connection);
175 if (component_status != BT_COMPONENT_STATUS_OK) {
3eeacbb9 176 goto error_rollback;
f60c8b34
JG
177 }
178end:
179 bt_put(upstream_graph);
180 bt_put(downstream_graph);
3eeacbb9
JG
181 bt_put(upstream_component);
182 bt_put(downstream_component);
f60c8b34 183 return connection;
3eeacbb9
JG
184error_rollback:
185 /*
186 * Remove newly-added components from the graph, being careful
187 * not to remove a component that was already present in the graph
188 * and is connected to other components.
189 */
190 components_to_remove += upstream_was_already_in_graph ? 0 : 1;
191 components_to_remove += downstream_was_already_in_graph ? 0 : 1;
192
193 if (!downstream_was_already_in_graph) {
f60c8b34
JG
194 if (bt_component_get_class_type(downstream_component) ==
195 BT_COMPONENT_CLASS_TYPE_SINK) {
196 g_queue_pop_tail(graph->sinks_to_consume);
197 }
3eeacbb9
JG
198 }
199 /* Remove newly created connection. */
200 g_ptr_array_set_size(graph->connections,
201 graph->connections->len - 1);
202
203 /*
204 * Remove newly added components.
205 *
206 * Note that this is a tricky situation. The graph, being the parent
207 * of the components, does not hold a reference to them. Normally,
208 * components are destroyed right away when the graph is released since
209 * the graph, being their parent, bounds their lifetime
210 * (see doc/ref-counting.md).
211 *
212 * In this particular case, we must take a number of steps:
213 * 1) unset the components' parent to rollback the initial state of
214 * the components being connected.
215 * Note that the reference taken by the component on its graph is
216 * released by the set_parent call.
217 * 2) set the pointer in the components array to NULL so that the
218 * destruction function called on the array's resize in invoked on
219 * NULL (no effect),
220 *
221 * NOTE: Point #1 assumes that *something* holds a reference to both
222 * components being connected. The fact that a reference is being
223 * held to a component means that it must hold a reference to its
224 * parent to prevent the parent from being destroyed (again, refer
225 * to doc/red-counting.md). This reference to a component is
226 * most likely being held *transitively* by the caller which holds
227 * a reference to both ports (a port has its component as a
228 * parent).
229 *
230 * This assumes that a graph is not connecting components by
231 * itself while not holding a reference to the ports/components
232 * being connected (i.e. "cheating" by using internal APIs).
233 */
234 for (i = 0; i < components_to_remove; i++) {
235 struct bt_component *component = g_ptr_array_index(
236 graph->components, graph->components->len - 1);
237
238 bt_component_set_graph(component, NULL);
239 g_ptr_array_index(graph->components,
240 graph->components->len - 1) = NULL;
f60c8b34 241 g_ptr_array_set_size(graph->components,
3eeacbb9 242 graph->components->len - 1);
f60c8b34 243 }
3eeacbb9
JG
244 /* NOTE: Resizing the ptr_arrays invokes the destruction of the elements. */
245 goto end;
246error:
247 BT_PUT(upstream_component);
248 BT_PUT(downstream_component);
f60c8b34
JG
249 goto end;
250}
251
252static
346df6cf
JG
253enum bt_component_status get_component_port_counts(
254 struct bt_component *component, uint64_t *input_count,
255 uint64_t *output_count)
f60c8b34 256{
346df6cf 257 enum bt_component_status ret;
f60c8b34
JG
258
259 switch (bt_component_get_class_type(component)) {
260 case BT_COMPONENT_CLASS_TYPE_SOURCE:
346df6cf
JG
261 ret = bt_component_source_get_output_port_count(component,
262 output_count);
263 if (ret != BT_COMPONENT_STATUS_OK) {
f60c8b34
JG
264 goto end;
265 }
f60c8b34
JG
266 break;
267 case BT_COMPONENT_CLASS_TYPE_FILTER:
346df6cf
JG
268 ret = bt_component_filter_get_output_port_count(component,
269 output_count);
270 if (ret != BT_COMPONENT_STATUS_OK) {
f60c8b34
JG
271 goto end;
272 }
346df6cf
JG
273 ret = bt_component_filter_get_input_port_count(component,
274 input_count);
275 if (ret != BT_COMPONENT_STATUS_OK) {
f60c8b34
JG
276 goto end;
277 }
f60c8b34
JG
278 break;
279 case BT_COMPONENT_CLASS_TYPE_SINK:
346df6cf
JG
280 ret = bt_component_sink_get_input_port_count(component,
281 input_count);
282 if (ret != BT_COMPONENT_STATUS_OK) {
f60c8b34
JG
283 goto end;
284 }
f60c8b34
JG
285 break;
286 default:
287 assert(false);
288 break;
289 }
346df6cf 290 ret = BT_COMPONENT_STATUS_OK;
f60c8b34
JG
291end:
292 return ret;
293}
294
295static
296struct bt_port *get_input_port(struct bt_component *component, int index)
c0418dd9 297{
f60c8b34
JG
298 struct bt_port *port = NULL;
299
300 switch (bt_component_get_class_type(component)) {
301 case BT_COMPONENT_CLASS_TYPE_FILTER:
302 port = bt_component_filter_get_input_port_at_index(component,
303 index);
304 break;
305 case BT_COMPONENT_CLASS_TYPE_SINK:
306 port = bt_component_sink_get_input_port_at_index(component,
307 index);
308 break;
309 default:
310 assert(false);
311 }
312 return port;
c0418dd9
JG
313}
314
f60c8b34
JG
315static
316struct bt_port *get_output_port(struct bt_component *component, int index)
c0418dd9 317{
f60c8b34
JG
318 struct bt_port *port = NULL;
319
320 switch (bt_component_get_class_type(component)) {
321 case BT_COMPONENT_CLASS_TYPE_SOURCE:
322 port = bt_component_source_get_output_port_at_index(component,
323 index);
324 break;
325 case BT_COMPONENT_CLASS_TYPE_FILTER:
326 port = bt_component_filter_get_output_port_at_index(component,
327 index);
328 break;
329 default:
330 assert(false);
331 }
332 return port;
c0418dd9
JG
333}
334
f60c8b34
JG
335enum bt_graph_status bt_graph_add_component_as_sibling(struct bt_graph *graph,
336 struct bt_component *origin,
c0418dd9
JG
337 struct bt_component *new_component)
338{
346df6cf
JG
339 uint64_t origin_input_port_count = 0;
340 uint64_t origin_output_port_count = 0;
341 uint64_t new_input_port_count = 0;
342 uint64_t new_output_port_count = 0;
f60c8b34
JG
343 enum bt_graph_status status = BT_GRAPH_STATUS_OK;
344 struct bt_graph *origin_graph = NULL;
345 struct bt_graph *new_graph = NULL;
346 struct bt_port *origin_port = NULL;
347 struct bt_port *new_port = NULL;
348 struct bt_port *upstream_port = NULL;
349 struct bt_port *downstream_port = NULL;
350 struct bt_connection *origin_connection = NULL;
351 struct bt_connection *new_connection = NULL;
352 int port_index;
353
354 if (!graph || !origin || !new_component) {
355 status = BT_GRAPH_STATUS_INVALID;
356 goto end;
357 }
358
359 if (bt_component_get_class_type(origin) !=
360 bt_component_get_class_type(new_component)) {
361 status = BT_GRAPH_STATUS_INVALID;
362 goto end;
363 }
364
365 origin_graph = bt_component_get_graph(origin);
366 if (!origin_graph || (origin_graph != graph)) {
367 status = BT_GRAPH_STATUS_INVALID;
368 goto end;
369 }
370
371 new_graph = bt_component_get_graph(new_component);
372 if (new_graph) {
373 status = BT_GRAPH_STATUS_ALREADY_IN_A_GRAPH;
374 goto end;
375 }
376
377 if (get_component_port_counts(origin, &origin_input_port_count,
346df6cf 378 &origin_output_port_count) != BT_COMPONENT_STATUS_OK) {
f60c8b34
JG
379 status = BT_GRAPH_STATUS_INVALID;
380 goto end;
381 }
382 if (get_component_port_counts(new_component, &new_input_port_count,
346df6cf 383 &new_output_port_count) != BT_COMPONENT_STATUS_OK) {
f60c8b34
JG
384 status = BT_GRAPH_STATUS_INVALID;
385 goto end;
386 }
387
388 if (origin_input_port_count != new_input_port_count ||
389 origin_output_port_count != new_output_port_count) {
390 status = BT_GRAPH_STATUS_INVALID;
391 goto end;
392 }
393
394 /* Replicate input connections. */
395 for (port_index = 0; port_index< origin_input_port_count; port_index++) {
396 uint64_t connection_count, connection_index;
397
398 origin_port = get_input_port(origin, port_index);
399 if (!origin_port) {
400 status = BT_GRAPH_STATUS_ERROR;
401 goto error_disconnect;
402 }
403 new_port = get_input_port(new_component, port_index);
404 if (!new_port) {
405 status = BT_GRAPH_STATUS_ERROR;
406 goto error_disconnect;
407 }
408
409 if (bt_port_get_connection_count(origin_port, &connection_count) !=
410 BT_PORT_STATUS_OK) {
411 status = BT_GRAPH_STATUS_ERROR;
412 goto error_disconnect;
413 }
414
415 for (connection_index = 0; connection_index < connection_count;
416 connection_index++) {
417 origin_connection = bt_port_get_connection(origin_port,
418 connection_index);
419 if (!origin_connection) {
420 goto error_disconnect;
421 }
422
423 upstream_port = bt_connection_get_output_port(
424 origin_connection);
425 if (!upstream_port) {
426 goto error_disconnect;
427 }
428
429 new_connection = bt_graph_connect(graph, upstream_port,
430 new_port);
431 if (!new_connection) {
432 goto error_disconnect;
433 }
434
435 BT_PUT(upstream_port);
436 BT_PUT(origin_connection);
437 BT_PUT(new_connection);
438 }
439 BT_PUT(origin_port);
440 BT_PUT(new_port);
441 }
442
443 /* Replicate output connections. */
444 for (port_index = 0; port_index< origin_output_port_count; port_index++) {
445 uint64_t connection_count, connection_index;
446
447 origin_port = get_output_port(origin, port_index);
448 if (!origin_port) {
449 status = BT_GRAPH_STATUS_ERROR;
450 goto error_disconnect;
451 }
452 new_port = get_output_port(new_component, port_index);
453 if (!new_port) {
454 status = BT_GRAPH_STATUS_ERROR;
455 goto error_disconnect;
456 }
457
458 if (bt_port_get_connection_count(origin_port, &connection_count) !=
459 BT_PORT_STATUS_OK) {
460 status = BT_GRAPH_STATUS_ERROR;
461 goto error_disconnect;
462 }
463
464 for (connection_index = 0; connection_index < connection_count;
465 connection_index++) {
466 origin_connection = bt_port_get_connection(origin_port,
467 connection_index);
468 if (!origin_connection) {
469 goto error_disconnect;
470 }
471
472 downstream_port = bt_connection_get_input_port(
473 origin_connection);
474 if (!downstream_port) {
475 goto error_disconnect;
476 }
477
478 new_connection = bt_graph_connect(graph, new_port,
479 downstream_port);
480 if (!new_connection) {
481 goto error_disconnect;
482 }
483
484 BT_PUT(downstream_port);
485 BT_PUT(origin_connection);
486 BT_PUT(new_connection);
487 }
488 BT_PUT(origin_port);
489 BT_PUT(new_port);
490 }
491end:
492 bt_put(origin_graph);
493 bt_put(new_graph);
494 bt_put(origin_port);
495 bt_put(new_port);
496 bt_put(upstream_port);
497 bt_put(downstream_port);
498 bt_put(origin_connection);
499 bt_put(new_connection);
500 return status;
501error_disconnect:
502 /* Destroy all connections of the new component. */
503 /* FIXME. */
504 goto end;
c0418dd9
JG
505}
506
f60c8b34 507enum bt_component_status bt_graph_consume(struct bt_graph *graph)
c0418dd9 508{
f60c8b34
JG
509 struct bt_component *sink;
510 enum bt_component_status status;
511 GList *current_node;
512
513 if (!graph) {
514 status = BT_COMPONENT_STATUS_INVALID;
515 goto end;
516 }
517
518 if (g_queue_is_empty(graph->sinks_to_consume)) {
519 status = BT_COMPONENT_STATUS_END;
520 goto end;
521 }
522
523 current_node = g_queue_pop_head_link(graph->sinks_to_consume);
524 sink = current_node->data;
525 status = bt_component_sink_consume(sink);
526 if (status != BT_COMPONENT_STATUS_END) {
527 g_queue_push_tail_link(graph->sinks_to_consume, current_node);
528 goto end;
529 }
530
531 /* End reached, the node is not added back to the queue and free'd. */
532 g_queue_delete_link(graph->sinks_to_consume, current_node);
533
534 /* Don't forward an END status if there are sinks left to consume. */
535 if (!g_queue_is_empty(graph->sinks_to_consume)) {
536 status = BT_GRAPH_STATUS_OK;
537 goto end;
538 }
539end:
540 return status;
c0418dd9
JG
541}
542
f60c8b34
JG
543enum bt_graph_status bt_graph_run(struct bt_graph *graph,
544 enum bt_component_status *_component_status)
545{
546 enum bt_component_status component_status;
547 enum bt_graph_status graph_status = BT_GRAPH_STATUS_OK;
548
549 if (!graph) {
550 graph_status = BT_GRAPH_STATUS_INVALID;
551 goto error;
552 }
553
554 do {
555 component_status = bt_graph_consume(graph);
556 if (component_status == BT_COMPONENT_STATUS_AGAIN) {
557 /*
558 * If AGAIN is received and there are multiple sinks,
559 * go ahead and consume from the next sink.
560 *
561 * However, in the case where a single sink is left,
562 * the caller can decide to busy-wait and call
563 * bt_graph_run continuously until the source is ready
564 * or it can decide to sleep for an arbitrary amount of
565 * time.
566 */
567 if (graph->sinks_to_consume->length > 1) {
568 component_status = BT_COMPONENT_STATUS_OK;
569 }
570 }
571 } while (component_status == BT_COMPONENT_STATUS_OK);
572
573 if (_component_status) {
574 *_component_status = component_status;
575 }
576
577 if (g_queue_is_empty(graph->sinks_to_consume)) {
578 graph_status = BT_GRAPH_STATUS_END;
579 } else if (component_status == BT_COMPONENT_STATUS_AGAIN) {
580 graph_status = BT_GRAPH_STATUS_AGAIN;
581 } else {
582 graph_status = BT_GRAPH_STATUS_ERROR;
583 }
584error:
585 return graph_status;
586}
This page took 0.050036 seconds and 4 git commands to generate.