Make bt_graph_connect_ports() return a status code
[babeltrace.git] / plugins / utils / muxer / muxer.c
CommitLineData
958f7d11
PP
1/*
2 * Copyright 2017 Philippe Proulx <pproulx@efficios.com>
3 *
4 * Permission is hereby granted, free of charge, to any person obtaining a copy
5 * of this software and associated documentation files (the "Software"), to deal
6 * in the Software without restriction, including without limitation the rights
7 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 * copies of the Software, and to permit persons to whom the Software is
9 * furnished to do so, subject to the following conditions:
10 *
11 * The above copyright notice and this permission notice shall be included in
12 * all copies or substantial portions of the Software.
13 *
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20 * SOFTWARE.
21 */
22
23#include <babeltrace/babeltrace-internal.h>
24#include <babeltrace/ctf-ir/clock-class.h>
25#include <babeltrace/ctf-ir/event.h>
26#include <babeltrace/graph/clock-class-priority-map.h>
27#include <babeltrace/graph/component-filter.h>
28#include <babeltrace/graph/component.h>
29#include <babeltrace/graph/notification-event.h>
30#include <babeltrace/graph/notification-inactivity.h>
31#include <babeltrace/graph/notification-iterator.h>
32#include <babeltrace/graph/notification.h>
33#include <babeltrace/graph/port.h>
34#include <babeltrace/graph/private-component-filter.h>
35#include <babeltrace/graph/private-component.h>
36#include <babeltrace/graph/private-component.h>
37#include <babeltrace/graph/private-connection.h>
38#include <babeltrace/graph/private-notification-iterator.h>
39#include <babeltrace/graph/private-port.h>
40#include <plugins-common.h>
41#include <glib.h>
c55a9f58 42#include <stdbool.h>
958f7d11 43#include <assert.h>
0fbb9a9f 44#include <stdlib.h>
958f7d11 45
c3acd5f3 46#define ASSUME_ABSOLUTE_CLOCK_CLASSES_PARAM_NAME "assume-absolute-clock-classes"
65ee897d 47
958f7d11
PP
48struct muxer_comp {
49 /* Array of struct bt_private_notification_iterator * (weak refs) */
50 GPtrArray *muxer_notif_iters;
51
52 /* Weak ref */
53 struct bt_private_component *priv_comp;
54 unsigned int next_port_num;
55 size_t available_input_ports;
56 bool error;
a09c6b95 57 bool initializing_muxer_notif_iter;
65ee897d 58 bool ignore_absolute;
958f7d11
PP
59};
60
61struct muxer_upstream_notif_iter {
ab11110e 62 /* Owned by this, NULL if ended */
958f7d11
PP
63 struct bt_notification_iterator *notif_iter;
64
089717de
PP
65 /*
66 * This flag is true if the upstream notification iterator's
67 * current notification must be considered for the multiplexing
68 * operations. If the upstream iterator returns
69 * BT_NOTIFICATION_ITERATOR_STATUS_AGAIN, then this object
70 * is considered invalid, because its current notification is
71 * still the previous one, but we already took it into account.
72 *
73 * The value of this flag is not important if notif_iter above
74 * is NULL (which means the upstream iterator is finished).
75 */
76 bool is_valid;
958f7d11
PP
77};
78
958f7d11 79struct muxer_notif_iter {
ab11110e
PP
80 /*
81 * Array of struct muxer_upstream_notif_iter * (owned by this).
82 *
83 * NOTE: This array is searched in linearly to find the youngest
84 * current notification. Keep this until benchmarks confirm that
85 * another data structure is faster than this for our typical
86 * use cases.
87 */
958f7d11
PP
88 GPtrArray *muxer_upstream_notif_iters;
89
ab11110e 90 /*
06a2cb0d 91 * List of "recently" connected input ports (weak) to
ab11110e
PP
92 * handle by this muxer notification iterator.
93 * muxer_port_connected() adds entries to this list, and the
94 * entries are removed when a notification iterator is created
95 * on the port's connection and put into
96 * muxer_upstream_notif_iters above by
97 * muxer_notif_iter_handle_newly_connected_ports().
98 */
99 GList *newly_connected_priv_ports;
958f7d11
PP
100
101 /* Next thing to return by the "next" method */
102 struct bt_notification_iterator_next_return next_next_return;
958f7d11
PP
103
104 /* Last time returned in a notification */
105 int64_t last_returned_ts_ns;
106};
107
ab11110e
PP
108static
109void destroy_muxer_upstream_notif_iter(
110 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter)
111{
112 if (!muxer_upstream_notif_iter) {
113 return;
114 }
115
116 bt_put(muxer_upstream_notif_iter->notif_iter);
ab11110e
PP
117 g_free(muxer_upstream_notif_iter);
118}
119
958f7d11
PP
120static
121struct muxer_upstream_notif_iter *muxer_notif_iter_add_upstream_notif_iter(
122 struct muxer_notif_iter *muxer_notif_iter,
123 struct bt_notification_iterator *notif_iter,
124 struct bt_private_port *priv_port)
125{
126 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter =
127 g_new0(struct muxer_upstream_notif_iter, 1);
128
129 if (!muxer_upstream_notif_iter) {
130 goto end;
131 }
132
133 muxer_upstream_notif_iter->notif_iter = bt_get(notif_iter);
089717de 134 muxer_upstream_notif_iter->is_valid = false;
958f7d11
PP
135 g_ptr_array_add(muxer_notif_iter->muxer_upstream_notif_iters,
136 muxer_upstream_notif_iter);
137
138end:
139 return muxer_upstream_notif_iter;
140}
141
958f7d11
PP
142static
143int ensure_available_input_port(struct bt_private_component *priv_comp)
144{
145 struct muxer_comp *muxer_comp =
146 bt_private_component_get_user_data(priv_comp);
147 int ret = 0;
148 GString *port_name = NULL;
149 void *priv_port = NULL;
150
151 assert(muxer_comp);
152
153 if (muxer_comp->available_input_ports >= 1) {
154 goto end;
155 }
156
157 port_name = g_string_new("in");
158 if (!port_name) {
159 ret = -1;
160 goto end;
161 }
162
163 g_string_append_printf(port_name, "%u", muxer_comp->next_port_num);
164 priv_port = bt_private_component_filter_add_input_private_port(
3e9b0023 165 priv_comp, port_name->str, NULL);
958f7d11
PP
166 if (!priv_port) {
167 ret = -1;
168 goto end;
169 }
170
171 muxer_comp->available_input_ports++;
172 muxer_comp->next_port_num++;
173
174end:
175 if (port_name) {
176 g_string_free(port_name, TRUE);
177 }
178
06a2cb0d 179 bt_put(priv_port);
958f7d11
PP
180 return ret;
181}
182
958f7d11
PP
183static
184int create_output_port(struct bt_private_component *priv_comp)
185{
186 void *priv_port;
187 int ret = 0;
188
189 priv_port = bt_private_component_filter_add_output_private_port(
3e9b0023 190 priv_comp, "out", NULL);
958f7d11
PP
191 if (!priv_port) {
192 ret = -1;
193 }
194
195 bt_put(priv_port);
196 return ret;
197}
198
199static
200void destroy_muxer_comp(struct muxer_comp *muxer_comp)
201{
202 if (!muxer_comp) {
203 return;
204 }
205
206 if (muxer_comp->muxer_notif_iters) {
207 g_ptr_array_free(muxer_comp->muxer_notif_iters, TRUE);
208 }
209
210 g_free(muxer_comp);
211}
212
65ee897d
PP
213static
214struct bt_value *get_default_params(void)
215{
216 struct bt_value *params;
217 int ret;
218
219 params = bt_value_map_create();
220 if (!params) {
221 goto error;
222 }
223
c3acd5f3
JG
224 ret = bt_value_map_insert_bool(params,
225 ASSUME_ABSOLUTE_CLOCK_CLASSES_PARAM_NAME, false);
65ee897d
PP
226 if (ret) {
227 goto error;
228 }
229
230 goto end;
231
232error:
233 BT_PUT(params);
234
235end:
236 return params;
237}
238
239static
240int configure_muxer_comp(struct muxer_comp *muxer_comp, struct bt_value *params)
241{
242 struct bt_value *default_params = NULL;
243 struct bt_value *real_params = NULL;
244 struct bt_value *ignore_absolute = NULL;
245 int ret = 0;
c55a9f58 246 bt_bool bool_val;
65ee897d
PP
247
248 default_params = get_default_params();
249 if (!default_params) {
250 goto error;
251 }
252
253 real_params = bt_value_map_extend(default_params, params);
254 if (!real_params) {
255 goto error;
256 }
257
258 ignore_absolute = bt_value_map_get(real_params,
c3acd5f3 259 ASSUME_ABSOLUTE_CLOCK_CLASSES_PARAM_NAME);
65ee897d
PP
260 if (!bt_value_is_bool(ignore_absolute)) {
261 goto error;
262 }
263
c55a9f58 264 if (bt_value_bool_get(ignore_absolute, &bool_val)) {
65ee897d
PP
265 goto error;
266 }
267
c55a9f58
PP
268 muxer_comp->ignore_absolute = (bool) bool_val;
269
65ee897d
PP
270 goto end;
271
272error:
273 ret = -1;
274
275end:
276 bt_put(default_params);
277 bt_put(real_params);
278 bt_put(ignore_absolute);
279 return ret;
280}
281
958f7d11
PP
282BT_HIDDEN
283enum bt_component_status muxer_init(
284 struct bt_private_component *priv_comp,
285 struct bt_value *params, void *init_data)
286{
287 int ret;
288 enum bt_component_status status = BT_COMPONENT_STATUS_OK;
289 struct muxer_comp *muxer_comp = g_new0(struct muxer_comp, 1);
290
291 if (!muxer_comp) {
292 goto error;
293 }
294
65ee897d
PP
295 ret = configure_muxer_comp(muxer_comp, params);
296 if (ret) {
297 goto error;
298 }
299
958f7d11
PP
300 muxer_comp->muxer_notif_iters = g_ptr_array_new();
301 if (!muxer_comp->muxer_notif_iters) {
302 goto error;
303 }
304
305 muxer_comp->priv_comp = priv_comp;
306 ret = bt_private_component_set_user_data(priv_comp, muxer_comp);
307 assert(ret == 0);
958f7d11
PP
308 ret = ensure_available_input_port(priv_comp);
309 if (ret) {
310 goto error;
311 }
312
313 ret = create_output_port(priv_comp);
314 if (ret) {
315 goto error;
316 }
317
318 goto end;
319
320error:
321 destroy_muxer_comp(muxer_comp);
322 ret = bt_private_component_set_user_data(priv_comp, NULL);
323 assert(ret == 0);
324 status = BT_COMPONENT_STATUS_ERROR;
325
326end:
327 return status;
328}
329
330BT_HIDDEN
331void muxer_finalize(struct bt_private_component *priv_comp)
332{
333 struct muxer_comp *muxer_comp =
334 bt_private_component_get_user_data(priv_comp);
335
336 destroy_muxer_comp(muxer_comp);
337}
338
339static
340struct bt_notification_iterator *create_notif_iter_on_input_port(
341 struct bt_private_port *priv_port, int *ret)
342{
343 struct bt_port *port = bt_port_from_private_port(priv_port);
344 struct bt_notification_iterator *notif_iter = NULL;
345 struct bt_private_connection *priv_conn = NULL;
346
347 assert(ret);
348 *ret = 0;
349 assert(port);
350
351 assert(bt_port_is_connected(port));
352 priv_conn = bt_private_port_get_private_connection(priv_port);
353 if (!priv_conn) {
354 *ret = -1;
355 goto end;
356 }
357
ab11110e
PP
358 // TODO: Advance the iterator to >= the time of the latest
359 // returned notification by the muxer notification
360 // iterator which creates it.
958f7d11 361 notif_iter = bt_private_connection_create_notification_iterator(
fa054faf 362 priv_conn, NULL);
958f7d11
PP
363 if (!notif_iter) {
364 *ret = -1;
365 goto end;
366 }
367
368end:
369 bt_put(port);
370 bt_put(priv_conn);
371 return notif_iter;
372}
373
ab11110e 374static
089717de 375enum bt_notification_iterator_status muxer_upstream_notif_iter_next(
ab11110e
PP
376 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter)
377{
089717de 378 enum bt_notification_iterator_status status;
ab11110e 379
089717de 380 status = bt_notification_iterator_next(
ab11110e
PP
381 muxer_upstream_notif_iter->notif_iter);
382
089717de 383 switch (status) {
ab11110e 384 case BT_NOTIFICATION_ITERATOR_STATUS_OK:
089717de
PP
385 /*
386 * Notification iterator's current notification is valid:
387 * it must be considered for muxing operations.
388 */
389 muxer_upstream_notif_iter->is_valid = true;
ab11110e
PP
390 break;
391 case BT_NOTIFICATION_ITERATOR_STATUS_AGAIN:
089717de
PP
392 /*
393 * Notification iterator's current notification is not
394 * valid anymore. Return
395 * BT_NOTIFICATION_ITERATOR_STATUS_AGAIN
396 * immediately.
397 */
398 muxer_upstream_notif_iter->is_valid = false;
ab11110e 399 break;
beed0223
MD
400 case BT_NOTIFICATION_ITERATOR_STATUS_END: /* Fall-through. */
401 case BT_NOTIFICATION_ITERATOR_STATUS_CANCELED:
ab11110e
PP
402 /*
403 * Notification iterator reached the end: release it. It
404 * won't be considered again to find the youngest
405 * notification.
406 */
407 BT_PUT(muxer_upstream_notif_iter->notif_iter);
089717de
PP
408 muxer_upstream_notif_iter->is_valid = false;
409 status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
410 break;
ab11110e
PP
411 default:
412 /* Error or unsupported status code */
089717de
PP
413 status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
414 break;
ab11110e
PP
415 }
416
089717de 417 return status;
ab11110e
PP
418}
419
420static
089717de 421int muxer_notif_iter_handle_newly_connected_ports(
ab11110e
PP
422 struct muxer_notif_iter *muxer_notif_iter)
423{
ab11110e
PP
424 int ret = 0;
425
ab11110e
PP
426 /*
427 * Here we create one upstream notification iterator for each
089717de
PP
428 * newly connected port. We do not perform an initial "next" on
429 * those new upstream notification iterators: they are
430 * invalidated, to be validated later. The list of newly
431 * connected ports to handle here is updated by
432 * muxer_port_connected().
ab11110e
PP
433 */
434 while (true) {
435 GList *node = muxer_notif_iter->newly_connected_priv_ports;
436 struct bt_private_port *priv_port;
437 struct bt_port *port;
438 struct bt_notification_iterator *upstream_notif_iter = NULL;
439 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter;
440
441 if (!node) {
442 break;
443 }
444
445 priv_port = node->data;
446 port = bt_port_from_private_port(priv_port);
447 assert(port);
448
449 if (!bt_port_is_connected(port)) {
450 /*
451 * Looks like this port is not connected
452 * anymore: we can't create an upstream
089717de
PP
453 * notification iterator on its (non-existing)
454 * connection in this case.
ab11110e
PP
455 */
456 goto remove_node;
457 }
458
459 BT_PUT(port);
460 upstream_notif_iter = create_notif_iter_on_input_port(priv_port,
461 &ret);
462 if (ret) {
463 assert(!upstream_notif_iter);
ab11110e
PP
464 goto error;
465 }
466
467 muxer_upstream_notif_iter =
468 muxer_notif_iter_add_upstream_notif_iter(
469 muxer_notif_iter, upstream_notif_iter,
470 priv_port);
ab11110e
PP
471 BT_PUT(upstream_notif_iter);
472 if (!muxer_upstream_notif_iter) {
473 goto error;
474 }
475
ab11110e
PP
476remove_node:
477 bt_put(upstream_notif_iter);
478 bt_put(port);
ab11110e
PP
479 muxer_notif_iter->newly_connected_priv_ports =
480 g_list_delete_link(
481 muxer_notif_iter->newly_connected_priv_ports,
482 node);
483 }
484
485 goto end;
486
487error:
089717de 488 if (ret >= 0) {
ab11110e
PP
489 ret = -1;
490 }
491
492end:
ab11110e
PP
493 return ret;
494}
495
958f7d11
PP
496static
497int get_notif_ts_ns(struct muxer_comp *muxer_comp,
498 struct bt_notification *notif, int64_t last_returned_ts_ns,
499 int64_t *ts_ns)
500{
501 struct bt_clock_class_priority_map *cc_prio_map = NULL;
502 struct bt_ctf_clock_class *clock_class = NULL;
503 struct bt_ctf_clock_value *clock_value = NULL;
504 struct bt_ctf_event *event = NULL;
505 int ret = 0;
506
507 assert(notif);
508 assert(ts_ns);
509
510 switch (bt_notification_get_type(notif)) {
511 case BT_NOTIFICATION_TYPE_EVENT:
512 cc_prio_map =
513 bt_notification_event_get_clock_class_priority_map(
514 notif);
515 break;
516
517 case BT_NOTIFICATION_TYPE_INACTIVITY:
518 cc_prio_map =
a09c6b95 519 bt_notification_inactivity_get_clock_class_priority_map(
958f7d11
PP
520 notif);
521 break;
522 default:
089717de 523 /* All the other notifications have a higher priority */
958f7d11
PP
524 *ts_ns = last_returned_ts_ns;
525 goto end;
526 }
527
528 if (!cc_prio_map) {
529 goto error;
530 }
531
532 /*
533 * If the clock class priority map is empty, then we consider
534 * that this notification has no time. In this case it's always
535 * the youngest.
536 */
537 if (bt_clock_class_priority_map_get_clock_class_count(cc_prio_map) == 0) {
538 *ts_ns = last_returned_ts_ns;
539 goto end;
540 }
541
542 clock_class =
543 bt_clock_class_priority_map_get_highest_priority_clock_class(
544 cc_prio_map);
545 if (!clock_class) {
546 goto error;
547 }
548
65ee897d
PP
549 if (!muxer_comp->ignore_absolute &&
550 !bt_ctf_clock_class_is_absolute(clock_class)) {
958f7d11
PP
551 goto error;
552 }
553
554 switch (bt_notification_get_type(notif)) {
555 case BT_NOTIFICATION_TYPE_EVENT:
556 event = bt_notification_event_get_event(notif);
ab11110e 557 assert(event);
958f7d11
PP
558 clock_value = bt_ctf_event_get_clock_value(event,
559 clock_class);
560 break;
561 case BT_NOTIFICATION_TYPE_INACTIVITY:
562 clock_value = bt_notification_inactivity_get_clock_value(
563 notif, clock_class);
564 break;
565 default:
0fbb9a9f 566 abort();
958f7d11
PP
567 }
568
569 if (!clock_value) {
570 goto error;
571 }
572
573 ret = bt_ctf_clock_value_get_value_ns_from_epoch(clock_value, ts_ns);
574 if (ret) {
575 goto error;
576 }
577
578 goto end;
579
580error:
581 ret = -1;
582
583end:
584 bt_put(cc_prio_map);
585 bt_put(event);
586 bt_put(clock_class);
587 bt_put(clock_value);
588 return ret;
589}
590
ab11110e
PP
591/*
592 * This function finds the youngest available notification amongst the
593 * non-ended upstream notification iterators and returns the upstream
594 * notification iterator which has it, or
595 * BT_NOTIFICATION_ITERATOR_STATUS_END if there's no available
596 * notification.
597 *
598 * This function does NOT:
599 *
600 * * Update any upstream notification iterator.
601 * * Check for newly connected ports.
602 * * Check the upstream notification iterators to retry.
603 *
604 * On sucess, this function sets *muxer_upstream_notif_iter to the
605 * upstream notification iterator of which the current notification is
606 * the youngest, and sets *ts_ns to its time.
607 */
958f7d11
PP
608static
609enum bt_notification_iterator_status
610muxer_notif_iter_youngest_upstream_notif_iter(
611 struct muxer_comp *muxer_comp,
612 struct muxer_notif_iter *muxer_notif_iter,
613 struct muxer_upstream_notif_iter **muxer_upstream_notif_iter,
614 int64_t *ts_ns)
615{
616 size_t i;
617 int ret;
618 int64_t youngest_ts_ns = INT64_MAX;
619 enum bt_notification_iterator_status status =
620 BT_NOTIFICATION_ITERATOR_STATUS_OK;
621
622 assert(muxer_comp);
623 assert(muxer_notif_iter);
624 assert(muxer_upstream_notif_iter);
625 *muxer_upstream_notif_iter = NULL;
626
627 for (i = 0; i < muxer_notif_iter->muxer_upstream_notif_iters->len; i++) {
628 struct bt_notification *notif;
629 struct muxer_upstream_notif_iter *cur_muxer_upstream_notif_iter =
630 g_ptr_array_index(muxer_notif_iter->muxer_upstream_notif_iters, i);
631 int64_t notif_ts_ns;
632
633 if (!cur_muxer_upstream_notif_iter->notif_iter) {
ab11110e 634 /* This upstream notification iterator is ended */
958f7d11
PP
635 continue;
636 }
637
089717de 638 assert(cur_muxer_upstream_notif_iter->is_valid);
958f7d11
PP
639 notif = bt_notification_iterator_get_notification(
640 cur_muxer_upstream_notif_iter->notif_iter);
641 assert(notif);
642 ret = get_notif_ts_ns(muxer_comp, notif,
643 muxer_notif_iter->last_returned_ts_ns, &notif_ts_ns);
644 bt_put(notif);
645 if (ret) {
646 *muxer_upstream_notif_iter = NULL;
647 status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
648 goto end;
649 }
650
651 if (notif_ts_ns <= youngest_ts_ns) {
652 *muxer_upstream_notif_iter =
653 cur_muxer_upstream_notif_iter;
654 youngest_ts_ns = notif_ts_ns;
655 *ts_ns = youngest_ts_ns;
656 }
657 }
658
659 if (!*muxer_upstream_notif_iter) {
660 status = BT_NOTIFICATION_ITERATOR_STATUS_END;
661 *ts_ns = INT64_MIN;
662 }
663
664end:
665 return status;
666}
667
668static
089717de
PP
669enum bt_notification_iterator_status validate_muxer_upstream_notif_iter(
670 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter)
958f7d11 671{
089717de
PP
672 enum bt_notification_iterator_status status =
673 BT_NOTIFICATION_ITERATOR_STATUS_OK;
958f7d11 674
089717de
PP
675 if (muxer_upstream_notif_iter->is_valid ||
676 !muxer_upstream_notif_iter->notif_iter) {
ab11110e
PP
677 goto end;
678 }
679
089717de
PP
680 status = muxer_upstream_notif_iter_next(muxer_upstream_notif_iter);
681
682end:
683 return status;
684}
685
686static
687enum bt_notification_iterator_status validate_muxer_upstream_notif_iters(
688 struct muxer_notif_iter *muxer_notif_iter)
689{
690 enum bt_notification_iterator_status status =
691 BT_NOTIFICATION_ITERATOR_STATUS_OK;
692 size_t i;
693
694 for (i = 0; i < muxer_notif_iter->muxer_upstream_notif_iters->len; i++) {
695 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter =
696 g_ptr_array_index(
697 muxer_notif_iter->muxer_upstream_notif_iters,
698 i);
699
700 status = validate_muxer_upstream_notif_iter(
701 muxer_upstream_notif_iter);
702 if (status != BT_NOTIFICATION_ITERATOR_STATUS_OK) {
703 goto end;
704 }
744ba28b
PP
705
706 /*
707 * Remove this muxer upstream notification iterator
708 * if it's ended or canceled.
709 */
710 if (!muxer_upstream_notif_iter->notif_iter) {
711 /*
712 * Use g_ptr_array_remove_fast() because the
713 * order of those elements is not important.
714 */
715 g_ptr_array_remove_index_fast(
716 muxer_notif_iter->muxer_upstream_notif_iters,
717 i);
718 i--;
719 }
089717de
PP
720 }
721
722end:
723 return status;
724}
725
726static
727struct bt_notification_iterator_next_return muxer_notif_iter_do_next(
728 struct muxer_comp *muxer_comp,
729 struct muxer_notif_iter *muxer_notif_iter)
730{
731 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter = NULL;
732 struct bt_notification_iterator_next_return next_return = {
733 .notification = NULL,
734 .status = BT_NOTIFICATION_ITERATOR_STATUS_OK,
735 };
736 int64_t next_return_ts;
737
738 while (true) {
739 int ret = muxer_notif_iter_handle_newly_connected_ports(
740 muxer_notif_iter);
741
742 if (ret) {
743 next_return.status =
744 BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
745 goto end;
746 }
747
748 next_return.status =
749 validate_muxer_upstream_notif_iters(muxer_notif_iter);
750 if (next_return.status != BT_NOTIFICATION_ITERATOR_STATUS_OK) {
751 goto end;
752 }
ab11110e 753
958f7d11 754 /*
089717de
PP
755 * At this point, we know that all the existing upstream
756 * notification iterators are valid. However the
757 * operations to validate them (during
758 * validate_muxer_upstream_notif_iters()) may have
759 * connected new ports. If no ports were connected
760 * during this operation, exit the loop.
958f7d11 761 */
089717de
PP
762 if (!muxer_notif_iter->newly_connected_priv_ports) {
763 break;
764 }
958f7d11
PP
765 }
766
089717de
PP
767 assert(!muxer_notif_iter->newly_connected_priv_ports);
768
958f7d11 769 /*
089717de
PP
770 * At this point we know that all the existing upstream
771 * notification iterators are valid. We can find the one,
772 * amongst those, of which the current notification is the
773 * youngest.
958f7d11 774 */
089717de 775 next_return.status =
958f7d11
PP
776 muxer_notif_iter_youngest_upstream_notif_iter(muxer_comp,
777 muxer_notif_iter, &muxer_upstream_notif_iter,
089717de
PP
778 &next_return_ts);
779 if (next_return.status < 0 ||
beed0223
MD
780 next_return.status == BT_NOTIFICATION_ITERATOR_STATUS_END ||
781 next_return.status == BT_NOTIFICATION_ITERATOR_STATUS_CANCELED) {
958f7d11
PP
782 goto end;
783 }
784
089717de
PP
785 if (next_return_ts < muxer_notif_iter->last_returned_ts_ns) {
786 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
958f7d11
PP
787 goto end;
788 }
789
089717de
PP
790 assert(next_return.status == BT_NOTIFICATION_ITERATOR_STATUS_OK);
791 assert(muxer_upstream_notif_iter);
792 next_return.notification = bt_notification_iterator_get_notification(
793 muxer_upstream_notif_iter->notif_iter);
794 assert(next_return.notification);
958f7d11
PP
795
796 /*
089717de
PP
797 * We invalidate the upstream notification iterator so that, the
798 * next time this function is called,
799 * validate_muxer_upstream_notif_iters() will make it valid.
958f7d11 800 */
089717de
PP
801 muxer_upstream_notif_iter->is_valid = false;
802 muxer_notif_iter->last_returned_ts_ns = next_return_ts;
958f7d11
PP
803
804end:
089717de 805 return next_return;
958f7d11
PP
806}
807
808static
ab11110e
PP
809void destroy_muxer_notif_iter(struct muxer_notif_iter *muxer_notif_iter)
810{
ab11110e
PP
811 if (!muxer_notif_iter) {
812 return;
813 }
814
815 if (muxer_notif_iter->muxer_upstream_notif_iters) {
816 g_ptr_array_free(
817 muxer_notif_iter->muxer_upstream_notif_iters, TRUE);
818 }
819
ab11110e
PP
820 g_list_free(muxer_notif_iter->newly_connected_priv_ports);
821 g_free(muxer_notif_iter);
822}
823
824static
825int muxer_notif_iter_init_newly_connected_ports(struct muxer_comp *muxer_comp,
958f7d11
PP
826 struct muxer_notif_iter *muxer_notif_iter)
827{
ab11110e 828 struct bt_component *comp;
544d0515
PP
829 int64_t count;
830 int64_t i;
ab11110e 831 int ret = 0;
958f7d11 832
ab11110e
PP
833 /*
834 * Add the connected input ports to this muxer notification
835 * iterator's list of newly connected ports. They will be
836 * handled by muxer_notif_iter_handle_newly_connected_ports().
837 */
958f7d11
PP
838 comp = bt_component_from_private_component(muxer_comp->priv_comp);
839 assert(comp);
544d0515
PP
840 count = bt_component_filter_get_input_port_count(comp);
841 if (count < 0) {
ab11110e
PP
842 goto end;
843 }
958f7d11
PP
844
845 for (i = 0; i < count; i++) {
846 struct bt_private_port *priv_port =
9ac68eb1 847 bt_private_component_filter_get_input_private_port_by_index(
958f7d11
PP
848 muxer_comp->priv_comp, i);
849 struct bt_port *port;
958f7d11
PP
850
851 assert(priv_port);
958f7d11 852 port = bt_port_from_private_port(priv_port);
ab11110e 853 assert(port);
958f7d11
PP
854
855 if (!bt_port_is_connected(port)) {
958f7d11 856 bt_put(priv_port);
ab11110e 857 bt_put(port);
958f7d11
PP
858 continue;
859 }
860
861 bt_put(port);
06a2cb0d 862 bt_put(priv_port);
ab11110e
PP
863 muxer_notif_iter->newly_connected_priv_ports =
864 g_list_append(
865 muxer_notif_iter->newly_connected_priv_ports,
958f7d11 866 priv_port);
ab11110e 867 if (!muxer_notif_iter->newly_connected_priv_ports) {
ab11110e
PP
868 ret = -1;
869 goto end;
958f7d11 870 }
958f7d11
PP
871 }
872
873end:
874 bt_put(comp);
875 return ret;
876}
877
958f7d11
PP
878BT_HIDDEN
879enum bt_notification_iterator_status muxer_notif_iter_init(
880 struct bt_private_notification_iterator *priv_notif_iter,
881 struct bt_private_port *output_priv_port)
882{
883 struct muxer_comp *muxer_comp = NULL;
884 struct muxer_notif_iter *muxer_notif_iter = NULL;
885 struct bt_private_component *priv_comp = NULL;
886 enum bt_notification_iterator_status status =
887 BT_NOTIFICATION_ITERATOR_STATUS_OK;
888 int ret;
889
890 priv_comp = bt_private_notification_iterator_get_private_component(
891 priv_notif_iter);
892 assert(priv_comp);
893 muxer_comp = bt_private_component_get_user_data(priv_comp);
894 assert(muxer_comp);
a09c6b95
PP
895
896 if (muxer_comp->initializing_muxer_notif_iter) {
897 /*
089717de
PP
898 * Weird, unhandled situation detected: downstream
899 * creates a muxer notification iterator while creating
900 * another muxer notification iterator (same component).
a09c6b95 901 */
958f7d11
PP
902 goto error;
903 }
904
a09c6b95
PP
905 muxer_comp->initializing_muxer_notif_iter = true;
906 muxer_notif_iter = g_new0(struct muxer_notif_iter, 1);
907 if (!muxer_notif_iter) {
ab11110e
PP
908 goto error;
909 }
910
958f7d11
PP
911 muxer_notif_iter->last_returned_ts_ns = INT64_MIN;
912 muxer_notif_iter->muxer_upstream_notif_iters =
913 g_ptr_array_new_with_free_func(
914 (GDestroyNotify) destroy_muxer_upstream_notif_iter);
915 if (!muxer_notif_iter->muxer_upstream_notif_iters) {
916 goto error;
917 }
918
a09c6b95
PP
919 /*
920 * Add the muxer notification iterator to the component's array
921 * of muxer notification iterators here because
922 * muxer_notif_iter_init_newly_connected_ports() can cause
923 * muxer_port_connected() to be called, which adds the newly
924 * connected port to each muxer notification iterator's list of
925 * newly connected ports.
926 */
927 g_ptr_array_add(muxer_comp->muxer_notif_iters, muxer_notif_iter);
928 ret = muxer_notif_iter_init_newly_connected_ports(muxer_comp,
929 muxer_notif_iter);
930 if (ret) {
931 goto error;
932 }
933
958f7d11
PP
934 ret = bt_private_notification_iterator_set_user_data(priv_notif_iter,
935 muxer_notif_iter);
936 assert(ret == 0);
958f7d11
PP
937 goto end;
938
939error:
a09c6b95
PP
940 if (g_ptr_array_index(muxer_comp->muxer_notif_iters,
941 muxer_comp->muxer_notif_iters->len - 1) == muxer_notif_iter) {
942 g_ptr_array_remove_index(muxer_comp->muxer_notif_iters,
943 muxer_comp->muxer_notif_iters->len - 1);
944 }
945
958f7d11
PP
946 destroy_muxer_notif_iter(muxer_notif_iter);
947 ret = bt_private_notification_iterator_set_user_data(priv_notif_iter,
948 NULL);
949 assert(ret == 0);
950 status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
951
952end:
a09c6b95 953 muxer_comp->initializing_muxer_notif_iter = false;
958f7d11
PP
954 bt_put(priv_comp);
955 return status;
956}
957
958BT_HIDDEN
959void muxer_notif_iter_finalize(
960 struct bt_private_notification_iterator *priv_notif_iter)
961{
962 struct muxer_notif_iter *muxer_notif_iter =
963 bt_private_notification_iterator_get_user_data(priv_notif_iter);
964 struct bt_private_component *priv_comp = NULL;
965 struct muxer_comp *muxer_comp = NULL;
966
967 priv_comp = bt_private_notification_iterator_get_private_component(
968 priv_notif_iter);
969 assert(priv_comp);
970 muxer_comp = bt_private_component_get_user_data(priv_comp);
971
972 if (muxer_comp) {
973 (void) g_ptr_array_remove_fast(muxer_comp->muxer_notif_iters,
974 muxer_notif_iter);
975 destroy_muxer_notif_iter(muxer_notif_iter);
976 }
977
978 bt_put(priv_comp);
979}
980
981BT_HIDDEN
982struct bt_notification_iterator_next_return muxer_notif_iter_next(
983 struct bt_private_notification_iterator *priv_notif_iter)
984{
089717de 985 struct bt_notification_iterator_next_return next_ret;
958f7d11
PP
986 struct muxer_notif_iter *muxer_notif_iter =
987 bt_private_notification_iterator_get_user_data(priv_notif_iter);
988 struct bt_private_component *priv_comp = NULL;
989 struct muxer_comp *muxer_comp = NULL;
958f7d11
PP
990
991 assert(muxer_notif_iter);
992 priv_comp = bt_private_notification_iterator_get_private_component(
993 priv_notif_iter);
994 assert(priv_comp);
995 muxer_comp = bt_private_component_get_user_data(priv_comp);
996 assert(muxer_comp);
997
998 /* Are we in an error state set elsewhere? */
999 if (unlikely(muxer_comp->error)) {
089717de
PP
1000 next_ret.notification = NULL;
1001 next_ret.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
1002 goto end;
958f7d11
PP
1003 }
1004
089717de 1005 next_ret = muxer_notif_iter_do_next(muxer_comp, muxer_notif_iter);
958f7d11
PP
1006
1007end:
1008 bt_put(priv_comp);
1009 return next_ret;
1010}
1011
1012BT_HIDDEN
1013void muxer_port_connected(
1014 struct bt_private_component *priv_comp,
1015 struct bt_private_port *self_private_port,
1016 struct bt_port *other_port)
1017{
1018 struct bt_port *self_port =
1019 bt_port_from_private_port(self_private_port);
1020 struct muxer_comp *muxer_comp =
1021 bt_private_component_get_user_data(priv_comp);
1022 size_t i;
06a2cb0d 1023 int ret;
958f7d11
PP
1024
1025 assert(self_port);
1026 assert(muxer_comp);
1027
06a2cb0d
PP
1028 if (bt_port_get_type(self_port) == BT_PORT_TYPE_OUTPUT) {
1029 goto end;
958f7d11
PP
1030 }
1031
1032 for (i = 0; i < muxer_comp->muxer_notif_iters->len; i++) {
1033 struct muxer_notif_iter *muxer_notif_iter =
1034 g_ptr_array_index(muxer_comp->muxer_notif_iters, i);
1035
1036 /*
ab11110e
PP
1037 * Add this port to the list of newly connected ports
1038 * for this muxer notification iterator. We append at
1039 * the end of this list while
1040 * muxer_notif_iter_handle_newly_connected_ports()
1041 * removes the nodes from the beginning.
1042 *
1043 * The list node owns the private port.
958f7d11 1044 */
ab11110e
PP
1045 muxer_notif_iter->newly_connected_priv_ports =
1046 g_list_append(
1047 muxer_notif_iter->newly_connected_priv_ports,
06a2cb0d 1048 self_private_port);
ab11110e 1049 if (!muxer_notif_iter->newly_connected_priv_ports) {
089717de 1050 /* Put reference taken by bt_get() above */
958f7d11
PP
1051 muxer_comp->error = true;
1052 goto end;
1053 }
1054 }
1055
06a2cb0d
PP
1056 /* One less available input port */
1057 muxer_comp->available_input_ports--;
1058 ret = ensure_available_input_port(priv_comp);
1059 if (ret) {
1060 /*
1061 * Only way to report an error later since this
1062 * method does not return anything.
1063 */
1064 muxer_comp->error = true;
1065 goto end;
1066 }
1067
958f7d11
PP
1068end:
1069 bt_put(self_port);
1070}
1071
1072BT_HIDDEN
1073void muxer_port_disconnected(struct bt_private_component *priv_comp,
1074 struct bt_private_port *priv_port)
1075{
1076 struct bt_port *port = bt_port_from_private_port(priv_port);
1077 struct muxer_comp *muxer_comp =
1078 bt_private_component_get_user_data(priv_comp);
1079
1080 assert(port);
1081 assert(muxer_comp);
1082
ab11110e
PP
1083 /*
1084 * There's nothing special to do when a port is disconnected
1085 * because this component deals with upstream notification
1086 * iterators which were already created thanks to connected
1087 * ports. The fact that the port is disconnected does not cancel
1088 * the upstream notification iterators created using its
089717de
PP
1089 * connection: they still exist, even if the connection is dead.
1090 * The only way to remove an upstream notification iterator is
1091 * for its "next" operation to return
1092 * BT_NOTIFICATION_ITERATOR_STATUS_END.
ab11110e 1093 */
958f7d11
PP
1094 if (bt_port_get_type(port) == BT_PORT_TYPE_INPUT) {
1095 /* One more available input port */
1096 muxer_comp->available_input_ports++;
1097 }
1098
1099 bt_put(port);
1100}
This page took 0.07276 seconds and 4 git commands to generate.