9982f5d076210d6d825f6d703d56be95a24e0ef1
[babeltrace.git] / lib / component / graph.c
1 /*
2 * graph.c
3 *
4 * Babeltrace Plugin Component Graph
5 *
6 * Copyright 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
7 *
8 * Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
9 *
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
16 *
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
19 *
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
26 * SOFTWARE.
27 */
28
29 #include <babeltrace/component/component-internal.h>
30 #include <babeltrace/component/graph-internal.h>
31 #include <babeltrace/component/connection-internal.h>
32 #include <babeltrace/component/component-sink-internal.h>
33 #include <babeltrace/component/component-source.h>
34 #include <babeltrace/component/component-filter.h>
35 #include <babeltrace/component/port.h>
36 #include <babeltrace/compiler.h>
37 #include <unistd.h>
38
39 static
40 void bt_graph_destroy(struct bt_object *obj)
41 {
42 struct bt_graph *graph = container_of(obj,
43 struct bt_graph, base);
44
45 if (graph->components) {
46 g_ptr_array_free(graph->components, TRUE);
47 }
48 if (graph->connections) {
49 g_ptr_array_free(graph->connections, TRUE);
50 }
51 if (graph->sinks_to_consume) {
52 g_queue_free(graph->sinks_to_consume);
53 }
54 g_free(graph);
55 }
56
57 struct bt_graph *bt_graph_create(void)
58 {
59 struct bt_graph *graph;
60
61 graph = g_new0(struct bt_graph, 1);
62 if (!graph) {
63 goto end;
64 }
65
66 bt_object_init(graph, bt_graph_destroy);
67
68 graph->connections = g_ptr_array_new_with_free_func(bt_object_release);
69 if (!graph->connections) {
70 goto error;
71 }
72 graph->components = g_ptr_array_new_with_free_func(bt_object_release);
73 if (!graph->components) {
74 goto error;
75 }
76 graph->sinks_to_consume = g_queue_new();
77 if (!graph->sinks_to_consume) {
78 goto error;
79 }
80 end:
81 return graph;
82 error:
83 BT_PUT(graph);
84 goto end;
85 }
86
87 struct bt_connection *bt_graph_connect(struct bt_graph *graph,
88 struct bt_port *upstream_port,
89 struct bt_port *downstream_port)
90 {
91 struct bt_connection *connection = NULL;
92 struct bt_graph *upstream_graph = NULL;
93 struct bt_graph *downstream_graph = NULL;
94 struct bt_component *upstream_component = NULL;
95 struct bt_component *downstream_component = NULL;
96 enum bt_component_status component_status;
97 bool upstream_was_already_in_graph;
98 bool downstream_was_already_in_graph;
99
100 if (!graph || !upstream_port || !downstream_port) {
101 goto end;
102 }
103
104 if (bt_port_get_type(upstream_port) != BT_PORT_TYPE_OUTPUT) {
105 goto end;
106 }
107 if (bt_port_get_type(downstream_port) != BT_PORT_TYPE_INPUT) {
108 goto end;
109 }
110
111 /* Ensure the components are not already part of another graph. */
112 upstream_component = bt_port_get_component(upstream_port);
113 assert(upstream_component);
114 upstream_graph = bt_component_get_graph(upstream_component);
115 if (upstream_graph && (graph != upstream_graph)) {
116 fprintf(stderr, "Upstream component is already part of another graph\n");
117 goto error;
118 }
119 upstream_was_already_in_graph = (graph == upstream_graph);
120
121 downstream_component = bt_port_get_component(downstream_port);
122 assert(downstream_component);
123 downstream_graph = bt_component_get_graph(downstream_component);
124 if (downstream_graph && (graph != downstream_graph)) {
125 fprintf(stderr, "Downstream component is already part of another graph\n");
126 goto error;
127 }
128 downstream_was_already_in_graph = (graph == downstream_graph);
129
130 connection = bt_connection_create(graph, upstream_port,
131 downstream_port);
132 if (!connection) {
133 goto error;
134 }
135
136 /*
137 * Ownership of up/downstream_component and of the connection object is
138 * transferred to the graph.
139 */
140 g_ptr_array_add(graph->connections, connection);
141
142 if (!upstream_was_already_in_graph) {
143 g_ptr_array_add(graph->components, upstream_component);
144 bt_component_set_graph(upstream_component, graph);
145 }
146 if (!downstream_was_already_in_graph) {
147 g_ptr_array_add(graph->components, downstream_component);
148 bt_component_set_graph(downstream_component, graph);
149 if (bt_component_get_class_type(downstream_component) ==
150 BT_COMPONENT_CLASS_TYPE_SINK) {
151 g_queue_push_tail(graph->sinks_to_consume,
152 downstream_component);
153 }
154 }
155
156 /*
157 * The graph is now the parent of these components which garantees their
158 * existence for the duration of the graph's lifetime.
159 */
160
161 /*
162 * The components and connection are added to the graph before invoking
163 * the new_connection method in order to make them visible to the
164 * components during the method's invocation.
165 */
166 component_status = bt_component_new_connection(upstream_component,
167 upstream_port, connection);
168 if (component_status != BT_COMPONENT_STATUS_OK) {
169 goto error;
170 }
171 component_status = bt_component_new_connection(downstream_component,
172 downstream_port, connection);
173 if (component_status != BT_COMPONENT_STATUS_OK) {
174 goto error;
175 }
176 end:
177 bt_put(upstream_graph);
178 bt_put(downstream_graph);
179 return connection;
180 error:
181 if (components_added) {
182 if (bt_component_get_class_type(downstream_component) ==
183 BT_COMPONENT_CLASS_TYPE_SINK) {
184 g_queue_pop_tail(graph->sinks_to_consume);
185 }
186 g_ptr_array_set_size(graph->connections,
187 graph->connections->len - 1);
188 g_ptr_array_set_size(graph->components,
189 graph->components->len - 2);
190 }
191 goto end;
192 }
193
194 static
195 enum bt_component_status get_component_port_counts(
196 struct bt_component *component, uint64_t *input_count,
197 uint64_t *output_count)
198 {
199 enum bt_component_status ret;
200
201 switch (bt_component_get_class_type(component)) {
202 case BT_COMPONENT_CLASS_TYPE_SOURCE:
203 ret = bt_component_source_get_output_port_count(component,
204 output_count);
205 if (ret != BT_COMPONENT_STATUS_OK) {
206 goto end;
207 }
208 break;
209 case BT_COMPONENT_CLASS_TYPE_FILTER:
210 ret = bt_component_filter_get_output_port_count(component,
211 output_count);
212 if (ret != BT_COMPONENT_STATUS_OK) {
213 goto end;
214 }
215 ret = bt_component_filter_get_input_port_count(component,
216 input_count);
217 if (ret != BT_COMPONENT_STATUS_OK) {
218 goto end;
219 }
220 break;
221 case BT_COMPONENT_CLASS_TYPE_SINK:
222 ret = bt_component_sink_get_input_port_count(component,
223 input_count);
224 if (ret != BT_COMPONENT_STATUS_OK) {
225 goto end;
226 }
227 break;
228 default:
229 assert(false);
230 break;
231 }
232 ret = BT_COMPONENT_STATUS_OK;
233 end:
234 return ret;
235 }
236
237 static
238 struct bt_port *get_input_port(struct bt_component *component, int index)
239 {
240 struct bt_port *port = NULL;
241
242 switch (bt_component_get_class_type(component)) {
243 case BT_COMPONENT_CLASS_TYPE_FILTER:
244 port = bt_component_filter_get_input_port_at_index(component,
245 index);
246 break;
247 case BT_COMPONENT_CLASS_TYPE_SINK:
248 port = bt_component_sink_get_input_port_at_index(component,
249 index);
250 break;
251 default:
252 assert(false);
253 }
254 return port;
255 }
256
257 static
258 struct bt_port *get_output_port(struct bt_component *component, int index)
259 {
260 struct bt_port *port = NULL;
261
262 switch (bt_component_get_class_type(component)) {
263 case BT_COMPONENT_CLASS_TYPE_SOURCE:
264 port = bt_component_source_get_output_port_at_index(component,
265 index);
266 break;
267 case BT_COMPONENT_CLASS_TYPE_FILTER:
268 port = bt_component_filter_get_output_port_at_index(component,
269 index);
270 break;
271 default:
272 assert(false);
273 }
274 return port;
275 }
276
277 enum bt_graph_status bt_graph_add_component_as_sibling(struct bt_graph *graph,
278 struct bt_component *origin,
279 struct bt_component *new_component)
280 {
281 uint64_t origin_input_port_count = 0;
282 uint64_t origin_output_port_count = 0;
283 uint64_t new_input_port_count = 0;
284 uint64_t new_output_port_count = 0;
285 enum bt_graph_status status = BT_GRAPH_STATUS_OK;
286 struct bt_graph *origin_graph = NULL;
287 struct bt_graph *new_graph = NULL;
288 struct bt_port *origin_port = NULL;
289 struct bt_port *new_port = NULL;
290 struct bt_port *upstream_port = NULL;
291 struct bt_port *downstream_port = NULL;
292 struct bt_connection *origin_connection = NULL;
293 struct bt_connection *new_connection = NULL;
294 int port_index;
295
296 if (!graph || !origin || !new_component) {
297 status = BT_GRAPH_STATUS_INVALID;
298 goto end;
299 }
300
301 if (bt_component_get_class_type(origin) !=
302 bt_component_get_class_type(new_component)) {
303 status = BT_GRAPH_STATUS_INVALID;
304 goto end;
305 }
306
307 origin_graph = bt_component_get_graph(origin);
308 if (!origin_graph || (origin_graph != graph)) {
309 status = BT_GRAPH_STATUS_INVALID;
310 goto end;
311 }
312
313 new_graph = bt_component_get_graph(new_component);
314 if (new_graph) {
315 status = BT_GRAPH_STATUS_ALREADY_IN_A_GRAPH;
316 goto end;
317 }
318
319 if (get_component_port_counts(origin, &origin_input_port_count,
320 &origin_output_port_count) != BT_COMPONENT_STATUS_OK) {
321 status = BT_GRAPH_STATUS_INVALID;
322 goto end;
323 }
324 if (get_component_port_counts(new_component, &new_input_port_count,
325 &new_output_port_count) != BT_COMPONENT_STATUS_OK) {
326 status = BT_GRAPH_STATUS_INVALID;
327 goto end;
328 }
329
330 if (origin_input_port_count != new_input_port_count ||
331 origin_output_port_count != new_output_port_count) {
332 status = BT_GRAPH_STATUS_INVALID;
333 goto end;
334 }
335
336 /* Replicate input connections. */
337 for (port_index = 0; port_index< origin_input_port_count; port_index++) {
338 uint64_t connection_count, connection_index;
339
340 origin_port = get_input_port(origin, port_index);
341 if (!origin_port) {
342 status = BT_GRAPH_STATUS_ERROR;
343 goto error_disconnect;
344 }
345 new_port = get_input_port(new_component, port_index);
346 if (!new_port) {
347 status = BT_GRAPH_STATUS_ERROR;
348 goto error_disconnect;
349 }
350
351 if (bt_port_get_connection_count(origin_port, &connection_count) !=
352 BT_PORT_STATUS_OK) {
353 status = BT_GRAPH_STATUS_ERROR;
354 goto error_disconnect;
355 }
356
357 for (connection_index = 0; connection_index < connection_count;
358 connection_index++) {
359 origin_connection = bt_port_get_connection(origin_port,
360 connection_index);
361 if (!origin_connection) {
362 goto error_disconnect;
363 }
364
365 upstream_port = bt_connection_get_output_port(
366 origin_connection);
367 if (!upstream_port) {
368 goto error_disconnect;
369 }
370
371 new_connection = bt_graph_connect(graph, upstream_port,
372 new_port);
373 if (!new_connection) {
374 goto error_disconnect;
375 }
376
377 BT_PUT(upstream_port);
378 BT_PUT(origin_connection);
379 BT_PUT(new_connection);
380 }
381 BT_PUT(origin_port);
382 BT_PUT(new_port);
383 }
384
385 /* Replicate output connections. */
386 for (port_index = 0; port_index< origin_output_port_count; port_index++) {
387 uint64_t connection_count, connection_index;
388
389 origin_port = get_output_port(origin, port_index);
390 if (!origin_port) {
391 status = BT_GRAPH_STATUS_ERROR;
392 goto error_disconnect;
393 }
394 new_port = get_output_port(new_component, port_index);
395 if (!new_port) {
396 status = BT_GRAPH_STATUS_ERROR;
397 goto error_disconnect;
398 }
399
400 if (bt_port_get_connection_count(origin_port, &connection_count) !=
401 BT_PORT_STATUS_OK) {
402 status = BT_GRAPH_STATUS_ERROR;
403 goto error_disconnect;
404 }
405
406 for (connection_index = 0; connection_index < connection_count;
407 connection_index++) {
408 origin_connection = bt_port_get_connection(origin_port,
409 connection_index);
410 if (!origin_connection) {
411 goto error_disconnect;
412 }
413
414 downstream_port = bt_connection_get_input_port(
415 origin_connection);
416 if (!downstream_port) {
417 goto error_disconnect;
418 }
419
420 new_connection = bt_graph_connect(graph, new_port,
421 downstream_port);
422 if (!new_connection) {
423 goto error_disconnect;
424 }
425
426 BT_PUT(downstream_port);
427 BT_PUT(origin_connection);
428 BT_PUT(new_connection);
429 }
430 BT_PUT(origin_port);
431 BT_PUT(new_port);
432 }
433 end:
434 bt_put(origin_graph);
435 bt_put(new_graph);
436 bt_put(origin_port);
437 bt_put(new_port);
438 bt_put(upstream_port);
439 bt_put(downstream_port);
440 bt_put(origin_connection);
441 bt_put(new_connection);
442 return status;
443 error_disconnect:
444 /* Destroy all connections of the new component. */
445 /* FIXME. */
446 goto end;
447 }
448
449 enum bt_component_status bt_graph_consume(struct bt_graph *graph)
450 {
451 struct bt_component *sink;
452 enum bt_component_status status;
453 GList *current_node;
454
455 if (!graph) {
456 status = BT_COMPONENT_STATUS_INVALID;
457 goto end;
458 }
459
460 if (g_queue_is_empty(graph->sinks_to_consume)) {
461 status = BT_COMPONENT_STATUS_END;
462 goto end;
463 }
464
465 current_node = g_queue_pop_head_link(graph->sinks_to_consume);
466 sink = current_node->data;
467 status = bt_component_sink_consume(sink);
468 if (status != BT_COMPONENT_STATUS_END) {
469 g_queue_push_tail_link(graph->sinks_to_consume, current_node);
470 goto end;
471 }
472
473 /* End reached, the node is not added back to the queue and free'd. */
474 g_queue_delete_link(graph->sinks_to_consume, current_node);
475
476 /* Don't forward an END status if there are sinks left to consume. */
477 if (!g_queue_is_empty(graph->sinks_to_consume)) {
478 status = BT_GRAPH_STATUS_OK;
479 goto end;
480 }
481 end:
482 return status;
483 }
484
485 enum bt_graph_status bt_graph_run(struct bt_graph *graph,
486 enum bt_component_status *_component_status)
487 {
488 enum bt_component_status component_status;
489 enum bt_graph_status graph_status = BT_GRAPH_STATUS_OK;
490
491 if (!graph) {
492 graph_status = BT_GRAPH_STATUS_INVALID;
493 goto error;
494 }
495
496 do {
497 component_status = bt_graph_consume(graph);
498 if (component_status == BT_COMPONENT_STATUS_AGAIN) {
499 /*
500 * If AGAIN is received and there are multiple sinks,
501 * go ahead and consume from the next sink.
502 *
503 * However, in the case where a single sink is left,
504 * the caller can decide to busy-wait and call
505 * bt_graph_run continuously until the source is ready
506 * or it can decide to sleep for an arbitrary amount of
507 * time.
508 */
509 if (graph->sinks_to_consume->length > 1) {
510 component_status = BT_COMPONENT_STATUS_OK;
511 }
512 }
513 } while (component_status == BT_COMPONENT_STATUS_OK);
514
515 if (_component_status) {
516 *_component_status = component_status;
517 }
518
519 if (g_queue_is_empty(graph->sinks_to_consume)) {
520 graph_status = BT_GRAPH_STATUS_END;
521 } else if (component_status == BT_COMPONENT_STATUS_AGAIN) {
522 graph_status = BT_GRAPH_STATUS_AGAIN;
523 } else {
524 graph_status = BT_GRAPH_STATUS_ERROR;
525 }
526 error:
527 return graph_status;
528 }
This page took 0.03934 seconds and 3 git commands to generate.