utils.muxer: fix bad behaviour caused by notification buffering
[babeltrace.git] / plugins / utils / muxer / muxer.c
1 /*
2 * Copyright 2017 Philippe Proulx <pproulx@efficios.com>
3 *
4 * Permission is hereby granted, free of charge, to any person obtaining a copy
5 * of this software and associated documentation files (the "Software"), to deal
6 * in the Software without restriction, including without limitation the rights
7 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 * copies of the Software, and to permit persons to whom the Software is
9 * furnished to do so, subject to the following conditions:
10 *
11 * The above copyright notice and this permission notice shall be included in
12 * all copies or substantial portions of the Software.
13 *
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20 * SOFTWARE.
21 */
22
23 #include <babeltrace/babeltrace-internal.h>
24 #include <babeltrace/ctf-ir/clock-class.h>
25 #include <babeltrace/ctf-ir/event.h>
26 #include <babeltrace/graph/clock-class-priority-map.h>
27 #include <babeltrace/graph/component-filter.h>
28 #include <babeltrace/graph/component.h>
29 #include <babeltrace/graph/notification-event.h>
30 #include <babeltrace/graph/notification-inactivity.h>
31 #include <babeltrace/graph/notification-iterator.h>
32 #include <babeltrace/graph/notification.h>
33 #include <babeltrace/graph/port.h>
34 #include <babeltrace/graph/private-component-filter.h>
35 #include <babeltrace/graph/private-component.h>
36 #include <babeltrace/graph/private-component.h>
37 #include <babeltrace/graph/private-connection.h>
38 #include <babeltrace/graph/private-notification-iterator.h>
39 #include <babeltrace/graph/private-port.h>
40 #include <plugins-common.h>
41 #include <glib.h>
42 #include <assert.h>
43
44 struct muxer_comp {
45 /* Array of struct bt_private_notification_iterator * (weak refs) */
46 GPtrArray *muxer_notif_iters;
47
48 /* Weak ref */
49 struct bt_private_component *priv_comp;
50 unsigned int next_port_num;
51 size_t available_input_ports;
52 bool error;
53 bool initializing_muxer_notif_iter;
54 };
55
56 struct muxer_upstream_notif_iter {
57 /* Owned by this, NULL if ended */
58 struct bt_notification_iterator *notif_iter;
59
60 /* Owned by this*/
61 struct bt_private_port *priv_port;
62
63 /*
64 * This flag is true if the upstream notification iterator's
65 * current notification must be considered for the multiplexing
66 * operations. If the upstream iterator returns
67 * BT_NOTIFICATION_ITERATOR_STATUS_AGAIN, then this object
68 * is considered invalid, because its current notification is
69 * still the previous one, but we already took it into account.
70 *
71 * The value of this flag is not important if notif_iter above
72 * is NULL (which means the upstream iterator is finished).
73 */
74 bool is_valid;
75 };
76
77 struct muxer_notif_iter {
78 /*
79 * Array of struct muxer_upstream_notif_iter * (owned by this).
80 *
81 * NOTE: This array is searched in linearly to find the youngest
82 * current notification. Keep this until benchmarks confirm that
83 * another data structure is faster than this for our typical
84 * use cases.
85 */
86 GPtrArray *muxer_upstream_notif_iters;
87
88 /*
89 * List of "recently" connected input ports (owned by this) to
90 * handle by this muxer notification iterator.
91 * muxer_port_connected() adds entries to this list, and the
92 * entries are removed when a notification iterator is created
93 * on the port's connection and put into
94 * muxer_upstream_notif_iters above by
95 * muxer_notif_iter_handle_newly_connected_ports().
96 */
97 GList *newly_connected_priv_ports;
98
99 /* Next thing to return by the "next" method */
100 struct bt_notification_iterator_next_return next_next_return;
101
102 /* Last time returned in a notification */
103 int64_t last_returned_ts_ns;
104 };
105
106 static
107 void destroy_muxer_upstream_notif_iter(
108 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter)
109 {
110 if (!muxer_upstream_notif_iter) {
111 return;
112 }
113
114 bt_put(muxer_upstream_notif_iter->notif_iter);
115 bt_put(muxer_upstream_notif_iter->priv_port);
116 g_free(muxer_upstream_notif_iter);
117 }
118
119 static
120 struct muxer_upstream_notif_iter *muxer_notif_iter_add_upstream_notif_iter(
121 struct muxer_notif_iter *muxer_notif_iter,
122 struct bt_notification_iterator *notif_iter,
123 struct bt_private_port *priv_port)
124 {
125 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter =
126 g_new0(struct muxer_upstream_notif_iter, 1);
127
128 if (!muxer_upstream_notif_iter) {
129 goto end;
130 }
131
132 muxer_upstream_notif_iter->notif_iter = bt_get(notif_iter);
133 muxer_upstream_notif_iter->priv_port = bt_get(priv_port);
134 muxer_upstream_notif_iter->is_valid = false;
135 g_ptr_array_add(muxer_notif_iter->muxer_upstream_notif_iters,
136 muxer_upstream_notif_iter);
137
138 end:
139 return muxer_upstream_notif_iter;
140 }
141
142 static
143 int ensure_available_input_port(struct bt_private_component *priv_comp)
144 {
145 struct muxer_comp *muxer_comp =
146 bt_private_component_get_user_data(priv_comp);
147 int ret = 0;
148 GString *port_name = NULL;
149 void *priv_port = NULL;
150
151 assert(muxer_comp);
152
153 if (muxer_comp->available_input_ports >= 1) {
154 goto end;
155 }
156
157 port_name = g_string_new("in");
158 if (!port_name) {
159 ret = -1;
160 goto end;
161 }
162
163 g_string_append_printf(port_name, "%u", muxer_comp->next_port_num);
164 priv_port = bt_private_component_filter_add_input_private_port(
165 priv_comp, port_name->str, NULL);
166 if (!priv_port) {
167 ret = -1;
168 goto end;
169 }
170
171 muxer_comp->available_input_ports++;
172 muxer_comp->next_port_num++;
173
174 end:
175 if (port_name) {
176 g_string_free(port_name, TRUE);
177 }
178
179 BT_PUT(priv_port);
180 return ret;
181 }
182
183 static
184 int remove_default_ports(struct bt_private_component *priv_comp)
185 {
186 struct bt_private_port *priv_port;
187 int ret = 0;
188
189 priv_port = bt_private_component_filter_get_default_input_private_port(
190 priv_comp);
191 if (priv_port) {
192 ret = bt_private_port_remove_from_component(priv_port);
193 if (ret) {
194 goto end;
195 }
196 }
197
198 bt_put(priv_port);
199 priv_port = bt_private_component_filter_get_default_output_private_port(
200 priv_comp);
201 if (priv_port) {
202 ret = bt_private_port_remove_from_component(priv_port);
203 if (ret) {
204 goto end;
205 }
206 }
207
208 end:
209 bt_put(priv_port);
210 return ret;
211 }
212
213 static
214 int create_output_port(struct bt_private_component *priv_comp)
215 {
216 void *priv_port;
217 int ret = 0;
218
219 priv_port = bt_private_component_filter_add_output_private_port(
220 priv_comp, "out", NULL);
221 if (!priv_port) {
222 ret = -1;
223 }
224
225 bt_put(priv_port);
226 return ret;
227 }
228
229 static
230 void destroy_muxer_comp(struct muxer_comp *muxer_comp)
231 {
232 if (!muxer_comp) {
233 return;
234 }
235
236 if (muxer_comp->muxer_notif_iters) {
237 g_ptr_array_free(muxer_comp->muxer_notif_iters, TRUE);
238 }
239
240 g_free(muxer_comp);
241 }
242
243 BT_HIDDEN
244 enum bt_component_status muxer_init(
245 struct bt_private_component *priv_comp,
246 struct bt_value *params, void *init_data)
247 {
248 int ret;
249 enum bt_component_status status = BT_COMPONENT_STATUS_OK;
250 struct muxer_comp *muxer_comp = g_new0(struct muxer_comp, 1);
251
252 if (!muxer_comp) {
253 goto error;
254 }
255
256 muxer_comp->muxer_notif_iters = g_ptr_array_new();
257 if (!muxer_comp->muxer_notif_iters) {
258 goto error;
259 }
260
261 muxer_comp->priv_comp = priv_comp;
262 ret = bt_private_component_set_user_data(priv_comp, muxer_comp);
263 assert(ret == 0);
264 ret = remove_default_ports(priv_comp);
265 if (ret) {
266 goto error;
267 }
268
269 ret = ensure_available_input_port(priv_comp);
270 if (ret) {
271 goto error;
272 }
273
274 ret = create_output_port(priv_comp);
275 if (ret) {
276 goto error;
277 }
278
279 goto end;
280
281 error:
282 destroy_muxer_comp(muxer_comp);
283 ret = bt_private_component_set_user_data(priv_comp, NULL);
284 assert(ret == 0);
285 status = BT_COMPONENT_STATUS_ERROR;
286
287 end:
288 return status;
289 }
290
291 BT_HIDDEN
292 void muxer_finalize(struct bt_private_component *priv_comp)
293 {
294 struct muxer_comp *muxer_comp =
295 bt_private_component_get_user_data(priv_comp);
296
297 destroy_muxer_comp(muxer_comp);
298 }
299
300 static
301 struct bt_notification_iterator *create_notif_iter_on_input_port(
302 struct bt_private_port *priv_port, int *ret)
303 {
304 struct bt_port *port = bt_port_from_private_port(priv_port);
305 struct bt_notification_iterator *notif_iter = NULL;
306 struct bt_private_connection *priv_conn = NULL;
307
308 assert(ret);
309 *ret = 0;
310 assert(port);
311
312 assert(bt_port_is_connected(port));
313 priv_conn = bt_private_port_get_private_connection(priv_port);
314 if (!priv_conn) {
315 *ret = -1;
316 goto end;
317 }
318
319 // TODO: Advance the iterator to >= the time of the latest
320 // returned notification by the muxer notification
321 // iterator which creates it.
322 notif_iter = bt_private_connection_create_notification_iterator(
323 priv_conn);
324 if (!notif_iter) {
325 *ret = -1;
326 goto end;
327 }
328
329 end:
330 bt_put(port);
331 bt_put(priv_conn);
332 return notif_iter;
333 }
334
335 static
336 enum bt_notification_iterator_status muxer_upstream_notif_iter_next(
337 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter)
338 {
339 enum bt_notification_iterator_status status;
340
341 status = bt_notification_iterator_next(
342 muxer_upstream_notif_iter->notif_iter);
343
344 switch (status) {
345 case BT_NOTIFICATION_ITERATOR_STATUS_OK:
346 /*
347 * Notification iterator's current notification is valid:
348 * it must be considered for muxing operations.
349 */
350 muxer_upstream_notif_iter->is_valid = true;
351 break;
352 case BT_NOTIFICATION_ITERATOR_STATUS_AGAIN:
353 /*
354 * Notification iterator's current notification is not
355 * valid anymore. Return
356 * BT_NOTIFICATION_ITERATOR_STATUS_AGAIN
357 * immediately.
358 */
359 muxer_upstream_notif_iter->is_valid = false;
360 break;
361 case BT_NOTIFICATION_ITERATOR_STATUS_END:
362 /*
363 * Notification iterator reached the end: release it. It
364 * won't be considered again to find the youngest
365 * notification.
366 */
367 BT_PUT(muxer_upstream_notif_iter->notif_iter);
368 muxer_upstream_notif_iter->is_valid = false;
369 status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
370 break;
371 default:
372 /* Error or unsupported status code */
373 status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
374 break;
375 }
376
377 return status;
378 }
379
380 static
381 int muxer_notif_iter_handle_newly_connected_ports(
382 struct muxer_notif_iter *muxer_notif_iter)
383 {
384 int ret = 0;
385
386 /*
387 * Here we create one upstream notification iterator for each
388 * newly connected port. We do not perform an initial "next" on
389 * those new upstream notification iterators: they are
390 * invalidated, to be validated later. The list of newly
391 * connected ports to handle here is updated by
392 * muxer_port_connected().
393 */
394 while (true) {
395 GList *node = muxer_notif_iter->newly_connected_priv_ports;
396 struct bt_private_port *priv_port;
397 struct bt_port *port;
398 struct bt_notification_iterator *upstream_notif_iter = NULL;
399 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter;
400
401 if (!node) {
402 break;
403 }
404
405 priv_port = node->data;
406 port = bt_port_from_private_port(priv_port);
407 assert(port);
408
409 if (!bt_port_is_connected(port)) {
410 /*
411 * Looks like this port is not connected
412 * anymore: we can't create an upstream
413 * notification iterator on its (non-existing)
414 * connection in this case.
415 */
416 goto remove_node;
417 }
418
419 BT_PUT(port);
420 upstream_notif_iter = create_notif_iter_on_input_port(priv_port,
421 &ret);
422 if (ret) {
423 assert(!upstream_notif_iter);
424 bt_put(priv_port);
425 goto error;
426 }
427
428 muxer_upstream_notif_iter =
429 muxer_notif_iter_add_upstream_notif_iter(
430 muxer_notif_iter, upstream_notif_iter,
431 priv_port);
432 BT_PUT(priv_port);
433 BT_PUT(upstream_notif_iter);
434 if (!muxer_upstream_notif_iter) {
435 goto error;
436 }
437
438 remove_node:
439 bt_put(upstream_notif_iter);
440 bt_put(port);
441 bt_put(priv_port);
442 muxer_notif_iter->newly_connected_priv_ports =
443 g_list_delete_link(
444 muxer_notif_iter->newly_connected_priv_ports,
445 node);
446 }
447
448 goto end;
449
450 error:
451 if (ret >= 0) {
452 ret = -1;
453 }
454
455 end:
456 return ret;
457 }
458
459 static
460 int get_notif_ts_ns(struct muxer_comp *muxer_comp,
461 struct bt_notification *notif, int64_t last_returned_ts_ns,
462 int64_t *ts_ns)
463 {
464 struct bt_clock_class_priority_map *cc_prio_map = NULL;
465 struct bt_ctf_clock_class *clock_class = NULL;
466 struct bt_ctf_clock_value *clock_value = NULL;
467 struct bt_ctf_event *event = NULL;
468 int ret = 0;
469
470 assert(notif);
471 assert(ts_ns);
472
473 switch (bt_notification_get_type(notif)) {
474 case BT_NOTIFICATION_TYPE_EVENT:
475 cc_prio_map =
476 bt_notification_event_get_clock_class_priority_map(
477 notif);
478 break;
479
480 case BT_NOTIFICATION_TYPE_INACTIVITY:
481 cc_prio_map =
482 bt_notification_inactivity_get_clock_class_priority_map(
483 notif);
484 break;
485 default:
486 /* All the other notifications have a higher priority */
487 *ts_ns = last_returned_ts_ns;
488 goto end;
489 }
490
491 if (!cc_prio_map) {
492 goto error;
493 }
494
495 /*
496 * If the clock class priority map is empty, then we consider
497 * that this notification has no time. In this case it's always
498 * the youngest.
499 */
500 if (bt_clock_class_priority_map_get_clock_class_count(cc_prio_map) == 0) {
501 *ts_ns = last_returned_ts_ns;
502 goto end;
503 }
504
505 clock_class =
506 bt_clock_class_priority_map_get_highest_priority_clock_class(
507 cc_prio_map);
508 if (!clock_class) {
509 goto error;
510 }
511
512 if (!bt_ctf_clock_class_is_absolute(clock_class)) {
513 // TODO: Allow this with an explicit parameter
514 goto error;
515 }
516
517 switch (bt_notification_get_type(notif)) {
518 case BT_NOTIFICATION_TYPE_EVENT:
519 event = bt_notification_event_get_event(notif);
520 assert(event);
521 clock_value = bt_ctf_event_get_clock_value(event,
522 clock_class);
523 break;
524 case BT_NOTIFICATION_TYPE_INACTIVITY:
525 clock_value = bt_notification_inactivity_get_clock_value(
526 notif, clock_class);
527 break;
528 default:
529 assert(false);
530 }
531
532 if (!clock_value) {
533 goto error;
534 }
535
536 ret = bt_ctf_clock_value_get_value_ns_from_epoch(clock_value, ts_ns);
537 if (ret) {
538 goto error;
539 }
540
541 goto end;
542
543 error:
544 ret = -1;
545
546 end:
547 bt_put(cc_prio_map);
548 bt_put(event);
549 bt_put(clock_class);
550 bt_put(clock_value);
551 return ret;
552 }
553
554 /*
555 * This function finds the youngest available notification amongst the
556 * non-ended upstream notification iterators and returns the upstream
557 * notification iterator which has it, or
558 * BT_NOTIFICATION_ITERATOR_STATUS_END if there's no available
559 * notification.
560 *
561 * This function does NOT:
562 *
563 * * Update any upstream notification iterator.
564 * * Check for newly connected ports.
565 * * Check the upstream notification iterators to retry.
566 *
567 * On sucess, this function sets *muxer_upstream_notif_iter to the
568 * upstream notification iterator of which the current notification is
569 * the youngest, and sets *ts_ns to its time.
570 */
571 static
572 enum bt_notification_iterator_status
573 muxer_notif_iter_youngest_upstream_notif_iter(
574 struct muxer_comp *muxer_comp,
575 struct muxer_notif_iter *muxer_notif_iter,
576 struct muxer_upstream_notif_iter **muxer_upstream_notif_iter,
577 int64_t *ts_ns)
578 {
579 size_t i;
580 int ret;
581 int64_t youngest_ts_ns = INT64_MAX;
582 enum bt_notification_iterator_status status =
583 BT_NOTIFICATION_ITERATOR_STATUS_OK;
584
585 assert(muxer_comp);
586 assert(muxer_notif_iter);
587 assert(muxer_upstream_notif_iter);
588 *muxer_upstream_notif_iter = NULL;
589
590 for (i = 0; i < muxer_notif_iter->muxer_upstream_notif_iters->len; i++) {
591 struct bt_notification *notif;
592 struct muxer_upstream_notif_iter *cur_muxer_upstream_notif_iter =
593 g_ptr_array_index(muxer_notif_iter->muxer_upstream_notif_iters, i);
594 int64_t notif_ts_ns;
595
596 if (!cur_muxer_upstream_notif_iter->notif_iter) {
597 /* This upstream notification iterator is ended */
598 continue;
599 }
600
601 assert(cur_muxer_upstream_notif_iter->is_valid);
602 notif = bt_notification_iterator_get_notification(
603 cur_muxer_upstream_notif_iter->notif_iter);
604 assert(notif);
605 ret = get_notif_ts_ns(muxer_comp, notif,
606 muxer_notif_iter->last_returned_ts_ns, &notif_ts_ns);
607 bt_put(notif);
608 if (ret) {
609 *muxer_upstream_notif_iter = NULL;
610 status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
611 goto end;
612 }
613
614 if (notif_ts_ns <= youngest_ts_ns) {
615 *muxer_upstream_notif_iter =
616 cur_muxer_upstream_notif_iter;
617 youngest_ts_ns = notif_ts_ns;
618 *ts_ns = youngest_ts_ns;
619 }
620 }
621
622 if (!*muxer_upstream_notif_iter) {
623 status = BT_NOTIFICATION_ITERATOR_STATUS_END;
624 *ts_ns = INT64_MIN;
625 }
626
627 end:
628 return status;
629 }
630
631 static
632 enum bt_notification_iterator_status validate_muxer_upstream_notif_iter(
633 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter)
634 {
635 enum bt_notification_iterator_status status =
636 BT_NOTIFICATION_ITERATOR_STATUS_OK;
637
638 if (muxer_upstream_notif_iter->is_valid ||
639 !muxer_upstream_notif_iter->notif_iter) {
640 goto end;
641 }
642
643 status = muxer_upstream_notif_iter_next(muxer_upstream_notif_iter);
644
645 end:
646 return status;
647 }
648
649 static
650 enum bt_notification_iterator_status validate_muxer_upstream_notif_iters(
651 struct muxer_notif_iter *muxer_notif_iter)
652 {
653 enum bt_notification_iterator_status status =
654 BT_NOTIFICATION_ITERATOR_STATUS_OK;
655 size_t i;
656
657 for (i = 0; i < muxer_notif_iter->muxer_upstream_notif_iters->len; i++) {
658 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter =
659 g_ptr_array_index(
660 muxer_notif_iter->muxer_upstream_notif_iters,
661 i);
662
663 status = validate_muxer_upstream_notif_iter(
664 muxer_upstream_notif_iter);
665 if (status != BT_NOTIFICATION_ITERATOR_STATUS_OK) {
666 goto end;
667 }
668 }
669
670 end:
671 return status;
672 }
673
674 static
675 struct bt_notification_iterator_next_return muxer_notif_iter_do_next(
676 struct muxer_comp *muxer_comp,
677 struct muxer_notif_iter *muxer_notif_iter)
678 {
679 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter = NULL;
680 struct bt_notification_iterator_next_return next_return = {
681 .notification = NULL,
682 .status = BT_NOTIFICATION_ITERATOR_STATUS_OK,
683 };
684 int64_t next_return_ts;
685
686 while (true) {
687 int ret = muxer_notif_iter_handle_newly_connected_ports(
688 muxer_notif_iter);
689
690 if (ret) {
691 next_return.status =
692 BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
693 goto end;
694 }
695
696 next_return.status =
697 validate_muxer_upstream_notif_iters(muxer_notif_iter);
698 if (next_return.status != BT_NOTIFICATION_ITERATOR_STATUS_OK) {
699 goto end;
700 }
701
702 /*
703 * At this point, we know that all the existing upstream
704 * notification iterators are valid. However the
705 * operations to validate them (during
706 * validate_muxer_upstream_notif_iters()) may have
707 * connected new ports. If no ports were connected
708 * during this operation, exit the loop.
709 */
710 if (!muxer_notif_iter->newly_connected_priv_ports) {
711 break;
712 }
713 }
714
715 assert(!muxer_notif_iter->newly_connected_priv_ports);
716
717 /*
718 * At this point we know that all the existing upstream
719 * notification iterators are valid. We can find the one,
720 * amongst those, of which the current notification is the
721 * youngest.
722 */
723 next_return.status =
724 muxer_notif_iter_youngest_upstream_notif_iter(muxer_comp,
725 muxer_notif_iter, &muxer_upstream_notif_iter,
726 &next_return_ts);
727 if (next_return.status < 0 ||
728 next_return.status == BT_NOTIFICATION_ITERATOR_STATUS_END) {
729 goto end;
730 }
731
732 if (next_return_ts < muxer_notif_iter->last_returned_ts_ns) {
733 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
734 goto end;
735 }
736
737 assert(next_return.status == BT_NOTIFICATION_ITERATOR_STATUS_OK);
738 assert(muxer_upstream_notif_iter);
739 next_return.notification = bt_notification_iterator_get_notification(
740 muxer_upstream_notif_iter->notif_iter);
741 assert(next_return.notification);
742
743 /*
744 * We invalidate the upstream notification iterator so that, the
745 * next time this function is called,
746 * validate_muxer_upstream_notif_iters() will make it valid.
747 */
748 muxer_upstream_notif_iter->is_valid = false;
749 muxer_notif_iter->last_returned_ts_ns = next_return_ts;
750
751 end:
752 return next_return;
753 }
754
755 static
756 void destroy_muxer_notif_iter(struct muxer_notif_iter *muxer_notif_iter)
757 {
758 GList *node;
759
760 if (!muxer_notif_iter) {
761 return;
762 }
763
764 if (muxer_notif_iter->muxer_upstream_notif_iters) {
765 g_ptr_array_free(
766 muxer_notif_iter->muxer_upstream_notif_iters, TRUE);
767 }
768
769 for (node = muxer_notif_iter->newly_connected_priv_ports;
770 node; node = g_list_next(node)) {
771 bt_put(node->data);
772 }
773
774 g_list_free(muxer_notif_iter->newly_connected_priv_ports);
775 g_free(muxer_notif_iter);
776 }
777
778 static
779 int muxer_notif_iter_init_newly_connected_ports(struct muxer_comp *muxer_comp,
780 struct muxer_notif_iter *muxer_notif_iter)
781 {
782 struct bt_component *comp;
783 int64_t count;
784 int64_t i;
785 int ret = 0;
786
787 /*
788 * Add the connected input ports to this muxer notification
789 * iterator's list of newly connected ports. They will be
790 * handled by muxer_notif_iter_handle_newly_connected_ports().
791 */
792 comp = bt_component_from_private_component(muxer_comp->priv_comp);
793 assert(comp);
794 count = bt_component_filter_get_input_port_count(comp);
795 if (count < 0) {
796 goto end;
797 }
798
799 for (i = 0; i < count; i++) {
800 struct bt_private_port *priv_port =
801 bt_private_component_filter_get_input_private_port_by_index(
802 muxer_comp->priv_comp, i);
803 struct bt_port *port;
804
805 assert(priv_port);
806 port = bt_port_from_private_port(priv_port);
807 assert(port);
808
809 if (!bt_port_is_connected(port)) {
810 bt_put(priv_port);
811 bt_put(port);
812 continue;
813 }
814
815 bt_put(port);
816 muxer_notif_iter->newly_connected_priv_ports =
817 g_list_append(
818 muxer_notif_iter->newly_connected_priv_ports,
819 priv_port);
820 if (!muxer_notif_iter->newly_connected_priv_ports) {
821 bt_put(priv_port);
822 ret = -1;
823 goto end;
824 }
825 }
826
827 end:
828 bt_put(comp);
829 return ret;
830 }
831
832 BT_HIDDEN
833 enum bt_notification_iterator_status muxer_notif_iter_init(
834 struct bt_private_notification_iterator *priv_notif_iter,
835 struct bt_private_port *output_priv_port)
836 {
837 struct muxer_comp *muxer_comp = NULL;
838 struct muxer_notif_iter *muxer_notif_iter = NULL;
839 struct bt_private_component *priv_comp = NULL;
840 enum bt_notification_iterator_status status =
841 BT_NOTIFICATION_ITERATOR_STATUS_OK;
842 int ret;
843
844 priv_comp = bt_private_notification_iterator_get_private_component(
845 priv_notif_iter);
846 assert(priv_comp);
847 muxer_comp = bt_private_component_get_user_data(priv_comp);
848 assert(muxer_comp);
849
850 if (muxer_comp->initializing_muxer_notif_iter) {
851 /*
852 * Weird, unhandled situation detected: downstream
853 * creates a muxer notification iterator while creating
854 * another muxer notification iterator (same component).
855 */
856 goto error;
857 }
858
859 muxer_comp->initializing_muxer_notif_iter = true;
860 muxer_notif_iter = g_new0(struct muxer_notif_iter, 1);
861 if (!muxer_notif_iter) {
862 goto error;
863 }
864
865 muxer_notif_iter->last_returned_ts_ns = INT64_MIN;
866 muxer_notif_iter->muxer_upstream_notif_iters =
867 g_ptr_array_new_with_free_func(
868 (GDestroyNotify) destroy_muxer_upstream_notif_iter);
869 if (!muxer_notif_iter->muxer_upstream_notif_iters) {
870 goto error;
871 }
872
873 /*
874 * Add the muxer notification iterator to the component's array
875 * of muxer notification iterators here because
876 * muxer_notif_iter_init_newly_connected_ports() can cause
877 * muxer_port_connected() to be called, which adds the newly
878 * connected port to each muxer notification iterator's list of
879 * newly connected ports.
880 */
881 g_ptr_array_add(muxer_comp->muxer_notif_iters, muxer_notif_iter);
882 ret = muxer_notif_iter_init_newly_connected_ports(muxer_comp,
883 muxer_notif_iter);
884 if (ret) {
885 goto error;
886 }
887
888 ret = bt_private_notification_iterator_set_user_data(priv_notif_iter,
889 muxer_notif_iter);
890 assert(ret == 0);
891 goto end;
892
893 error:
894 if (g_ptr_array_index(muxer_comp->muxer_notif_iters,
895 muxer_comp->muxer_notif_iters->len - 1) == muxer_notif_iter) {
896 g_ptr_array_remove_index(muxer_comp->muxer_notif_iters,
897 muxer_comp->muxer_notif_iters->len - 1);
898 }
899
900 destroy_muxer_notif_iter(muxer_notif_iter);
901 ret = bt_private_notification_iterator_set_user_data(priv_notif_iter,
902 NULL);
903 assert(ret == 0);
904 status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
905
906 end:
907 muxer_comp->initializing_muxer_notif_iter = false;
908 bt_put(priv_comp);
909 return status;
910 }
911
912 BT_HIDDEN
913 void muxer_notif_iter_finalize(
914 struct bt_private_notification_iterator *priv_notif_iter)
915 {
916 struct muxer_notif_iter *muxer_notif_iter =
917 bt_private_notification_iterator_get_user_data(priv_notif_iter);
918 struct bt_private_component *priv_comp = NULL;
919 struct muxer_comp *muxer_comp = NULL;
920
921 priv_comp = bt_private_notification_iterator_get_private_component(
922 priv_notif_iter);
923 assert(priv_comp);
924 muxer_comp = bt_private_component_get_user_data(priv_comp);
925
926 if (muxer_comp) {
927 (void) g_ptr_array_remove_fast(muxer_comp->muxer_notif_iters,
928 muxer_notif_iter);
929 destroy_muxer_notif_iter(muxer_notif_iter);
930 }
931
932 bt_put(priv_comp);
933 }
934
935 BT_HIDDEN
936 struct bt_notification_iterator_next_return muxer_notif_iter_next(
937 struct bt_private_notification_iterator *priv_notif_iter)
938 {
939 struct bt_notification_iterator_next_return next_ret;
940 struct muxer_notif_iter *muxer_notif_iter =
941 bt_private_notification_iterator_get_user_data(priv_notif_iter);
942 struct bt_private_component *priv_comp = NULL;
943 struct muxer_comp *muxer_comp = NULL;
944
945 assert(muxer_notif_iter);
946 priv_comp = bt_private_notification_iterator_get_private_component(
947 priv_notif_iter);
948 assert(priv_comp);
949 muxer_comp = bt_private_component_get_user_data(priv_comp);
950 assert(muxer_comp);
951
952 /* Are we in an error state set elsewhere? */
953 if (unlikely(muxer_comp->error)) {
954 next_ret.notification = NULL;
955 next_ret.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
956 goto end;
957 }
958
959 next_ret = muxer_notif_iter_do_next(muxer_comp, muxer_notif_iter);
960
961 end:
962 bt_put(priv_comp);
963 return next_ret;
964 }
965
966 BT_HIDDEN
967 void muxer_port_connected(
968 struct bt_private_component *priv_comp,
969 struct bt_private_port *self_private_port,
970 struct bt_port *other_port)
971 {
972 struct bt_port *self_port =
973 bt_port_from_private_port(self_private_port);
974 struct muxer_comp *muxer_comp =
975 bt_private_component_get_user_data(priv_comp);
976 size_t i;
977
978 assert(self_port);
979 assert(muxer_comp);
980
981 if (bt_port_get_type(self_port) == BT_PORT_TYPE_INPUT) {
982 int ret;
983
984 /* One less available input port */
985 muxer_comp->available_input_ports--;
986 ret = ensure_available_input_port(priv_comp);
987 if (ret) {
988 /*
989 * Only way to report an error later since this
990 * method does not return anything.
991 */
992 muxer_comp->error = true;
993 goto end;
994 }
995 }
996
997 for (i = 0; i < muxer_comp->muxer_notif_iters->len; i++) {
998 struct muxer_notif_iter *muxer_notif_iter =
999 g_ptr_array_index(muxer_comp->muxer_notif_iters, i);
1000
1001 /*
1002 * Add this port to the list of newly connected ports
1003 * for this muxer notification iterator. We append at
1004 * the end of this list while
1005 * muxer_notif_iter_handle_newly_connected_ports()
1006 * removes the nodes from the beginning.
1007 *
1008 * The list node owns the private port.
1009 */
1010 muxer_notif_iter->newly_connected_priv_ports =
1011 g_list_append(
1012 muxer_notif_iter->newly_connected_priv_ports,
1013 bt_get(self_private_port));
1014 if (!muxer_notif_iter->newly_connected_priv_ports) {
1015 /* Put reference taken by bt_get() above */
1016 bt_put(self_private_port);
1017 muxer_comp->error = true;
1018 goto end;
1019 }
1020 }
1021
1022 end:
1023 bt_put(self_port);
1024 }
1025
1026 BT_HIDDEN
1027 void muxer_port_disconnected(struct bt_private_component *priv_comp,
1028 struct bt_private_port *priv_port)
1029 {
1030 struct bt_port *port = bt_port_from_private_port(priv_port);
1031 struct muxer_comp *muxer_comp =
1032 bt_private_component_get_user_data(priv_comp);
1033
1034 assert(port);
1035 assert(muxer_comp);
1036
1037 /*
1038 * There's nothing special to do when a port is disconnected
1039 * because this component deals with upstream notification
1040 * iterators which were already created thanks to connected
1041 * ports. The fact that the port is disconnected does not cancel
1042 * the upstream notification iterators created using its
1043 * connection: they still exist, even if the connection is dead.
1044 * The only way to remove an upstream notification iterator is
1045 * for its "next" operation to return
1046 * BT_NOTIFICATION_ITERATOR_STATUS_END.
1047 */
1048 if (bt_port_get_type(port) == BT_PORT_TYPE_INPUT) {
1049 /* One more available input port */
1050 muxer_comp->available_input_ports++;
1051 }
1052
1053 bt_put(port);
1054 }
This page took 0.052578 seconds and 5 git commands to generate.