Add utils.muxer component class
[babeltrace.git] / plugins / utils / muxer / muxer.c
CommitLineData
958f7d11
PP
1/*
2 * Copyright 2017 Philippe Proulx <pproulx@efficios.com>
3 *
4 * Permission is hereby granted, free of charge, to any person obtaining a copy
5 * of this software and associated documentation files (the "Software"), to deal
6 * in the Software without restriction, including without limitation the rights
7 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 * copies of the Software, and to permit persons to whom the Software is
9 * furnished to do so, subject to the following conditions:
10 *
11 * The above copyright notice and this permission notice shall be included in
12 * all copies or substantial portions of the Software.
13 *
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20 * SOFTWARE.
21 */
22
23#include <babeltrace/babeltrace-internal.h>
24#include <babeltrace/ctf-ir/clock-class.h>
25#include <babeltrace/ctf-ir/event.h>
26#include <babeltrace/graph/clock-class-priority-map.h>
27#include <babeltrace/graph/component-filter.h>
28#include <babeltrace/graph/component.h>
29#include <babeltrace/graph/notification-event.h>
30#include <babeltrace/graph/notification-inactivity.h>
31#include <babeltrace/graph/notification-iterator.h>
32#include <babeltrace/graph/notification.h>
33#include <babeltrace/graph/port.h>
34#include <babeltrace/graph/private-component-filter.h>
35#include <babeltrace/graph/private-component.h>
36#include <babeltrace/graph/private-component.h>
37#include <babeltrace/graph/private-connection.h>
38#include <babeltrace/graph/private-notification-iterator.h>
39#include <babeltrace/graph/private-port.h>
40#include <plugins-common.h>
41#include <glib.h>
42#include <assert.h>
43
44struct muxer_comp {
45 /* Array of struct bt_private_notification_iterator * (weak refs) */
46 GPtrArray *muxer_notif_iters;
47
48 /* Weak ref */
49 struct bt_private_component *priv_comp;
50 unsigned int next_port_num;
51 size_t available_input_ports;
52 bool error;
53};
54
55struct muxer_upstream_notif_iter {
56 /* Owned by this */
57 struct bt_notification_iterator *notif_iter;
58
59 /* Owned by this*/
60 struct bt_private_port *priv_port;
61};
62
63
64struct muxer_notif_iter {
65 /* Array of struct muxer_upstream_notif_iter * (owned by this) */
66 GPtrArray *muxer_upstream_notif_iters;
67
68 /* Array of struct muxer_upstream_notif_iter * (weak refs) */
69 GPtrArray *muxer_upstream_notif_iters_retry;
70
71 /* Next thing to return by the "next" method */
72 struct bt_notification_iterator_next_return next_next_return;
73 int64_t next_next_return_ts_ns;
74
75 /* Last time returned in a notification */
76 int64_t last_returned_ts_ns;
77};
78
79static
80struct muxer_upstream_notif_iter *muxer_notif_iter_add_upstream_notif_iter(
81 struct muxer_notif_iter *muxer_notif_iter,
82 struct bt_notification_iterator *notif_iter,
83 struct bt_private_port *priv_port)
84{
85 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter =
86 g_new0(struct muxer_upstream_notif_iter, 1);
87
88 if (!muxer_upstream_notif_iter) {
89 goto end;
90 }
91
92 muxer_upstream_notif_iter->notif_iter = bt_get(notif_iter);
93 muxer_upstream_notif_iter->priv_port = bt_get(priv_port);
94 g_ptr_array_add(muxer_notif_iter->muxer_upstream_notif_iters,
95 muxer_upstream_notif_iter);
96
97end:
98 return muxer_upstream_notif_iter;
99}
100
101static inline
102bool muxer_notif_iter_has_upstream_notif_iter_to_retry(
103 struct muxer_notif_iter *muxer_notif_iter)
104{
105 assert(muxer_notif_iter);
106 return muxer_notif_iter->muxer_upstream_notif_iters_retry->len > 0;
107}
108
109static
110void muxer_notif_iter_add_upstream_notif_iter_to_retry(
111 struct muxer_notif_iter *muxer_notif_iter,
112 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter)
113{
114 assert(muxer_notif_iter);
115 assert(muxer_upstream_notif_iter);
116 g_ptr_array_add(muxer_notif_iter->muxer_upstream_notif_iters_retry,
117 muxer_upstream_notif_iter);
118}
119
120static
121void destroy_muxer_upstream_notif_iter(
122 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter)
123{
124 if (!muxer_upstream_notif_iter) {
125 return;
126 }
127
128 bt_put(muxer_upstream_notif_iter->notif_iter);
129 bt_put(muxer_upstream_notif_iter->priv_port);
130 g_free(muxer_upstream_notif_iter);
131}
132
133static
134bool muxer_notif_iter_has_upstream_notif_iter_on_port(
135 struct muxer_notif_iter *muxer_notif_iter,
136 struct bt_private_port *priv_port)
137{
138 size_t i;
139 bool exists = false;
140
141 for (i = 0; i < muxer_notif_iter->muxer_upstream_notif_iters->len; i++) {
142 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter =
143 g_ptr_array_index(
144 muxer_notif_iter->muxer_upstream_notif_iters, i);
145
146 if (muxer_upstream_notif_iter->priv_port == priv_port) {
147 exists = true;
148 goto end;
149 }
150 }
151
152end:
153 return exists;
154}
155
156static
157int ensure_available_input_port(struct bt_private_component *priv_comp)
158{
159 struct muxer_comp *muxer_comp =
160 bt_private_component_get_user_data(priv_comp);
161 int ret = 0;
162 GString *port_name = NULL;
163 void *priv_port = NULL;
164
165 assert(muxer_comp);
166
167 if (muxer_comp->available_input_ports >= 1) {
168 goto end;
169 }
170
171 port_name = g_string_new("in");
172 if (!port_name) {
173 ret = -1;
174 goto end;
175 }
176
177 g_string_append_printf(port_name, "%u", muxer_comp->next_port_num);
178 priv_port = bt_private_component_filter_add_input_private_port(
179 priv_comp, port_name->str);
180 if (!priv_port) {
181 ret = -1;
182 goto end;
183 }
184
185 muxer_comp->available_input_ports++;
186 muxer_comp->next_port_num++;
187
188end:
189 if (port_name) {
190 g_string_free(port_name, TRUE);
191 }
192
193 BT_PUT(priv_port);
194 return ret;
195}
196
197static
198int remove_default_ports(struct bt_private_component *priv_comp)
199{
200 struct bt_private_port *priv_port;
201 int ret = 0;
202
203 priv_port = bt_private_component_filter_get_default_input_private_port(
204 priv_comp);
205 if (priv_port) {
206 ret = bt_private_port_remove_from_component(priv_port);
207 if (ret) {
208 goto end;
209 }
210 }
211
212 bt_put(priv_port);
213 priv_port = bt_private_component_filter_get_default_output_private_port(
214 priv_comp);
215 if (priv_port) {
216 ret = bt_private_port_remove_from_component(priv_port);
217 if (ret) {
218 goto end;
219 }
220 }
221
222end:
223 bt_put(priv_port);
224 return ret;
225}
226
227static
228int create_output_port(struct bt_private_component *priv_comp)
229{
230 void *priv_port;
231 int ret = 0;
232
233 priv_port = bt_private_component_filter_add_output_private_port(
234 priv_comp, "out");
235 if (!priv_port) {
236 ret = -1;
237 }
238
239 bt_put(priv_port);
240 return ret;
241}
242
243static
244void destroy_muxer_comp(struct muxer_comp *muxer_comp)
245{
246 if (!muxer_comp) {
247 return;
248 }
249
250 if (muxer_comp->muxer_notif_iters) {
251 g_ptr_array_free(muxer_comp->muxer_notif_iters, TRUE);
252 }
253
254 g_free(muxer_comp);
255}
256
257BT_HIDDEN
258enum bt_component_status muxer_init(
259 struct bt_private_component *priv_comp,
260 struct bt_value *params, void *init_data)
261{
262 int ret;
263 enum bt_component_status status = BT_COMPONENT_STATUS_OK;
264 struct muxer_comp *muxer_comp = g_new0(struct muxer_comp, 1);
265
266 if (!muxer_comp) {
267 goto error;
268 }
269
270 muxer_comp->muxer_notif_iters = g_ptr_array_new();
271 if (!muxer_comp->muxer_notif_iters) {
272 goto error;
273 }
274
275 muxer_comp->priv_comp = priv_comp;
276 ret = bt_private_component_set_user_data(priv_comp, muxer_comp);
277 assert(ret == 0);
278 ret = remove_default_ports(priv_comp);
279 if (ret) {
280 goto error;
281 }
282
283 ret = ensure_available_input_port(priv_comp);
284 if (ret) {
285 goto error;
286 }
287
288 ret = create_output_port(priv_comp);
289 if (ret) {
290 goto error;
291 }
292
293 goto end;
294
295error:
296 destroy_muxer_comp(muxer_comp);
297 ret = bt_private_component_set_user_data(priv_comp, NULL);
298 assert(ret == 0);
299 status = BT_COMPONENT_STATUS_ERROR;
300
301end:
302 return status;
303}
304
305BT_HIDDEN
306void muxer_finalize(struct bt_private_component *priv_comp)
307{
308 struct muxer_comp *muxer_comp =
309 bt_private_component_get_user_data(priv_comp);
310
311 destroy_muxer_comp(muxer_comp);
312}
313
314static
315struct bt_notification_iterator *create_notif_iter_on_input_port(
316 struct bt_private_port *priv_port, int *ret)
317{
318 struct bt_port *port = bt_port_from_private_port(priv_port);
319 struct bt_notification_iterator *notif_iter = NULL;
320 struct bt_private_connection *priv_conn = NULL;
321
322 assert(ret);
323 *ret = 0;
324 assert(port);
325
326 assert(bt_port_is_connected(port));
327 priv_conn = bt_private_port_get_private_connection(priv_port);
328 if (!priv_conn) {
329 *ret = -1;
330 goto end;
331 }
332
333 notif_iter = bt_private_connection_create_notification_iterator(
334 priv_conn);
335 if (!notif_iter) {
336 *ret = -1;
337 goto end;
338 }
339
340end:
341 bt_put(port);
342 bt_put(priv_conn);
343 return notif_iter;
344}
345
346static
347int get_notif_ts_ns(struct muxer_comp *muxer_comp,
348 struct bt_notification *notif, int64_t last_returned_ts_ns,
349 int64_t *ts_ns)
350{
351 struct bt_clock_class_priority_map *cc_prio_map = NULL;
352 struct bt_ctf_clock_class *clock_class = NULL;
353 struct bt_ctf_clock_value *clock_value = NULL;
354 struct bt_ctf_event *event = NULL;
355 int ret = 0;
356
357 assert(notif);
358 assert(ts_ns);
359
360 switch (bt_notification_get_type(notif)) {
361 case BT_NOTIFICATION_TYPE_EVENT:
362 cc_prio_map =
363 bt_notification_event_get_clock_class_priority_map(
364 notif);
365 break;
366
367 case BT_NOTIFICATION_TYPE_INACTIVITY:
368 cc_prio_map =
369 bt_notification_event_get_clock_class_priority_map(
370 notif);
371 break;
372 default:
373 /*
374 * All the other notifications have a higher
375 * priority.
376 */
377 *ts_ns = last_returned_ts_ns;
378 goto end;
379 }
380
381 if (!cc_prio_map) {
382 goto error;
383 }
384
385 /*
386 * If the clock class priority map is empty, then we consider
387 * that this notification has no time. In this case it's always
388 * the youngest.
389 */
390 if (bt_clock_class_priority_map_get_clock_class_count(cc_prio_map) == 0) {
391 *ts_ns = last_returned_ts_ns;
392 goto end;
393 }
394
395 clock_class =
396 bt_clock_class_priority_map_get_highest_priority_clock_class(
397 cc_prio_map);
398 if (!clock_class) {
399 goto error;
400 }
401
402 if (!bt_ctf_clock_class_get_is_absolute(clock_class)) {
403 goto error;
404 }
405
406 switch (bt_notification_get_type(notif)) {
407 case BT_NOTIFICATION_TYPE_EVENT:
408 event = bt_notification_event_get_event(notif);
409 if (!event) {
410 goto error;
411 }
412
413 clock_value = bt_ctf_event_get_clock_value(event,
414 clock_class);
415 break;
416 case BT_NOTIFICATION_TYPE_INACTIVITY:
417 clock_value = bt_notification_inactivity_get_clock_value(
418 notif, clock_class);
419 break;
420 default:
421 assert(false);
422 }
423
424 if (!clock_value) {
425 goto error;
426 }
427
428 ret = bt_ctf_clock_value_get_value_ns_from_epoch(clock_value, ts_ns);
429 if (ret) {
430 goto error;
431 }
432
433 goto end;
434
435error:
436 ret = -1;
437
438end:
439 bt_put(cc_prio_map);
440 bt_put(event);
441 bt_put(clock_class);
442 bt_put(clock_value);
443 return ret;
444}
445
446static
447enum bt_notification_iterator_status
448muxer_notif_iter_youngest_upstream_notif_iter(
449 struct muxer_comp *muxer_comp,
450 struct muxer_notif_iter *muxer_notif_iter,
451 struct muxer_upstream_notif_iter **muxer_upstream_notif_iter,
452 int64_t *ts_ns)
453{
454 size_t i;
455 int ret;
456 int64_t youngest_ts_ns = INT64_MAX;
457 enum bt_notification_iterator_status status =
458 BT_NOTIFICATION_ITERATOR_STATUS_OK;
459
460 assert(muxer_comp);
461 assert(muxer_notif_iter);
462 assert(muxer_upstream_notif_iter);
463 *muxer_upstream_notif_iter = NULL;
464
465 for (i = 0; i < muxer_notif_iter->muxer_upstream_notif_iters->len; i++) {
466 struct bt_notification *notif;
467 struct muxer_upstream_notif_iter *cur_muxer_upstream_notif_iter =
468 g_ptr_array_index(muxer_notif_iter->muxer_upstream_notif_iters, i);
469 int64_t notif_ts_ns;
470
471 if (!cur_muxer_upstream_notif_iter->notif_iter) {
472 /* This upstream notification iterator is done */
473 continue;
474 }
475
476 notif = bt_notification_iterator_get_notification(
477 cur_muxer_upstream_notif_iter->notif_iter);
478 assert(notif);
479 ret = get_notif_ts_ns(muxer_comp, notif,
480 muxer_notif_iter->last_returned_ts_ns, &notif_ts_ns);
481 bt_put(notif);
482 if (ret) {
483 *muxer_upstream_notif_iter = NULL;
484 status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
485 goto end;
486 }
487
488 if (notif_ts_ns <= youngest_ts_ns) {
489 *muxer_upstream_notif_iter =
490 cur_muxer_upstream_notif_iter;
491 youngest_ts_ns = notif_ts_ns;
492 *ts_ns = youngest_ts_ns;
493 }
494 }
495
496 if (!*muxer_upstream_notif_iter) {
497 status = BT_NOTIFICATION_ITERATOR_STATUS_END;
498 *ts_ns = INT64_MIN;
499 }
500
501end:
502 return status;
503}
504
505static
506int muxer_notif_iter_set_next_next_return(struct muxer_comp *muxer_comp,
507 struct muxer_notif_iter *muxer_notif_iter)
508{
509 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter;
510 struct bt_notification *notif = NULL;
511 enum bt_notification_iterator_status notif_iter_status;
512 int ret = 0;
513
514 if (muxer_notif_iter_has_upstream_notif_iter_to_retry(muxer_notif_iter)) {
515 /*
516 * At least one upstream notification iterator to retry:
517 * try again later.
518 */
519 muxer_notif_iter->next_next_return.status =
520 BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
521 BT_PUT(muxer_notif_iter->next_next_return.notification);
522 goto end;
523 }
524
525 /*
526 * Pick the current youngest notification and advance this
527 * upstream notification iterator.
528 */
529 notif_iter_status =
530 muxer_notif_iter_youngest_upstream_notif_iter(muxer_comp,
531 muxer_notif_iter, &muxer_upstream_notif_iter,
532 &muxer_notif_iter->next_next_return_ts_ns);
533 if (notif_iter_status == BT_NOTIFICATION_ITERATOR_STATUS_END) {
534 /* No more active upstream notification iterator */
535 muxer_notif_iter->next_next_return.status =
536 BT_NOTIFICATION_ITERATOR_STATUS_END;
537 BT_PUT(muxer_notif_iter->next_next_return.notification);
538 goto end;
539 }
540
541 if (notif_iter_status < 0) {
542 ret = -1;
543 goto end;
544 }
545
546 assert(muxer_upstream_notif_iter);
547 notif = bt_notification_iterator_get_notification(
548 muxer_upstream_notif_iter->notif_iter);
549 assert(notif);
550 muxer_notif_iter->next_next_return.status =
551 BT_NOTIFICATION_ITERATOR_STATUS_OK;
552 BT_MOVE(muxer_notif_iter->next_next_return.notification, notif);
553 notif_iter_status = bt_notification_iterator_next(
554 muxer_upstream_notif_iter->notif_iter);
555 if (notif_iter_status < 0) {
556 ret = -1;
557 goto end;
558 }
559
560 if (notif_iter_status == BT_NOTIFICATION_ITERATOR_STATUS_END) {
561 /* This upstream notification iterator is done */
562 BT_PUT(muxer_upstream_notif_iter->notif_iter);
563 goto ensure_monotonic;
564 }
565
566 assert(notif_iter_status == BT_NOTIFICATION_ITERATOR_STATUS_OK ||
567 notif_iter_status == BT_NOTIFICATION_ITERATOR_STATUS_AGAIN);
568
569 if (notif_iter_status == BT_NOTIFICATION_ITERATOR_STATUS_AGAIN) {
570 muxer_notif_iter_add_upstream_notif_iter_to_retry(
571 muxer_notif_iter, muxer_upstream_notif_iter);
572 }
573
574ensure_monotonic:
575 /*
576 * Here we have the next "next" return value. It won't change
577 * until it is returned by the next call to our "next" method.
578 * If its time is less than the time of the last notification
579 * that our "next" method returned, then fail because the
580 * muxer's output wouldn't be monotonic.
581 */
582 if (muxer_notif_iter->next_next_return_ts_ns <
583 muxer_notif_iter->last_returned_ts_ns) {
584 ret = -1;
585 goto end;
586 }
587
588 /*
589 * We are now sure that the next "next" return value will not
590 * change until it is returned by this muxer notification
591 * iterator. It is now safe to set the last returned time
592 * to this one.
593 */
594 muxer_notif_iter->last_returned_ts_ns =
595 muxer_notif_iter->next_next_return_ts_ns;
596
597end:
598 return ret;
599}
600
601static
602int muxer_notif_iter_update_upstream_notif_iters(struct muxer_comp *muxer_comp,
603 struct muxer_notif_iter *muxer_notif_iter)
604{
605 struct bt_component *comp = NULL;
606 int ret = 0;
607 uint64_t count;
608 size_t i;
609
610 comp = bt_component_from_private_component(muxer_comp->priv_comp);
611 assert(comp);
612 ret = bt_component_filter_get_input_port_count(comp, &count);
613 assert(ret == 0);
614
615 for (i = 0; i < count; i++) {
616 struct bt_private_port *priv_port =
617 bt_private_component_filter_get_input_private_port_at_index(
618 muxer_comp->priv_comp, i);
619 struct bt_port *port;
620 struct bt_notification_iterator *upstream_notif_iter;
621 enum bt_notification_iterator_status next_status;
622 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter;
623
624 assert(priv_port);
625
626 if (muxer_notif_iter_has_upstream_notif_iter_on_port(
627 muxer_notif_iter, priv_port)) {
628 bt_put(priv_port);
629 continue;
630 }
631
632 port = bt_port_from_private_port(priv_port);
633
634 if (!bt_port_is_connected(port)) {
635 bt_put(port);
636 bt_put(priv_port);
637 continue;
638 }
639
640 bt_put(port);
641 upstream_notif_iter = create_notif_iter_on_input_port(priv_port,
642 &ret);
643 if (ret) {
644 assert(!upstream_notif_iter);
645 bt_put(priv_port);
646 goto error;
647 }
648
649 next_status = bt_notification_iterator_next(
650 upstream_notif_iter);
651 if (next_status < 0) {
652 bt_put(priv_port);
653 bt_put(upstream_notif_iter);
654 ret = next_status;
655 goto error;
656 }
657
658 if (next_status == BT_NOTIFICATION_ITERATOR_STATUS_END) {
659 /* Already the end: do not even keep it */
660 bt_put(priv_port);
661 bt_put(upstream_notif_iter);
662 continue;
663 }
664
665 assert(next_status == BT_NOTIFICATION_ITERATOR_STATUS_OK ||
666 next_status == BT_NOTIFICATION_ITERATOR_STATUS_AGAIN);
667 muxer_upstream_notif_iter =
668 muxer_notif_iter_add_upstream_notif_iter(
669 muxer_notif_iter, upstream_notif_iter,
670 priv_port);
671 if (!muxer_upstream_notif_iter) {
672 bt_put(priv_port);
673 bt_put(upstream_notif_iter);
674 goto error;
675 }
676
677 if (next_status == BT_NOTIFICATION_ITERATOR_STATUS_AGAIN) {
678 muxer_notif_iter_add_upstream_notif_iter_to_retry(
679 muxer_notif_iter, muxer_upstream_notif_iter);
680 }
681
682 bt_put(priv_port);
683 bt_put(upstream_notif_iter);
684 }
685
686 goto end;
687
688error:
689 if (ret >= 0) {
690 ret = -1;
691 }
692
693end:
694 bt_put(comp);
695 return ret;
696}
697
698static
699void destroy_muxer_notif_iter(struct muxer_notif_iter *muxer_notif_iter)
700{
701 if (!muxer_notif_iter) {
702 return;
703 }
704
705 if (muxer_notif_iter->muxer_upstream_notif_iters) {
706 g_ptr_array_free(
707 muxer_notif_iter->muxer_upstream_notif_iters, TRUE);
708 }
709
710 if (muxer_notif_iter->muxer_upstream_notif_iters_retry) {
711 g_ptr_array_free(
712 muxer_notif_iter->muxer_upstream_notif_iters_retry,
713 TRUE);
714 }
715
716 g_free(muxer_notif_iter);
717}
718
719BT_HIDDEN
720enum bt_notification_iterator_status muxer_notif_iter_init(
721 struct bt_private_notification_iterator *priv_notif_iter,
722 struct bt_private_port *output_priv_port)
723{
724 struct muxer_comp *muxer_comp = NULL;
725 struct muxer_notif_iter *muxer_notif_iter = NULL;
726 struct bt_private_component *priv_comp = NULL;
727 enum bt_notification_iterator_status status =
728 BT_NOTIFICATION_ITERATOR_STATUS_OK;
729 int ret;
730
731 priv_comp = bt_private_notification_iterator_get_private_component(
732 priv_notif_iter);
733 assert(priv_comp);
734 muxer_comp = bt_private_component_get_user_data(priv_comp);
735 assert(muxer_comp);
736 muxer_notif_iter = g_new0(struct muxer_notif_iter, 1);
737 if (!muxer_notif_iter) {
738 goto error;
739 }
740
741 muxer_notif_iter->last_returned_ts_ns = INT64_MIN;
742 muxer_notif_iter->muxer_upstream_notif_iters =
743 g_ptr_array_new_with_free_func(
744 (GDestroyNotify) destroy_muxer_upstream_notif_iter);
745 if (!muxer_notif_iter->muxer_upstream_notif_iters) {
746 goto error;
747 }
748
749 muxer_notif_iter->muxer_upstream_notif_iters_retry = g_ptr_array_new();
750 if (!muxer_notif_iter->muxer_upstream_notif_iters_retry) {
751 goto error;
752 }
753
754 /*
755 * Initial upstream notification iterator update: this creates
756 * one upstream notification iterator for each connected port
757 * without an upstream notification iterator (for this muxer
758 * notification iterator).
759 *
760 * At this point the next "next" return value is not set yet.
761 */
762 ret = muxer_notif_iter_update_upstream_notif_iters(muxer_comp,
763 muxer_notif_iter);
764 if (ret) {
765 goto error;
766 }
767
768 /*
769 * Set the initial "next" return value.
770 */
771 ret = muxer_notif_iter_set_next_next_return(muxer_comp,
772 muxer_notif_iter);
773 if (ret) {
774 goto error;
775 }
776
777 ret = bt_private_notification_iterator_set_user_data(priv_notif_iter,
778 muxer_notif_iter);
779 assert(ret == 0);
780 g_ptr_array_add(muxer_comp->muxer_notif_iters, muxer_notif_iter);
781 goto end;
782
783error:
784 destroy_muxer_notif_iter(muxer_notif_iter);
785 ret = bt_private_notification_iterator_set_user_data(priv_notif_iter,
786 NULL);
787 assert(ret == 0);
788 status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
789
790end:
791 bt_put(priv_comp);
792 return status;
793}
794
795BT_HIDDEN
796void muxer_notif_iter_finalize(
797 struct bt_private_notification_iterator *priv_notif_iter)
798{
799 struct muxer_notif_iter *muxer_notif_iter =
800 bt_private_notification_iterator_get_user_data(priv_notif_iter);
801 struct bt_private_component *priv_comp = NULL;
802 struct muxer_comp *muxer_comp = NULL;
803
804 priv_comp = bt_private_notification_iterator_get_private_component(
805 priv_notif_iter);
806 assert(priv_comp);
807 muxer_comp = bt_private_component_get_user_data(priv_comp);
808
809 if (muxer_comp) {
810 (void) g_ptr_array_remove_fast(muxer_comp->muxer_notif_iters,
811 muxer_notif_iter);
812 destroy_muxer_notif_iter(muxer_notif_iter);
813 }
814
815 bt_put(priv_comp);
816}
817
818BT_HIDDEN
819struct bt_notification_iterator_next_return muxer_notif_iter_next(
820 struct bt_private_notification_iterator *priv_notif_iter)
821{
822 struct bt_notification_iterator_next_return next_ret = {
823 .notification = NULL,
824 };
825 struct muxer_notif_iter *muxer_notif_iter =
826 bt_private_notification_iterator_get_user_data(priv_notif_iter);
827 struct bt_private_component *priv_comp = NULL;
828 struct muxer_comp *muxer_comp = NULL;
829 size_t i;
830 int ret;
831
832 assert(muxer_notif_iter);
833 priv_comp = bt_private_notification_iterator_get_private_component(
834 priv_notif_iter);
835 assert(priv_comp);
836 muxer_comp = bt_private_component_get_user_data(priv_comp);
837 assert(muxer_comp);
838
839 /* Are we in an error state set elsewhere? */
840 if (unlikely(muxer_comp->error)) {
841 goto error;
842 }
843
844 /*
845 * If we have upstream notification iterators to retry, retry
846 * them now. Each one we find which now has a notification or
847 * is in "end" state, we set it to NULL in this array. Then
848 * we remove all the NULL values from this array.
849 */
850 for (i = 0; i < muxer_notif_iter->muxer_upstream_notif_iters_retry->len; i++) {
851 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter =
852 g_ptr_array_index(muxer_notif_iter->muxer_upstream_notif_iters_retry, i);
853 enum bt_notification_iterator_status status;
854
855 assert(muxer_upstream_notif_iter->notif_iter);
856 status = bt_notification_iterator_next(
857 muxer_upstream_notif_iter->notif_iter);
858 if (status < 0) {
859 /*
860 * Technically we have a next "next" return
861 * value which is ready for this call, but we're
862 * failing within this call, so discard this
863 * buffer.
864 */
865 goto error;
866 }
867
868 if (status == BT_NOTIFICATION_ITERATOR_STATUS_END) {
869 /*
870 * This upstream notification iterator is done.
871 * Set it to NULL so that it's removed later.
872 */
873 g_ptr_array_index(muxer_notif_iter->muxer_upstream_notif_iters_retry,
874 i) = NULL;
875 BT_PUT(muxer_upstream_notif_iter->notif_iter);
876 continue;
877 }
878
879 assert(status == BT_NOTIFICATION_ITERATOR_STATUS_OK ||
880 status == BT_NOTIFICATION_ITERATOR_STATUS_AGAIN);
881
882 if (status == BT_NOTIFICATION_ITERATOR_STATUS_OK) {
883 /*
884 * This upstream notification iterator now has.
885 * a notification. Remove it from this array.
886 */
887 g_ptr_array_index(muxer_notif_iter->muxer_upstream_notif_iters_retry,
888 i) = NULL;
889 continue;
890 }
891 }
892
893 /*
894 * Remove NULL values from the array of upstream notification
895 * iterators to retry.
896 */
897 while (g_ptr_array_remove_fast(
898 muxer_notif_iter->muxer_upstream_notif_iters_retry, NULL));
899
900 /* Take our next "next" next return value */
901 next_ret = muxer_notif_iter->next_next_return;
902 muxer_notif_iter->next_next_return.status =
903 BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
904 muxer_notif_iter->next_next_return.notification = NULL;
905
906 /* Set the next "next" return value */
907 ret = muxer_notif_iter_set_next_next_return(muxer_comp,
908 muxer_notif_iter);
909 if (ret) {
910 goto error;
911 }
912
913 goto end;
914
915error:
916 BT_PUT(next_ret.notification);
917 next_ret.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
918
919end:
920 bt_put(priv_comp);
921 return next_ret;
922}
923
924BT_HIDDEN
925void muxer_port_connected(
926 struct bt_private_component *priv_comp,
927 struct bt_private_port *self_private_port,
928 struct bt_port *other_port)
929{
930 struct bt_port *self_port =
931 bt_port_from_private_port(self_private_port);
932 struct muxer_comp *muxer_comp =
933 bt_private_component_get_user_data(priv_comp);
934 size_t i;
935 int ret;
936
937 assert(self_port);
938 assert(muxer_comp);
939
940 if (bt_port_get_type(self_port) == BT_PORT_TYPE_INPUT) {
941 int ret;
942
943 /* One less available input port */
944 muxer_comp->available_input_ports--;
945 ret = ensure_available_input_port(priv_comp);
946 if (ret) {
947 muxer_comp->error = true;
948 goto end;
949 }
950 }
951
952 for (i = 0; i < muxer_comp->muxer_notif_iters->len; i++) {
953 struct muxer_notif_iter *muxer_notif_iter =
954 g_ptr_array_index(muxer_comp->muxer_notif_iters, i);
955
956 /*
957 * Here we update the list of upstream notification
958 * iterators, but we do NOT call
959 * muxer_notif_iter_set_next_next_return() because we
960 * already have a next "next" return value at this point
961 * (right after the muxer notification iterator
962 * initialization, and always after).
963 */
964 ret = muxer_notif_iter_update_upstream_notif_iters(muxer_comp,
965 muxer_notif_iter);
966 if (ret) {
967 muxer_comp->error = true;
968 goto end;
969 }
970 }
971
972end:
973 bt_put(self_port);
974}
975
976BT_HIDDEN
977void muxer_port_disconnected(struct bt_private_component *priv_comp,
978 struct bt_private_port *priv_port)
979{
980 struct bt_port *port = bt_port_from_private_port(priv_port);
981 struct muxer_comp *muxer_comp =
982 bt_private_component_get_user_data(priv_comp);
983
984 assert(port);
985 assert(muxer_comp);
986
987 if (bt_port_get_type(port) == BT_PORT_TYPE_INPUT) {
988 /* One more available input port */
989 muxer_comp->available_input_ports++;
990 }
991
992 bt_put(port);
993}
This page took 0.061851 seconds and 4 git commands to generate.