text.pretty: fix handle_notification()
[babeltrace.git] / plugins / utils / muxer / muxer.c
CommitLineData
958f7d11
PP
1/*
2 * Copyright 2017 Philippe Proulx <pproulx@efficios.com>
3 *
4 * Permission is hereby granted, free of charge, to any person obtaining a copy
5 * of this software and associated documentation files (the "Software"), to deal
6 * in the Software without restriction, including without limitation the rights
7 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 * copies of the Software, and to permit persons to whom the Software is
9 * furnished to do so, subject to the following conditions:
10 *
11 * The above copyright notice and this permission notice shall be included in
12 * all copies or substantial portions of the Software.
13 *
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20 * SOFTWARE.
21 */
22
23#include <babeltrace/babeltrace-internal.h>
24#include <babeltrace/ctf-ir/clock-class.h>
25#include <babeltrace/ctf-ir/event.h>
26#include <babeltrace/graph/clock-class-priority-map.h>
27#include <babeltrace/graph/component-filter.h>
28#include <babeltrace/graph/component.h>
29#include <babeltrace/graph/notification-event.h>
30#include <babeltrace/graph/notification-inactivity.h>
31#include <babeltrace/graph/notification-iterator.h>
32#include <babeltrace/graph/notification.h>
33#include <babeltrace/graph/port.h>
34#include <babeltrace/graph/private-component-filter.h>
35#include <babeltrace/graph/private-component.h>
36#include <babeltrace/graph/private-component.h>
37#include <babeltrace/graph/private-connection.h>
38#include <babeltrace/graph/private-notification-iterator.h>
39#include <babeltrace/graph/private-port.h>
40#include <plugins-common.h>
41#include <glib.h>
42#include <assert.h>
43
44struct muxer_comp {
45 /* Array of struct bt_private_notification_iterator * (weak refs) */
46 GPtrArray *muxer_notif_iters;
47
48 /* Weak ref */
49 struct bt_private_component *priv_comp;
50 unsigned int next_port_num;
51 size_t available_input_ports;
52 bool error;
53};
54
55struct muxer_upstream_notif_iter {
ab11110e 56 /* Owned by this, NULL if ended */
958f7d11
PP
57 struct bt_notification_iterator *notif_iter;
58
59 /* Owned by this*/
60 struct bt_private_port *priv_port;
61};
62
958f7d11 63struct muxer_notif_iter {
ab11110e
PP
64 /*
65 * Array of struct muxer_upstream_notif_iter * (owned by this).
66 *
67 * NOTE: This array is searched in linearly to find the youngest
68 * current notification. Keep this until benchmarks confirm that
69 * another data structure is faster than this for our typical
70 * use cases.
71 */
958f7d11
PP
72 GPtrArray *muxer_upstream_notif_iters;
73
74 /* Array of struct muxer_upstream_notif_iter * (weak refs) */
ab11110e
PP
75 GList *muxer_upstream_notif_iters_to_retry;
76
77 /*
78 * List of "recently" connected input ports (owned by this) to
79 * handle by this muxer notification iterator.
80 * muxer_port_connected() adds entries to this list, and the
81 * entries are removed when a notification iterator is created
82 * on the port's connection and put into
83 * muxer_upstream_notif_iters above by
84 * muxer_notif_iter_handle_newly_connected_ports().
85 */
86 GList *newly_connected_priv_ports;
958f7d11
PP
87
88 /* Next thing to return by the "next" method */
89 struct bt_notification_iterator_next_return next_next_return;
90 int64_t next_next_return_ts_ns;
91
92 /* Last time returned in a notification */
93 int64_t last_returned_ts_ns;
94};
95
ab11110e
PP
96static
97void destroy_muxer_upstream_notif_iter(
98 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter)
99{
100 if (!muxer_upstream_notif_iter) {
101 return;
102 }
103
104 bt_put(muxer_upstream_notif_iter->notif_iter);
105 bt_put(muxer_upstream_notif_iter->priv_port);
106 g_free(muxer_upstream_notif_iter);
107}
108
958f7d11
PP
109static
110struct muxer_upstream_notif_iter *muxer_notif_iter_add_upstream_notif_iter(
111 struct muxer_notif_iter *muxer_notif_iter,
112 struct bt_notification_iterator *notif_iter,
113 struct bt_private_port *priv_port)
114{
115 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter =
116 g_new0(struct muxer_upstream_notif_iter, 1);
117
118 if (!muxer_upstream_notif_iter) {
119 goto end;
120 }
121
122 muxer_upstream_notif_iter->notif_iter = bt_get(notif_iter);
123 muxer_upstream_notif_iter->priv_port = bt_get(priv_port);
124 g_ptr_array_add(muxer_notif_iter->muxer_upstream_notif_iters,
125 muxer_upstream_notif_iter);
126
127end:
128 return muxer_upstream_notif_iter;
129}
130
131static inline
132bool muxer_notif_iter_has_upstream_notif_iter_to_retry(
133 struct muxer_notif_iter *muxer_notif_iter)
134{
135 assert(muxer_notif_iter);
ab11110e 136 return muxer_notif_iter->muxer_upstream_notif_iters_to_retry != NULL;
958f7d11
PP
137}
138
139static
140void muxer_notif_iter_add_upstream_notif_iter_to_retry(
141 struct muxer_notif_iter *muxer_notif_iter,
142 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter)
143{
144 assert(muxer_notif_iter);
145 assert(muxer_upstream_notif_iter);
ab11110e
PP
146 muxer_notif_iter->muxer_upstream_notif_iters_to_retry =
147 g_list_append(
148 muxer_notif_iter->muxer_upstream_notif_iters_to_retry,
149 muxer_upstream_notif_iter);
958f7d11
PP
150}
151
152static
153int ensure_available_input_port(struct bt_private_component *priv_comp)
154{
155 struct muxer_comp *muxer_comp =
156 bt_private_component_get_user_data(priv_comp);
157 int ret = 0;
158 GString *port_name = NULL;
159 void *priv_port = NULL;
160
161 assert(muxer_comp);
162
163 if (muxer_comp->available_input_ports >= 1) {
164 goto end;
165 }
166
167 port_name = g_string_new("in");
168 if (!port_name) {
169 ret = -1;
170 goto end;
171 }
172
173 g_string_append_printf(port_name, "%u", muxer_comp->next_port_num);
174 priv_port = bt_private_component_filter_add_input_private_port(
175 priv_comp, port_name->str);
176 if (!priv_port) {
177 ret = -1;
178 goto end;
179 }
180
181 muxer_comp->available_input_ports++;
182 muxer_comp->next_port_num++;
183
184end:
185 if (port_name) {
186 g_string_free(port_name, TRUE);
187 }
188
189 BT_PUT(priv_port);
190 return ret;
191}
192
193static
194int remove_default_ports(struct bt_private_component *priv_comp)
195{
196 struct bt_private_port *priv_port;
197 int ret = 0;
198
199 priv_port = bt_private_component_filter_get_default_input_private_port(
200 priv_comp);
201 if (priv_port) {
202 ret = bt_private_port_remove_from_component(priv_port);
203 if (ret) {
204 goto end;
205 }
206 }
207
208 bt_put(priv_port);
209 priv_port = bt_private_component_filter_get_default_output_private_port(
210 priv_comp);
211 if (priv_port) {
212 ret = bt_private_port_remove_from_component(priv_port);
213 if (ret) {
214 goto end;
215 }
216 }
217
218end:
219 bt_put(priv_port);
220 return ret;
221}
222
223static
224int create_output_port(struct bt_private_component *priv_comp)
225{
226 void *priv_port;
227 int ret = 0;
228
229 priv_port = bt_private_component_filter_add_output_private_port(
230 priv_comp, "out");
231 if (!priv_port) {
232 ret = -1;
233 }
234
235 bt_put(priv_port);
236 return ret;
237}
238
239static
240void destroy_muxer_comp(struct muxer_comp *muxer_comp)
241{
242 if (!muxer_comp) {
243 return;
244 }
245
246 if (muxer_comp->muxer_notif_iters) {
247 g_ptr_array_free(muxer_comp->muxer_notif_iters, TRUE);
248 }
249
250 g_free(muxer_comp);
251}
252
253BT_HIDDEN
254enum bt_component_status muxer_init(
255 struct bt_private_component *priv_comp,
256 struct bt_value *params, void *init_data)
257{
258 int ret;
259 enum bt_component_status status = BT_COMPONENT_STATUS_OK;
260 struct muxer_comp *muxer_comp = g_new0(struct muxer_comp, 1);
261
262 if (!muxer_comp) {
263 goto error;
264 }
265
266 muxer_comp->muxer_notif_iters = g_ptr_array_new();
267 if (!muxer_comp->muxer_notif_iters) {
268 goto error;
269 }
270
271 muxer_comp->priv_comp = priv_comp;
272 ret = bt_private_component_set_user_data(priv_comp, muxer_comp);
273 assert(ret == 0);
274 ret = remove_default_ports(priv_comp);
275 if (ret) {
276 goto error;
277 }
278
279 ret = ensure_available_input_port(priv_comp);
280 if (ret) {
281 goto error;
282 }
283
284 ret = create_output_port(priv_comp);
285 if (ret) {
286 goto error;
287 }
288
289 goto end;
290
291error:
292 destroy_muxer_comp(muxer_comp);
293 ret = bt_private_component_set_user_data(priv_comp, NULL);
294 assert(ret == 0);
295 status = BT_COMPONENT_STATUS_ERROR;
296
297end:
298 return status;
299}
300
301BT_HIDDEN
302void muxer_finalize(struct bt_private_component *priv_comp)
303{
304 struct muxer_comp *muxer_comp =
305 bt_private_component_get_user_data(priv_comp);
306
307 destroy_muxer_comp(muxer_comp);
308}
309
310static
311struct bt_notification_iterator *create_notif_iter_on_input_port(
312 struct bt_private_port *priv_port, int *ret)
313{
314 struct bt_port *port = bt_port_from_private_port(priv_port);
315 struct bt_notification_iterator *notif_iter = NULL;
316 struct bt_private_connection *priv_conn = NULL;
317
318 assert(ret);
319 *ret = 0;
320 assert(port);
321
322 assert(bt_port_is_connected(port));
323 priv_conn = bt_private_port_get_private_connection(priv_port);
324 if (!priv_conn) {
325 *ret = -1;
326 goto end;
327 }
328
ab11110e
PP
329 // TODO: Advance the iterator to >= the time of the latest
330 // returned notification by the muxer notification
331 // iterator which creates it.
958f7d11
PP
332 notif_iter = bt_private_connection_create_notification_iterator(
333 priv_conn);
334 if (!notif_iter) {
335 *ret = -1;
336 goto end;
337 }
338
339end:
340 bt_put(port);
341 bt_put(priv_conn);
342 return notif_iter;
343}
344
ab11110e
PP
345static
346int muxer_upstream_notif_iter_next(struct muxer_notif_iter *muxer_notif_iter,
347 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter)
348{
349 int ret = 0;
350 enum bt_notification_iterator_status next_status;
351
352 next_status = bt_notification_iterator_next(
353 muxer_upstream_notif_iter->notif_iter);
354
355 switch (next_status) {
356 case BT_NOTIFICATION_ITERATOR_STATUS_OK:
357 /* Everything okay */
358 break;
359 case BT_NOTIFICATION_ITERATOR_STATUS_AGAIN:
360 muxer_notif_iter_add_upstream_notif_iter_to_retry(
361 muxer_notif_iter, muxer_upstream_notif_iter);
362 break;
363 case BT_NOTIFICATION_ITERATOR_STATUS_END:
364 /*
365 * Notification iterator reached the end: release it. It
366 * won't be considered again to find the youngest
367 * notification.
368 */
369 BT_PUT(muxer_upstream_notif_iter->notif_iter);
370 goto end;
371 default:
372 /* Error or unsupported status code */
373 ret = next_status;
374 }
375
376end:
377 return ret;
378}
379
380static
381int muxer_notif_iter_handle_newly_connected_ports(struct muxer_comp *muxer_comp,
382 struct muxer_notif_iter *muxer_notif_iter)
383{
384 struct bt_component *comp = NULL;
385 int ret = 0;
386
387 comp = bt_component_from_private_component(muxer_comp->priv_comp);
388 assert(comp);
389
390 /*
391 * Here we create one upstream notification iterator for each
392 * newly connected port. The list of newly connected ports to
393 * handle here is updated by muxer_port_connected().
394 *
395 * An initial "next" operation is performed on each new upstream
396 * notification iterator. The possible return values of this
397 * initial "next" operation are:
398 *
399 * * BT_NOTIFICATION_ITERATOR_STATUS_OK: Perfect, we have a
400 * current notification.
401 *
402 * * BT_NOTIFICATION_ITERATOR_STATUS_AGAIN: No notification so
403 * far, but the muxer upstream notification iterator is added
404 * to the list of upstream notification iterators to retry
405 * before finding the next youngest notification.
406 *
407 * * BT_NOTIFICATION_ITERATOR_STATUS_END: No notification, and
408 * we immediately release the upstream notification iterator
409 * because it's useless.
410 *
411 * A possible side effect of this initial "next" operation, on
412 * each notification iterator, is the connection of a new port.
413 * In this case the list of newly connected ports is updated and
414 * this loop continues.
415 *
416 * Once this loop finishes successfully, the set of upstream
417 * notification iterators is considered _stable_, that is, it is
418 * safe, if no notification iterators must be retried, to select
419 * the youngest notification amongst them to be returned by the
420 * next "next" method call.
421 */
422 while (true) {
423 GList *node = muxer_notif_iter->newly_connected_priv_ports;
424 struct bt_private_port *priv_port;
425 struct bt_port *port;
426 struct bt_notification_iterator *upstream_notif_iter = NULL;
427 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter;
428
429 if (!node) {
430 break;
431 }
432
433 priv_port = node->data;
434 port = bt_port_from_private_port(priv_port);
435 assert(port);
436
437 if (!bt_port_is_connected(port)) {
438 /*
439 * Looks like this port is not connected
440 * anymore: we can't create an upstream
441 * notification iterator on its connection in
442 * this case.
443 */
444 goto remove_node;
445 }
446
447 BT_PUT(port);
448 upstream_notif_iter = create_notif_iter_on_input_port(priv_port,
449 &ret);
450 if (ret) {
451 assert(!upstream_notif_iter);
452 bt_put(priv_port);
453 goto error;
454 }
455
456 muxer_upstream_notif_iter =
457 muxer_notif_iter_add_upstream_notif_iter(
458 muxer_notif_iter, upstream_notif_iter,
459 priv_port);
460 BT_PUT(priv_port);
461 BT_PUT(upstream_notif_iter);
462 if (!muxer_upstream_notif_iter) {
463 goto error;
464 }
465
466 ret = muxer_upstream_notif_iter_next(muxer_notif_iter,
467 muxer_upstream_notif_iter);
468 if (ret) {
469 goto error;
470 }
471
472remove_node:
473 bt_put(upstream_notif_iter);
474 bt_put(port);
475 bt_put(priv_port);
476 muxer_notif_iter->newly_connected_priv_ports =
477 g_list_delete_link(
478 muxer_notif_iter->newly_connected_priv_ports,
479 node);
480 }
481
482 goto end;
483
484error:
485 if (ret == 0) {
486 ret = -1;
487 }
488
489end:
490 bt_put(comp);
491 return ret;
492}
493
958f7d11
PP
494static
495int get_notif_ts_ns(struct muxer_comp *muxer_comp,
496 struct bt_notification *notif, int64_t last_returned_ts_ns,
497 int64_t *ts_ns)
498{
499 struct bt_clock_class_priority_map *cc_prio_map = NULL;
500 struct bt_ctf_clock_class *clock_class = NULL;
501 struct bt_ctf_clock_value *clock_value = NULL;
502 struct bt_ctf_event *event = NULL;
503 int ret = 0;
504
505 assert(notif);
506 assert(ts_ns);
507
508 switch (bt_notification_get_type(notif)) {
509 case BT_NOTIFICATION_TYPE_EVENT:
510 cc_prio_map =
511 bt_notification_event_get_clock_class_priority_map(
512 notif);
513 break;
514
515 case BT_NOTIFICATION_TYPE_INACTIVITY:
516 cc_prio_map =
517 bt_notification_event_get_clock_class_priority_map(
518 notif);
519 break;
520 default:
521 /*
522 * All the other notifications have a higher
523 * priority.
524 */
525 *ts_ns = last_returned_ts_ns;
526 goto end;
527 }
528
529 if (!cc_prio_map) {
530 goto error;
531 }
532
533 /*
534 * If the clock class priority map is empty, then we consider
535 * that this notification has no time. In this case it's always
536 * the youngest.
537 */
538 if (bt_clock_class_priority_map_get_clock_class_count(cc_prio_map) == 0) {
539 *ts_ns = last_returned_ts_ns;
540 goto end;
541 }
542
543 clock_class =
544 bt_clock_class_priority_map_get_highest_priority_clock_class(
545 cc_prio_map);
546 if (!clock_class) {
547 goto error;
548 }
549
550 if (!bt_ctf_clock_class_get_is_absolute(clock_class)) {
ab11110e 551 // TODO: Allow this with an explicit parameter
958f7d11
PP
552 goto error;
553 }
554
555 switch (bt_notification_get_type(notif)) {
556 case BT_NOTIFICATION_TYPE_EVENT:
557 event = bt_notification_event_get_event(notif);
ab11110e 558 assert(event);
958f7d11
PP
559 clock_value = bt_ctf_event_get_clock_value(event,
560 clock_class);
561 break;
562 case BT_NOTIFICATION_TYPE_INACTIVITY:
563 clock_value = bt_notification_inactivity_get_clock_value(
564 notif, clock_class);
565 break;
566 default:
567 assert(false);
568 }
569
570 if (!clock_value) {
571 goto error;
572 }
573
574 ret = bt_ctf_clock_value_get_value_ns_from_epoch(clock_value, ts_ns);
575 if (ret) {
576 goto error;
577 }
578
579 goto end;
580
581error:
582 ret = -1;
583
584end:
585 bt_put(cc_prio_map);
586 bt_put(event);
587 bt_put(clock_class);
588 bt_put(clock_value);
589 return ret;
590}
591
ab11110e
PP
592/*
593 * This function finds the youngest available notification amongst the
594 * non-ended upstream notification iterators and returns the upstream
595 * notification iterator which has it, or
596 * BT_NOTIFICATION_ITERATOR_STATUS_END if there's no available
597 * notification.
598 *
599 * This function does NOT:
600 *
601 * * Update any upstream notification iterator.
602 * * Check for newly connected ports.
603 * * Check the upstream notification iterators to retry.
604 *
605 * On sucess, this function sets *muxer_upstream_notif_iter to the
606 * upstream notification iterator of which the current notification is
607 * the youngest, and sets *ts_ns to its time.
608 */
958f7d11
PP
609static
610enum bt_notification_iterator_status
611muxer_notif_iter_youngest_upstream_notif_iter(
612 struct muxer_comp *muxer_comp,
613 struct muxer_notif_iter *muxer_notif_iter,
614 struct muxer_upstream_notif_iter **muxer_upstream_notif_iter,
615 int64_t *ts_ns)
616{
617 size_t i;
618 int ret;
619 int64_t youngest_ts_ns = INT64_MAX;
620 enum bt_notification_iterator_status status =
621 BT_NOTIFICATION_ITERATOR_STATUS_OK;
622
623 assert(muxer_comp);
624 assert(muxer_notif_iter);
625 assert(muxer_upstream_notif_iter);
626 *muxer_upstream_notif_iter = NULL;
627
628 for (i = 0; i < muxer_notif_iter->muxer_upstream_notif_iters->len; i++) {
629 struct bt_notification *notif;
630 struct muxer_upstream_notif_iter *cur_muxer_upstream_notif_iter =
631 g_ptr_array_index(muxer_notif_iter->muxer_upstream_notif_iters, i);
632 int64_t notif_ts_ns;
633
634 if (!cur_muxer_upstream_notif_iter->notif_iter) {
ab11110e 635 /* This upstream notification iterator is ended */
958f7d11
PP
636 continue;
637 }
638
639 notif = bt_notification_iterator_get_notification(
640 cur_muxer_upstream_notif_iter->notif_iter);
641 assert(notif);
642 ret = get_notif_ts_ns(muxer_comp, notif,
643 muxer_notif_iter->last_returned_ts_ns, &notif_ts_ns);
644 bt_put(notif);
645 if (ret) {
646 *muxer_upstream_notif_iter = NULL;
647 status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
648 goto end;
649 }
650
651 if (notif_ts_ns <= youngest_ts_ns) {
652 *muxer_upstream_notif_iter =
653 cur_muxer_upstream_notif_iter;
654 youngest_ts_ns = notif_ts_ns;
655 *ts_ns = youngest_ts_ns;
656 }
657 }
658
659 if (!*muxer_upstream_notif_iter) {
660 status = BT_NOTIFICATION_ITERATOR_STATUS_END;
661 *ts_ns = INT64_MIN;
662 }
663
664end:
665 return status;
666}
667
668static
669int muxer_notif_iter_set_next_next_return(struct muxer_comp *muxer_comp,
670 struct muxer_notif_iter *muxer_notif_iter)
671{
672 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter;
958f7d11
PP
673 enum bt_notification_iterator_status notif_iter_status;
674 int ret = 0;
675
ab11110e
PP
676 /*
677 * Previous operations might have connected ports. They must be
678 * considered when finding the youngest notification because
679 * their upstream notification iterator does not exist yet.
680 */
681 ret = muxer_notif_iter_handle_newly_connected_ports(muxer_comp,
682 muxer_notif_iter);
683 if (ret) {
684 muxer_notif_iter->next_next_return.status =
685 BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
686 BT_PUT(muxer_notif_iter->next_next_return.notification);
687 goto end;
688 }
689
690 assert(!muxer_notif_iter->newly_connected_priv_ports);
691
692 if (muxer_notif_iter_has_upstream_notif_iter_to_retry(
693 muxer_notif_iter)) {
958f7d11
PP
694 /*
695 * At least one upstream notification iterator to retry:
ab11110e
PP
696 * try again later, because we cannot find the youngest
697 * notification if we don't have the current
698 * notification of each upstream notification iterator.
958f7d11
PP
699 */
700 muxer_notif_iter->next_next_return.status =
701 BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
702 BT_PUT(muxer_notif_iter->next_next_return.notification);
703 goto end;
704 }
705
706 /*
ab11110e
PP
707 * At this point we know that all our connected ports have an
708 * upstream notification iterator, and that all those iterators
709 * have a current notification (stable state). It is safe to
710 * find the youngest notification. It is possible that calling
711 * "next" on its iterator will connect new ports. This will be
712 * handled by the next call to
713 * muxer_notif_iter_set_next_next_return().
958f7d11
PP
714 */
715 notif_iter_status =
716 muxer_notif_iter_youngest_upstream_notif_iter(muxer_comp,
717 muxer_notif_iter, &muxer_upstream_notif_iter,
718 &muxer_notif_iter->next_next_return_ts_ns);
719 if (notif_iter_status == BT_NOTIFICATION_ITERATOR_STATUS_END) {
720 /* No more active upstream notification iterator */
721 muxer_notif_iter->next_next_return.status =
722 BT_NOTIFICATION_ITERATOR_STATUS_END;
723 BT_PUT(muxer_notif_iter->next_next_return.notification);
724 goto end;
725 }
726
727 if (notif_iter_status < 0) {
728 ret = -1;
729 goto end;
730 }
731
ab11110e
PP
732 assert(notif_iter_status == BT_NOTIFICATION_ITERATOR_STATUS_OK);
733 BT_PUT(muxer_notif_iter->next_next_return.notification);
734 muxer_notif_iter->next_next_return.notification =
735 bt_notification_iterator_get_notification(
736 muxer_upstream_notif_iter->notif_iter);
737 assert(muxer_notif_iter->next_next_return.notification);
958f7d11
PP
738 muxer_notif_iter->next_next_return.status =
739 BT_NOTIFICATION_ITERATOR_STATUS_OK;
ab11110e
PP
740 ret = muxer_upstream_notif_iter_next(muxer_notif_iter,
741 muxer_upstream_notif_iter);
742 if (ret) {
958f7d11
PP
743 goto end;
744 }
745
958f7d11
PP
746 /*
747 * Here we have the next "next" return value. It won't change
748 * until it is returned by the next call to our "next" method.
749 * If its time is less than the time of the last notification
750 * that our "next" method returned, then fail because the
751 * muxer's output wouldn't be monotonic.
752 */
753 if (muxer_notif_iter->next_next_return_ts_ns <
754 muxer_notif_iter->last_returned_ts_ns) {
755 ret = -1;
756 goto end;
757 }
758
759 /*
760 * We are now sure that the next "next" return value will not
761 * change until it is returned by this muxer notification
ab11110e
PP
762 * iterator (unless there's a fatal error). It is now safe to
763 * set the last returned time to this one.
958f7d11
PP
764 */
765 muxer_notif_iter->last_returned_ts_ns =
766 muxer_notif_iter->next_next_return_ts_ns;
767
768end:
769 return ret;
770}
771
772static
ab11110e
PP
773void destroy_muxer_notif_iter(struct muxer_notif_iter *muxer_notif_iter)
774{
775 GList *node;
776
777 if (!muxer_notif_iter) {
778 return;
779 }
780
781 if (muxer_notif_iter->muxer_upstream_notif_iters) {
782 g_ptr_array_free(
783 muxer_notif_iter->muxer_upstream_notif_iters, TRUE);
784 }
785
786 if (muxer_notif_iter->muxer_upstream_notif_iters_to_retry) {
787 g_list_free(muxer_notif_iter->muxer_upstream_notif_iters_to_retry);
788 }
789
790 for (node = muxer_notif_iter->newly_connected_priv_ports;
791 node; node = g_list_next(node)) {
792 bt_put(node->data);
793 }
794
795 g_list_free(muxer_notif_iter->newly_connected_priv_ports);
796 g_free(muxer_notif_iter);
797}
798
799static
800int muxer_notif_iter_init_newly_connected_ports(struct muxer_comp *muxer_comp,
958f7d11
PP
801 struct muxer_notif_iter *muxer_notif_iter)
802{
ab11110e 803 struct bt_component *comp;
958f7d11 804 uint64_t count;
ab11110e
PP
805 uint64_t i;
806 int ret = 0;
958f7d11 807
ab11110e
PP
808 /*
809 * Add the connected input ports to this muxer notification
810 * iterator's list of newly connected ports. They will be
811 * handled by muxer_notif_iter_handle_newly_connected_ports().
812 */
958f7d11
PP
813 comp = bt_component_from_private_component(muxer_comp->priv_comp);
814 assert(comp);
815 ret = bt_component_filter_get_input_port_count(comp, &count);
ab11110e
PP
816 if (ret) {
817 goto end;
818 }
958f7d11
PP
819
820 for (i = 0; i < count; i++) {
821 struct bt_private_port *priv_port =
822 bt_private_component_filter_get_input_private_port_at_index(
823 muxer_comp->priv_comp, i);
824 struct bt_port *port;
958f7d11
PP
825
826 assert(priv_port);
958f7d11 827 port = bt_port_from_private_port(priv_port);
ab11110e 828 assert(port);
958f7d11
PP
829
830 if (!bt_port_is_connected(port)) {
958f7d11 831 bt_put(priv_port);
ab11110e 832 bt_put(port);
958f7d11
PP
833 continue;
834 }
835
836 bt_put(port);
ab11110e
PP
837 muxer_notif_iter->newly_connected_priv_ports =
838 g_list_append(
839 muxer_notif_iter->newly_connected_priv_ports,
958f7d11 840 priv_port);
ab11110e 841 if (!muxer_notif_iter->newly_connected_priv_ports) {
958f7d11 842 bt_put(priv_port);
ab11110e
PP
843 ret = -1;
844 goto end;
958f7d11 845 }
958f7d11
PP
846 }
847
848end:
849 bt_put(comp);
850 return ret;
851}
852
958f7d11
PP
853BT_HIDDEN
854enum bt_notification_iterator_status muxer_notif_iter_init(
855 struct bt_private_notification_iterator *priv_notif_iter,
856 struct bt_private_port *output_priv_port)
857{
858 struct muxer_comp *muxer_comp = NULL;
859 struct muxer_notif_iter *muxer_notif_iter = NULL;
860 struct bt_private_component *priv_comp = NULL;
861 enum bt_notification_iterator_status status =
862 BT_NOTIFICATION_ITERATOR_STATUS_OK;
863 int ret;
864
865 priv_comp = bt_private_notification_iterator_get_private_component(
866 priv_notif_iter);
867 assert(priv_comp);
868 muxer_comp = bt_private_component_get_user_data(priv_comp);
869 assert(muxer_comp);
870 muxer_notif_iter = g_new0(struct muxer_notif_iter, 1);
871 if (!muxer_notif_iter) {
872 goto error;
873 }
874
ab11110e
PP
875 ret = muxer_notif_iter_init_newly_connected_ports(muxer_comp,
876 muxer_notif_iter);
877 if (ret) {
878 goto error;
879 }
880
958f7d11
PP
881 muxer_notif_iter->last_returned_ts_ns = INT64_MIN;
882 muxer_notif_iter->muxer_upstream_notif_iters =
883 g_ptr_array_new_with_free_func(
884 (GDestroyNotify) destroy_muxer_upstream_notif_iter);
885 if (!muxer_notif_iter->muxer_upstream_notif_iters) {
886 goto error;
887 }
888
ab11110e 889 /* Set the initial "next" return value */
958f7d11
PP
890 ret = muxer_notif_iter_set_next_next_return(muxer_comp,
891 muxer_notif_iter);
892 if (ret) {
893 goto error;
894 }
895
896 ret = bt_private_notification_iterator_set_user_data(priv_notif_iter,
897 muxer_notif_iter);
898 assert(ret == 0);
899 g_ptr_array_add(muxer_comp->muxer_notif_iters, muxer_notif_iter);
900 goto end;
901
902error:
903 destroy_muxer_notif_iter(muxer_notif_iter);
904 ret = bt_private_notification_iterator_set_user_data(priv_notif_iter,
905 NULL);
906 assert(ret == 0);
907 status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
908
909end:
910 bt_put(priv_comp);
911 return status;
912}
913
914BT_HIDDEN
915void muxer_notif_iter_finalize(
916 struct bt_private_notification_iterator *priv_notif_iter)
917{
918 struct muxer_notif_iter *muxer_notif_iter =
919 bt_private_notification_iterator_get_user_data(priv_notif_iter);
920 struct bt_private_component *priv_comp = NULL;
921 struct muxer_comp *muxer_comp = NULL;
922
923 priv_comp = bt_private_notification_iterator_get_private_component(
924 priv_notif_iter);
925 assert(priv_comp);
926 muxer_comp = bt_private_component_get_user_data(priv_comp);
927
928 if (muxer_comp) {
929 (void) g_ptr_array_remove_fast(muxer_comp->muxer_notif_iters,
930 muxer_notif_iter);
931 destroy_muxer_notif_iter(muxer_notif_iter);
932 }
933
934 bt_put(priv_comp);
935}
936
937BT_HIDDEN
938struct bt_notification_iterator_next_return muxer_notif_iter_next(
939 struct bt_private_notification_iterator *priv_notif_iter)
940{
941 struct bt_notification_iterator_next_return next_ret = {
942 .notification = NULL,
943 };
944 struct muxer_notif_iter *muxer_notif_iter =
945 bt_private_notification_iterator_get_user_data(priv_notif_iter);
946 struct bt_private_component *priv_comp = NULL;
947 struct muxer_comp *muxer_comp = NULL;
ab11110e 948 GList *retry_node;
958f7d11
PP
949 int ret;
950
951 assert(muxer_notif_iter);
952 priv_comp = bt_private_notification_iterator_get_private_component(
953 priv_notif_iter);
954 assert(priv_comp);
955 muxer_comp = bt_private_component_get_user_data(priv_comp);
956 assert(muxer_comp);
957
958 /* Are we in an error state set elsewhere? */
959 if (unlikely(muxer_comp->error)) {
960 goto error;
961 }
962
963 /*
964 * If we have upstream notification iterators to retry, retry
965 * them now. Each one we find which now has a notification or
966 * is in "end" state, we set it to NULL in this array. Then
967 * we remove all the NULL values from this array.
968 */
ab11110e
PP
969 retry_node = muxer_notif_iter->muxer_upstream_notif_iters_to_retry;
970 while (retry_node) {
958f7d11 971 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter =
ab11110e 972 retry_node->data;
958f7d11 973 enum bt_notification_iterator_status status;
ab11110e 974 GList *next_retry_node = g_list_next(retry_node);
958f7d11
PP
975
976 assert(muxer_upstream_notif_iter->notif_iter);
977 status = bt_notification_iterator_next(
978 muxer_upstream_notif_iter->notif_iter);
979 if (status < 0) {
958f7d11
PP
980 goto error;
981 }
982
983 if (status == BT_NOTIFICATION_ITERATOR_STATUS_END) {
984 /*
985 * This upstream notification iterator is done.
ab11110e 986 * Put the iterator and remove node from list.
958f7d11 987 */
958f7d11 988 BT_PUT(muxer_upstream_notif_iter->notif_iter);
ab11110e
PP
989 muxer_notif_iter->muxer_upstream_notif_iters_to_retry =
990 g_list_delete_link(
991 muxer_notif_iter->muxer_upstream_notif_iters_to_retry,
992 retry_node);
993 retry_node = next_retry_node;
958f7d11
PP
994 continue;
995 }
996
997 assert(status == BT_NOTIFICATION_ITERATOR_STATUS_OK ||
998 status == BT_NOTIFICATION_ITERATOR_STATUS_AGAIN);
999
1000 if (status == BT_NOTIFICATION_ITERATOR_STATUS_OK) {
1001 /*
1002 * This upstream notification iterator now has.
ab11110e 1003 * a notification. Remove it from this list.
958f7d11 1004 */
ab11110e
PP
1005 muxer_notif_iter->muxer_upstream_notif_iters_to_retry =
1006 g_list_delete_link(
1007 muxer_notif_iter->muxer_upstream_notif_iters_to_retry,
1008 retry_node);
958f7d11 1009 }
958f7d11 1010
ab11110e
PP
1011 retry_node = next_retry_node;
1012 }
958f7d11
PP
1013
1014 /* Take our next "next" next return value */
1015 next_ret = muxer_notif_iter->next_next_return;
1016 muxer_notif_iter->next_next_return.status =
1017 BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
1018 muxer_notif_iter->next_next_return.notification = NULL;
1019
1020 /* Set the next "next" return value */
1021 ret = muxer_notif_iter_set_next_next_return(muxer_comp,
1022 muxer_notif_iter);
1023 if (ret) {
1024 goto error;
1025 }
1026
1027 goto end;
1028
1029error:
ab11110e
PP
1030 /*
1031 * Technically we already have a next "next" return value which
1032 * is ready for this call, but we're failing within this call,
1033 * so discard this buffer and return the error ASAP.
1034 */
958f7d11
PP
1035 BT_PUT(next_ret.notification);
1036 next_ret.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
1037
1038end:
1039 bt_put(priv_comp);
1040 return next_ret;
1041}
1042
1043BT_HIDDEN
1044void muxer_port_connected(
1045 struct bt_private_component *priv_comp,
1046 struct bt_private_port *self_private_port,
1047 struct bt_port *other_port)
1048{
1049 struct bt_port *self_port =
1050 bt_port_from_private_port(self_private_port);
1051 struct muxer_comp *muxer_comp =
1052 bt_private_component_get_user_data(priv_comp);
1053 size_t i;
958f7d11
PP
1054
1055 assert(self_port);
1056 assert(muxer_comp);
1057
1058 if (bt_port_get_type(self_port) == BT_PORT_TYPE_INPUT) {
1059 int ret;
1060
1061 /* One less available input port */
1062 muxer_comp->available_input_ports--;
1063 ret = ensure_available_input_port(priv_comp);
1064 if (ret) {
1065 muxer_comp->error = true;
1066 goto end;
1067 }
1068 }
1069
1070 for (i = 0; i < muxer_comp->muxer_notif_iters->len; i++) {
1071 struct muxer_notif_iter *muxer_notif_iter =
1072 g_ptr_array_index(muxer_comp->muxer_notif_iters, i);
1073
1074 /*
ab11110e
PP
1075 * Add this port to the list of newly connected ports
1076 * for this muxer notification iterator. We append at
1077 * the end of this list while
1078 * muxer_notif_iter_handle_newly_connected_ports()
1079 * removes the nodes from the beginning.
1080 *
1081 * The list node owns the private port.
958f7d11 1082 */
ab11110e
PP
1083 muxer_notif_iter->newly_connected_priv_ports =
1084 g_list_append(
1085 muxer_notif_iter->newly_connected_priv_ports,
1086 bt_get(self_private_port));
1087 if (!muxer_notif_iter->newly_connected_priv_ports) {
1088 bt_put(self_private_port);
958f7d11
PP
1089 muxer_comp->error = true;
1090 goto end;
1091 }
1092 }
1093
1094end:
1095 bt_put(self_port);
1096}
1097
1098BT_HIDDEN
1099void muxer_port_disconnected(struct bt_private_component *priv_comp,
1100 struct bt_private_port *priv_port)
1101{
1102 struct bt_port *port = bt_port_from_private_port(priv_port);
1103 struct muxer_comp *muxer_comp =
1104 bt_private_component_get_user_data(priv_comp);
1105
1106 assert(port);
1107 assert(muxer_comp);
1108
ab11110e
PP
1109 /*
1110 * There's nothing special to do when a port is disconnected
1111 * because this component deals with upstream notification
1112 * iterators which were already created thanks to connected
1113 * ports. The fact that the port is disconnected does not cancel
1114 * the upstream notification iterators created using its
1115 * connection: they still exist. The only way to remove an
1116 * upstream notification iterator is for its "next" operation to
1117 * return BT_NOTIFICATION_ITERATOR_STATUS_END.
1118 */
958f7d11
PP
1119 if (bt_port_get_type(port) == BT_PORT_TYPE_INPUT) {
1120 /* One more available input port */
1121 muxer_comp->available_input_ports++;
1122 }
1123
1124 bt_put(port);
1125}
This page took 0.066946 seconds and 4 git commands to generate.