2 * Copyright 2017 Philippe Proulx <pproulx@efficios.com>
4 * Permission is hereby granted, free of charge, to any person obtaining a copy
5 * of this software and associated documentation files (the "Software"), to deal
6 * in the Software without restriction, including without limitation the rights
7 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 * copies of the Software, and to permit persons to whom the Software is
9 * furnished to do so, subject to the following conditions:
11 * The above copyright notice and this permission notice shall be included in
12 * all copies or substantial portions of the Software.
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
23 #include <babeltrace/babeltrace-internal.h>
24 #include <babeltrace/ctf-ir/clock-class.h>
25 #include <babeltrace/ctf-ir/event.h>
26 #include <babeltrace/graph/clock-class-priority-map.h>
27 #include <babeltrace/graph/component-filter.h>
28 #include <babeltrace/graph/component.h>
29 #include <babeltrace/graph/notification-event.h>
30 #include <babeltrace/graph/notification-inactivity.h>
31 #include <babeltrace/graph/notification-iterator.h>
32 #include <babeltrace/graph/notification.h>
33 #include <babeltrace/graph/port.h>
34 #include <babeltrace/graph/private-component-filter.h>
35 #include <babeltrace/graph/private-component.h>
36 #include <babeltrace/graph/private-component.h>
37 #include <babeltrace/graph/private-connection.h>
38 #include <babeltrace/graph/private-notification-iterator.h>
39 #include <babeltrace/graph/private-port.h>
40 #include <plugins-common.h>
45 /* Array of struct bt_private_notification_iterator * (weak refs) */
46 GPtrArray
*muxer_notif_iters
;
49 struct bt_private_component
*priv_comp
;
50 unsigned int next_port_num
;
51 size_t available_input_ports
;
53 bool initializing_muxer_notif_iter
;
56 struct muxer_upstream_notif_iter
{
57 /* Owned by this, NULL if ended */
58 struct bt_notification_iterator
*notif_iter
;
61 struct bt_private_port
*priv_port
;
64 struct muxer_notif_iter
{
66 * Array of struct muxer_upstream_notif_iter * (owned by this).
68 * NOTE: This array is searched in linearly to find the youngest
69 * current notification. Keep this until benchmarks confirm that
70 * another data structure is faster than this for our typical
73 GPtrArray
*muxer_upstream_notif_iters
;
75 /* Array of struct muxer_upstream_notif_iter * (weak refs) */
76 GList
*muxer_upstream_notif_iters_to_retry
;
79 * List of "recently" connected input ports (owned by this) to
80 * handle by this muxer notification iterator.
81 * muxer_port_connected() adds entries to this list, and the
82 * entries are removed when a notification iterator is created
83 * on the port's connection and put into
84 * muxer_upstream_notif_iters above by
85 * muxer_notif_iter_handle_newly_connected_ports().
87 GList
*newly_connected_priv_ports
;
89 /* Next thing to return by the "next" method */
90 struct bt_notification_iterator_next_return next_next_return
;
91 int64_t next_next_return_ts_ns
;
93 /* Last time returned in a notification */
94 int64_t last_returned_ts_ns
;
98 void destroy_muxer_upstream_notif_iter(
99 struct muxer_upstream_notif_iter
*muxer_upstream_notif_iter
)
101 if (!muxer_upstream_notif_iter
) {
105 bt_put(muxer_upstream_notif_iter
->notif_iter
);
106 bt_put(muxer_upstream_notif_iter
->priv_port
);
107 g_free(muxer_upstream_notif_iter
);
111 struct muxer_upstream_notif_iter
*muxer_notif_iter_add_upstream_notif_iter(
112 struct muxer_notif_iter
*muxer_notif_iter
,
113 struct bt_notification_iterator
*notif_iter
,
114 struct bt_private_port
*priv_port
)
116 struct muxer_upstream_notif_iter
*muxer_upstream_notif_iter
=
117 g_new0(struct muxer_upstream_notif_iter
, 1);
119 if (!muxer_upstream_notif_iter
) {
123 muxer_upstream_notif_iter
->notif_iter
= bt_get(notif_iter
);
124 muxer_upstream_notif_iter
->priv_port
= bt_get(priv_port
);
125 g_ptr_array_add(muxer_notif_iter
->muxer_upstream_notif_iters
,
126 muxer_upstream_notif_iter
);
129 return muxer_upstream_notif_iter
;
133 bool muxer_notif_iter_has_upstream_notif_iter_to_retry(
134 struct muxer_notif_iter
*muxer_notif_iter
)
136 assert(muxer_notif_iter
);
137 return muxer_notif_iter
->muxer_upstream_notif_iters_to_retry
!= NULL
;
141 void muxer_notif_iter_add_upstream_notif_iter_to_retry(
142 struct muxer_notif_iter
*muxer_notif_iter
,
143 struct muxer_upstream_notif_iter
*muxer_upstream_notif_iter
)
145 assert(muxer_notif_iter
);
146 assert(muxer_upstream_notif_iter
);
147 muxer_notif_iter
->muxer_upstream_notif_iters_to_retry
=
149 muxer_notif_iter
->muxer_upstream_notif_iters_to_retry
,
150 muxer_upstream_notif_iter
);
154 int ensure_available_input_port(struct bt_private_component
*priv_comp
)
156 struct muxer_comp
*muxer_comp
=
157 bt_private_component_get_user_data(priv_comp
);
159 GString
*port_name
= NULL
;
160 void *priv_port
= NULL
;
164 if (muxer_comp
->available_input_ports
>= 1) {
168 port_name
= g_string_new("in");
174 g_string_append_printf(port_name
, "%u", muxer_comp
->next_port_num
);
175 priv_port
= bt_private_component_filter_add_input_private_port(
176 priv_comp
, port_name
->str
, NULL
);
182 muxer_comp
->available_input_ports
++;
183 muxer_comp
->next_port_num
++;
187 g_string_free(port_name
, TRUE
);
195 int remove_default_ports(struct bt_private_component
*priv_comp
)
197 struct bt_private_port
*priv_port
;
200 priv_port
= bt_private_component_filter_get_default_input_private_port(
203 ret
= bt_private_port_remove_from_component(priv_port
);
210 priv_port
= bt_private_component_filter_get_default_output_private_port(
213 ret
= bt_private_port_remove_from_component(priv_port
);
225 int create_output_port(struct bt_private_component
*priv_comp
)
230 priv_port
= bt_private_component_filter_add_output_private_port(
231 priv_comp
, "out", NULL
);
241 void destroy_muxer_comp(struct muxer_comp
*muxer_comp
)
247 if (muxer_comp
->muxer_notif_iters
) {
248 g_ptr_array_free(muxer_comp
->muxer_notif_iters
, TRUE
);
255 enum bt_component_status
muxer_init(
256 struct bt_private_component
*priv_comp
,
257 struct bt_value
*params
, void *init_data
)
260 enum bt_component_status status
= BT_COMPONENT_STATUS_OK
;
261 struct muxer_comp
*muxer_comp
= g_new0(struct muxer_comp
, 1);
267 muxer_comp
->muxer_notif_iters
= g_ptr_array_new();
268 if (!muxer_comp
->muxer_notif_iters
) {
272 muxer_comp
->priv_comp
= priv_comp
;
273 ret
= bt_private_component_set_user_data(priv_comp
, muxer_comp
);
275 ret
= remove_default_ports(priv_comp
);
280 ret
= ensure_available_input_port(priv_comp
);
285 ret
= create_output_port(priv_comp
);
293 destroy_muxer_comp(muxer_comp
);
294 ret
= bt_private_component_set_user_data(priv_comp
, NULL
);
296 status
= BT_COMPONENT_STATUS_ERROR
;
303 void muxer_finalize(struct bt_private_component
*priv_comp
)
305 struct muxer_comp
*muxer_comp
=
306 bt_private_component_get_user_data(priv_comp
);
308 destroy_muxer_comp(muxer_comp
);
312 struct bt_notification_iterator
*create_notif_iter_on_input_port(
313 struct bt_private_port
*priv_port
, int *ret
)
315 struct bt_port
*port
= bt_port_from_private_port(priv_port
);
316 struct bt_notification_iterator
*notif_iter
= NULL
;
317 struct bt_private_connection
*priv_conn
= NULL
;
323 assert(bt_port_is_connected(port
));
324 priv_conn
= bt_private_port_get_private_connection(priv_port
);
330 // TODO: Advance the iterator to >= the time of the latest
331 // returned notification by the muxer notification
332 // iterator which creates it.
333 notif_iter
= bt_private_connection_create_notification_iterator(
347 int muxer_upstream_notif_iter_next(struct muxer_notif_iter
*muxer_notif_iter
,
348 struct muxer_upstream_notif_iter
*muxer_upstream_notif_iter
)
351 enum bt_notification_iterator_status next_status
;
353 next_status
= bt_notification_iterator_next(
354 muxer_upstream_notif_iter
->notif_iter
);
356 switch (next_status
) {
357 case BT_NOTIFICATION_ITERATOR_STATUS_OK
:
358 /* Everything okay */
360 case BT_NOTIFICATION_ITERATOR_STATUS_AGAIN
:
361 muxer_notif_iter_add_upstream_notif_iter_to_retry(
362 muxer_notif_iter
, muxer_upstream_notif_iter
);
364 case BT_NOTIFICATION_ITERATOR_STATUS_END
:
366 * Notification iterator reached the end: release it. It
367 * won't be considered again to find the youngest
370 BT_PUT(muxer_upstream_notif_iter
->notif_iter
);
373 /* Error or unsupported status code */
382 int muxer_notif_iter_handle_newly_connected_ports(struct muxer_comp
*muxer_comp
,
383 struct muxer_notif_iter
*muxer_notif_iter
)
385 struct bt_component
*comp
= NULL
;
388 comp
= bt_component_from_private_component(muxer_comp
->priv_comp
);
392 * Here we create one upstream notification iterator for each
393 * newly connected port. The list of newly connected ports to
394 * handle here is updated by muxer_port_connected().
396 * An initial "next" operation is performed on each new upstream
397 * notification iterator. The possible return values of this
398 * initial "next" operation are:
400 * * BT_NOTIFICATION_ITERATOR_STATUS_OK: Perfect, we have a
401 * current notification.
403 * * BT_NOTIFICATION_ITERATOR_STATUS_AGAIN: No notification so
404 * far, but the muxer upstream notification iterator is added
405 * to the list of upstream notification iterators to retry
406 * before finding the next youngest notification.
408 * * BT_NOTIFICATION_ITERATOR_STATUS_END: No notification, and
409 * we immediately release the upstream notification iterator
410 * because it's useless.
412 * A possible side effect of this initial "next" operation, on
413 * each notification iterator, is the connection of a new port.
414 * In this case the list of newly connected ports is updated and
415 * this loop continues.
417 * Once this loop finishes successfully, the set of upstream
418 * notification iterators is considered _stable_, that is, it is
419 * safe, if no notification iterators must be retried, to select
420 * the youngest notification amongst them to be returned by the
421 * next "next" method call.
424 GList
*node
= muxer_notif_iter
->newly_connected_priv_ports
;
425 struct bt_private_port
*priv_port
;
426 struct bt_port
*port
;
427 struct bt_notification_iterator
*upstream_notif_iter
= NULL
;
428 struct muxer_upstream_notif_iter
*muxer_upstream_notif_iter
;
434 priv_port
= node
->data
;
435 port
= bt_port_from_private_port(priv_port
);
438 if (!bt_port_is_connected(port
)) {
440 * Looks like this port is not connected
441 * anymore: we can't create an upstream
442 * notification iterator on its connection in
449 upstream_notif_iter
= create_notif_iter_on_input_port(priv_port
,
452 assert(!upstream_notif_iter
);
457 muxer_upstream_notif_iter
=
458 muxer_notif_iter_add_upstream_notif_iter(
459 muxer_notif_iter
, upstream_notif_iter
,
462 BT_PUT(upstream_notif_iter
);
463 if (!muxer_upstream_notif_iter
) {
467 ret
= muxer_upstream_notif_iter_next(muxer_notif_iter
,
468 muxer_upstream_notif_iter
);
474 bt_put(upstream_notif_iter
);
477 muxer_notif_iter
->newly_connected_priv_ports
=
479 muxer_notif_iter
->newly_connected_priv_ports
,
496 int get_notif_ts_ns(struct muxer_comp
*muxer_comp
,
497 struct bt_notification
*notif
, int64_t last_returned_ts_ns
,
500 struct bt_clock_class_priority_map
*cc_prio_map
= NULL
;
501 struct bt_ctf_clock_class
*clock_class
= NULL
;
502 struct bt_ctf_clock_value
*clock_value
= NULL
;
503 struct bt_ctf_event
*event
= NULL
;
509 switch (bt_notification_get_type(notif
)) {
510 case BT_NOTIFICATION_TYPE_EVENT
:
512 bt_notification_event_get_clock_class_priority_map(
516 case BT_NOTIFICATION_TYPE_INACTIVITY
:
518 bt_notification_inactivity_get_clock_class_priority_map(
523 * All the other notifications have a higher
526 *ts_ns
= last_returned_ts_ns
;
535 * If the clock class priority map is empty, then we consider
536 * that this notification has no time. In this case it's always
539 if (bt_clock_class_priority_map_get_clock_class_count(cc_prio_map
) == 0) {
540 *ts_ns
= last_returned_ts_ns
;
545 bt_clock_class_priority_map_get_highest_priority_clock_class(
551 if (!bt_ctf_clock_class_is_absolute(clock_class
)) {
552 // TODO: Allow this with an explicit parameter
556 switch (bt_notification_get_type(notif
)) {
557 case BT_NOTIFICATION_TYPE_EVENT
:
558 event
= bt_notification_event_get_event(notif
);
560 clock_value
= bt_ctf_event_get_clock_value(event
,
563 case BT_NOTIFICATION_TYPE_INACTIVITY
:
564 clock_value
= bt_notification_inactivity_get_clock_value(
575 ret
= bt_ctf_clock_value_get_value_ns_from_epoch(clock_value
, ts_ns
);
594 * This function finds the youngest available notification amongst the
595 * non-ended upstream notification iterators and returns the upstream
596 * notification iterator which has it, or
597 * BT_NOTIFICATION_ITERATOR_STATUS_END if there's no available
600 * This function does NOT:
602 * * Update any upstream notification iterator.
603 * * Check for newly connected ports.
604 * * Check the upstream notification iterators to retry.
606 * On sucess, this function sets *muxer_upstream_notif_iter to the
607 * upstream notification iterator of which the current notification is
608 * the youngest, and sets *ts_ns to its time.
611 enum bt_notification_iterator_status
612 muxer_notif_iter_youngest_upstream_notif_iter(
613 struct muxer_comp
*muxer_comp
,
614 struct muxer_notif_iter
*muxer_notif_iter
,
615 struct muxer_upstream_notif_iter
**muxer_upstream_notif_iter
,
620 int64_t youngest_ts_ns
= INT64_MAX
;
621 enum bt_notification_iterator_status status
=
622 BT_NOTIFICATION_ITERATOR_STATUS_OK
;
625 assert(muxer_notif_iter
);
626 assert(muxer_upstream_notif_iter
);
627 *muxer_upstream_notif_iter
= NULL
;
629 for (i
= 0; i
< muxer_notif_iter
->muxer_upstream_notif_iters
->len
; i
++) {
630 struct bt_notification
*notif
;
631 struct muxer_upstream_notif_iter
*cur_muxer_upstream_notif_iter
=
632 g_ptr_array_index(muxer_notif_iter
->muxer_upstream_notif_iters
, i
);
635 if (!cur_muxer_upstream_notif_iter
->notif_iter
) {
636 /* This upstream notification iterator is ended */
640 notif
= bt_notification_iterator_get_notification(
641 cur_muxer_upstream_notif_iter
->notif_iter
);
643 ret
= get_notif_ts_ns(muxer_comp
, notif
,
644 muxer_notif_iter
->last_returned_ts_ns
, ¬if_ts_ns
);
647 *muxer_upstream_notif_iter
= NULL
;
648 status
= BT_NOTIFICATION_ITERATOR_STATUS_ERROR
;
652 if (notif_ts_ns
<= youngest_ts_ns
) {
653 *muxer_upstream_notif_iter
=
654 cur_muxer_upstream_notif_iter
;
655 youngest_ts_ns
= notif_ts_ns
;
656 *ts_ns
= youngest_ts_ns
;
660 if (!*muxer_upstream_notif_iter
) {
661 status
= BT_NOTIFICATION_ITERATOR_STATUS_END
;
670 int muxer_notif_iter_set_next_next_return(struct muxer_comp
*muxer_comp
,
671 struct muxer_notif_iter
*muxer_notif_iter
)
673 struct muxer_upstream_notif_iter
*muxer_upstream_notif_iter
;
674 enum bt_notification_iterator_status notif_iter_status
;
678 * Previous operations might have connected ports. They must be
679 * considered when finding the youngest notification because
680 * their upstream notification iterator does not exist yet.
682 ret
= muxer_notif_iter_handle_newly_connected_ports(muxer_comp
,
685 muxer_notif_iter
->next_next_return
.status
=
686 BT_NOTIFICATION_ITERATOR_STATUS_ERROR
;
687 BT_PUT(muxer_notif_iter
->next_next_return
.notification
);
691 assert(!muxer_notif_iter
->newly_connected_priv_ports
);
693 if (muxer_notif_iter_has_upstream_notif_iter_to_retry(
696 * At least one upstream notification iterator to retry:
697 * try again later, because we cannot find the youngest
698 * notification if we don't have the current
699 * notification of each upstream notification iterator.
701 muxer_notif_iter
->next_next_return
.status
=
702 BT_NOTIFICATION_ITERATOR_STATUS_AGAIN
;
703 BT_PUT(muxer_notif_iter
->next_next_return
.notification
);
708 * At this point we know that all our connected ports have an
709 * upstream notification iterator, and that all those iterators
710 * have a current notification (stable state). It is safe to
711 * find the youngest notification. It is possible that calling
712 * "next" on its iterator will connect new ports. This will be
713 * handled by the next call to
714 * muxer_notif_iter_set_next_next_return().
717 muxer_notif_iter_youngest_upstream_notif_iter(muxer_comp
,
718 muxer_notif_iter
, &muxer_upstream_notif_iter
,
719 &muxer_notif_iter
->next_next_return_ts_ns
);
720 if (notif_iter_status
== BT_NOTIFICATION_ITERATOR_STATUS_END
) {
721 /* No more active upstream notification iterator */
722 muxer_notif_iter
->next_next_return
.status
=
723 BT_NOTIFICATION_ITERATOR_STATUS_END
;
724 BT_PUT(muxer_notif_iter
->next_next_return
.notification
);
728 if (notif_iter_status
< 0) {
733 assert(notif_iter_status
== BT_NOTIFICATION_ITERATOR_STATUS_OK
);
734 BT_PUT(muxer_notif_iter
->next_next_return
.notification
);
735 muxer_notif_iter
->next_next_return
.notification
=
736 bt_notification_iterator_get_notification(
737 muxer_upstream_notif_iter
->notif_iter
);
738 assert(muxer_notif_iter
->next_next_return
.notification
);
739 muxer_notif_iter
->next_next_return
.status
=
740 BT_NOTIFICATION_ITERATOR_STATUS_OK
;
741 ret
= muxer_upstream_notif_iter_next(muxer_notif_iter
,
742 muxer_upstream_notif_iter
);
748 * Here we have the next "next" return value. It won't change
749 * until it is returned by the next call to our "next" method.
750 * If its time is less than the time of the last notification
751 * that our "next" method returned, then fail because the
752 * muxer's output wouldn't be monotonic.
754 if (muxer_notif_iter
->next_next_return_ts_ns
<
755 muxer_notif_iter
->last_returned_ts_ns
) {
761 * We are now sure that the next "next" return value will not
762 * change until it is returned by this muxer notification
763 * iterator (unless there's a fatal error). It is now safe to
764 * set the last returned time to this one.
766 muxer_notif_iter
->last_returned_ts_ns
=
767 muxer_notif_iter
->next_next_return_ts_ns
;
774 void destroy_muxer_notif_iter(struct muxer_notif_iter
*muxer_notif_iter
)
778 if (!muxer_notif_iter
) {
782 if (muxer_notif_iter
->muxer_upstream_notif_iters
) {
784 muxer_notif_iter
->muxer_upstream_notif_iters
, TRUE
);
787 if (muxer_notif_iter
->muxer_upstream_notif_iters_to_retry
) {
788 g_list_free(muxer_notif_iter
->muxer_upstream_notif_iters_to_retry
);
791 for (node
= muxer_notif_iter
->newly_connected_priv_ports
;
792 node
; node
= g_list_next(node
)) {
796 g_list_free(muxer_notif_iter
->newly_connected_priv_ports
);
797 g_free(muxer_notif_iter
);
801 int muxer_notif_iter_init_newly_connected_ports(struct muxer_comp
*muxer_comp
,
802 struct muxer_notif_iter
*muxer_notif_iter
)
804 struct bt_component
*comp
;
810 * Add the connected input ports to this muxer notification
811 * iterator's list of newly connected ports. They will be
812 * handled by muxer_notif_iter_handle_newly_connected_ports().
814 comp
= bt_component_from_private_component(muxer_comp
->priv_comp
);
816 count
= bt_component_filter_get_input_port_count(comp
);
821 for (i
= 0; i
< count
; i
++) {
822 struct bt_private_port
*priv_port
=
823 bt_private_component_filter_get_input_private_port_by_index(
824 muxer_comp
->priv_comp
, i
);
825 struct bt_port
*port
;
828 port
= bt_port_from_private_port(priv_port
);
831 if (!bt_port_is_connected(port
)) {
838 muxer_notif_iter
->newly_connected_priv_ports
=
840 muxer_notif_iter
->newly_connected_priv_ports
,
842 if (!muxer_notif_iter
->newly_connected_priv_ports
) {
855 enum bt_notification_iterator_status
muxer_notif_iter_init(
856 struct bt_private_notification_iterator
*priv_notif_iter
,
857 struct bt_private_port
*output_priv_port
)
859 struct muxer_comp
*muxer_comp
= NULL
;
860 struct muxer_notif_iter
*muxer_notif_iter
= NULL
;
861 struct bt_private_component
*priv_comp
= NULL
;
862 enum bt_notification_iterator_status status
=
863 BT_NOTIFICATION_ITERATOR_STATUS_OK
;
866 priv_comp
= bt_private_notification_iterator_get_private_component(
869 muxer_comp
= bt_private_component_get_user_data(priv_comp
);
872 if (muxer_comp
->initializing_muxer_notif_iter
) {
874 * Weird, unhandled situation: downstream creates a
875 * muxer notification iterator while creating another
876 * muxer notification iterator (same component).
881 muxer_comp
->initializing_muxer_notif_iter
= true;
882 muxer_notif_iter
= g_new0(struct muxer_notif_iter
, 1);
883 if (!muxer_notif_iter
) {
887 muxer_notif_iter
->last_returned_ts_ns
= INT64_MIN
;
888 muxer_notif_iter
->muxer_upstream_notif_iters
=
889 g_ptr_array_new_with_free_func(
890 (GDestroyNotify
) destroy_muxer_upstream_notif_iter
);
891 if (!muxer_notif_iter
->muxer_upstream_notif_iters
) {
896 * Add the muxer notification iterator to the component's array
897 * of muxer notification iterators here because
898 * muxer_notif_iter_init_newly_connected_ports() can cause
899 * muxer_port_connected() to be called, which adds the newly
900 * connected port to each muxer notification iterator's list of
901 * newly connected ports.
903 g_ptr_array_add(muxer_comp
->muxer_notif_iters
, muxer_notif_iter
);
904 ret
= muxer_notif_iter_init_newly_connected_ports(muxer_comp
,
910 /* Set the initial "next" return value */
911 ret
= muxer_notif_iter_set_next_next_return(muxer_comp
,
917 ret
= bt_private_notification_iterator_set_user_data(priv_notif_iter
,
923 if (g_ptr_array_index(muxer_comp
->muxer_notif_iters
,
924 muxer_comp
->muxer_notif_iters
->len
- 1) == muxer_notif_iter
) {
925 g_ptr_array_remove_index(muxer_comp
->muxer_notif_iters
,
926 muxer_comp
->muxer_notif_iters
->len
- 1);
929 destroy_muxer_notif_iter(muxer_notif_iter
);
930 ret
= bt_private_notification_iterator_set_user_data(priv_notif_iter
,
933 status
= BT_NOTIFICATION_ITERATOR_STATUS_ERROR
;
936 muxer_comp
->initializing_muxer_notif_iter
= false;
942 void muxer_notif_iter_finalize(
943 struct bt_private_notification_iterator
*priv_notif_iter
)
945 struct muxer_notif_iter
*muxer_notif_iter
=
946 bt_private_notification_iterator_get_user_data(priv_notif_iter
);
947 struct bt_private_component
*priv_comp
= NULL
;
948 struct muxer_comp
*muxer_comp
= NULL
;
950 priv_comp
= bt_private_notification_iterator_get_private_component(
953 muxer_comp
= bt_private_component_get_user_data(priv_comp
);
956 (void) g_ptr_array_remove_fast(muxer_comp
->muxer_notif_iters
,
958 destroy_muxer_notif_iter(muxer_notif_iter
);
965 struct bt_notification_iterator_next_return
muxer_notif_iter_next(
966 struct bt_private_notification_iterator
*priv_notif_iter
)
968 struct bt_notification_iterator_next_return next_ret
= {
969 .notification
= NULL
,
971 struct muxer_notif_iter
*muxer_notif_iter
=
972 bt_private_notification_iterator_get_user_data(priv_notif_iter
);
973 struct bt_private_component
*priv_comp
= NULL
;
974 struct muxer_comp
*muxer_comp
= NULL
;
978 assert(muxer_notif_iter
);
979 priv_comp
= bt_private_notification_iterator_get_private_component(
982 muxer_comp
= bt_private_component_get_user_data(priv_comp
);
985 /* Are we in an error state set elsewhere? */
986 if (unlikely(muxer_comp
->error
)) {
991 * If we have upstream notification iterators to retry, retry
992 * them now. Each one we find which now has a notification or
993 * is in "end" state, we set it to NULL in this array. Then
994 * we remove all the NULL values from this array.
996 retry_node
= muxer_notif_iter
->muxer_upstream_notif_iters_to_retry
;
998 struct muxer_upstream_notif_iter
*muxer_upstream_notif_iter
=
1000 enum bt_notification_iterator_status status
;
1001 GList
*next_retry_node
= g_list_next(retry_node
);
1003 assert(muxer_upstream_notif_iter
->notif_iter
);
1004 status
= bt_notification_iterator_next(
1005 muxer_upstream_notif_iter
->notif_iter
);
1010 if (status
== BT_NOTIFICATION_ITERATOR_STATUS_END
) {
1012 * This upstream notification iterator is done.
1013 * Put the iterator and remove node from list.
1015 BT_PUT(muxer_upstream_notif_iter
->notif_iter
);
1016 muxer_notif_iter
->muxer_upstream_notif_iters_to_retry
=
1018 muxer_notif_iter
->muxer_upstream_notif_iters_to_retry
,
1020 retry_node
= next_retry_node
;
1024 assert(status
== BT_NOTIFICATION_ITERATOR_STATUS_OK
||
1025 status
== BT_NOTIFICATION_ITERATOR_STATUS_AGAIN
);
1027 if (status
== BT_NOTIFICATION_ITERATOR_STATUS_OK
) {
1029 * This upstream notification iterator now has.
1030 * a notification. Remove it from this list.
1032 muxer_notif_iter
->muxer_upstream_notif_iters_to_retry
=
1034 muxer_notif_iter
->muxer_upstream_notif_iters_to_retry
,
1038 retry_node
= next_retry_node
;
1041 /* Take our next "next" next return value */
1042 next_ret
= muxer_notif_iter
->next_next_return
;
1043 muxer_notif_iter
->next_next_return
.status
=
1044 BT_NOTIFICATION_ITERATOR_STATUS_ERROR
;
1045 muxer_notif_iter
->next_next_return
.notification
= NULL
;
1047 /* Set the next "next" return value */
1048 ret
= muxer_notif_iter_set_next_next_return(muxer_comp
,
1058 * Technically we already have a next "next" return value which
1059 * is ready for this call, but we're failing within this call,
1060 * so discard this buffer and return the error ASAP.
1062 BT_PUT(next_ret
.notification
);
1063 next_ret
.status
= BT_NOTIFICATION_ITERATOR_STATUS_ERROR
;
1071 void muxer_port_connected(
1072 struct bt_private_component
*priv_comp
,
1073 struct bt_private_port
*self_private_port
,
1074 struct bt_port
*other_port
)
1076 struct bt_port
*self_port
=
1077 bt_port_from_private_port(self_private_port
);
1078 struct muxer_comp
*muxer_comp
=
1079 bt_private_component_get_user_data(priv_comp
);
1085 if (bt_port_get_type(self_port
) == BT_PORT_TYPE_INPUT
) {
1088 /* One less available input port */
1089 muxer_comp
->available_input_ports
--;
1090 ret
= ensure_available_input_port(priv_comp
);
1092 muxer_comp
->error
= true;
1097 for (i
= 0; i
< muxer_comp
->muxer_notif_iters
->len
; i
++) {
1098 struct muxer_notif_iter
*muxer_notif_iter
=
1099 g_ptr_array_index(muxer_comp
->muxer_notif_iters
, i
);
1102 * Add this port to the list of newly connected ports
1103 * for this muxer notification iterator. We append at
1104 * the end of this list while
1105 * muxer_notif_iter_handle_newly_connected_ports()
1106 * removes the nodes from the beginning.
1108 * The list node owns the private port.
1110 muxer_notif_iter
->newly_connected_priv_ports
=
1112 muxer_notif_iter
->newly_connected_priv_ports
,
1113 bt_get(self_private_port
));
1114 if (!muxer_notif_iter
->newly_connected_priv_ports
) {
1115 bt_put(self_private_port
);
1116 muxer_comp
->error
= true;
1126 void muxer_port_disconnected(struct bt_private_component
*priv_comp
,
1127 struct bt_private_port
*priv_port
)
1129 struct bt_port
*port
= bt_port_from_private_port(priv_port
);
1130 struct muxer_comp
*muxer_comp
=
1131 bt_private_component_get_user_data(priv_comp
);
1137 * There's nothing special to do when a port is disconnected
1138 * because this component deals with upstream notification
1139 * iterators which were already created thanks to connected
1140 * ports. The fact that the port is disconnected does not cancel
1141 * the upstream notification iterators created using its
1142 * connection: they still exist. The only way to remove an
1143 * upstream notification iterator is for its "next" operation to
1144 * return BT_NOTIFICATION_ITERATOR_STATUS_END.
1146 if (bt_port_get_type(port
) == BT_PORT_TYPE_INPUT
) {
1147 /* One more available input port */
1148 muxer_comp
->available_input_ports
++;