Remove default port API
[babeltrace.git] / plugins / utils / muxer / muxer.c
CommitLineData
958f7d11
PP
1/*
2 * Copyright 2017 Philippe Proulx <pproulx@efficios.com>
3 *
4 * Permission is hereby granted, free of charge, to any person obtaining a copy
5 * of this software and associated documentation files (the "Software"), to deal
6 * in the Software without restriction, including without limitation the rights
7 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 * copies of the Software, and to permit persons to whom the Software is
9 * furnished to do so, subject to the following conditions:
10 *
11 * The above copyright notice and this permission notice shall be included in
12 * all copies or substantial portions of the Software.
13 *
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20 * SOFTWARE.
21 */
22
23#include <babeltrace/babeltrace-internal.h>
24#include <babeltrace/ctf-ir/clock-class.h>
25#include <babeltrace/ctf-ir/event.h>
26#include <babeltrace/graph/clock-class-priority-map.h>
27#include <babeltrace/graph/component-filter.h>
28#include <babeltrace/graph/component.h>
29#include <babeltrace/graph/notification-event.h>
30#include <babeltrace/graph/notification-inactivity.h>
31#include <babeltrace/graph/notification-iterator.h>
32#include <babeltrace/graph/notification.h>
33#include <babeltrace/graph/port.h>
34#include <babeltrace/graph/private-component-filter.h>
35#include <babeltrace/graph/private-component.h>
36#include <babeltrace/graph/private-component.h>
37#include <babeltrace/graph/private-connection.h>
38#include <babeltrace/graph/private-notification-iterator.h>
39#include <babeltrace/graph/private-port.h>
40#include <plugins-common.h>
41#include <glib.h>
42#include <assert.h>
43
44struct muxer_comp {
45 /* Array of struct bt_private_notification_iterator * (weak refs) */
46 GPtrArray *muxer_notif_iters;
47
48 /* Weak ref */
49 struct bt_private_component *priv_comp;
50 unsigned int next_port_num;
51 size_t available_input_ports;
52 bool error;
a09c6b95 53 bool initializing_muxer_notif_iter;
958f7d11
PP
54};
55
56struct muxer_upstream_notif_iter {
ab11110e 57 /* Owned by this, NULL if ended */
958f7d11
PP
58 struct bt_notification_iterator *notif_iter;
59
60 /* Owned by this*/
61 struct bt_private_port *priv_port;
089717de
PP
62
63 /*
64 * This flag is true if the upstream notification iterator's
65 * current notification must be considered for the multiplexing
66 * operations. If the upstream iterator returns
67 * BT_NOTIFICATION_ITERATOR_STATUS_AGAIN, then this object
68 * is considered invalid, because its current notification is
69 * still the previous one, but we already took it into account.
70 *
71 * The value of this flag is not important if notif_iter above
72 * is NULL (which means the upstream iterator is finished).
73 */
74 bool is_valid;
958f7d11
PP
75};
76
958f7d11 77struct muxer_notif_iter {
ab11110e
PP
78 /*
79 * Array of struct muxer_upstream_notif_iter * (owned by this).
80 *
81 * NOTE: This array is searched in linearly to find the youngest
82 * current notification. Keep this until benchmarks confirm that
83 * another data structure is faster than this for our typical
84 * use cases.
85 */
958f7d11
PP
86 GPtrArray *muxer_upstream_notif_iters;
87
ab11110e
PP
88 /*
89 * List of "recently" connected input ports (owned by this) to
90 * handle by this muxer notification iterator.
91 * muxer_port_connected() adds entries to this list, and the
92 * entries are removed when a notification iterator is created
93 * on the port's connection and put into
94 * muxer_upstream_notif_iters above by
95 * muxer_notif_iter_handle_newly_connected_ports().
96 */
97 GList *newly_connected_priv_ports;
958f7d11
PP
98
99 /* Next thing to return by the "next" method */
100 struct bt_notification_iterator_next_return next_next_return;
958f7d11
PP
101
102 /* Last time returned in a notification */
103 int64_t last_returned_ts_ns;
104};
105
ab11110e
PP
106static
107void destroy_muxer_upstream_notif_iter(
108 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter)
109{
110 if (!muxer_upstream_notif_iter) {
111 return;
112 }
113
114 bt_put(muxer_upstream_notif_iter->notif_iter);
115 bt_put(muxer_upstream_notif_iter->priv_port);
116 g_free(muxer_upstream_notif_iter);
117}
118
958f7d11
PP
119static
120struct muxer_upstream_notif_iter *muxer_notif_iter_add_upstream_notif_iter(
121 struct muxer_notif_iter *muxer_notif_iter,
122 struct bt_notification_iterator *notif_iter,
123 struct bt_private_port *priv_port)
124{
125 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter =
126 g_new0(struct muxer_upstream_notif_iter, 1);
127
128 if (!muxer_upstream_notif_iter) {
129 goto end;
130 }
131
132 muxer_upstream_notif_iter->notif_iter = bt_get(notif_iter);
133 muxer_upstream_notif_iter->priv_port = bt_get(priv_port);
089717de 134 muxer_upstream_notif_iter->is_valid = false;
958f7d11
PP
135 g_ptr_array_add(muxer_notif_iter->muxer_upstream_notif_iters,
136 muxer_upstream_notif_iter);
137
138end:
139 return muxer_upstream_notif_iter;
140}
141
958f7d11
PP
142static
143int ensure_available_input_port(struct bt_private_component *priv_comp)
144{
145 struct muxer_comp *muxer_comp =
146 bt_private_component_get_user_data(priv_comp);
147 int ret = 0;
148 GString *port_name = NULL;
149 void *priv_port = NULL;
150
151 assert(muxer_comp);
152
153 if (muxer_comp->available_input_ports >= 1) {
154 goto end;
155 }
156
157 port_name = g_string_new("in");
158 if (!port_name) {
159 ret = -1;
160 goto end;
161 }
162
163 g_string_append_printf(port_name, "%u", muxer_comp->next_port_num);
164 priv_port = bt_private_component_filter_add_input_private_port(
3e9b0023 165 priv_comp, port_name->str, NULL);
958f7d11
PP
166 if (!priv_port) {
167 ret = -1;
168 goto end;
169 }
170
171 muxer_comp->available_input_ports++;
172 muxer_comp->next_port_num++;
173
174end:
175 if (port_name) {
176 g_string_free(port_name, TRUE);
177 }
178
179 BT_PUT(priv_port);
180 return ret;
181}
182
958f7d11
PP
183static
184int create_output_port(struct bt_private_component *priv_comp)
185{
186 void *priv_port;
187 int ret = 0;
188
189 priv_port = bt_private_component_filter_add_output_private_port(
3e9b0023 190 priv_comp, "out", NULL);
958f7d11
PP
191 if (!priv_port) {
192 ret = -1;
193 }
194
195 bt_put(priv_port);
196 return ret;
197}
198
199static
200void destroy_muxer_comp(struct muxer_comp *muxer_comp)
201{
202 if (!muxer_comp) {
203 return;
204 }
205
206 if (muxer_comp->muxer_notif_iters) {
207 g_ptr_array_free(muxer_comp->muxer_notif_iters, TRUE);
208 }
209
210 g_free(muxer_comp);
211}
212
213BT_HIDDEN
214enum bt_component_status muxer_init(
215 struct bt_private_component *priv_comp,
216 struct bt_value *params, void *init_data)
217{
218 int ret;
219 enum bt_component_status status = BT_COMPONENT_STATUS_OK;
220 struct muxer_comp *muxer_comp = g_new0(struct muxer_comp, 1);
221
222 if (!muxer_comp) {
223 goto error;
224 }
225
226 muxer_comp->muxer_notif_iters = g_ptr_array_new();
227 if (!muxer_comp->muxer_notif_iters) {
228 goto error;
229 }
230
231 muxer_comp->priv_comp = priv_comp;
232 ret = bt_private_component_set_user_data(priv_comp, muxer_comp);
233 assert(ret == 0);
958f7d11
PP
234 ret = ensure_available_input_port(priv_comp);
235 if (ret) {
236 goto error;
237 }
238
239 ret = create_output_port(priv_comp);
240 if (ret) {
241 goto error;
242 }
243
244 goto end;
245
246error:
247 destroy_muxer_comp(muxer_comp);
248 ret = bt_private_component_set_user_data(priv_comp, NULL);
249 assert(ret == 0);
250 status = BT_COMPONENT_STATUS_ERROR;
251
252end:
253 return status;
254}
255
256BT_HIDDEN
257void muxer_finalize(struct bt_private_component *priv_comp)
258{
259 struct muxer_comp *muxer_comp =
260 bt_private_component_get_user_data(priv_comp);
261
262 destroy_muxer_comp(muxer_comp);
263}
264
265static
266struct bt_notification_iterator *create_notif_iter_on_input_port(
267 struct bt_private_port *priv_port, int *ret)
268{
269 struct bt_port *port = bt_port_from_private_port(priv_port);
270 struct bt_notification_iterator *notif_iter = NULL;
271 struct bt_private_connection *priv_conn = NULL;
272
273 assert(ret);
274 *ret = 0;
275 assert(port);
276
277 assert(bt_port_is_connected(port));
278 priv_conn = bt_private_port_get_private_connection(priv_port);
279 if (!priv_conn) {
280 *ret = -1;
281 goto end;
282 }
283
ab11110e
PP
284 // TODO: Advance the iterator to >= the time of the latest
285 // returned notification by the muxer notification
286 // iterator which creates it.
958f7d11
PP
287 notif_iter = bt_private_connection_create_notification_iterator(
288 priv_conn);
289 if (!notif_iter) {
290 *ret = -1;
291 goto end;
292 }
293
294end:
295 bt_put(port);
296 bt_put(priv_conn);
297 return notif_iter;
298}
299
ab11110e 300static
089717de 301enum bt_notification_iterator_status muxer_upstream_notif_iter_next(
ab11110e
PP
302 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter)
303{
089717de 304 enum bt_notification_iterator_status status;
ab11110e 305
089717de 306 status = bt_notification_iterator_next(
ab11110e
PP
307 muxer_upstream_notif_iter->notif_iter);
308
089717de 309 switch (status) {
ab11110e 310 case BT_NOTIFICATION_ITERATOR_STATUS_OK:
089717de
PP
311 /*
312 * Notification iterator's current notification is valid:
313 * it must be considered for muxing operations.
314 */
315 muxer_upstream_notif_iter->is_valid = true;
ab11110e
PP
316 break;
317 case BT_NOTIFICATION_ITERATOR_STATUS_AGAIN:
089717de
PP
318 /*
319 * Notification iterator's current notification is not
320 * valid anymore. Return
321 * BT_NOTIFICATION_ITERATOR_STATUS_AGAIN
322 * immediately.
323 */
324 muxer_upstream_notif_iter->is_valid = false;
ab11110e
PP
325 break;
326 case BT_NOTIFICATION_ITERATOR_STATUS_END:
327 /*
328 * Notification iterator reached the end: release it. It
329 * won't be considered again to find the youngest
330 * notification.
331 */
332 BT_PUT(muxer_upstream_notif_iter->notif_iter);
089717de
PP
333 muxer_upstream_notif_iter->is_valid = false;
334 status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
335 break;
ab11110e
PP
336 default:
337 /* Error or unsupported status code */
089717de
PP
338 status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
339 break;
ab11110e
PP
340 }
341
089717de 342 return status;
ab11110e
PP
343}
344
345static
089717de 346int muxer_notif_iter_handle_newly_connected_ports(
ab11110e
PP
347 struct muxer_notif_iter *muxer_notif_iter)
348{
ab11110e
PP
349 int ret = 0;
350
ab11110e
PP
351 /*
352 * Here we create one upstream notification iterator for each
089717de
PP
353 * newly connected port. We do not perform an initial "next" on
354 * those new upstream notification iterators: they are
355 * invalidated, to be validated later. The list of newly
356 * connected ports to handle here is updated by
357 * muxer_port_connected().
ab11110e
PP
358 */
359 while (true) {
360 GList *node = muxer_notif_iter->newly_connected_priv_ports;
361 struct bt_private_port *priv_port;
362 struct bt_port *port;
363 struct bt_notification_iterator *upstream_notif_iter = NULL;
364 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter;
365
366 if (!node) {
367 break;
368 }
369
370 priv_port = node->data;
371 port = bt_port_from_private_port(priv_port);
372 assert(port);
373
374 if (!bt_port_is_connected(port)) {
375 /*
376 * Looks like this port is not connected
377 * anymore: we can't create an upstream
089717de
PP
378 * notification iterator on its (non-existing)
379 * connection in this case.
ab11110e
PP
380 */
381 goto remove_node;
382 }
383
384 BT_PUT(port);
385 upstream_notif_iter = create_notif_iter_on_input_port(priv_port,
386 &ret);
387 if (ret) {
388 assert(!upstream_notif_iter);
389 bt_put(priv_port);
390 goto error;
391 }
392
393 muxer_upstream_notif_iter =
394 muxer_notif_iter_add_upstream_notif_iter(
395 muxer_notif_iter, upstream_notif_iter,
396 priv_port);
397 BT_PUT(priv_port);
398 BT_PUT(upstream_notif_iter);
399 if (!muxer_upstream_notif_iter) {
400 goto error;
401 }
402
ab11110e
PP
403remove_node:
404 bt_put(upstream_notif_iter);
405 bt_put(port);
406 bt_put(priv_port);
407 muxer_notif_iter->newly_connected_priv_ports =
408 g_list_delete_link(
409 muxer_notif_iter->newly_connected_priv_ports,
410 node);
411 }
412
413 goto end;
414
415error:
089717de 416 if (ret >= 0) {
ab11110e
PP
417 ret = -1;
418 }
419
420end:
ab11110e
PP
421 return ret;
422}
423
958f7d11
PP
424static
425int get_notif_ts_ns(struct muxer_comp *muxer_comp,
426 struct bt_notification *notif, int64_t last_returned_ts_ns,
427 int64_t *ts_ns)
428{
429 struct bt_clock_class_priority_map *cc_prio_map = NULL;
430 struct bt_ctf_clock_class *clock_class = NULL;
431 struct bt_ctf_clock_value *clock_value = NULL;
432 struct bt_ctf_event *event = NULL;
433 int ret = 0;
434
435 assert(notif);
436 assert(ts_ns);
437
438 switch (bt_notification_get_type(notif)) {
439 case BT_NOTIFICATION_TYPE_EVENT:
440 cc_prio_map =
441 bt_notification_event_get_clock_class_priority_map(
442 notif);
443 break;
444
445 case BT_NOTIFICATION_TYPE_INACTIVITY:
446 cc_prio_map =
a09c6b95 447 bt_notification_inactivity_get_clock_class_priority_map(
958f7d11
PP
448 notif);
449 break;
450 default:
089717de 451 /* All the other notifications have a higher priority */
958f7d11
PP
452 *ts_ns = last_returned_ts_ns;
453 goto end;
454 }
455
456 if (!cc_prio_map) {
457 goto error;
458 }
459
460 /*
461 * If the clock class priority map is empty, then we consider
462 * that this notification has no time. In this case it's always
463 * the youngest.
464 */
465 if (bt_clock_class_priority_map_get_clock_class_count(cc_prio_map) == 0) {
466 *ts_ns = last_returned_ts_ns;
467 goto end;
468 }
469
470 clock_class =
471 bt_clock_class_priority_map_get_highest_priority_clock_class(
472 cc_prio_map);
473 if (!clock_class) {
474 goto error;
475 }
476
acd6aeb1 477 if (!bt_ctf_clock_class_is_absolute(clock_class)) {
ab11110e 478 // TODO: Allow this with an explicit parameter
958f7d11
PP
479 goto error;
480 }
481
482 switch (bt_notification_get_type(notif)) {
483 case BT_NOTIFICATION_TYPE_EVENT:
484 event = bt_notification_event_get_event(notif);
ab11110e 485 assert(event);
958f7d11
PP
486 clock_value = bt_ctf_event_get_clock_value(event,
487 clock_class);
488 break;
489 case BT_NOTIFICATION_TYPE_INACTIVITY:
490 clock_value = bt_notification_inactivity_get_clock_value(
491 notif, clock_class);
492 break;
493 default:
494 assert(false);
495 }
496
497 if (!clock_value) {
498 goto error;
499 }
500
501 ret = bt_ctf_clock_value_get_value_ns_from_epoch(clock_value, ts_ns);
502 if (ret) {
503 goto error;
504 }
505
506 goto end;
507
508error:
509 ret = -1;
510
511end:
512 bt_put(cc_prio_map);
513 bt_put(event);
514 bt_put(clock_class);
515 bt_put(clock_value);
516 return ret;
517}
518
ab11110e
PP
519/*
520 * This function finds the youngest available notification amongst the
521 * non-ended upstream notification iterators and returns the upstream
522 * notification iterator which has it, or
523 * BT_NOTIFICATION_ITERATOR_STATUS_END if there's no available
524 * notification.
525 *
526 * This function does NOT:
527 *
528 * * Update any upstream notification iterator.
529 * * Check for newly connected ports.
530 * * Check the upstream notification iterators to retry.
531 *
532 * On sucess, this function sets *muxer_upstream_notif_iter to the
533 * upstream notification iterator of which the current notification is
534 * the youngest, and sets *ts_ns to its time.
535 */
958f7d11
PP
536static
537enum bt_notification_iterator_status
538muxer_notif_iter_youngest_upstream_notif_iter(
539 struct muxer_comp *muxer_comp,
540 struct muxer_notif_iter *muxer_notif_iter,
541 struct muxer_upstream_notif_iter **muxer_upstream_notif_iter,
542 int64_t *ts_ns)
543{
544 size_t i;
545 int ret;
546 int64_t youngest_ts_ns = INT64_MAX;
547 enum bt_notification_iterator_status status =
548 BT_NOTIFICATION_ITERATOR_STATUS_OK;
549
550 assert(muxer_comp);
551 assert(muxer_notif_iter);
552 assert(muxer_upstream_notif_iter);
553 *muxer_upstream_notif_iter = NULL;
554
555 for (i = 0; i < muxer_notif_iter->muxer_upstream_notif_iters->len; i++) {
556 struct bt_notification *notif;
557 struct muxer_upstream_notif_iter *cur_muxer_upstream_notif_iter =
558 g_ptr_array_index(muxer_notif_iter->muxer_upstream_notif_iters, i);
559 int64_t notif_ts_ns;
560
561 if (!cur_muxer_upstream_notif_iter->notif_iter) {
ab11110e 562 /* This upstream notification iterator is ended */
958f7d11
PP
563 continue;
564 }
565
089717de 566 assert(cur_muxer_upstream_notif_iter->is_valid);
958f7d11
PP
567 notif = bt_notification_iterator_get_notification(
568 cur_muxer_upstream_notif_iter->notif_iter);
569 assert(notif);
570 ret = get_notif_ts_ns(muxer_comp, notif,
571 muxer_notif_iter->last_returned_ts_ns, &notif_ts_ns);
572 bt_put(notif);
573 if (ret) {
574 *muxer_upstream_notif_iter = NULL;
575 status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
576 goto end;
577 }
578
579 if (notif_ts_ns <= youngest_ts_ns) {
580 *muxer_upstream_notif_iter =
581 cur_muxer_upstream_notif_iter;
582 youngest_ts_ns = notif_ts_ns;
583 *ts_ns = youngest_ts_ns;
584 }
585 }
586
587 if (!*muxer_upstream_notif_iter) {
588 status = BT_NOTIFICATION_ITERATOR_STATUS_END;
589 *ts_ns = INT64_MIN;
590 }
591
592end:
593 return status;
594}
595
596static
089717de
PP
597enum bt_notification_iterator_status validate_muxer_upstream_notif_iter(
598 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter)
958f7d11 599{
089717de
PP
600 enum bt_notification_iterator_status status =
601 BT_NOTIFICATION_ITERATOR_STATUS_OK;
958f7d11 602
089717de
PP
603 if (muxer_upstream_notif_iter->is_valid ||
604 !muxer_upstream_notif_iter->notif_iter) {
ab11110e
PP
605 goto end;
606 }
607
089717de
PP
608 status = muxer_upstream_notif_iter_next(muxer_upstream_notif_iter);
609
610end:
611 return status;
612}
613
614static
615enum bt_notification_iterator_status validate_muxer_upstream_notif_iters(
616 struct muxer_notif_iter *muxer_notif_iter)
617{
618 enum bt_notification_iterator_status status =
619 BT_NOTIFICATION_ITERATOR_STATUS_OK;
620 size_t i;
621
622 for (i = 0; i < muxer_notif_iter->muxer_upstream_notif_iters->len; i++) {
623 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter =
624 g_ptr_array_index(
625 muxer_notif_iter->muxer_upstream_notif_iters,
626 i);
627
628 status = validate_muxer_upstream_notif_iter(
629 muxer_upstream_notif_iter);
630 if (status != BT_NOTIFICATION_ITERATOR_STATUS_OK) {
631 goto end;
632 }
633 }
634
635end:
636 return status;
637}
638
639static
640struct bt_notification_iterator_next_return muxer_notif_iter_do_next(
641 struct muxer_comp *muxer_comp,
642 struct muxer_notif_iter *muxer_notif_iter)
643{
644 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter = NULL;
645 struct bt_notification_iterator_next_return next_return = {
646 .notification = NULL,
647 .status = BT_NOTIFICATION_ITERATOR_STATUS_OK,
648 };
649 int64_t next_return_ts;
650
651 while (true) {
652 int ret = muxer_notif_iter_handle_newly_connected_ports(
653 muxer_notif_iter);
654
655 if (ret) {
656 next_return.status =
657 BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
658 goto end;
659 }
660
661 next_return.status =
662 validate_muxer_upstream_notif_iters(muxer_notif_iter);
663 if (next_return.status != BT_NOTIFICATION_ITERATOR_STATUS_OK) {
664 goto end;
665 }
ab11110e 666
958f7d11 667 /*
089717de
PP
668 * At this point, we know that all the existing upstream
669 * notification iterators are valid. However the
670 * operations to validate them (during
671 * validate_muxer_upstream_notif_iters()) may have
672 * connected new ports. If no ports were connected
673 * during this operation, exit the loop.
958f7d11 674 */
089717de
PP
675 if (!muxer_notif_iter->newly_connected_priv_ports) {
676 break;
677 }
958f7d11
PP
678 }
679
089717de
PP
680 assert(!muxer_notif_iter->newly_connected_priv_ports);
681
958f7d11 682 /*
089717de
PP
683 * At this point we know that all the existing upstream
684 * notification iterators are valid. We can find the one,
685 * amongst those, of which the current notification is the
686 * youngest.
958f7d11 687 */
089717de 688 next_return.status =
958f7d11
PP
689 muxer_notif_iter_youngest_upstream_notif_iter(muxer_comp,
690 muxer_notif_iter, &muxer_upstream_notif_iter,
089717de
PP
691 &next_return_ts);
692 if (next_return.status < 0 ||
693 next_return.status == BT_NOTIFICATION_ITERATOR_STATUS_END) {
958f7d11
PP
694 goto end;
695 }
696
089717de
PP
697 if (next_return_ts < muxer_notif_iter->last_returned_ts_ns) {
698 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
958f7d11
PP
699 goto end;
700 }
701
089717de
PP
702 assert(next_return.status == BT_NOTIFICATION_ITERATOR_STATUS_OK);
703 assert(muxer_upstream_notif_iter);
704 next_return.notification = bt_notification_iterator_get_notification(
705 muxer_upstream_notif_iter->notif_iter);
706 assert(next_return.notification);
958f7d11
PP
707
708 /*
089717de
PP
709 * We invalidate the upstream notification iterator so that, the
710 * next time this function is called,
711 * validate_muxer_upstream_notif_iters() will make it valid.
958f7d11 712 */
089717de
PP
713 muxer_upstream_notif_iter->is_valid = false;
714 muxer_notif_iter->last_returned_ts_ns = next_return_ts;
958f7d11
PP
715
716end:
089717de 717 return next_return;
958f7d11
PP
718}
719
720static
ab11110e
PP
721void destroy_muxer_notif_iter(struct muxer_notif_iter *muxer_notif_iter)
722{
723 GList *node;
724
725 if (!muxer_notif_iter) {
726 return;
727 }
728
729 if (muxer_notif_iter->muxer_upstream_notif_iters) {
730 g_ptr_array_free(
731 muxer_notif_iter->muxer_upstream_notif_iters, TRUE);
732 }
733
ab11110e
PP
734 for (node = muxer_notif_iter->newly_connected_priv_ports;
735 node; node = g_list_next(node)) {
736 bt_put(node->data);
737 }
738
739 g_list_free(muxer_notif_iter->newly_connected_priv_ports);
740 g_free(muxer_notif_iter);
741}
742
743static
744int muxer_notif_iter_init_newly_connected_ports(struct muxer_comp *muxer_comp,
958f7d11
PP
745 struct muxer_notif_iter *muxer_notif_iter)
746{
ab11110e 747 struct bt_component *comp;
544d0515
PP
748 int64_t count;
749 int64_t i;
ab11110e 750 int ret = 0;
958f7d11 751
ab11110e
PP
752 /*
753 * Add the connected input ports to this muxer notification
754 * iterator's list of newly connected ports. They will be
755 * handled by muxer_notif_iter_handle_newly_connected_ports().
756 */
958f7d11
PP
757 comp = bt_component_from_private_component(muxer_comp->priv_comp);
758 assert(comp);
544d0515
PP
759 count = bt_component_filter_get_input_port_count(comp);
760 if (count < 0) {
ab11110e
PP
761 goto end;
762 }
958f7d11
PP
763
764 for (i = 0; i < count; i++) {
765 struct bt_private_port *priv_port =
9ac68eb1 766 bt_private_component_filter_get_input_private_port_by_index(
958f7d11
PP
767 muxer_comp->priv_comp, i);
768 struct bt_port *port;
958f7d11
PP
769
770 assert(priv_port);
958f7d11 771 port = bt_port_from_private_port(priv_port);
ab11110e 772 assert(port);
958f7d11
PP
773
774 if (!bt_port_is_connected(port)) {
958f7d11 775 bt_put(priv_port);
ab11110e 776 bt_put(port);
958f7d11
PP
777 continue;
778 }
779
780 bt_put(port);
ab11110e
PP
781 muxer_notif_iter->newly_connected_priv_ports =
782 g_list_append(
783 muxer_notif_iter->newly_connected_priv_ports,
958f7d11 784 priv_port);
ab11110e 785 if (!muxer_notif_iter->newly_connected_priv_ports) {
958f7d11 786 bt_put(priv_port);
ab11110e
PP
787 ret = -1;
788 goto end;
958f7d11 789 }
958f7d11
PP
790 }
791
792end:
793 bt_put(comp);
794 return ret;
795}
796
958f7d11
PP
797BT_HIDDEN
798enum bt_notification_iterator_status muxer_notif_iter_init(
799 struct bt_private_notification_iterator *priv_notif_iter,
800 struct bt_private_port *output_priv_port)
801{
802 struct muxer_comp *muxer_comp = NULL;
803 struct muxer_notif_iter *muxer_notif_iter = NULL;
804 struct bt_private_component *priv_comp = NULL;
805 enum bt_notification_iterator_status status =
806 BT_NOTIFICATION_ITERATOR_STATUS_OK;
807 int ret;
808
809 priv_comp = bt_private_notification_iterator_get_private_component(
810 priv_notif_iter);
811 assert(priv_comp);
812 muxer_comp = bt_private_component_get_user_data(priv_comp);
813 assert(muxer_comp);
a09c6b95
PP
814
815 if (muxer_comp->initializing_muxer_notif_iter) {
816 /*
089717de
PP
817 * Weird, unhandled situation detected: downstream
818 * creates a muxer notification iterator while creating
819 * another muxer notification iterator (same component).
a09c6b95 820 */
958f7d11
PP
821 goto error;
822 }
823
a09c6b95
PP
824 muxer_comp->initializing_muxer_notif_iter = true;
825 muxer_notif_iter = g_new0(struct muxer_notif_iter, 1);
826 if (!muxer_notif_iter) {
ab11110e
PP
827 goto error;
828 }
829
958f7d11
PP
830 muxer_notif_iter->last_returned_ts_ns = INT64_MIN;
831 muxer_notif_iter->muxer_upstream_notif_iters =
832 g_ptr_array_new_with_free_func(
833 (GDestroyNotify) destroy_muxer_upstream_notif_iter);
834 if (!muxer_notif_iter->muxer_upstream_notif_iters) {
835 goto error;
836 }
837
a09c6b95
PP
838 /*
839 * Add the muxer notification iterator to the component's array
840 * of muxer notification iterators here because
841 * muxer_notif_iter_init_newly_connected_ports() can cause
842 * muxer_port_connected() to be called, which adds the newly
843 * connected port to each muxer notification iterator's list of
844 * newly connected ports.
845 */
846 g_ptr_array_add(muxer_comp->muxer_notif_iters, muxer_notif_iter);
847 ret = muxer_notif_iter_init_newly_connected_ports(muxer_comp,
848 muxer_notif_iter);
849 if (ret) {
850 goto error;
851 }
852
958f7d11
PP
853 ret = bt_private_notification_iterator_set_user_data(priv_notif_iter,
854 muxer_notif_iter);
855 assert(ret == 0);
958f7d11
PP
856 goto end;
857
858error:
a09c6b95
PP
859 if (g_ptr_array_index(muxer_comp->muxer_notif_iters,
860 muxer_comp->muxer_notif_iters->len - 1) == muxer_notif_iter) {
861 g_ptr_array_remove_index(muxer_comp->muxer_notif_iters,
862 muxer_comp->muxer_notif_iters->len - 1);
863 }
864
958f7d11
PP
865 destroy_muxer_notif_iter(muxer_notif_iter);
866 ret = bt_private_notification_iterator_set_user_data(priv_notif_iter,
867 NULL);
868 assert(ret == 0);
869 status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
870
871end:
a09c6b95 872 muxer_comp->initializing_muxer_notif_iter = false;
958f7d11
PP
873 bt_put(priv_comp);
874 return status;
875}
876
877BT_HIDDEN
878void muxer_notif_iter_finalize(
879 struct bt_private_notification_iterator *priv_notif_iter)
880{
881 struct muxer_notif_iter *muxer_notif_iter =
882 bt_private_notification_iterator_get_user_data(priv_notif_iter);
883 struct bt_private_component *priv_comp = NULL;
884 struct muxer_comp *muxer_comp = NULL;
885
886 priv_comp = bt_private_notification_iterator_get_private_component(
887 priv_notif_iter);
888 assert(priv_comp);
889 muxer_comp = bt_private_component_get_user_data(priv_comp);
890
891 if (muxer_comp) {
892 (void) g_ptr_array_remove_fast(muxer_comp->muxer_notif_iters,
893 muxer_notif_iter);
894 destroy_muxer_notif_iter(muxer_notif_iter);
895 }
896
897 bt_put(priv_comp);
898}
899
900BT_HIDDEN
901struct bt_notification_iterator_next_return muxer_notif_iter_next(
902 struct bt_private_notification_iterator *priv_notif_iter)
903{
089717de 904 struct bt_notification_iterator_next_return next_ret;
958f7d11
PP
905 struct muxer_notif_iter *muxer_notif_iter =
906 bt_private_notification_iterator_get_user_data(priv_notif_iter);
907 struct bt_private_component *priv_comp = NULL;
908 struct muxer_comp *muxer_comp = NULL;
958f7d11
PP
909
910 assert(muxer_notif_iter);
911 priv_comp = bt_private_notification_iterator_get_private_component(
912 priv_notif_iter);
913 assert(priv_comp);
914 muxer_comp = bt_private_component_get_user_data(priv_comp);
915 assert(muxer_comp);
916
917 /* Are we in an error state set elsewhere? */
918 if (unlikely(muxer_comp->error)) {
089717de
PP
919 next_ret.notification = NULL;
920 next_ret.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
921 goto end;
958f7d11
PP
922 }
923
089717de 924 next_ret = muxer_notif_iter_do_next(muxer_comp, muxer_notif_iter);
958f7d11
PP
925
926end:
927 bt_put(priv_comp);
928 return next_ret;
929}
930
931BT_HIDDEN
932void muxer_port_connected(
933 struct bt_private_component *priv_comp,
934 struct bt_private_port *self_private_port,
935 struct bt_port *other_port)
936{
937 struct bt_port *self_port =
938 bt_port_from_private_port(self_private_port);
939 struct muxer_comp *muxer_comp =
940 bt_private_component_get_user_data(priv_comp);
941 size_t i;
958f7d11
PP
942
943 assert(self_port);
944 assert(muxer_comp);
945
946 if (bt_port_get_type(self_port) == BT_PORT_TYPE_INPUT) {
947 int ret;
948
949 /* One less available input port */
950 muxer_comp->available_input_ports--;
951 ret = ensure_available_input_port(priv_comp);
952 if (ret) {
089717de
PP
953 /*
954 * Only way to report an error later since this
955 * method does not return anything.
956 */
958f7d11
PP
957 muxer_comp->error = true;
958 goto end;
959 }
960 }
961
962 for (i = 0; i < muxer_comp->muxer_notif_iters->len; i++) {
963 struct muxer_notif_iter *muxer_notif_iter =
964 g_ptr_array_index(muxer_comp->muxer_notif_iters, i);
965
966 /*
ab11110e
PP
967 * Add this port to the list of newly connected ports
968 * for this muxer notification iterator. We append at
969 * the end of this list while
970 * muxer_notif_iter_handle_newly_connected_ports()
971 * removes the nodes from the beginning.
972 *
973 * The list node owns the private port.
958f7d11 974 */
ab11110e
PP
975 muxer_notif_iter->newly_connected_priv_ports =
976 g_list_append(
977 muxer_notif_iter->newly_connected_priv_ports,
978 bt_get(self_private_port));
979 if (!muxer_notif_iter->newly_connected_priv_ports) {
089717de 980 /* Put reference taken by bt_get() above */
ab11110e 981 bt_put(self_private_port);
958f7d11
PP
982 muxer_comp->error = true;
983 goto end;
984 }
985 }
986
987end:
988 bt_put(self_port);
989}
990
991BT_HIDDEN
992void muxer_port_disconnected(struct bt_private_component *priv_comp,
993 struct bt_private_port *priv_port)
994{
995 struct bt_port *port = bt_port_from_private_port(priv_port);
996 struct muxer_comp *muxer_comp =
997 bt_private_component_get_user_data(priv_comp);
998
999 assert(port);
1000 assert(muxer_comp);
1001
ab11110e
PP
1002 /*
1003 * There's nothing special to do when a port is disconnected
1004 * because this component deals with upstream notification
1005 * iterators which were already created thanks to connected
1006 * ports. The fact that the port is disconnected does not cancel
1007 * the upstream notification iterators created using its
089717de
PP
1008 * connection: they still exist, even if the connection is dead.
1009 * The only way to remove an upstream notification iterator is
1010 * for its "next" operation to return
1011 * BT_NOTIFICATION_ITERATOR_STATUS_END.
ab11110e 1012 */
958f7d11
PP
1013 if (bt_port_get_type(port) == BT_PORT_TYPE_INPUT) {
1014 /* One more available input port */
1015 muxer_comp->available_input_ports++;
1016 }
1017
1018 bt_put(port);
1019}
This page took 0.063785 seconds and 4 git commands to generate.