ISO C: empty file needs at least one declaration
[babeltrace.git] / plugins / utils / muxer / muxer.c
CommitLineData
958f7d11
PP
1/*
2 * Copyright 2017 Philippe Proulx <pproulx@efficios.com>
3 *
4 * Permission is hereby granted, free of charge, to any person obtaining a copy
5 * of this software and associated documentation files (the "Software"), to deal
6 * in the Software without restriction, including without limitation the rights
7 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 * copies of the Software, and to permit persons to whom the Software is
9 * furnished to do so, subject to the following conditions:
10 *
11 * The above copyright notice and this permission notice shall be included in
12 * all copies or substantial portions of the Software.
13 *
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20 * SOFTWARE.
21 */
22
23#include <babeltrace/babeltrace-internal.h>
24#include <babeltrace/ctf-ir/clock-class.h>
25#include <babeltrace/ctf-ir/event.h>
26#include <babeltrace/graph/clock-class-priority-map.h>
27#include <babeltrace/graph/component-filter.h>
28#include <babeltrace/graph/component.h>
29#include <babeltrace/graph/notification-event.h>
30#include <babeltrace/graph/notification-inactivity.h>
31#include <babeltrace/graph/notification-iterator.h>
32#include <babeltrace/graph/notification.h>
33#include <babeltrace/graph/port.h>
34#include <babeltrace/graph/private-component-filter.h>
35#include <babeltrace/graph/private-component.h>
36#include <babeltrace/graph/private-component.h>
37#include <babeltrace/graph/private-connection.h>
38#include <babeltrace/graph/private-notification-iterator.h>
39#include <babeltrace/graph/private-port.h>
40#include <plugins-common.h>
41#include <glib.h>
42#include <assert.h>
43
44struct muxer_comp {
45 /* Array of struct bt_private_notification_iterator * (weak refs) */
46 GPtrArray *muxer_notif_iters;
47
48 /* Weak ref */
49 struct bt_private_component *priv_comp;
50 unsigned int next_port_num;
51 size_t available_input_ports;
52 bool error;
a09c6b95 53 bool initializing_muxer_notif_iter;
958f7d11
PP
54};
55
56struct muxer_upstream_notif_iter {
ab11110e 57 /* Owned by this, NULL if ended */
958f7d11
PP
58 struct bt_notification_iterator *notif_iter;
59
60 /* Owned by this*/
61 struct bt_private_port *priv_port;
62};
63
958f7d11 64struct muxer_notif_iter {
ab11110e
PP
65 /*
66 * Array of struct muxer_upstream_notif_iter * (owned by this).
67 *
68 * NOTE: This array is searched in linearly to find the youngest
69 * current notification. Keep this until benchmarks confirm that
70 * another data structure is faster than this for our typical
71 * use cases.
72 */
958f7d11
PP
73 GPtrArray *muxer_upstream_notif_iters;
74
75 /* Array of struct muxer_upstream_notif_iter * (weak refs) */
ab11110e
PP
76 GList *muxer_upstream_notif_iters_to_retry;
77
78 /*
79 * List of "recently" connected input ports (owned by this) to
80 * handle by this muxer notification iterator.
81 * muxer_port_connected() adds entries to this list, and the
82 * entries are removed when a notification iterator is created
83 * on the port's connection and put into
84 * muxer_upstream_notif_iters above by
85 * muxer_notif_iter_handle_newly_connected_ports().
86 */
87 GList *newly_connected_priv_ports;
958f7d11
PP
88
89 /* Next thing to return by the "next" method */
90 struct bt_notification_iterator_next_return next_next_return;
91 int64_t next_next_return_ts_ns;
92
93 /* Last time returned in a notification */
94 int64_t last_returned_ts_ns;
95};
96
ab11110e
PP
97static
98void destroy_muxer_upstream_notif_iter(
99 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter)
100{
101 if (!muxer_upstream_notif_iter) {
102 return;
103 }
104
105 bt_put(muxer_upstream_notif_iter->notif_iter);
106 bt_put(muxer_upstream_notif_iter->priv_port);
107 g_free(muxer_upstream_notif_iter);
108}
109
958f7d11
PP
110static
111struct muxer_upstream_notif_iter *muxer_notif_iter_add_upstream_notif_iter(
112 struct muxer_notif_iter *muxer_notif_iter,
113 struct bt_notification_iterator *notif_iter,
114 struct bt_private_port *priv_port)
115{
116 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter =
117 g_new0(struct muxer_upstream_notif_iter, 1);
118
119 if (!muxer_upstream_notif_iter) {
120 goto end;
121 }
122
123 muxer_upstream_notif_iter->notif_iter = bt_get(notif_iter);
124 muxer_upstream_notif_iter->priv_port = bt_get(priv_port);
125 g_ptr_array_add(muxer_notif_iter->muxer_upstream_notif_iters,
126 muxer_upstream_notif_iter);
127
128end:
129 return muxer_upstream_notif_iter;
130}
131
132static inline
133bool muxer_notif_iter_has_upstream_notif_iter_to_retry(
134 struct muxer_notif_iter *muxer_notif_iter)
135{
136 assert(muxer_notif_iter);
ab11110e 137 return muxer_notif_iter->muxer_upstream_notif_iters_to_retry != NULL;
958f7d11
PP
138}
139
140static
141void muxer_notif_iter_add_upstream_notif_iter_to_retry(
142 struct muxer_notif_iter *muxer_notif_iter,
143 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter)
144{
145 assert(muxer_notif_iter);
146 assert(muxer_upstream_notif_iter);
ab11110e
PP
147 muxer_notif_iter->muxer_upstream_notif_iters_to_retry =
148 g_list_append(
149 muxer_notif_iter->muxer_upstream_notif_iters_to_retry,
150 muxer_upstream_notif_iter);
958f7d11
PP
151}
152
153static
154int ensure_available_input_port(struct bt_private_component *priv_comp)
155{
156 struct muxer_comp *muxer_comp =
157 bt_private_component_get_user_data(priv_comp);
158 int ret = 0;
159 GString *port_name = NULL;
160 void *priv_port = NULL;
161
162 assert(muxer_comp);
163
164 if (muxer_comp->available_input_ports >= 1) {
165 goto end;
166 }
167
168 port_name = g_string_new("in");
169 if (!port_name) {
170 ret = -1;
171 goto end;
172 }
173
174 g_string_append_printf(port_name, "%u", muxer_comp->next_port_num);
175 priv_port = bt_private_component_filter_add_input_private_port(
176 priv_comp, port_name->str);
177 if (!priv_port) {
178 ret = -1;
179 goto end;
180 }
181
182 muxer_comp->available_input_ports++;
183 muxer_comp->next_port_num++;
184
185end:
186 if (port_name) {
187 g_string_free(port_name, TRUE);
188 }
189
190 BT_PUT(priv_port);
191 return ret;
192}
193
194static
195int remove_default_ports(struct bt_private_component *priv_comp)
196{
197 struct bt_private_port *priv_port;
198 int ret = 0;
199
200 priv_port = bt_private_component_filter_get_default_input_private_port(
201 priv_comp);
202 if (priv_port) {
203 ret = bt_private_port_remove_from_component(priv_port);
204 if (ret) {
205 goto end;
206 }
207 }
208
209 bt_put(priv_port);
210 priv_port = bt_private_component_filter_get_default_output_private_port(
211 priv_comp);
212 if (priv_port) {
213 ret = bt_private_port_remove_from_component(priv_port);
214 if (ret) {
215 goto end;
216 }
217 }
218
219end:
220 bt_put(priv_port);
221 return ret;
222}
223
224static
225int create_output_port(struct bt_private_component *priv_comp)
226{
227 void *priv_port;
228 int ret = 0;
229
230 priv_port = bt_private_component_filter_add_output_private_port(
231 priv_comp, "out");
232 if (!priv_port) {
233 ret = -1;
234 }
235
236 bt_put(priv_port);
237 return ret;
238}
239
240static
241void destroy_muxer_comp(struct muxer_comp *muxer_comp)
242{
243 if (!muxer_comp) {
244 return;
245 }
246
247 if (muxer_comp->muxer_notif_iters) {
248 g_ptr_array_free(muxer_comp->muxer_notif_iters, TRUE);
249 }
250
251 g_free(muxer_comp);
252}
253
254BT_HIDDEN
255enum bt_component_status muxer_init(
256 struct bt_private_component *priv_comp,
257 struct bt_value *params, void *init_data)
258{
259 int ret;
260 enum bt_component_status status = BT_COMPONENT_STATUS_OK;
261 struct muxer_comp *muxer_comp = g_new0(struct muxer_comp, 1);
262
263 if (!muxer_comp) {
264 goto error;
265 }
266
267 muxer_comp->muxer_notif_iters = g_ptr_array_new();
268 if (!muxer_comp->muxer_notif_iters) {
269 goto error;
270 }
271
272 muxer_comp->priv_comp = priv_comp;
273 ret = bt_private_component_set_user_data(priv_comp, muxer_comp);
274 assert(ret == 0);
275 ret = remove_default_ports(priv_comp);
276 if (ret) {
277 goto error;
278 }
279
280 ret = ensure_available_input_port(priv_comp);
281 if (ret) {
282 goto error;
283 }
284
285 ret = create_output_port(priv_comp);
286 if (ret) {
287 goto error;
288 }
289
290 goto end;
291
292error:
293 destroy_muxer_comp(muxer_comp);
294 ret = bt_private_component_set_user_data(priv_comp, NULL);
295 assert(ret == 0);
296 status = BT_COMPONENT_STATUS_ERROR;
297
298end:
299 return status;
300}
301
302BT_HIDDEN
303void muxer_finalize(struct bt_private_component *priv_comp)
304{
305 struct muxer_comp *muxer_comp =
306 bt_private_component_get_user_data(priv_comp);
307
308 destroy_muxer_comp(muxer_comp);
309}
310
311static
312struct bt_notification_iterator *create_notif_iter_on_input_port(
313 struct bt_private_port *priv_port, int *ret)
314{
315 struct bt_port *port = bt_port_from_private_port(priv_port);
316 struct bt_notification_iterator *notif_iter = NULL;
317 struct bt_private_connection *priv_conn = NULL;
318
319 assert(ret);
320 *ret = 0;
321 assert(port);
322
323 assert(bt_port_is_connected(port));
324 priv_conn = bt_private_port_get_private_connection(priv_port);
325 if (!priv_conn) {
326 *ret = -1;
327 goto end;
328 }
329
ab11110e
PP
330 // TODO: Advance the iterator to >= the time of the latest
331 // returned notification by the muxer notification
332 // iterator which creates it.
958f7d11
PP
333 notif_iter = bt_private_connection_create_notification_iterator(
334 priv_conn);
335 if (!notif_iter) {
336 *ret = -1;
337 goto end;
338 }
339
340end:
341 bt_put(port);
342 bt_put(priv_conn);
343 return notif_iter;
344}
345
ab11110e
PP
346static
347int muxer_upstream_notif_iter_next(struct muxer_notif_iter *muxer_notif_iter,
348 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter)
349{
350 int ret = 0;
351 enum bt_notification_iterator_status next_status;
352
353 next_status = bt_notification_iterator_next(
354 muxer_upstream_notif_iter->notif_iter);
355
356 switch (next_status) {
357 case BT_NOTIFICATION_ITERATOR_STATUS_OK:
358 /* Everything okay */
359 break;
360 case BT_NOTIFICATION_ITERATOR_STATUS_AGAIN:
361 muxer_notif_iter_add_upstream_notif_iter_to_retry(
362 muxer_notif_iter, muxer_upstream_notif_iter);
363 break;
364 case BT_NOTIFICATION_ITERATOR_STATUS_END:
365 /*
366 * Notification iterator reached the end: release it. It
367 * won't be considered again to find the youngest
368 * notification.
369 */
370 BT_PUT(muxer_upstream_notif_iter->notif_iter);
371 goto end;
372 default:
373 /* Error or unsupported status code */
374 ret = next_status;
375 }
376
377end:
378 return ret;
379}
380
381static
382int muxer_notif_iter_handle_newly_connected_ports(struct muxer_comp *muxer_comp,
383 struct muxer_notif_iter *muxer_notif_iter)
384{
385 struct bt_component *comp = NULL;
386 int ret = 0;
387
388 comp = bt_component_from_private_component(muxer_comp->priv_comp);
389 assert(comp);
390
391 /*
392 * Here we create one upstream notification iterator for each
393 * newly connected port. The list of newly connected ports to
394 * handle here is updated by muxer_port_connected().
395 *
396 * An initial "next" operation is performed on each new upstream
397 * notification iterator. The possible return values of this
398 * initial "next" operation are:
399 *
400 * * BT_NOTIFICATION_ITERATOR_STATUS_OK: Perfect, we have a
401 * current notification.
402 *
403 * * BT_NOTIFICATION_ITERATOR_STATUS_AGAIN: No notification so
404 * far, but the muxer upstream notification iterator is added
405 * to the list of upstream notification iterators to retry
406 * before finding the next youngest notification.
407 *
408 * * BT_NOTIFICATION_ITERATOR_STATUS_END: No notification, and
409 * we immediately release the upstream notification iterator
410 * because it's useless.
411 *
412 * A possible side effect of this initial "next" operation, on
413 * each notification iterator, is the connection of a new port.
414 * In this case the list of newly connected ports is updated and
415 * this loop continues.
416 *
417 * Once this loop finishes successfully, the set of upstream
418 * notification iterators is considered _stable_, that is, it is
419 * safe, if no notification iterators must be retried, to select
420 * the youngest notification amongst them to be returned by the
421 * next "next" method call.
422 */
423 while (true) {
424 GList *node = muxer_notif_iter->newly_connected_priv_ports;
425 struct bt_private_port *priv_port;
426 struct bt_port *port;
427 struct bt_notification_iterator *upstream_notif_iter = NULL;
428 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter;
429
430 if (!node) {
431 break;
432 }
433
434 priv_port = node->data;
435 port = bt_port_from_private_port(priv_port);
436 assert(port);
437
438 if (!bt_port_is_connected(port)) {
439 /*
440 * Looks like this port is not connected
441 * anymore: we can't create an upstream
442 * notification iterator on its connection in
443 * this case.
444 */
445 goto remove_node;
446 }
447
448 BT_PUT(port);
449 upstream_notif_iter = create_notif_iter_on_input_port(priv_port,
450 &ret);
451 if (ret) {
452 assert(!upstream_notif_iter);
453 bt_put(priv_port);
454 goto error;
455 }
456
457 muxer_upstream_notif_iter =
458 muxer_notif_iter_add_upstream_notif_iter(
459 muxer_notif_iter, upstream_notif_iter,
460 priv_port);
461 BT_PUT(priv_port);
462 BT_PUT(upstream_notif_iter);
463 if (!muxer_upstream_notif_iter) {
464 goto error;
465 }
466
467 ret = muxer_upstream_notif_iter_next(muxer_notif_iter,
468 muxer_upstream_notif_iter);
469 if (ret) {
470 goto error;
471 }
472
473remove_node:
474 bt_put(upstream_notif_iter);
475 bt_put(port);
476 bt_put(priv_port);
477 muxer_notif_iter->newly_connected_priv_ports =
478 g_list_delete_link(
479 muxer_notif_iter->newly_connected_priv_ports,
480 node);
481 }
482
483 goto end;
484
485error:
486 if (ret == 0) {
487 ret = -1;
488 }
489
490end:
491 bt_put(comp);
492 return ret;
493}
494
958f7d11
PP
495static
496int get_notif_ts_ns(struct muxer_comp *muxer_comp,
497 struct bt_notification *notif, int64_t last_returned_ts_ns,
498 int64_t *ts_ns)
499{
500 struct bt_clock_class_priority_map *cc_prio_map = NULL;
501 struct bt_ctf_clock_class *clock_class = NULL;
502 struct bt_ctf_clock_value *clock_value = NULL;
503 struct bt_ctf_event *event = NULL;
504 int ret = 0;
505
506 assert(notif);
507 assert(ts_ns);
508
509 switch (bt_notification_get_type(notif)) {
510 case BT_NOTIFICATION_TYPE_EVENT:
511 cc_prio_map =
512 bt_notification_event_get_clock_class_priority_map(
513 notif);
514 break;
515
516 case BT_NOTIFICATION_TYPE_INACTIVITY:
517 cc_prio_map =
a09c6b95 518 bt_notification_inactivity_get_clock_class_priority_map(
958f7d11
PP
519 notif);
520 break;
521 default:
522 /*
523 * All the other notifications have a higher
524 * priority.
525 */
526 *ts_ns = last_returned_ts_ns;
527 goto end;
528 }
529
530 if (!cc_prio_map) {
531 goto error;
532 }
533
534 /*
535 * If the clock class priority map is empty, then we consider
536 * that this notification has no time. In this case it's always
537 * the youngest.
538 */
539 if (bt_clock_class_priority_map_get_clock_class_count(cc_prio_map) == 0) {
540 *ts_ns = last_returned_ts_ns;
541 goto end;
542 }
543
544 clock_class =
545 bt_clock_class_priority_map_get_highest_priority_clock_class(
546 cc_prio_map);
547 if (!clock_class) {
548 goto error;
549 }
550
acd6aeb1 551 if (!bt_ctf_clock_class_is_absolute(clock_class)) {
ab11110e 552 // TODO: Allow this with an explicit parameter
958f7d11
PP
553 goto error;
554 }
555
556 switch (bt_notification_get_type(notif)) {
557 case BT_NOTIFICATION_TYPE_EVENT:
558 event = bt_notification_event_get_event(notif);
ab11110e 559 assert(event);
958f7d11
PP
560 clock_value = bt_ctf_event_get_clock_value(event,
561 clock_class);
562 break;
563 case BT_NOTIFICATION_TYPE_INACTIVITY:
564 clock_value = bt_notification_inactivity_get_clock_value(
565 notif, clock_class);
566 break;
567 default:
568 assert(false);
569 }
570
571 if (!clock_value) {
572 goto error;
573 }
574
575 ret = bt_ctf_clock_value_get_value_ns_from_epoch(clock_value, ts_ns);
576 if (ret) {
577 goto error;
578 }
579
580 goto end;
581
582error:
583 ret = -1;
584
585end:
586 bt_put(cc_prio_map);
587 bt_put(event);
588 bt_put(clock_class);
589 bt_put(clock_value);
590 return ret;
591}
592
ab11110e
PP
593/*
594 * This function finds the youngest available notification amongst the
595 * non-ended upstream notification iterators and returns the upstream
596 * notification iterator which has it, or
597 * BT_NOTIFICATION_ITERATOR_STATUS_END if there's no available
598 * notification.
599 *
600 * This function does NOT:
601 *
602 * * Update any upstream notification iterator.
603 * * Check for newly connected ports.
604 * * Check the upstream notification iterators to retry.
605 *
606 * On sucess, this function sets *muxer_upstream_notif_iter to the
607 * upstream notification iterator of which the current notification is
608 * the youngest, and sets *ts_ns to its time.
609 */
958f7d11
PP
610static
611enum bt_notification_iterator_status
612muxer_notif_iter_youngest_upstream_notif_iter(
613 struct muxer_comp *muxer_comp,
614 struct muxer_notif_iter *muxer_notif_iter,
615 struct muxer_upstream_notif_iter **muxer_upstream_notif_iter,
616 int64_t *ts_ns)
617{
618 size_t i;
619 int ret;
620 int64_t youngest_ts_ns = INT64_MAX;
621 enum bt_notification_iterator_status status =
622 BT_NOTIFICATION_ITERATOR_STATUS_OK;
623
624 assert(muxer_comp);
625 assert(muxer_notif_iter);
626 assert(muxer_upstream_notif_iter);
627 *muxer_upstream_notif_iter = NULL;
628
629 for (i = 0; i < muxer_notif_iter->muxer_upstream_notif_iters->len; i++) {
630 struct bt_notification *notif;
631 struct muxer_upstream_notif_iter *cur_muxer_upstream_notif_iter =
632 g_ptr_array_index(muxer_notif_iter->muxer_upstream_notif_iters, i);
633 int64_t notif_ts_ns;
634
635 if (!cur_muxer_upstream_notif_iter->notif_iter) {
ab11110e 636 /* This upstream notification iterator is ended */
958f7d11
PP
637 continue;
638 }
639
640 notif = bt_notification_iterator_get_notification(
641 cur_muxer_upstream_notif_iter->notif_iter);
642 assert(notif);
643 ret = get_notif_ts_ns(muxer_comp, notif,
644 muxer_notif_iter->last_returned_ts_ns, &notif_ts_ns);
645 bt_put(notif);
646 if (ret) {
647 *muxer_upstream_notif_iter = NULL;
648 status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
649 goto end;
650 }
651
652 if (notif_ts_ns <= youngest_ts_ns) {
653 *muxer_upstream_notif_iter =
654 cur_muxer_upstream_notif_iter;
655 youngest_ts_ns = notif_ts_ns;
656 *ts_ns = youngest_ts_ns;
657 }
658 }
659
660 if (!*muxer_upstream_notif_iter) {
661 status = BT_NOTIFICATION_ITERATOR_STATUS_END;
662 *ts_ns = INT64_MIN;
663 }
664
665end:
666 return status;
667}
668
669static
670int muxer_notif_iter_set_next_next_return(struct muxer_comp *muxer_comp,
671 struct muxer_notif_iter *muxer_notif_iter)
672{
673 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter;
958f7d11
PP
674 enum bt_notification_iterator_status notif_iter_status;
675 int ret = 0;
676
ab11110e
PP
677 /*
678 * Previous operations might have connected ports. They must be
679 * considered when finding the youngest notification because
680 * their upstream notification iterator does not exist yet.
681 */
682 ret = muxer_notif_iter_handle_newly_connected_ports(muxer_comp,
683 muxer_notif_iter);
684 if (ret) {
685 muxer_notif_iter->next_next_return.status =
686 BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
687 BT_PUT(muxer_notif_iter->next_next_return.notification);
688 goto end;
689 }
690
691 assert(!muxer_notif_iter->newly_connected_priv_ports);
692
693 if (muxer_notif_iter_has_upstream_notif_iter_to_retry(
694 muxer_notif_iter)) {
958f7d11
PP
695 /*
696 * At least one upstream notification iterator to retry:
ab11110e
PP
697 * try again later, because we cannot find the youngest
698 * notification if we don't have the current
699 * notification of each upstream notification iterator.
958f7d11
PP
700 */
701 muxer_notif_iter->next_next_return.status =
702 BT_NOTIFICATION_ITERATOR_STATUS_AGAIN;
703 BT_PUT(muxer_notif_iter->next_next_return.notification);
704 goto end;
705 }
706
707 /*
ab11110e
PP
708 * At this point we know that all our connected ports have an
709 * upstream notification iterator, and that all those iterators
710 * have a current notification (stable state). It is safe to
711 * find the youngest notification. It is possible that calling
712 * "next" on its iterator will connect new ports. This will be
713 * handled by the next call to
714 * muxer_notif_iter_set_next_next_return().
958f7d11
PP
715 */
716 notif_iter_status =
717 muxer_notif_iter_youngest_upstream_notif_iter(muxer_comp,
718 muxer_notif_iter, &muxer_upstream_notif_iter,
719 &muxer_notif_iter->next_next_return_ts_ns);
720 if (notif_iter_status == BT_NOTIFICATION_ITERATOR_STATUS_END) {
721 /* No more active upstream notification iterator */
722 muxer_notif_iter->next_next_return.status =
723 BT_NOTIFICATION_ITERATOR_STATUS_END;
724 BT_PUT(muxer_notif_iter->next_next_return.notification);
725 goto end;
726 }
727
728 if (notif_iter_status < 0) {
729 ret = -1;
730 goto end;
731 }
732
ab11110e
PP
733 assert(notif_iter_status == BT_NOTIFICATION_ITERATOR_STATUS_OK);
734 BT_PUT(muxer_notif_iter->next_next_return.notification);
735 muxer_notif_iter->next_next_return.notification =
736 bt_notification_iterator_get_notification(
737 muxer_upstream_notif_iter->notif_iter);
738 assert(muxer_notif_iter->next_next_return.notification);
958f7d11
PP
739 muxer_notif_iter->next_next_return.status =
740 BT_NOTIFICATION_ITERATOR_STATUS_OK;
ab11110e
PP
741 ret = muxer_upstream_notif_iter_next(muxer_notif_iter,
742 muxer_upstream_notif_iter);
743 if (ret) {
958f7d11
PP
744 goto end;
745 }
746
958f7d11
PP
747 /*
748 * Here we have the next "next" return value. It won't change
749 * until it is returned by the next call to our "next" method.
750 * If its time is less than the time of the last notification
751 * that our "next" method returned, then fail because the
752 * muxer's output wouldn't be monotonic.
753 */
754 if (muxer_notif_iter->next_next_return_ts_ns <
755 muxer_notif_iter->last_returned_ts_ns) {
756 ret = -1;
757 goto end;
758 }
759
760 /*
761 * We are now sure that the next "next" return value will not
762 * change until it is returned by this muxer notification
ab11110e
PP
763 * iterator (unless there's a fatal error). It is now safe to
764 * set the last returned time to this one.
958f7d11
PP
765 */
766 muxer_notif_iter->last_returned_ts_ns =
767 muxer_notif_iter->next_next_return_ts_ns;
768
769end:
770 return ret;
771}
772
773static
ab11110e
PP
774void destroy_muxer_notif_iter(struct muxer_notif_iter *muxer_notif_iter)
775{
776 GList *node;
777
778 if (!muxer_notif_iter) {
779 return;
780 }
781
782 if (muxer_notif_iter->muxer_upstream_notif_iters) {
783 g_ptr_array_free(
784 muxer_notif_iter->muxer_upstream_notif_iters, TRUE);
785 }
786
787 if (muxer_notif_iter->muxer_upstream_notif_iters_to_retry) {
788 g_list_free(muxer_notif_iter->muxer_upstream_notif_iters_to_retry);
789 }
790
791 for (node = muxer_notif_iter->newly_connected_priv_ports;
792 node; node = g_list_next(node)) {
793 bt_put(node->data);
794 }
795
796 g_list_free(muxer_notif_iter->newly_connected_priv_ports);
797 g_free(muxer_notif_iter);
798}
799
800static
801int muxer_notif_iter_init_newly_connected_ports(struct muxer_comp *muxer_comp,
958f7d11
PP
802 struct muxer_notif_iter *muxer_notif_iter)
803{
ab11110e 804 struct bt_component *comp;
958f7d11 805 uint64_t count;
ab11110e
PP
806 uint64_t i;
807 int ret = 0;
958f7d11 808
ab11110e
PP
809 /*
810 * Add the connected input ports to this muxer notification
811 * iterator's list of newly connected ports. They will be
812 * handled by muxer_notif_iter_handle_newly_connected_ports().
813 */
958f7d11
PP
814 comp = bt_component_from_private_component(muxer_comp->priv_comp);
815 assert(comp);
816 ret = bt_component_filter_get_input_port_count(comp, &count);
ab11110e
PP
817 if (ret) {
818 goto end;
819 }
958f7d11
PP
820
821 for (i = 0; i < count; i++) {
822 struct bt_private_port *priv_port =
823 bt_private_component_filter_get_input_private_port_at_index(
824 muxer_comp->priv_comp, i);
825 struct bt_port *port;
958f7d11
PP
826
827 assert(priv_port);
958f7d11 828 port = bt_port_from_private_port(priv_port);
ab11110e 829 assert(port);
958f7d11
PP
830
831 if (!bt_port_is_connected(port)) {
958f7d11 832 bt_put(priv_port);
ab11110e 833 bt_put(port);
958f7d11
PP
834 continue;
835 }
836
837 bt_put(port);
ab11110e
PP
838 muxer_notif_iter->newly_connected_priv_ports =
839 g_list_append(
840 muxer_notif_iter->newly_connected_priv_ports,
958f7d11 841 priv_port);
ab11110e 842 if (!muxer_notif_iter->newly_connected_priv_ports) {
958f7d11 843 bt_put(priv_port);
ab11110e
PP
844 ret = -1;
845 goto end;
958f7d11 846 }
958f7d11
PP
847 }
848
849end:
850 bt_put(comp);
851 return ret;
852}
853
958f7d11
PP
854BT_HIDDEN
855enum bt_notification_iterator_status muxer_notif_iter_init(
856 struct bt_private_notification_iterator *priv_notif_iter,
857 struct bt_private_port *output_priv_port)
858{
859 struct muxer_comp *muxer_comp = NULL;
860 struct muxer_notif_iter *muxer_notif_iter = NULL;
861 struct bt_private_component *priv_comp = NULL;
862 enum bt_notification_iterator_status status =
863 BT_NOTIFICATION_ITERATOR_STATUS_OK;
864 int ret;
865
866 priv_comp = bt_private_notification_iterator_get_private_component(
867 priv_notif_iter);
868 assert(priv_comp);
869 muxer_comp = bt_private_component_get_user_data(priv_comp);
870 assert(muxer_comp);
a09c6b95
PP
871
872 if (muxer_comp->initializing_muxer_notif_iter) {
873 /*
874 * Weird, unhandled situation: downstream creates a
875 * muxer notification iterator while creating another
876 * muxer notification iterator (same component).
877 */
958f7d11
PP
878 goto error;
879 }
880
a09c6b95
PP
881 muxer_comp->initializing_muxer_notif_iter = true;
882 muxer_notif_iter = g_new0(struct muxer_notif_iter, 1);
883 if (!muxer_notif_iter) {
ab11110e
PP
884 goto error;
885 }
886
958f7d11
PP
887 muxer_notif_iter->last_returned_ts_ns = INT64_MIN;
888 muxer_notif_iter->muxer_upstream_notif_iters =
889 g_ptr_array_new_with_free_func(
890 (GDestroyNotify) destroy_muxer_upstream_notif_iter);
891 if (!muxer_notif_iter->muxer_upstream_notif_iters) {
892 goto error;
893 }
894
a09c6b95
PP
895 /*
896 * Add the muxer notification iterator to the component's array
897 * of muxer notification iterators here because
898 * muxer_notif_iter_init_newly_connected_ports() can cause
899 * muxer_port_connected() to be called, which adds the newly
900 * connected port to each muxer notification iterator's list of
901 * newly connected ports.
902 */
903 g_ptr_array_add(muxer_comp->muxer_notif_iters, muxer_notif_iter);
904 ret = muxer_notif_iter_init_newly_connected_ports(muxer_comp,
905 muxer_notif_iter);
906 if (ret) {
907 goto error;
908 }
909
ab11110e 910 /* Set the initial "next" return value */
958f7d11
PP
911 ret = muxer_notif_iter_set_next_next_return(muxer_comp,
912 muxer_notif_iter);
913 if (ret) {
914 goto error;
915 }
916
917 ret = bt_private_notification_iterator_set_user_data(priv_notif_iter,
918 muxer_notif_iter);
919 assert(ret == 0);
958f7d11
PP
920 goto end;
921
922error:
a09c6b95
PP
923 if (g_ptr_array_index(muxer_comp->muxer_notif_iters,
924 muxer_comp->muxer_notif_iters->len - 1) == muxer_notif_iter) {
925 g_ptr_array_remove_index(muxer_comp->muxer_notif_iters,
926 muxer_comp->muxer_notif_iters->len - 1);
927 }
928
958f7d11
PP
929 destroy_muxer_notif_iter(muxer_notif_iter);
930 ret = bt_private_notification_iterator_set_user_data(priv_notif_iter,
931 NULL);
932 assert(ret == 0);
933 status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
934
935end:
a09c6b95 936 muxer_comp->initializing_muxer_notif_iter = false;
958f7d11
PP
937 bt_put(priv_comp);
938 return status;
939}
940
941BT_HIDDEN
942void muxer_notif_iter_finalize(
943 struct bt_private_notification_iterator *priv_notif_iter)
944{
945 struct muxer_notif_iter *muxer_notif_iter =
946 bt_private_notification_iterator_get_user_data(priv_notif_iter);
947 struct bt_private_component *priv_comp = NULL;
948 struct muxer_comp *muxer_comp = NULL;
949
950 priv_comp = bt_private_notification_iterator_get_private_component(
951 priv_notif_iter);
952 assert(priv_comp);
953 muxer_comp = bt_private_component_get_user_data(priv_comp);
954
955 if (muxer_comp) {
956 (void) g_ptr_array_remove_fast(muxer_comp->muxer_notif_iters,
957 muxer_notif_iter);
958 destroy_muxer_notif_iter(muxer_notif_iter);
959 }
960
961 bt_put(priv_comp);
962}
963
964BT_HIDDEN
965struct bt_notification_iterator_next_return muxer_notif_iter_next(
966 struct bt_private_notification_iterator *priv_notif_iter)
967{
968 struct bt_notification_iterator_next_return next_ret = {
969 .notification = NULL,
970 };
971 struct muxer_notif_iter *muxer_notif_iter =
972 bt_private_notification_iterator_get_user_data(priv_notif_iter);
973 struct bt_private_component *priv_comp = NULL;
974 struct muxer_comp *muxer_comp = NULL;
ab11110e 975 GList *retry_node;
958f7d11
PP
976 int ret;
977
978 assert(muxer_notif_iter);
979 priv_comp = bt_private_notification_iterator_get_private_component(
980 priv_notif_iter);
981 assert(priv_comp);
982 muxer_comp = bt_private_component_get_user_data(priv_comp);
983 assert(muxer_comp);
984
985 /* Are we in an error state set elsewhere? */
986 if (unlikely(muxer_comp->error)) {
987 goto error;
988 }
989
990 /*
991 * If we have upstream notification iterators to retry, retry
992 * them now. Each one we find which now has a notification or
993 * is in "end" state, we set it to NULL in this array. Then
994 * we remove all the NULL values from this array.
995 */
ab11110e
PP
996 retry_node = muxer_notif_iter->muxer_upstream_notif_iters_to_retry;
997 while (retry_node) {
958f7d11 998 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter =
ab11110e 999 retry_node->data;
958f7d11 1000 enum bt_notification_iterator_status status;
ab11110e 1001 GList *next_retry_node = g_list_next(retry_node);
958f7d11
PP
1002
1003 assert(muxer_upstream_notif_iter->notif_iter);
1004 status = bt_notification_iterator_next(
1005 muxer_upstream_notif_iter->notif_iter);
1006 if (status < 0) {
958f7d11
PP
1007 goto error;
1008 }
1009
1010 if (status == BT_NOTIFICATION_ITERATOR_STATUS_END) {
1011 /*
1012 * This upstream notification iterator is done.
ab11110e 1013 * Put the iterator and remove node from list.
958f7d11 1014 */
958f7d11 1015 BT_PUT(muxer_upstream_notif_iter->notif_iter);
ab11110e
PP
1016 muxer_notif_iter->muxer_upstream_notif_iters_to_retry =
1017 g_list_delete_link(
1018 muxer_notif_iter->muxer_upstream_notif_iters_to_retry,
1019 retry_node);
1020 retry_node = next_retry_node;
958f7d11
PP
1021 continue;
1022 }
1023
1024 assert(status == BT_NOTIFICATION_ITERATOR_STATUS_OK ||
1025 status == BT_NOTIFICATION_ITERATOR_STATUS_AGAIN);
1026
1027 if (status == BT_NOTIFICATION_ITERATOR_STATUS_OK) {
1028 /*
1029 * This upstream notification iterator now has.
ab11110e 1030 * a notification. Remove it from this list.
958f7d11 1031 */
ab11110e
PP
1032 muxer_notif_iter->muxer_upstream_notif_iters_to_retry =
1033 g_list_delete_link(
1034 muxer_notif_iter->muxer_upstream_notif_iters_to_retry,
1035 retry_node);
958f7d11 1036 }
958f7d11 1037
ab11110e
PP
1038 retry_node = next_retry_node;
1039 }
958f7d11
PP
1040
1041 /* Take our next "next" next return value */
1042 next_ret = muxer_notif_iter->next_next_return;
1043 muxer_notif_iter->next_next_return.status =
1044 BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
1045 muxer_notif_iter->next_next_return.notification = NULL;
1046
1047 /* Set the next "next" return value */
1048 ret = muxer_notif_iter_set_next_next_return(muxer_comp,
1049 muxer_notif_iter);
1050 if (ret) {
1051 goto error;
1052 }
1053
1054 goto end;
1055
1056error:
ab11110e
PP
1057 /*
1058 * Technically we already have a next "next" return value which
1059 * is ready for this call, but we're failing within this call,
1060 * so discard this buffer and return the error ASAP.
1061 */
958f7d11
PP
1062 BT_PUT(next_ret.notification);
1063 next_ret.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
1064
1065end:
1066 bt_put(priv_comp);
1067 return next_ret;
1068}
1069
1070BT_HIDDEN
1071void muxer_port_connected(
1072 struct bt_private_component *priv_comp,
1073 struct bt_private_port *self_private_port,
1074 struct bt_port *other_port)
1075{
1076 struct bt_port *self_port =
1077 bt_port_from_private_port(self_private_port);
1078 struct muxer_comp *muxer_comp =
1079 bt_private_component_get_user_data(priv_comp);
1080 size_t i;
958f7d11
PP
1081
1082 assert(self_port);
1083 assert(muxer_comp);
1084
1085 if (bt_port_get_type(self_port) == BT_PORT_TYPE_INPUT) {
1086 int ret;
1087
1088 /* One less available input port */
1089 muxer_comp->available_input_ports--;
1090 ret = ensure_available_input_port(priv_comp);
1091 if (ret) {
1092 muxer_comp->error = true;
1093 goto end;
1094 }
1095 }
1096
1097 for (i = 0; i < muxer_comp->muxer_notif_iters->len; i++) {
1098 struct muxer_notif_iter *muxer_notif_iter =
1099 g_ptr_array_index(muxer_comp->muxer_notif_iters, i);
1100
1101 /*
ab11110e
PP
1102 * Add this port to the list of newly connected ports
1103 * for this muxer notification iterator. We append at
1104 * the end of this list while
1105 * muxer_notif_iter_handle_newly_connected_ports()
1106 * removes the nodes from the beginning.
1107 *
1108 * The list node owns the private port.
958f7d11 1109 */
ab11110e
PP
1110 muxer_notif_iter->newly_connected_priv_ports =
1111 g_list_append(
1112 muxer_notif_iter->newly_connected_priv_ports,
1113 bt_get(self_private_port));
1114 if (!muxer_notif_iter->newly_connected_priv_ports) {
1115 bt_put(self_private_port);
958f7d11
PP
1116 muxer_comp->error = true;
1117 goto end;
1118 }
1119 }
1120
1121end:
1122 bt_put(self_port);
1123}
1124
1125BT_HIDDEN
1126void muxer_port_disconnected(struct bt_private_component *priv_comp,
1127 struct bt_private_port *priv_port)
1128{
1129 struct bt_port *port = bt_port_from_private_port(priv_port);
1130 struct muxer_comp *muxer_comp =
1131 bt_private_component_get_user_data(priv_comp);
1132
1133 assert(port);
1134 assert(muxer_comp);
1135
ab11110e
PP
1136 /*
1137 * There's nothing special to do when a port is disconnected
1138 * because this component deals with upstream notification
1139 * iterators which were already created thanks to connected
1140 * ports. The fact that the port is disconnected does not cancel
1141 * the upstream notification iterators created using its
1142 * connection: they still exist. The only way to remove an
1143 * upstream notification iterator is for its "next" operation to
1144 * return BT_NOTIFICATION_ITERATOR_STATUS_END.
1145 */
958f7d11
PP
1146 if (bt_port_get_type(port) == BT_PORT_TYPE_INPUT) {
1147 /* One more available input port */
1148 muxer_comp->available_input_ports++;
1149 }
1150
1151 bt_put(port);
1152}
This page took 0.069411 seconds and 4 git commands to generate.