2 * Copyright 2017 Philippe Proulx <pproulx@efficios.com>
4 * Permission is hereby granted, free of charge, to any person obtaining a copy
5 * of this software and associated documentation files (the "Software"), to deal
6 * in the Software without restriction, including without limitation the rights
7 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 * copies of the Software, and to permit persons to whom the Software is
9 * furnished to do so, subject to the following conditions:
11 * The above copyright notice and this permission notice shall be included in
12 * all copies or substantial portions of the Software.
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
23 #include <babeltrace/babeltrace-internal.h>
24 #include <babeltrace/ctf-ir/clock-class.h>
25 #include <babeltrace/ctf-ir/event.h>
26 #include <babeltrace/graph/clock-class-priority-map.h>
27 #include <babeltrace/graph/component-filter.h>
28 #include <babeltrace/graph/component.h>
29 #include <babeltrace/graph/notification-event.h>
30 #include <babeltrace/graph/notification-inactivity.h>
31 #include <babeltrace/graph/notification-iterator.h>
32 #include <babeltrace/graph/notification.h>
33 #include <babeltrace/graph/port.h>
34 #include <babeltrace/graph/private-component-filter.h>
35 #include <babeltrace/graph/private-component.h>
36 #include <babeltrace/graph/private-component.h>
37 #include <babeltrace/graph/private-connection.h>
38 #include <babeltrace/graph/private-notification-iterator.h>
39 #include <babeltrace/graph/private-port.h>
40 #include <plugins-common.h>
45 /* Array of struct bt_private_notification_iterator * (weak refs) */
46 GPtrArray
*muxer_notif_iters
;
49 struct bt_private_component
*priv_comp
;
50 unsigned int next_port_num
;
51 size_t available_input_ports
;
55 struct muxer_upstream_notif_iter
{
56 /* Owned by this, NULL if ended */
57 struct bt_notification_iterator
*notif_iter
;
60 struct bt_private_port
*priv_port
;
63 struct muxer_notif_iter
{
65 * Array of struct muxer_upstream_notif_iter * (owned by this).
67 * NOTE: This array is searched in linearly to find the youngest
68 * current notification. Keep this until benchmarks confirm that
69 * another data structure is faster than this for our typical
72 GPtrArray
*muxer_upstream_notif_iters
;
74 /* Array of struct muxer_upstream_notif_iter * (weak refs) */
75 GList
*muxer_upstream_notif_iters_to_retry
;
78 * List of "recently" connected input ports (owned by this) to
79 * handle by this muxer notification iterator.
80 * muxer_port_connected() adds entries to this list, and the
81 * entries are removed when a notification iterator is created
82 * on the port's connection and put into
83 * muxer_upstream_notif_iters above by
84 * muxer_notif_iter_handle_newly_connected_ports().
86 GList
*newly_connected_priv_ports
;
88 /* Next thing to return by the "next" method */
89 struct bt_notification_iterator_next_return next_next_return
;
90 int64_t next_next_return_ts_ns
;
92 /* Last time returned in a notification */
93 int64_t last_returned_ts_ns
;
97 void destroy_muxer_upstream_notif_iter(
98 struct muxer_upstream_notif_iter
*muxer_upstream_notif_iter
)
100 if (!muxer_upstream_notif_iter
) {
104 bt_put(muxer_upstream_notif_iter
->notif_iter
);
105 bt_put(muxer_upstream_notif_iter
->priv_port
);
106 g_free(muxer_upstream_notif_iter
);
110 struct muxer_upstream_notif_iter
*muxer_notif_iter_add_upstream_notif_iter(
111 struct muxer_notif_iter
*muxer_notif_iter
,
112 struct bt_notification_iterator
*notif_iter
,
113 struct bt_private_port
*priv_port
)
115 struct muxer_upstream_notif_iter
*muxer_upstream_notif_iter
=
116 g_new0(struct muxer_upstream_notif_iter
, 1);
118 if (!muxer_upstream_notif_iter
) {
122 muxer_upstream_notif_iter
->notif_iter
= bt_get(notif_iter
);
123 muxer_upstream_notif_iter
->priv_port
= bt_get(priv_port
);
124 g_ptr_array_add(muxer_notif_iter
->muxer_upstream_notif_iters
,
125 muxer_upstream_notif_iter
);
128 return muxer_upstream_notif_iter
;
132 bool muxer_notif_iter_has_upstream_notif_iter_to_retry(
133 struct muxer_notif_iter
*muxer_notif_iter
)
135 assert(muxer_notif_iter
);
136 return muxer_notif_iter
->muxer_upstream_notif_iters_to_retry
!= NULL
;
140 void muxer_notif_iter_add_upstream_notif_iter_to_retry(
141 struct muxer_notif_iter
*muxer_notif_iter
,
142 struct muxer_upstream_notif_iter
*muxer_upstream_notif_iter
)
144 assert(muxer_notif_iter
);
145 assert(muxer_upstream_notif_iter
);
146 muxer_notif_iter
->muxer_upstream_notif_iters_to_retry
=
148 muxer_notif_iter
->muxer_upstream_notif_iters_to_retry
,
149 muxer_upstream_notif_iter
);
153 int ensure_available_input_port(struct bt_private_component
*priv_comp
)
155 struct muxer_comp
*muxer_comp
=
156 bt_private_component_get_user_data(priv_comp
);
158 GString
*port_name
= NULL
;
159 void *priv_port
= NULL
;
163 if (muxer_comp
->available_input_ports
>= 1) {
167 port_name
= g_string_new("in");
173 g_string_append_printf(port_name
, "%u", muxer_comp
->next_port_num
);
174 priv_port
= bt_private_component_filter_add_input_private_port(
175 priv_comp
, port_name
->str
);
181 muxer_comp
->available_input_ports
++;
182 muxer_comp
->next_port_num
++;
186 g_string_free(port_name
, TRUE
);
194 int remove_default_ports(struct bt_private_component
*priv_comp
)
196 struct bt_private_port
*priv_port
;
199 priv_port
= bt_private_component_filter_get_default_input_private_port(
202 ret
= bt_private_port_remove_from_component(priv_port
);
209 priv_port
= bt_private_component_filter_get_default_output_private_port(
212 ret
= bt_private_port_remove_from_component(priv_port
);
224 int create_output_port(struct bt_private_component
*priv_comp
)
229 priv_port
= bt_private_component_filter_add_output_private_port(
240 void destroy_muxer_comp(struct muxer_comp
*muxer_comp
)
246 if (muxer_comp
->muxer_notif_iters
) {
247 g_ptr_array_free(muxer_comp
->muxer_notif_iters
, TRUE
);
254 enum bt_component_status
muxer_init(
255 struct bt_private_component
*priv_comp
,
256 struct bt_value
*params
, void *init_data
)
259 enum bt_component_status status
= BT_COMPONENT_STATUS_OK
;
260 struct muxer_comp
*muxer_comp
= g_new0(struct muxer_comp
, 1);
266 muxer_comp
->muxer_notif_iters
= g_ptr_array_new();
267 if (!muxer_comp
->muxer_notif_iters
) {
271 muxer_comp
->priv_comp
= priv_comp
;
272 ret
= bt_private_component_set_user_data(priv_comp
, muxer_comp
);
274 ret
= remove_default_ports(priv_comp
);
279 ret
= ensure_available_input_port(priv_comp
);
284 ret
= create_output_port(priv_comp
);
292 destroy_muxer_comp(muxer_comp
);
293 ret
= bt_private_component_set_user_data(priv_comp
, NULL
);
295 status
= BT_COMPONENT_STATUS_ERROR
;
302 void muxer_finalize(struct bt_private_component
*priv_comp
)
304 struct muxer_comp
*muxer_comp
=
305 bt_private_component_get_user_data(priv_comp
);
307 destroy_muxer_comp(muxer_comp
);
311 struct bt_notification_iterator
*create_notif_iter_on_input_port(
312 struct bt_private_port
*priv_port
, int *ret
)
314 struct bt_port
*port
= bt_port_from_private_port(priv_port
);
315 struct bt_notification_iterator
*notif_iter
= NULL
;
316 struct bt_private_connection
*priv_conn
= NULL
;
322 assert(bt_port_is_connected(port
));
323 priv_conn
= bt_private_port_get_private_connection(priv_port
);
329 // TODO: Advance the iterator to >= the time of the latest
330 // returned notification by the muxer notification
331 // iterator which creates it.
332 notif_iter
= bt_private_connection_create_notification_iterator(
346 int muxer_upstream_notif_iter_next(struct muxer_notif_iter
*muxer_notif_iter
,
347 struct muxer_upstream_notif_iter
*muxer_upstream_notif_iter
)
350 enum bt_notification_iterator_status next_status
;
352 next_status
= bt_notification_iterator_next(
353 muxer_upstream_notif_iter
->notif_iter
);
355 switch (next_status
) {
356 case BT_NOTIFICATION_ITERATOR_STATUS_OK
:
357 /* Everything okay */
359 case BT_NOTIFICATION_ITERATOR_STATUS_AGAIN
:
360 muxer_notif_iter_add_upstream_notif_iter_to_retry(
361 muxer_notif_iter
, muxer_upstream_notif_iter
);
363 case BT_NOTIFICATION_ITERATOR_STATUS_END
:
365 * Notification iterator reached the end: release it. It
366 * won't be considered again to find the youngest
369 BT_PUT(muxer_upstream_notif_iter
->notif_iter
);
372 /* Error or unsupported status code */
381 int muxer_notif_iter_handle_newly_connected_ports(struct muxer_comp
*muxer_comp
,
382 struct muxer_notif_iter
*muxer_notif_iter
)
384 struct bt_component
*comp
= NULL
;
387 comp
= bt_component_from_private_component(muxer_comp
->priv_comp
);
391 * Here we create one upstream notification iterator for each
392 * newly connected port. The list of newly connected ports to
393 * handle here is updated by muxer_port_connected().
395 * An initial "next" operation is performed on each new upstream
396 * notification iterator. The possible return values of this
397 * initial "next" operation are:
399 * * BT_NOTIFICATION_ITERATOR_STATUS_OK: Perfect, we have a
400 * current notification.
402 * * BT_NOTIFICATION_ITERATOR_STATUS_AGAIN: No notification so
403 * far, but the muxer upstream notification iterator is added
404 * to the list of upstream notification iterators to retry
405 * before finding the next youngest notification.
407 * * BT_NOTIFICATION_ITERATOR_STATUS_END: No notification, and
408 * we immediately release the upstream notification iterator
409 * because it's useless.
411 * A possible side effect of this initial "next" operation, on
412 * each notification iterator, is the connection of a new port.
413 * In this case the list of newly connected ports is updated and
414 * this loop continues.
416 * Once this loop finishes successfully, the set of upstream
417 * notification iterators is considered _stable_, that is, it is
418 * safe, if no notification iterators must be retried, to select
419 * the youngest notification amongst them to be returned by the
420 * next "next" method call.
423 GList
*node
= muxer_notif_iter
->newly_connected_priv_ports
;
424 struct bt_private_port
*priv_port
;
425 struct bt_port
*port
;
426 struct bt_notification_iterator
*upstream_notif_iter
= NULL
;
427 struct muxer_upstream_notif_iter
*muxer_upstream_notif_iter
;
433 priv_port
= node
->data
;
434 port
= bt_port_from_private_port(priv_port
);
437 if (!bt_port_is_connected(port
)) {
439 * Looks like this port is not connected
440 * anymore: we can't create an upstream
441 * notification iterator on its connection in
448 upstream_notif_iter
= create_notif_iter_on_input_port(priv_port
,
451 assert(!upstream_notif_iter
);
456 muxer_upstream_notif_iter
=
457 muxer_notif_iter_add_upstream_notif_iter(
458 muxer_notif_iter
, upstream_notif_iter
,
461 BT_PUT(upstream_notif_iter
);
462 if (!muxer_upstream_notif_iter
) {
466 ret
= muxer_upstream_notif_iter_next(muxer_notif_iter
,
467 muxer_upstream_notif_iter
);
473 bt_put(upstream_notif_iter
);
476 muxer_notif_iter
->newly_connected_priv_ports
=
478 muxer_notif_iter
->newly_connected_priv_ports
,
495 int get_notif_ts_ns(struct muxer_comp
*muxer_comp
,
496 struct bt_notification
*notif
, int64_t last_returned_ts_ns
,
499 struct bt_clock_class_priority_map
*cc_prio_map
= NULL
;
500 struct bt_ctf_clock_class
*clock_class
= NULL
;
501 struct bt_ctf_clock_value
*clock_value
= NULL
;
502 struct bt_ctf_event
*event
= NULL
;
508 switch (bt_notification_get_type(notif
)) {
509 case BT_NOTIFICATION_TYPE_EVENT
:
511 bt_notification_event_get_clock_class_priority_map(
515 case BT_NOTIFICATION_TYPE_INACTIVITY
:
517 bt_notification_event_get_clock_class_priority_map(
522 * All the other notifications have a higher
525 *ts_ns
= last_returned_ts_ns
;
534 * If the clock class priority map is empty, then we consider
535 * that this notification has no time. In this case it's always
538 if (bt_clock_class_priority_map_get_clock_class_count(cc_prio_map
) == 0) {
539 *ts_ns
= last_returned_ts_ns
;
544 bt_clock_class_priority_map_get_highest_priority_clock_class(
550 if (!bt_ctf_clock_class_get_is_absolute(clock_class
)) {
551 // TODO: Allow this with an explicit parameter
555 switch (bt_notification_get_type(notif
)) {
556 case BT_NOTIFICATION_TYPE_EVENT
:
557 event
= bt_notification_event_get_event(notif
);
559 clock_value
= bt_ctf_event_get_clock_value(event
,
562 case BT_NOTIFICATION_TYPE_INACTIVITY
:
563 clock_value
= bt_notification_inactivity_get_clock_value(
574 ret
= bt_ctf_clock_value_get_value_ns_from_epoch(clock_value
, ts_ns
);
593 * This function finds the youngest available notification amongst the
594 * non-ended upstream notification iterators and returns the upstream
595 * notification iterator which has it, or
596 * BT_NOTIFICATION_ITERATOR_STATUS_END if there's no available
599 * This function does NOT:
601 * * Update any upstream notification iterator.
602 * * Check for newly connected ports.
603 * * Check the upstream notification iterators to retry.
605 * On sucess, this function sets *muxer_upstream_notif_iter to the
606 * upstream notification iterator of which the current notification is
607 * the youngest, and sets *ts_ns to its time.
610 enum bt_notification_iterator_status
611 muxer_notif_iter_youngest_upstream_notif_iter(
612 struct muxer_comp
*muxer_comp
,
613 struct muxer_notif_iter
*muxer_notif_iter
,
614 struct muxer_upstream_notif_iter
**muxer_upstream_notif_iter
,
619 int64_t youngest_ts_ns
= INT64_MAX
;
620 enum bt_notification_iterator_status status
=
621 BT_NOTIFICATION_ITERATOR_STATUS_OK
;
624 assert(muxer_notif_iter
);
625 assert(muxer_upstream_notif_iter
);
626 *muxer_upstream_notif_iter
= NULL
;
628 for (i
= 0; i
< muxer_notif_iter
->muxer_upstream_notif_iters
->len
; i
++) {
629 struct bt_notification
*notif
;
630 struct muxer_upstream_notif_iter
*cur_muxer_upstream_notif_iter
=
631 g_ptr_array_index(muxer_notif_iter
->muxer_upstream_notif_iters
, i
);
634 if (!cur_muxer_upstream_notif_iter
->notif_iter
) {
635 /* This upstream notification iterator is ended */
639 notif
= bt_notification_iterator_get_notification(
640 cur_muxer_upstream_notif_iter
->notif_iter
);
642 ret
= get_notif_ts_ns(muxer_comp
, notif
,
643 muxer_notif_iter
->last_returned_ts_ns
, ¬if_ts_ns
);
646 *muxer_upstream_notif_iter
= NULL
;
647 status
= BT_NOTIFICATION_ITERATOR_STATUS_ERROR
;
651 if (notif_ts_ns
<= youngest_ts_ns
) {
652 *muxer_upstream_notif_iter
=
653 cur_muxer_upstream_notif_iter
;
654 youngest_ts_ns
= notif_ts_ns
;
655 *ts_ns
= youngest_ts_ns
;
659 if (!*muxer_upstream_notif_iter
) {
660 status
= BT_NOTIFICATION_ITERATOR_STATUS_END
;
669 int muxer_notif_iter_set_next_next_return(struct muxer_comp
*muxer_comp
,
670 struct muxer_notif_iter
*muxer_notif_iter
)
672 struct muxer_upstream_notif_iter
*muxer_upstream_notif_iter
;
673 enum bt_notification_iterator_status notif_iter_status
;
677 * Previous operations might have connected ports. They must be
678 * considered when finding the youngest notification because
679 * their upstream notification iterator does not exist yet.
681 ret
= muxer_notif_iter_handle_newly_connected_ports(muxer_comp
,
684 muxer_notif_iter
->next_next_return
.status
=
685 BT_NOTIFICATION_ITERATOR_STATUS_ERROR
;
686 BT_PUT(muxer_notif_iter
->next_next_return
.notification
);
690 assert(!muxer_notif_iter
->newly_connected_priv_ports
);
692 if (muxer_notif_iter_has_upstream_notif_iter_to_retry(
695 * At least one upstream notification iterator to retry:
696 * try again later, because we cannot find the youngest
697 * notification if we don't have the current
698 * notification of each upstream notification iterator.
700 muxer_notif_iter
->next_next_return
.status
=
701 BT_NOTIFICATION_ITERATOR_STATUS_AGAIN
;
702 BT_PUT(muxer_notif_iter
->next_next_return
.notification
);
707 * At this point we know that all our connected ports have an
708 * upstream notification iterator, and that all those iterators
709 * have a current notification (stable state). It is safe to
710 * find the youngest notification. It is possible that calling
711 * "next" on its iterator will connect new ports. This will be
712 * handled by the next call to
713 * muxer_notif_iter_set_next_next_return().
716 muxer_notif_iter_youngest_upstream_notif_iter(muxer_comp
,
717 muxer_notif_iter
, &muxer_upstream_notif_iter
,
718 &muxer_notif_iter
->next_next_return_ts_ns
);
719 if (notif_iter_status
== BT_NOTIFICATION_ITERATOR_STATUS_END
) {
720 /* No more active upstream notification iterator */
721 muxer_notif_iter
->next_next_return
.status
=
722 BT_NOTIFICATION_ITERATOR_STATUS_END
;
723 BT_PUT(muxer_notif_iter
->next_next_return
.notification
);
727 if (notif_iter_status
< 0) {
732 assert(notif_iter_status
== BT_NOTIFICATION_ITERATOR_STATUS_OK
);
733 BT_PUT(muxer_notif_iter
->next_next_return
.notification
);
734 muxer_notif_iter
->next_next_return
.notification
=
735 bt_notification_iterator_get_notification(
736 muxer_upstream_notif_iter
->notif_iter
);
737 assert(muxer_notif_iter
->next_next_return
.notification
);
738 muxer_notif_iter
->next_next_return
.status
=
739 BT_NOTIFICATION_ITERATOR_STATUS_OK
;
740 ret
= muxer_upstream_notif_iter_next(muxer_notif_iter
,
741 muxer_upstream_notif_iter
);
747 * Here we have the next "next" return value. It won't change
748 * until it is returned by the next call to our "next" method.
749 * If its time is less than the time of the last notification
750 * that our "next" method returned, then fail because the
751 * muxer's output wouldn't be monotonic.
753 if (muxer_notif_iter
->next_next_return_ts_ns
<
754 muxer_notif_iter
->last_returned_ts_ns
) {
760 * We are now sure that the next "next" return value will not
761 * change until it is returned by this muxer notification
762 * iterator (unless there's a fatal error). It is now safe to
763 * set the last returned time to this one.
765 muxer_notif_iter
->last_returned_ts_ns
=
766 muxer_notif_iter
->next_next_return_ts_ns
;
773 void destroy_muxer_notif_iter(struct muxer_notif_iter
*muxer_notif_iter
)
777 if (!muxer_notif_iter
) {
781 if (muxer_notif_iter
->muxer_upstream_notif_iters
) {
783 muxer_notif_iter
->muxer_upstream_notif_iters
, TRUE
);
786 if (muxer_notif_iter
->muxer_upstream_notif_iters_to_retry
) {
787 g_list_free(muxer_notif_iter
->muxer_upstream_notif_iters_to_retry
);
790 for (node
= muxer_notif_iter
->newly_connected_priv_ports
;
791 node
; node
= g_list_next(node
)) {
795 g_list_free(muxer_notif_iter
->newly_connected_priv_ports
);
796 g_free(muxer_notif_iter
);
800 int muxer_notif_iter_init_newly_connected_ports(struct muxer_comp
*muxer_comp
,
801 struct muxer_notif_iter
*muxer_notif_iter
)
803 struct bt_component
*comp
;
809 * Add the connected input ports to this muxer notification
810 * iterator's list of newly connected ports. They will be
811 * handled by muxer_notif_iter_handle_newly_connected_ports().
813 comp
= bt_component_from_private_component(muxer_comp
->priv_comp
);
815 ret
= bt_component_filter_get_input_port_count(comp
, &count
);
820 for (i
= 0; i
< count
; i
++) {
821 struct bt_private_port
*priv_port
=
822 bt_private_component_filter_get_input_private_port_at_index(
823 muxer_comp
->priv_comp
, i
);
824 struct bt_port
*port
;
827 port
= bt_port_from_private_port(priv_port
);
830 if (!bt_port_is_connected(port
)) {
837 muxer_notif_iter
->newly_connected_priv_ports
=
839 muxer_notif_iter
->newly_connected_priv_ports
,
841 if (!muxer_notif_iter
->newly_connected_priv_ports
) {
854 enum bt_notification_iterator_status
muxer_notif_iter_init(
855 struct bt_private_notification_iterator
*priv_notif_iter
,
856 struct bt_private_port
*output_priv_port
)
858 struct muxer_comp
*muxer_comp
= NULL
;
859 struct muxer_notif_iter
*muxer_notif_iter
= NULL
;
860 struct bt_private_component
*priv_comp
= NULL
;
861 enum bt_notification_iterator_status status
=
862 BT_NOTIFICATION_ITERATOR_STATUS_OK
;
865 priv_comp
= bt_private_notification_iterator_get_private_component(
868 muxer_comp
= bt_private_component_get_user_data(priv_comp
);
870 muxer_notif_iter
= g_new0(struct muxer_notif_iter
, 1);
871 if (!muxer_notif_iter
) {
875 ret
= muxer_notif_iter_init_newly_connected_ports(muxer_comp
,
881 muxer_notif_iter
->last_returned_ts_ns
= INT64_MIN
;
882 muxer_notif_iter
->muxer_upstream_notif_iters
=
883 g_ptr_array_new_with_free_func(
884 (GDestroyNotify
) destroy_muxer_upstream_notif_iter
);
885 if (!muxer_notif_iter
->muxer_upstream_notif_iters
) {
889 /* Set the initial "next" return value */
890 ret
= muxer_notif_iter_set_next_next_return(muxer_comp
,
896 ret
= bt_private_notification_iterator_set_user_data(priv_notif_iter
,
899 g_ptr_array_add(muxer_comp
->muxer_notif_iters
, muxer_notif_iter
);
903 destroy_muxer_notif_iter(muxer_notif_iter
);
904 ret
= bt_private_notification_iterator_set_user_data(priv_notif_iter
,
907 status
= BT_NOTIFICATION_ITERATOR_STATUS_ERROR
;
915 void muxer_notif_iter_finalize(
916 struct bt_private_notification_iterator
*priv_notif_iter
)
918 struct muxer_notif_iter
*muxer_notif_iter
=
919 bt_private_notification_iterator_get_user_data(priv_notif_iter
);
920 struct bt_private_component
*priv_comp
= NULL
;
921 struct muxer_comp
*muxer_comp
= NULL
;
923 priv_comp
= bt_private_notification_iterator_get_private_component(
926 muxer_comp
= bt_private_component_get_user_data(priv_comp
);
929 (void) g_ptr_array_remove_fast(muxer_comp
->muxer_notif_iters
,
931 destroy_muxer_notif_iter(muxer_notif_iter
);
938 struct bt_notification_iterator_next_return
muxer_notif_iter_next(
939 struct bt_private_notification_iterator
*priv_notif_iter
)
941 struct bt_notification_iterator_next_return next_ret
= {
942 .notification
= NULL
,
944 struct muxer_notif_iter
*muxer_notif_iter
=
945 bt_private_notification_iterator_get_user_data(priv_notif_iter
);
946 struct bt_private_component
*priv_comp
= NULL
;
947 struct muxer_comp
*muxer_comp
= NULL
;
951 assert(muxer_notif_iter
);
952 priv_comp
= bt_private_notification_iterator_get_private_component(
955 muxer_comp
= bt_private_component_get_user_data(priv_comp
);
958 /* Are we in an error state set elsewhere? */
959 if (unlikely(muxer_comp
->error
)) {
964 * If we have upstream notification iterators to retry, retry
965 * them now. Each one we find which now has a notification or
966 * is in "end" state, we set it to NULL in this array. Then
967 * we remove all the NULL values from this array.
969 retry_node
= muxer_notif_iter
->muxer_upstream_notif_iters_to_retry
;
971 struct muxer_upstream_notif_iter
*muxer_upstream_notif_iter
=
973 enum bt_notification_iterator_status status
;
974 GList
*next_retry_node
= g_list_next(retry_node
);
976 assert(muxer_upstream_notif_iter
->notif_iter
);
977 status
= bt_notification_iterator_next(
978 muxer_upstream_notif_iter
->notif_iter
);
983 if (status
== BT_NOTIFICATION_ITERATOR_STATUS_END
) {
985 * This upstream notification iterator is done.
986 * Put the iterator and remove node from list.
988 BT_PUT(muxer_upstream_notif_iter
->notif_iter
);
989 muxer_notif_iter
->muxer_upstream_notif_iters_to_retry
=
991 muxer_notif_iter
->muxer_upstream_notif_iters_to_retry
,
993 retry_node
= next_retry_node
;
997 assert(status
== BT_NOTIFICATION_ITERATOR_STATUS_OK
||
998 status
== BT_NOTIFICATION_ITERATOR_STATUS_AGAIN
);
1000 if (status
== BT_NOTIFICATION_ITERATOR_STATUS_OK
) {
1002 * This upstream notification iterator now has.
1003 * a notification. Remove it from this list.
1005 muxer_notif_iter
->muxer_upstream_notif_iters_to_retry
=
1007 muxer_notif_iter
->muxer_upstream_notif_iters_to_retry
,
1011 retry_node
= next_retry_node
;
1014 /* Take our next "next" next return value */
1015 next_ret
= muxer_notif_iter
->next_next_return
;
1016 muxer_notif_iter
->next_next_return
.status
=
1017 BT_NOTIFICATION_ITERATOR_STATUS_ERROR
;
1018 muxer_notif_iter
->next_next_return
.notification
= NULL
;
1020 /* Set the next "next" return value */
1021 ret
= muxer_notif_iter_set_next_next_return(muxer_comp
,
1031 * Technically we already have a next "next" return value which
1032 * is ready for this call, but we're failing within this call,
1033 * so discard this buffer and return the error ASAP.
1035 BT_PUT(next_ret
.notification
);
1036 next_ret
.status
= BT_NOTIFICATION_ITERATOR_STATUS_ERROR
;
1044 void muxer_port_connected(
1045 struct bt_private_component
*priv_comp
,
1046 struct bt_private_port
*self_private_port
,
1047 struct bt_port
*other_port
)
1049 struct bt_port
*self_port
=
1050 bt_port_from_private_port(self_private_port
);
1051 struct muxer_comp
*muxer_comp
=
1052 bt_private_component_get_user_data(priv_comp
);
1058 if (bt_port_get_type(self_port
) == BT_PORT_TYPE_INPUT
) {
1061 /* One less available input port */
1062 muxer_comp
->available_input_ports
--;
1063 ret
= ensure_available_input_port(priv_comp
);
1065 muxer_comp
->error
= true;
1070 for (i
= 0; i
< muxer_comp
->muxer_notif_iters
->len
; i
++) {
1071 struct muxer_notif_iter
*muxer_notif_iter
=
1072 g_ptr_array_index(muxer_comp
->muxer_notif_iters
, i
);
1075 * Add this port to the list of newly connected ports
1076 * for this muxer notification iterator. We append at
1077 * the end of this list while
1078 * muxer_notif_iter_handle_newly_connected_ports()
1079 * removes the nodes from the beginning.
1081 * The list node owns the private port.
1083 muxer_notif_iter
->newly_connected_priv_ports
=
1085 muxer_notif_iter
->newly_connected_priv_ports
,
1086 bt_get(self_private_port
));
1087 if (!muxer_notif_iter
->newly_connected_priv_ports
) {
1088 bt_put(self_private_port
);
1089 muxer_comp
->error
= true;
1099 void muxer_port_disconnected(struct bt_private_component
*priv_comp
,
1100 struct bt_private_port
*priv_port
)
1102 struct bt_port
*port
= bt_port_from_private_port(priv_port
);
1103 struct muxer_comp
*muxer_comp
=
1104 bt_private_component_get_user_data(priv_comp
);
1110 * There's nothing special to do when a port is disconnected
1111 * because this component deals with upstream notification
1112 * iterators which were already created thanks to connected
1113 * ports. The fact that the port is disconnected does not cancel
1114 * the upstream notification iterators created using its
1115 * connection: they still exist. The only way to remove an
1116 * upstream notification iterator is for its "next" operation to
1117 * return BT_NOTIFICATION_ITERATOR_STATUS_END.
1119 if (bt_port_get_type(port
) == BT_PORT_TYPE_INPUT
) {
1120 /* One more available input port */
1121 muxer_comp
->available_input_ports
++;