utils.muxer: remove upstream notif. iter. once ended/canceled
[babeltrace.git] / plugins / utils / muxer / muxer.c
1 /*
2 * Copyright 2017 Philippe Proulx <pproulx@efficios.com>
3 *
4 * Permission is hereby granted, free of charge, to any person obtaining a copy
5 * of this software and associated documentation files (the "Software"), to deal
6 * in the Software without restriction, including without limitation the rights
7 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 * copies of the Software, and to permit persons to whom the Software is
9 * furnished to do so, subject to the following conditions:
10 *
11 * The above copyright notice and this permission notice shall be included in
12 * all copies or substantial portions of the Software.
13 *
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20 * SOFTWARE.
21 */
22
23 #include <babeltrace/babeltrace-internal.h>
24 #include <babeltrace/ctf-ir/clock-class.h>
25 #include <babeltrace/ctf-ir/event.h>
26 #include <babeltrace/graph/clock-class-priority-map.h>
27 #include <babeltrace/graph/component-filter.h>
28 #include <babeltrace/graph/component.h>
29 #include <babeltrace/graph/notification-event.h>
30 #include <babeltrace/graph/notification-inactivity.h>
31 #include <babeltrace/graph/notification-iterator.h>
32 #include <babeltrace/graph/notification.h>
33 #include <babeltrace/graph/port.h>
34 #include <babeltrace/graph/private-component-filter.h>
35 #include <babeltrace/graph/private-component.h>
36 #include <babeltrace/graph/private-component.h>
37 #include <babeltrace/graph/private-connection.h>
38 #include <babeltrace/graph/private-notification-iterator.h>
39 #include <babeltrace/graph/private-port.h>
40 #include <plugins-common.h>
41 #include <glib.h>
42 #include <stdbool.h>
43 #include <assert.h>
44 #include <stdlib.h>
45
46 #define ASSUME_ABSOLUTE_CLOCK_CLASSES_PARAM_NAME "assume-absolute-clock-classes"
47
48 struct muxer_comp {
49 /* Array of struct bt_private_notification_iterator * (weak refs) */
50 GPtrArray *muxer_notif_iters;
51
52 /* Weak ref */
53 struct bt_private_component *priv_comp;
54 unsigned int next_port_num;
55 size_t available_input_ports;
56 bool error;
57 bool initializing_muxer_notif_iter;
58 bool ignore_absolute;
59 };
60
61 struct muxer_upstream_notif_iter {
62 /* Owned by this, NULL if ended */
63 struct bt_notification_iterator *notif_iter;
64
65 /*
66 * This flag is true if the upstream notification iterator's
67 * current notification must be considered for the multiplexing
68 * operations. If the upstream iterator returns
69 * BT_NOTIFICATION_ITERATOR_STATUS_AGAIN, then this object
70 * is considered invalid, because its current notification is
71 * still the previous one, but we already took it into account.
72 *
73 * The value of this flag is not important if notif_iter above
74 * is NULL (which means the upstream iterator is finished).
75 */
76 bool is_valid;
77 };
78
79 struct muxer_notif_iter {
80 /*
81 * Array of struct muxer_upstream_notif_iter * (owned by this).
82 *
83 * NOTE: This array is searched in linearly to find the youngest
84 * current notification. Keep this until benchmarks confirm that
85 * another data structure is faster than this for our typical
86 * use cases.
87 */
88 GPtrArray *muxer_upstream_notif_iters;
89
90 /*
91 * List of "recently" connected input ports (weak) to
92 * handle by this muxer notification iterator.
93 * muxer_port_connected() adds entries to this list, and the
94 * entries are removed when a notification iterator is created
95 * on the port's connection and put into
96 * muxer_upstream_notif_iters above by
97 * muxer_notif_iter_handle_newly_connected_ports().
98 */
99 GList *newly_connected_priv_ports;
100
101 /* Next thing to return by the "next" method */
102 struct bt_notification_iterator_next_return next_next_return;
103
104 /* Last time returned in a notification */
105 int64_t last_returned_ts_ns;
106 };
107
108 static
109 void destroy_muxer_upstream_notif_iter(
110 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter)
111 {
112 if (!muxer_upstream_notif_iter) {
113 return;
114 }
115
116 bt_put(muxer_upstream_notif_iter->notif_iter);
117 g_free(muxer_upstream_notif_iter);
118 }
119
120 static
121 struct muxer_upstream_notif_iter *muxer_notif_iter_add_upstream_notif_iter(
122 struct muxer_notif_iter *muxer_notif_iter,
123 struct bt_notification_iterator *notif_iter,
124 struct bt_private_port *priv_port)
125 {
126 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter =
127 g_new0(struct muxer_upstream_notif_iter, 1);
128
129 if (!muxer_upstream_notif_iter) {
130 goto end;
131 }
132
133 muxer_upstream_notif_iter->notif_iter = bt_get(notif_iter);
134 muxer_upstream_notif_iter->is_valid = false;
135 g_ptr_array_add(muxer_notif_iter->muxer_upstream_notif_iters,
136 muxer_upstream_notif_iter);
137
138 end:
139 return muxer_upstream_notif_iter;
140 }
141
142 static
143 int ensure_available_input_port(struct bt_private_component *priv_comp)
144 {
145 struct muxer_comp *muxer_comp =
146 bt_private_component_get_user_data(priv_comp);
147 int ret = 0;
148 GString *port_name = NULL;
149 void *priv_port = NULL;
150
151 assert(muxer_comp);
152
153 if (muxer_comp->available_input_ports >= 1) {
154 goto end;
155 }
156
157 port_name = g_string_new("in");
158 if (!port_name) {
159 ret = -1;
160 goto end;
161 }
162
163 g_string_append_printf(port_name, "%u", muxer_comp->next_port_num);
164 priv_port = bt_private_component_filter_add_input_private_port(
165 priv_comp, port_name->str, NULL);
166 if (!priv_port) {
167 ret = -1;
168 goto end;
169 }
170
171 muxer_comp->available_input_ports++;
172 muxer_comp->next_port_num++;
173
174 end:
175 if (port_name) {
176 g_string_free(port_name, TRUE);
177 }
178
179 bt_put(priv_port);
180 return ret;
181 }
182
183 static
184 int create_output_port(struct bt_private_component *priv_comp)
185 {
186 void *priv_port;
187 int ret = 0;
188
189 priv_port = bt_private_component_filter_add_output_private_port(
190 priv_comp, "out", NULL);
191 if (!priv_port) {
192 ret = -1;
193 }
194
195 bt_put(priv_port);
196 return ret;
197 }
198
199 static
200 void destroy_muxer_comp(struct muxer_comp *muxer_comp)
201 {
202 if (!muxer_comp) {
203 return;
204 }
205
206 if (muxer_comp->muxer_notif_iters) {
207 g_ptr_array_free(muxer_comp->muxer_notif_iters, TRUE);
208 }
209
210 g_free(muxer_comp);
211 }
212
213 static
214 struct bt_value *get_default_params(void)
215 {
216 struct bt_value *params;
217 int ret;
218
219 params = bt_value_map_create();
220 if (!params) {
221 goto error;
222 }
223
224 ret = bt_value_map_insert_bool(params,
225 ASSUME_ABSOLUTE_CLOCK_CLASSES_PARAM_NAME, false);
226 if (ret) {
227 goto error;
228 }
229
230 goto end;
231
232 error:
233 BT_PUT(params);
234
235 end:
236 return params;
237 }
238
239 static
240 int configure_muxer_comp(struct muxer_comp *muxer_comp, struct bt_value *params)
241 {
242 struct bt_value *default_params = NULL;
243 struct bt_value *real_params = NULL;
244 struct bt_value *ignore_absolute = NULL;
245 int ret = 0;
246 bt_bool bool_val;
247
248 default_params = get_default_params();
249 if (!default_params) {
250 goto error;
251 }
252
253 real_params = bt_value_map_extend(default_params, params);
254 if (!real_params) {
255 goto error;
256 }
257
258 ignore_absolute = bt_value_map_get(real_params,
259 ASSUME_ABSOLUTE_CLOCK_CLASSES_PARAM_NAME);
260 if (!bt_value_is_bool(ignore_absolute)) {
261 goto error;
262 }
263
264 if (bt_value_bool_get(ignore_absolute, &bool_val)) {
265 goto error;
266 }
267
268 muxer_comp->ignore_absolute = (bool) bool_val;
269
270 goto end;
271
272 error:
273 ret = -1;
274
275 end:
276 bt_put(default_params);
277 bt_put(real_params);
278 bt_put(ignore_absolute);
279 return ret;
280 }
281
282 BT_HIDDEN
283 enum bt_component_status muxer_init(
284 struct bt_private_component *priv_comp,
285 struct bt_value *params, void *init_data)
286 {
287 int ret;
288 enum bt_component_status status = BT_COMPONENT_STATUS_OK;
289 struct muxer_comp *muxer_comp = g_new0(struct muxer_comp, 1);
290
291 if (!muxer_comp) {
292 goto error;
293 }
294
295 ret = configure_muxer_comp(muxer_comp, params);
296 if (ret) {
297 goto error;
298 }
299
300 muxer_comp->muxer_notif_iters = g_ptr_array_new();
301 if (!muxer_comp->muxer_notif_iters) {
302 goto error;
303 }
304
305 muxer_comp->priv_comp = priv_comp;
306 ret = bt_private_component_set_user_data(priv_comp, muxer_comp);
307 assert(ret == 0);
308 ret = ensure_available_input_port(priv_comp);
309 if (ret) {
310 goto error;
311 }
312
313 ret = create_output_port(priv_comp);
314 if (ret) {
315 goto error;
316 }
317
318 goto end;
319
320 error:
321 destroy_muxer_comp(muxer_comp);
322 ret = bt_private_component_set_user_data(priv_comp, NULL);
323 assert(ret == 0);
324 status = BT_COMPONENT_STATUS_ERROR;
325
326 end:
327 return status;
328 }
329
330 BT_HIDDEN
331 void muxer_finalize(struct bt_private_component *priv_comp)
332 {
333 struct muxer_comp *muxer_comp =
334 bt_private_component_get_user_data(priv_comp);
335
336 destroy_muxer_comp(muxer_comp);
337 }
338
339 static
340 struct bt_notification_iterator *create_notif_iter_on_input_port(
341 struct bt_private_port *priv_port, int *ret)
342 {
343 struct bt_port *port = bt_port_from_private_port(priv_port);
344 struct bt_notification_iterator *notif_iter = NULL;
345 struct bt_private_connection *priv_conn = NULL;
346
347 assert(ret);
348 *ret = 0;
349 assert(port);
350
351 assert(bt_port_is_connected(port));
352 priv_conn = bt_private_port_get_private_connection(priv_port);
353 if (!priv_conn) {
354 *ret = -1;
355 goto end;
356 }
357
358 // TODO: Advance the iterator to >= the time of the latest
359 // returned notification by the muxer notification
360 // iterator which creates it.
361 notif_iter = bt_private_connection_create_notification_iterator(
362 priv_conn, NULL);
363 if (!notif_iter) {
364 *ret = -1;
365 goto end;
366 }
367
368 end:
369 bt_put(port);
370 bt_put(priv_conn);
371 return notif_iter;
372 }
373
374 static
375 enum bt_notification_iterator_status muxer_upstream_notif_iter_next(
376 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter)
377 {
378 enum bt_notification_iterator_status status;
379
380 status = bt_notification_iterator_next(
381 muxer_upstream_notif_iter->notif_iter);
382
383 switch (status) {
384 case BT_NOTIFICATION_ITERATOR_STATUS_OK:
385 /*
386 * Notification iterator's current notification is valid:
387 * it must be considered for muxing operations.
388 */
389 muxer_upstream_notif_iter->is_valid = true;
390 break;
391 case BT_NOTIFICATION_ITERATOR_STATUS_AGAIN:
392 /*
393 * Notification iterator's current notification is not
394 * valid anymore. Return
395 * BT_NOTIFICATION_ITERATOR_STATUS_AGAIN
396 * immediately.
397 */
398 muxer_upstream_notif_iter->is_valid = false;
399 break;
400 case BT_NOTIFICATION_ITERATOR_STATUS_END: /* Fall-through. */
401 case BT_NOTIFICATION_ITERATOR_STATUS_CANCELED:
402 /*
403 * Notification iterator reached the end: release it. It
404 * won't be considered again to find the youngest
405 * notification.
406 */
407 BT_PUT(muxer_upstream_notif_iter->notif_iter);
408 muxer_upstream_notif_iter->is_valid = false;
409 status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
410 break;
411 default:
412 /* Error or unsupported status code */
413 status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
414 break;
415 }
416
417 return status;
418 }
419
420 static
421 int muxer_notif_iter_handle_newly_connected_ports(
422 struct muxer_notif_iter *muxer_notif_iter)
423 {
424 int ret = 0;
425
426 /*
427 * Here we create one upstream notification iterator for each
428 * newly connected port. We do not perform an initial "next" on
429 * those new upstream notification iterators: they are
430 * invalidated, to be validated later. The list of newly
431 * connected ports to handle here is updated by
432 * muxer_port_connected().
433 */
434 while (true) {
435 GList *node = muxer_notif_iter->newly_connected_priv_ports;
436 struct bt_private_port *priv_port;
437 struct bt_port *port;
438 struct bt_notification_iterator *upstream_notif_iter = NULL;
439 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter;
440
441 if (!node) {
442 break;
443 }
444
445 priv_port = node->data;
446 port = bt_port_from_private_port(priv_port);
447 assert(port);
448
449 if (!bt_port_is_connected(port)) {
450 /*
451 * Looks like this port is not connected
452 * anymore: we can't create an upstream
453 * notification iterator on its (non-existing)
454 * connection in this case.
455 */
456 goto remove_node;
457 }
458
459 BT_PUT(port);
460 upstream_notif_iter = create_notif_iter_on_input_port(priv_port,
461 &ret);
462 if (ret) {
463 assert(!upstream_notif_iter);
464 goto error;
465 }
466
467 muxer_upstream_notif_iter =
468 muxer_notif_iter_add_upstream_notif_iter(
469 muxer_notif_iter, upstream_notif_iter,
470 priv_port);
471 BT_PUT(upstream_notif_iter);
472 if (!muxer_upstream_notif_iter) {
473 goto error;
474 }
475
476 remove_node:
477 bt_put(upstream_notif_iter);
478 bt_put(port);
479 muxer_notif_iter->newly_connected_priv_ports =
480 g_list_delete_link(
481 muxer_notif_iter->newly_connected_priv_ports,
482 node);
483 }
484
485 goto end;
486
487 error:
488 if (ret >= 0) {
489 ret = -1;
490 }
491
492 end:
493 return ret;
494 }
495
496 static
497 int get_notif_ts_ns(struct muxer_comp *muxer_comp,
498 struct bt_notification *notif, int64_t last_returned_ts_ns,
499 int64_t *ts_ns)
500 {
501 struct bt_clock_class_priority_map *cc_prio_map = NULL;
502 struct bt_ctf_clock_class *clock_class = NULL;
503 struct bt_ctf_clock_value *clock_value = NULL;
504 struct bt_ctf_event *event = NULL;
505 int ret = 0;
506
507 assert(notif);
508 assert(ts_ns);
509
510 switch (bt_notification_get_type(notif)) {
511 case BT_NOTIFICATION_TYPE_EVENT:
512 cc_prio_map =
513 bt_notification_event_get_clock_class_priority_map(
514 notif);
515 break;
516
517 case BT_NOTIFICATION_TYPE_INACTIVITY:
518 cc_prio_map =
519 bt_notification_inactivity_get_clock_class_priority_map(
520 notif);
521 break;
522 default:
523 /* All the other notifications have a higher priority */
524 *ts_ns = last_returned_ts_ns;
525 goto end;
526 }
527
528 if (!cc_prio_map) {
529 goto error;
530 }
531
532 /*
533 * If the clock class priority map is empty, then we consider
534 * that this notification has no time. In this case it's always
535 * the youngest.
536 */
537 if (bt_clock_class_priority_map_get_clock_class_count(cc_prio_map) == 0) {
538 *ts_ns = last_returned_ts_ns;
539 goto end;
540 }
541
542 clock_class =
543 bt_clock_class_priority_map_get_highest_priority_clock_class(
544 cc_prio_map);
545 if (!clock_class) {
546 goto error;
547 }
548
549 if (!muxer_comp->ignore_absolute &&
550 !bt_ctf_clock_class_is_absolute(clock_class)) {
551 goto error;
552 }
553
554 switch (bt_notification_get_type(notif)) {
555 case BT_NOTIFICATION_TYPE_EVENT:
556 event = bt_notification_event_get_event(notif);
557 assert(event);
558 clock_value = bt_ctf_event_get_clock_value(event,
559 clock_class);
560 break;
561 case BT_NOTIFICATION_TYPE_INACTIVITY:
562 clock_value = bt_notification_inactivity_get_clock_value(
563 notif, clock_class);
564 break;
565 default:
566 abort();
567 }
568
569 if (!clock_value) {
570 goto error;
571 }
572
573 ret = bt_ctf_clock_value_get_value_ns_from_epoch(clock_value, ts_ns);
574 if (ret) {
575 goto error;
576 }
577
578 goto end;
579
580 error:
581 ret = -1;
582
583 end:
584 bt_put(cc_prio_map);
585 bt_put(event);
586 bt_put(clock_class);
587 bt_put(clock_value);
588 return ret;
589 }
590
591 /*
592 * This function finds the youngest available notification amongst the
593 * non-ended upstream notification iterators and returns the upstream
594 * notification iterator which has it, or
595 * BT_NOTIFICATION_ITERATOR_STATUS_END if there's no available
596 * notification.
597 *
598 * This function does NOT:
599 *
600 * * Update any upstream notification iterator.
601 * * Check for newly connected ports.
602 * * Check the upstream notification iterators to retry.
603 *
604 * On sucess, this function sets *muxer_upstream_notif_iter to the
605 * upstream notification iterator of which the current notification is
606 * the youngest, and sets *ts_ns to its time.
607 */
608 static
609 enum bt_notification_iterator_status
610 muxer_notif_iter_youngest_upstream_notif_iter(
611 struct muxer_comp *muxer_comp,
612 struct muxer_notif_iter *muxer_notif_iter,
613 struct muxer_upstream_notif_iter **muxer_upstream_notif_iter,
614 int64_t *ts_ns)
615 {
616 size_t i;
617 int ret;
618 int64_t youngest_ts_ns = INT64_MAX;
619 enum bt_notification_iterator_status status =
620 BT_NOTIFICATION_ITERATOR_STATUS_OK;
621
622 assert(muxer_comp);
623 assert(muxer_notif_iter);
624 assert(muxer_upstream_notif_iter);
625 *muxer_upstream_notif_iter = NULL;
626
627 for (i = 0; i < muxer_notif_iter->muxer_upstream_notif_iters->len; i++) {
628 struct bt_notification *notif;
629 struct muxer_upstream_notif_iter *cur_muxer_upstream_notif_iter =
630 g_ptr_array_index(muxer_notif_iter->muxer_upstream_notif_iters, i);
631 int64_t notif_ts_ns;
632
633 if (!cur_muxer_upstream_notif_iter->notif_iter) {
634 /* This upstream notification iterator is ended */
635 continue;
636 }
637
638 assert(cur_muxer_upstream_notif_iter->is_valid);
639 notif = bt_notification_iterator_get_notification(
640 cur_muxer_upstream_notif_iter->notif_iter);
641 assert(notif);
642 ret = get_notif_ts_ns(muxer_comp, notif,
643 muxer_notif_iter->last_returned_ts_ns, &notif_ts_ns);
644 bt_put(notif);
645 if (ret) {
646 *muxer_upstream_notif_iter = NULL;
647 status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
648 goto end;
649 }
650
651 if (notif_ts_ns <= youngest_ts_ns) {
652 *muxer_upstream_notif_iter =
653 cur_muxer_upstream_notif_iter;
654 youngest_ts_ns = notif_ts_ns;
655 *ts_ns = youngest_ts_ns;
656 }
657 }
658
659 if (!*muxer_upstream_notif_iter) {
660 status = BT_NOTIFICATION_ITERATOR_STATUS_END;
661 *ts_ns = INT64_MIN;
662 }
663
664 end:
665 return status;
666 }
667
668 static
669 enum bt_notification_iterator_status validate_muxer_upstream_notif_iter(
670 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter)
671 {
672 enum bt_notification_iterator_status status =
673 BT_NOTIFICATION_ITERATOR_STATUS_OK;
674
675 if (muxer_upstream_notif_iter->is_valid ||
676 !muxer_upstream_notif_iter->notif_iter) {
677 goto end;
678 }
679
680 status = muxer_upstream_notif_iter_next(muxer_upstream_notif_iter);
681
682 end:
683 return status;
684 }
685
686 static
687 enum bt_notification_iterator_status validate_muxer_upstream_notif_iters(
688 struct muxer_notif_iter *muxer_notif_iter)
689 {
690 enum bt_notification_iterator_status status =
691 BT_NOTIFICATION_ITERATOR_STATUS_OK;
692 size_t i;
693
694 for (i = 0; i < muxer_notif_iter->muxer_upstream_notif_iters->len; i++) {
695 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter =
696 g_ptr_array_index(
697 muxer_notif_iter->muxer_upstream_notif_iters,
698 i);
699
700 status = validate_muxer_upstream_notif_iter(
701 muxer_upstream_notif_iter);
702 if (status != BT_NOTIFICATION_ITERATOR_STATUS_OK) {
703 goto end;
704 }
705
706 /*
707 * Remove this muxer upstream notification iterator
708 * if it's ended or canceled.
709 */
710 if (!muxer_upstream_notif_iter->notif_iter) {
711 /*
712 * Use g_ptr_array_remove_fast() because the
713 * order of those elements is not important.
714 */
715 g_ptr_array_remove_index_fast(
716 muxer_notif_iter->muxer_upstream_notif_iters,
717 i);
718 i--;
719 }
720 }
721
722 end:
723 return status;
724 }
725
726 static
727 struct bt_notification_iterator_next_return muxer_notif_iter_do_next(
728 struct muxer_comp *muxer_comp,
729 struct muxer_notif_iter *muxer_notif_iter)
730 {
731 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter = NULL;
732 struct bt_notification_iterator_next_return next_return = {
733 .notification = NULL,
734 .status = BT_NOTIFICATION_ITERATOR_STATUS_OK,
735 };
736 int64_t next_return_ts;
737
738 while (true) {
739 int ret = muxer_notif_iter_handle_newly_connected_ports(
740 muxer_notif_iter);
741
742 if (ret) {
743 next_return.status =
744 BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
745 goto end;
746 }
747
748 next_return.status =
749 validate_muxer_upstream_notif_iters(muxer_notif_iter);
750 if (next_return.status != BT_NOTIFICATION_ITERATOR_STATUS_OK) {
751 goto end;
752 }
753
754 /*
755 * At this point, we know that all the existing upstream
756 * notification iterators are valid. However the
757 * operations to validate them (during
758 * validate_muxer_upstream_notif_iters()) may have
759 * connected new ports. If no ports were connected
760 * during this operation, exit the loop.
761 */
762 if (!muxer_notif_iter->newly_connected_priv_ports) {
763 break;
764 }
765 }
766
767 assert(!muxer_notif_iter->newly_connected_priv_ports);
768
769 /*
770 * At this point we know that all the existing upstream
771 * notification iterators are valid. We can find the one,
772 * amongst those, of which the current notification is the
773 * youngest.
774 */
775 next_return.status =
776 muxer_notif_iter_youngest_upstream_notif_iter(muxer_comp,
777 muxer_notif_iter, &muxer_upstream_notif_iter,
778 &next_return_ts);
779 if (next_return.status < 0 ||
780 next_return.status == BT_NOTIFICATION_ITERATOR_STATUS_END ||
781 next_return.status == BT_NOTIFICATION_ITERATOR_STATUS_CANCELED) {
782 goto end;
783 }
784
785 if (next_return_ts < muxer_notif_iter->last_returned_ts_ns) {
786 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
787 goto end;
788 }
789
790 assert(next_return.status == BT_NOTIFICATION_ITERATOR_STATUS_OK);
791 assert(muxer_upstream_notif_iter);
792 next_return.notification = bt_notification_iterator_get_notification(
793 muxer_upstream_notif_iter->notif_iter);
794 assert(next_return.notification);
795
796 /*
797 * We invalidate the upstream notification iterator so that, the
798 * next time this function is called,
799 * validate_muxer_upstream_notif_iters() will make it valid.
800 */
801 muxer_upstream_notif_iter->is_valid = false;
802 muxer_notif_iter->last_returned_ts_ns = next_return_ts;
803
804 end:
805 return next_return;
806 }
807
808 static
809 void destroy_muxer_notif_iter(struct muxer_notif_iter *muxer_notif_iter)
810 {
811 if (!muxer_notif_iter) {
812 return;
813 }
814
815 if (muxer_notif_iter->muxer_upstream_notif_iters) {
816 g_ptr_array_free(
817 muxer_notif_iter->muxer_upstream_notif_iters, TRUE);
818 }
819
820 g_list_free(muxer_notif_iter->newly_connected_priv_ports);
821 g_free(muxer_notif_iter);
822 }
823
824 static
825 int muxer_notif_iter_init_newly_connected_ports(struct muxer_comp *muxer_comp,
826 struct muxer_notif_iter *muxer_notif_iter)
827 {
828 struct bt_component *comp;
829 int64_t count;
830 int64_t i;
831 int ret = 0;
832
833 /*
834 * Add the connected input ports to this muxer notification
835 * iterator's list of newly connected ports. They will be
836 * handled by muxer_notif_iter_handle_newly_connected_ports().
837 */
838 comp = bt_component_from_private_component(muxer_comp->priv_comp);
839 assert(comp);
840 count = bt_component_filter_get_input_port_count(comp);
841 if (count < 0) {
842 goto end;
843 }
844
845 for (i = 0; i < count; i++) {
846 struct bt_private_port *priv_port =
847 bt_private_component_filter_get_input_private_port_by_index(
848 muxer_comp->priv_comp, i);
849 struct bt_port *port;
850
851 assert(priv_port);
852 port = bt_port_from_private_port(priv_port);
853 assert(port);
854
855 if (!bt_port_is_connected(port)) {
856 bt_put(priv_port);
857 bt_put(port);
858 continue;
859 }
860
861 bt_put(port);
862 bt_put(priv_port);
863 muxer_notif_iter->newly_connected_priv_ports =
864 g_list_append(
865 muxer_notif_iter->newly_connected_priv_ports,
866 priv_port);
867 if (!muxer_notif_iter->newly_connected_priv_ports) {
868 ret = -1;
869 goto end;
870 }
871 }
872
873 end:
874 bt_put(comp);
875 return ret;
876 }
877
878 BT_HIDDEN
879 enum bt_notification_iterator_status muxer_notif_iter_init(
880 struct bt_private_notification_iterator *priv_notif_iter,
881 struct bt_private_port *output_priv_port)
882 {
883 struct muxer_comp *muxer_comp = NULL;
884 struct muxer_notif_iter *muxer_notif_iter = NULL;
885 struct bt_private_component *priv_comp = NULL;
886 enum bt_notification_iterator_status status =
887 BT_NOTIFICATION_ITERATOR_STATUS_OK;
888 int ret;
889
890 priv_comp = bt_private_notification_iterator_get_private_component(
891 priv_notif_iter);
892 assert(priv_comp);
893 muxer_comp = bt_private_component_get_user_data(priv_comp);
894 assert(muxer_comp);
895
896 if (muxer_comp->initializing_muxer_notif_iter) {
897 /*
898 * Weird, unhandled situation detected: downstream
899 * creates a muxer notification iterator while creating
900 * another muxer notification iterator (same component).
901 */
902 goto error;
903 }
904
905 muxer_comp->initializing_muxer_notif_iter = true;
906 muxer_notif_iter = g_new0(struct muxer_notif_iter, 1);
907 if (!muxer_notif_iter) {
908 goto error;
909 }
910
911 muxer_notif_iter->last_returned_ts_ns = INT64_MIN;
912 muxer_notif_iter->muxer_upstream_notif_iters =
913 g_ptr_array_new_with_free_func(
914 (GDestroyNotify) destroy_muxer_upstream_notif_iter);
915 if (!muxer_notif_iter->muxer_upstream_notif_iters) {
916 goto error;
917 }
918
919 /*
920 * Add the muxer notification iterator to the component's array
921 * of muxer notification iterators here because
922 * muxer_notif_iter_init_newly_connected_ports() can cause
923 * muxer_port_connected() to be called, which adds the newly
924 * connected port to each muxer notification iterator's list of
925 * newly connected ports.
926 */
927 g_ptr_array_add(muxer_comp->muxer_notif_iters, muxer_notif_iter);
928 ret = muxer_notif_iter_init_newly_connected_ports(muxer_comp,
929 muxer_notif_iter);
930 if (ret) {
931 goto error;
932 }
933
934 ret = bt_private_notification_iterator_set_user_data(priv_notif_iter,
935 muxer_notif_iter);
936 assert(ret == 0);
937 goto end;
938
939 error:
940 if (g_ptr_array_index(muxer_comp->muxer_notif_iters,
941 muxer_comp->muxer_notif_iters->len - 1) == muxer_notif_iter) {
942 g_ptr_array_remove_index(muxer_comp->muxer_notif_iters,
943 muxer_comp->muxer_notif_iters->len - 1);
944 }
945
946 destroy_muxer_notif_iter(muxer_notif_iter);
947 ret = bt_private_notification_iterator_set_user_data(priv_notif_iter,
948 NULL);
949 assert(ret == 0);
950 status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
951
952 end:
953 muxer_comp->initializing_muxer_notif_iter = false;
954 bt_put(priv_comp);
955 return status;
956 }
957
958 BT_HIDDEN
959 void muxer_notif_iter_finalize(
960 struct bt_private_notification_iterator *priv_notif_iter)
961 {
962 struct muxer_notif_iter *muxer_notif_iter =
963 bt_private_notification_iterator_get_user_data(priv_notif_iter);
964 struct bt_private_component *priv_comp = NULL;
965 struct muxer_comp *muxer_comp = NULL;
966
967 priv_comp = bt_private_notification_iterator_get_private_component(
968 priv_notif_iter);
969 assert(priv_comp);
970 muxer_comp = bt_private_component_get_user_data(priv_comp);
971
972 if (muxer_comp) {
973 (void) g_ptr_array_remove_fast(muxer_comp->muxer_notif_iters,
974 muxer_notif_iter);
975 destroy_muxer_notif_iter(muxer_notif_iter);
976 }
977
978 bt_put(priv_comp);
979 }
980
981 BT_HIDDEN
982 struct bt_notification_iterator_next_return muxer_notif_iter_next(
983 struct bt_private_notification_iterator *priv_notif_iter)
984 {
985 struct bt_notification_iterator_next_return next_ret;
986 struct muxer_notif_iter *muxer_notif_iter =
987 bt_private_notification_iterator_get_user_data(priv_notif_iter);
988 struct bt_private_component *priv_comp = NULL;
989 struct muxer_comp *muxer_comp = NULL;
990
991 assert(muxer_notif_iter);
992 priv_comp = bt_private_notification_iterator_get_private_component(
993 priv_notif_iter);
994 assert(priv_comp);
995 muxer_comp = bt_private_component_get_user_data(priv_comp);
996 assert(muxer_comp);
997
998 /* Are we in an error state set elsewhere? */
999 if (unlikely(muxer_comp->error)) {
1000 next_ret.notification = NULL;
1001 next_ret.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
1002 goto end;
1003 }
1004
1005 next_ret = muxer_notif_iter_do_next(muxer_comp, muxer_notif_iter);
1006
1007 end:
1008 bt_put(priv_comp);
1009 return next_ret;
1010 }
1011
1012 BT_HIDDEN
1013 void muxer_port_connected(
1014 struct bt_private_component *priv_comp,
1015 struct bt_private_port *self_private_port,
1016 struct bt_port *other_port)
1017 {
1018 struct bt_port *self_port =
1019 bt_port_from_private_port(self_private_port);
1020 struct muxer_comp *muxer_comp =
1021 bt_private_component_get_user_data(priv_comp);
1022 size_t i;
1023 int ret;
1024
1025 assert(self_port);
1026 assert(muxer_comp);
1027
1028 if (bt_port_get_type(self_port) == BT_PORT_TYPE_OUTPUT) {
1029 goto end;
1030 }
1031
1032 for (i = 0; i < muxer_comp->muxer_notif_iters->len; i++) {
1033 struct muxer_notif_iter *muxer_notif_iter =
1034 g_ptr_array_index(muxer_comp->muxer_notif_iters, i);
1035
1036 /*
1037 * Add this port to the list of newly connected ports
1038 * for this muxer notification iterator. We append at
1039 * the end of this list while
1040 * muxer_notif_iter_handle_newly_connected_ports()
1041 * removes the nodes from the beginning.
1042 *
1043 * The list node owns the private port.
1044 */
1045 muxer_notif_iter->newly_connected_priv_ports =
1046 g_list_append(
1047 muxer_notif_iter->newly_connected_priv_ports,
1048 self_private_port);
1049 if (!muxer_notif_iter->newly_connected_priv_ports) {
1050 /* Put reference taken by bt_get() above */
1051 muxer_comp->error = true;
1052 goto end;
1053 }
1054 }
1055
1056 /* One less available input port */
1057 muxer_comp->available_input_ports--;
1058 ret = ensure_available_input_port(priv_comp);
1059 if (ret) {
1060 /*
1061 * Only way to report an error later since this
1062 * method does not return anything.
1063 */
1064 muxer_comp->error = true;
1065 goto end;
1066 }
1067
1068 end:
1069 bt_put(self_port);
1070 }
1071
1072 BT_HIDDEN
1073 void muxer_port_disconnected(struct bt_private_component *priv_comp,
1074 struct bt_private_port *priv_port)
1075 {
1076 struct bt_port *port = bt_port_from_private_port(priv_port);
1077 struct muxer_comp *muxer_comp =
1078 bt_private_component_get_user_data(priv_comp);
1079
1080 assert(port);
1081 assert(muxer_comp);
1082
1083 /*
1084 * There's nothing special to do when a port is disconnected
1085 * because this component deals with upstream notification
1086 * iterators which were already created thanks to connected
1087 * ports. The fact that the port is disconnected does not cancel
1088 * the upstream notification iterators created using its
1089 * connection: they still exist, even if the connection is dead.
1090 * The only way to remove an upstream notification iterator is
1091 * for its "next" operation to return
1092 * BT_NOTIFICATION_ITERATOR_STATUS_END.
1093 */
1094 if (bt_port_get_type(port) == BT_PORT_TYPE_INPUT) {
1095 /* One more available input port */
1096 muxer_comp->available_input_ports++;
1097 }
1098
1099 bt_put(port);
1100 }
This page took 0.05162 seconds and 5 git commands to generate.