Allow a component to remove a port and any user to disconnect one
[babeltrace.git] / lib / component / graph.c
CommitLineData
c0418dd9 1/*
7d55361f 2 * graph.c
c0418dd9
JG
3 *
4 * Babeltrace Plugin Component Graph
5 *
f60c8b34 6 * Copyright 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
c0418dd9
JG
7 *
8 * Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
9 *
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
16 *
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
19 *
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
26 * SOFTWARE.
27 */
28
f60c8b34 29#include <babeltrace/component/component-internal.h>
7d55361f
JG
30#include <babeltrace/component/graph-internal.h>
31#include <babeltrace/component/connection-internal.h>
f60c8b34
JG
32#include <babeltrace/component/component-sink-internal.h>
33#include <babeltrace/component/component-source.h>
34#include <babeltrace/component/component-filter.h>
7d55361f 35#include <babeltrace/component/port.h>
c0418dd9 36#include <babeltrace/compiler.h>
f60c8b34 37#include <unistd.h>
c0418dd9 38
f60c8b34
JG
39static
40void bt_graph_destroy(struct bt_object *obj)
c0418dd9 41{
f60c8b34
JG
42 struct bt_graph *graph = container_of(obj,
43 struct bt_graph, base);
c0418dd9 44
f60c8b34
JG
45 if (graph->components) {
46 g_ptr_array_free(graph->components, TRUE);
47 }
c0418dd9
JG
48 if (graph->connections) {
49 g_ptr_array_free(graph->connections, TRUE);
50 }
f60c8b34
JG
51 if (graph->sinks_to_consume) {
52 g_queue_free(graph->sinks_to_consume);
c0418dd9
JG
53 }
54 g_free(graph);
55}
56
f60c8b34 57struct bt_graph *bt_graph_create(void)
c0418dd9 58{
f60c8b34 59 struct bt_graph *graph;
c0418dd9 60
f60c8b34 61 graph = g_new0(struct bt_graph, 1);
c0418dd9
JG
62 if (!graph) {
63 goto end;
64 }
65
f60c8b34 66 bt_object_init(graph, bt_graph_destroy);
c0418dd9 67
f60c8b34 68 graph->connections = g_ptr_array_new_with_free_func(bt_object_release);
c0418dd9
JG
69 if (!graph->connections) {
70 goto error;
71 }
f60c8b34
JG
72 graph->components = g_ptr_array_new_with_free_func(bt_object_release);
73 if (!graph->components) {
74 goto error;
75 }
76 graph->sinks_to_consume = g_queue_new();
77 if (!graph->sinks_to_consume) {
c0418dd9
JG
78 goto error;
79 }
80end:
81 return graph;
82error:
83 BT_PUT(graph);
84 goto end;
85}
86
f60c8b34
JG
87struct bt_connection *bt_graph_connect(struct bt_graph *graph,
88 struct bt_port *upstream_port,
89 struct bt_port *downstream_port)
90{
91 struct bt_connection *connection = NULL;
92 struct bt_graph *upstream_graph = NULL;
93 struct bt_graph *downstream_graph = NULL;
94 struct bt_component *upstream_component = NULL;
95 struct bt_component *downstream_component = NULL;
72b913fb 96 struct bt_connection *existing_conn = NULL;
f60c8b34 97 enum bt_component_status component_status;
ffeb0eed
JG
98 bool upstream_was_already_in_graph;
99 bool downstream_was_already_in_graph;
3eeacbb9
JG
100 int components_to_remove = 0;
101 int i;
f60c8b34
JG
102
103 if (!graph || !upstream_port || !downstream_port) {
104 goto end;
105 }
106
72b913fb 107 /* Ensure appropriate types for upstream and downstream ports. */
f60c8b34
JG
108 if (bt_port_get_type(upstream_port) != BT_PORT_TYPE_OUTPUT) {
109 goto end;
110 }
111 if (bt_port_get_type(downstream_port) != BT_PORT_TYPE_INPUT) {
112 goto end;
113 }
114
72b913fb
PP
115 /* Ensure that both ports are currently unconnected. */
116 existing_conn = bt_port_get_connection(upstream_port);
117 bt_put(existing_conn);
118 if (existing_conn) {
119 fprintf(stderr, "Upstream port is already connected\n");
120 goto end;
121 }
122
123 existing_conn = bt_port_get_connection(downstream_port);
124 bt_put(existing_conn);
125 if (existing_conn) {
126 fprintf(stderr, "Downstream port is already connected\n");
127 goto end;
128 }
129
130 /*
131 * Ensure that both ports are still attached to their creating
132 * component.
133 */
f60c8b34 134 upstream_component = bt_port_get_component(upstream_port);
72b913fb
PP
135 if (!upstream_component) {
136 fprintf(stderr, "Upstream port does not belong to a component\n");
137 goto end;
138 }
139
140 downstream_component = bt_port_get_component(downstream_port);
141 if (!downstream_component) {
142 fprintf(stderr, "Downstream port does not belong to a component\n");
143 goto end;
144 }
145
146 /* Ensure the components are not already part of another graph. */
f60c8b34
JG
147 upstream_graph = bt_component_get_graph(upstream_component);
148 if (upstream_graph && (graph != upstream_graph)) {
149 fprintf(stderr, "Upstream component is already part of another graph\n");
150 goto error;
151 }
ffeb0eed 152 upstream_was_already_in_graph = (graph == upstream_graph);
f60c8b34
JG
153 downstream_graph = bt_component_get_graph(downstream_component);
154 if (downstream_graph && (graph != downstream_graph)) {
155 fprintf(stderr, "Downstream component is already part of another graph\n");
156 goto error;
157 }
ffeb0eed 158 downstream_was_already_in_graph = (graph == downstream_graph);
f60c8b34
JG
159
160 connection = bt_connection_create(graph, upstream_port,
161 downstream_port);
162 if (!connection) {
163 goto error;
164 }
165
166 /*
72b913fb
PP
167 * Ownership of upstream_component/downstream_component and of
168 * the connection object is transferred to the graph.
f60c8b34
JG
169 */
170 g_ptr_array_add(graph->connections, connection);
ffeb0eed
JG
171
172 if (!upstream_was_already_in_graph) {
173 g_ptr_array_add(graph->components, upstream_component);
174 bt_component_set_graph(upstream_component, graph);
175 }
176 if (!downstream_was_already_in_graph) {
177 g_ptr_array_add(graph->components, downstream_component);
178 bt_component_set_graph(downstream_component, graph);
179 if (bt_component_get_class_type(downstream_component) ==
180 BT_COMPONENT_CLASS_TYPE_SINK) {
181 g_queue_push_tail(graph->sinks_to_consume,
182 downstream_component);
183 }
f60c8b34
JG
184 }
185
186 /*
187 * The graph is now the parent of these components which garantees their
188 * existence for the duration of the graph's lifetime.
189 */
f60c8b34
JG
190
191 /*
72b913fb
PP
192 * The components and connection are added to the graph before
193 * invoking the `accept_port_connection` method in order to make
194 * them visible to the components during the method's
195 * invocation.
f60c8b34 196 */
72b913fb
PP
197 component_status = bt_component_accept_port_connection(
198 upstream_component, upstream_port);
f60c8b34 199 if (component_status != BT_COMPONENT_STATUS_OK) {
3eeacbb9 200 goto error_rollback;
f60c8b34 201 }
72b913fb
PP
202 component_status = bt_component_accept_port_connection(
203 downstream_component, downstream_port);
f60c8b34 204 if (component_status != BT_COMPONENT_STATUS_OK) {
3eeacbb9 205 goto error_rollback;
f60c8b34
JG
206 }
207end:
208 bt_put(upstream_graph);
209 bt_put(downstream_graph);
3eeacbb9
JG
210 bt_put(upstream_component);
211 bt_put(downstream_component);
f60c8b34 212 return connection;
3eeacbb9
JG
213error_rollback:
214 /*
215 * Remove newly-added components from the graph, being careful
216 * not to remove a component that was already present in the graph
217 * and is connected to other components.
218 */
219 components_to_remove += upstream_was_already_in_graph ? 0 : 1;
220 components_to_remove += downstream_was_already_in_graph ? 0 : 1;
221
222 if (!downstream_was_already_in_graph) {
f60c8b34
JG
223 if (bt_component_get_class_type(downstream_component) ==
224 BT_COMPONENT_CLASS_TYPE_SINK) {
225 g_queue_pop_tail(graph->sinks_to_consume);
226 }
3eeacbb9
JG
227 }
228 /* Remove newly created connection. */
229 g_ptr_array_set_size(graph->connections,
230 graph->connections->len - 1);
231
232 /*
233 * Remove newly added components.
234 *
235 * Note that this is a tricky situation. The graph, being the parent
236 * of the components, does not hold a reference to them. Normally,
237 * components are destroyed right away when the graph is released since
238 * the graph, being their parent, bounds their lifetime
239 * (see doc/ref-counting.md).
240 *
241 * In this particular case, we must take a number of steps:
242 * 1) unset the components' parent to rollback the initial state of
243 * the components being connected.
244 * Note that the reference taken by the component on its graph is
245 * released by the set_parent call.
246 * 2) set the pointer in the components array to NULL so that the
247 * destruction function called on the array's resize in invoked on
248 * NULL (no effect),
249 *
250 * NOTE: Point #1 assumes that *something* holds a reference to both
251 * components being connected. The fact that a reference is being
252 * held to a component means that it must hold a reference to its
253 * parent to prevent the parent from being destroyed (again, refer
254 * to doc/red-counting.md). This reference to a component is
255 * most likely being held *transitively* by the caller which holds
256 * a reference to both ports (a port has its component as a
257 * parent).
258 *
259 * This assumes that a graph is not connecting components by
260 * itself while not holding a reference to the ports/components
261 * being connected (i.e. "cheating" by using internal APIs).
262 */
263 for (i = 0; i < components_to_remove; i++) {
264 struct bt_component *component = g_ptr_array_index(
265 graph->components, graph->components->len - 1);
266
267 bt_component_set_graph(component, NULL);
268 g_ptr_array_index(graph->components,
269 graph->components->len - 1) = NULL;
f60c8b34 270 g_ptr_array_set_size(graph->components,
3eeacbb9 271 graph->components->len - 1);
f60c8b34 272 }
3eeacbb9
JG
273 /* NOTE: Resizing the ptr_arrays invokes the destruction of the elements. */
274 goto end;
275error:
276 BT_PUT(upstream_component);
277 BT_PUT(downstream_component);
f60c8b34
JG
278 goto end;
279}
280
281static
346df6cf
JG
282enum bt_component_status get_component_port_counts(
283 struct bt_component *component, uint64_t *input_count,
284 uint64_t *output_count)
f60c8b34 285{
346df6cf 286 enum bt_component_status ret;
f60c8b34
JG
287
288 switch (bt_component_get_class_type(component)) {
289 case BT_COMPONENT_CLASS_TYPE_SOURCE:
346df6cf
JG
290 ret = bt_component_source_get_output_port_count(component,
291 output_count);
292 if (ret != BT_COMPONENT_STATUS_OK) {
f60c8b34
JG
293 goto end;
294 }
f60c8b34
JG
295 break;
296 case BT_COMPONENT_CLASS_TYPE_FILTER:
346df6cf
JG
297 ret = bt_component_filter_get_output_port_count(component,
298 output_count);
299 if (ret != BT_COMPONENT_STATUS_OK) {
f60c8b34
JG
300 goto end;
301 }
346df6cf
JG
302 ret = bt_component_filter_get_input_port_count(component,
303 input_count);
304 if (ret != BT_COMPONENT_STATUS_OK) {
f60c8b34
JG
305 goto end;
306 }
f60c8b34
JG
307 break;
308 case BT_COMPONENT_CLASS_TYPE_SINK:
346df6cf
JG
309 ret = bt_component_sink_get_input_port_count(component,
310 input_count);
311 if (ret != BT_COMPONENT_STATUS_OK) {
f60c8b34
JG
312 goto end;
313 }
f60c8b34
JG
314 break;
315 default:
316 assert(false);
317 break;
318 }
346df6cf 319 ret = BT_COMPONENT_STATUS_OK;
f60c8b34
JG
320end:
321 return ret;
322}
323
f60c8b34
JG
324enum bt_graph_status bt_graph_add_component_as_sibling(struct bt_graph *graph,
325 struct bt_component *origin,
c0418dd9
JG
326 struct bt_component *new_component)
327{
346df6cf
JG
328 uint64_t origin_input_port_count = 0;
329 uint64_t origin_output_port_count = 0;
330 uint64_t new_input_port_count = 0;
331 uint64_t new_output_port_count = 0;
f60c8b34
JG
332 enum bt_graph_status status = BT_GRAPH_STATUS_OK;
333 struct bt_graph *origin_graph = NULL;
334 struct bt_graph *new_graph = NULL;
335 struct bt_port *origin_port = NULL;
336 struct bt_port *new_port = NULL;
337 struct bt_port *upstream_port = NULL;
338 struct bt_port *downstream_port = NULL;
339 struct bt_connection *origin_connection = NULL;
340 struct bt_connection *new_connection = NULL;
341 int port_index;
342
343 if (!graph || !origin || !new_component) {
344 status = BT_GRAPH_STATUS_INVALID;
345 goto end;
346 }
347
348 if (bt_component_get_class_type(origin) !=
349 bt_component_get_class_type(new_component)) {
350 status = BT_GRAPH_STATUS_INVALID;
351 goto end;
352 }
353
354 origin_graph = bt_component_get_graph(origin);
355 if (!origin_graph || (origin_graph != graph)) {
356 status = BT_GRAPH_STATUS_INVALID;
357 goto end;
358 }
359
360 new_graph = bt_component_get_graph(new_component);
361 if (new_graph) {
362 status = BT_GRAPH_STATUS_ALREADY_IN_A_GRAPH;
363 goto end;
364 }
365
366 if (get_component_port_counts(origin, &origin_input_port_count,
346df6cf 367 &origin_output_port_count) != BT_COMPONENT_STATUS_OK) {
f60c8b34
JG
368 status = BT_GRAPH_STATUS_INVALID;
369 goto end;
370 }
371 if (get_component_port_counts(new_component, &new_input_port_count,
346df6cf 372 &new_output_port_count) != BT_COMPONENT_STATUS_OK) {
f60c8b34
JG
373 status = BT_GRAPH_STATUS_INVALID;
374 goto end;
375 }
376
377 if (origin_input_port_count != new_input_port_count ||
378 origin_output_port_count != new_output_port_count) {
379 status = BT_GRAPH_STATUS_INVALID;
380 goto end;
381 }
382
383 /* Replicate input connections. */
384 for (port_index = 0; port_index< origin_input_port_count; port_index++) {
72b913fb
PP
385 origin_port = bt_component_get_input_port_at_index(origin,
386 port_index);
f60c8b34
JG
387 if (!origin_port) {
388 status = BT_GRAPH_STATUS_ERROR;
389 goto error_disconnect;
390 }
f60c8b34 391
72b913fb
PP
392 new_port = bt_component_get_input_port_at_index(new_component,
393 port_index);
394 if (!new_port) {
f60c8b34
JG
395 status = BT_GRAPH_STATUS_ERROR;
396 goto error_disconnect;
397 }
398
72b913fb
PP
399 origin_connection = bt_port_get_connection(origin_port);
400 if (origin_connection) {
401 upstream_port = bt_connection_get_upstream_port(
402 origin_connection);
f60c8b34
JG
403 if (!upstream_port) {
404 goto error_disconnect;
405 }
406
407 new_connection = bt_graph_connect(graph, upstream_port,
408 new_port);
409 if (!new_connection) {
410 goto error_disconnect;
411 }
f60c8b34 412 }
72b913fb
PP
413
414 BT_PUT(upstream_port);
415 BT_PUT(origin_connection);
416 BT_PUT(new_connection);
f60c8b34
JG
417 BT_PUT(origin_port);
418 BT_PUT(new_port);
419 }
420
421 /* Replicate output connections. */
72b913fb
PP
422 for (port_index = 0; port_index < origin_output_port_count; port_index++) {
423 origin_port = bt_component_get_output_port_at_index(origin,
424 port_index);
f60c8b34
JG
425 if (!origin_port) {
426 status = BT_GRAPH_STATUS_ERROR;
427 goto error_disconnect;
428 }
72b913fb
PP
429 new_port = bt_component_get_output_port_at_index(new_component,
430 port_index);
f60c8b34
JG
431 if (!new_port) {
432 status = BT_GRAPH_STATUS_ERROR;
433 goto error_disconnect;
434 }
435
72b913fb
PP
436 origin_connection = bt_port_get_connection(origin_port);
437 if (origin_connection) {
438 downstream_port = bt_connection_get_downstream_port(
f60c8b34
JG
439 origin_connection);
440 if (!downstream_port) {
441 goto error_disconnect;
442 }
443
444 new_connection = bt_graph_connect(graph, new_port,
445 downstream_port);
446 if (!new_connection) {
447 goto error_disconnect;
448 }
f60c8b34 449 }
72b913fb
PP
450
451 BT_PUT(downstream_port);
452 BT_PUT(origin_connection);
453 BT_PUT(new_connection);
f60c8b34
JG
454 BT_PUT(origin_port);
455 BT_PUT(new_port);
456 }
457end:
458 bt_put(origin_graph);
459 bt_put(new_graph);
460 bt_put(origin_port);
461 bt_put(new_port);
462 bt_put(upstream_port);
463 bt_put(downstream_port);
464 bt_put(origin_connection);
465 bt_put(new_connection);
466 return status;
467error_disconnect:
468 /* Destroy all connections of the new component. */
469 /* FIXME. */
470 goto end;
c0418dd9
JG
471}
472
72b913fb 473enum bt_graph_status bt_graph_consume(struct bt_graph *graph)
c0418dd9 474{
f60c8b34 475 struct bt_component *sink;
72b913fb
PP
476 enum bt_graph_status status = BT_GRAPH_STATUS_OK;
477 enum bt_component_status comp_status;
f60c8b34
JG
478 GList *current_node;
479
480 if (!graph) {
72b913fb 481 status = BT_GRAPH_STATUS_INVALID;
f60c8b34
JG
482 goto end;
483 }
484
485 if (g_queue_is_empty(graph->sinks_to_consume)) {
72b913fb 486 status = BT_GRAPH_STATUS_END;
f60c8b34
JG
487 goto end;
488 }
489
490 current_node = g_queue_pop_head_link(graph->sinks_to_consume);
491 sink = current_node->data;
72b913fb
PP
492 comp_status = bt_component_sink_consume(sink);
493 switch (comp_status) {
494 case BT_COMPONENT_STATUS_OK:
495 break;
496 case BT_COMPONENT_STATUS_END:
497 status = BT_GRAPH_STATUS_END;
498 break;
499 case BT_COMPONENT_STATUS_AGAIN:
500 status = BT_GRAPH_STATUS_AGAIN;
501 break;
502 case BT_COMPONENT_STATUS_INVALID:
503 status = BT_GRAPH_STATUS_INVALID;
504 break;
505 default:
506 status = BT_GRAPH_STATUS_ERROR;
507 break;
508 }
509
510 if (status != BT_GRAPH_STATUS_END) {
f60c8b34
JG
511 g_queue_push_tail_link(graph->sinks_to_consume, current_node);
512 goto end;
513 }
514
515 /* End reached, the node is not added back to the queue and free'd. */
516 g_queue_delete_link(graph->sinks_to_consume, current_node);
517
518 /* Don't forward an END status if there are sinks left to consume. */
519 if (!g_queue_is_empty(graph->sinks_to_consume)) {
520 status = BT_GRAPH_STATUS_OK;
521 goto end;
522 }
523end:
524 return status;
c0418dd9
JG
525}
526
72b913fb 527enum bt_graph_status bt_graph_run(struct bt_graph *graph)
f60c8b34 528{
72b913fb 529 enum bt_graph_status status = BT_GRAPH_STATUS_OK;
f60c8b34
JG
530
531 if (!graph) {
72b913fb 532 status = BT_GRAPH_STATUS_INVALID;
f60c8b34
JG
533 goto error;
534 }
535
536 do {
72b913fb
PP
537 status = bt_graph_consume(graph);
538 if (status == BT_GRAPH_STATUS_AGAIN) {
f60c8b34
JG
539 /*
540 * If AGAIN is received and there are multiple sinks,
541 * go ahead and consume from the next sink.
542 *
543 * However, in the case where a single sink is left,
544 * the caller can decide to busy-wait and call
545 * bt_graph_run continuously until the source is ready
546 * or it can decide to sleep for an arbitrary amount of
547 * time.
548 */
549 if (graph->sinks_to_consume->length > 1) {
72b913fb 550 status = BT_GRAPH_STATUS_OK;
f60c8b34
JG
551 }
552 }
72b913fb 553 } while (status == BT_GRAPH_STATUS_OK);
f60c8b34
JG
554
555 if (g_queue_is_empty(graph->sinks_to_consume)) {
72b913fb 556 status = BT_GRAPH_STATUS_END;
f60c8b34
JG
557 }
558error:
72b913fb 559 return status;
f60c8b34 560}
This page took 0.050019 seconds and 4 git commands to generate.