Remove default port API
[babeltrace.git] / plugins / utils / muxer / muxer.c
1 /*
2 * Copyright 2017 Philippe Proulx <pproulx@efficios.com>
3 *
4 * Permission is hereby granted, free of charge, to any person obtaining a copy
5 * of this software and associated documentation files (the "Software"), to deal
6 * in the Software without restriction, including without limitation the rights
7 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 * copies of the Software, and to permit persons to whom the Software is
9 * furnished to do so, subject to the following conditions:
10 *
11 * The above copyright notice and this permission notice shall be included in
12 * all copies or substantial portions of the Software.
13 *
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20 * SOFTWARE.
21 */
22
23 #include <babeltrace/babeltrace-internal.h>
24 #include <babeltrace/ctf-ir/clock-class.h>
25 #include <babeltrace/ctf-ir/event.h>
26 #include <babeltrace/graph/clock-class-priority-map.h>
27 #include <babeltrace/graph/component-filter.h>
28 #include <babeltrace/graph/component.h>
29 #include <babeltrace/graph/notification-event.h>
30 #include <babeltrace/graph/notification-inactivity.h>
31 #include <babeltrace/graph/notification-iterator.h>
32 #include <babeltrace/graph/notification.h>
33 #include <babeltrace/graph/port.h>
34 #include <babeltrace/graph/private-component-filter.h>
35 #include <babeltrace/graph/private-component.h>
36 #include <babeltrace/graph/private-component.h>
37 #include <babeltrace/graph/private-connection.h>
38 #include <babeltrace/graph/private-notification-iterator.h>
39 #include <babeltrace/graph/private-port.h>
40 #include <plugins-common.h>
41 #include <glib.h>
42 #include <assert.h>
43
44 struct muxer_comp {
45 /* Array of struct bt_private_notification_iterator * (weak refs) */
46 GPtrArray *muxer_notif_iters;
47
48 /* Weak ref */
49 struct bt_private_component *priv_comp;
50 unsigned int next_port_num;
51 size_t available_input_ports;
52 bool error;
53 bool initializing_muxer_notif_iter;
54 };
55
56 struct muxer_upstream_notif_iter {
57 /* Owned by this, NULL if ended */
58 struct bt_notification_iterator *notif_iter;
59
60 /* Owned by this*/
61 struct bt_private_port *priv_port;
62
63 /*
64 * This flag is true if the upstream notification iterator's
65 * current notification must be considered for the multiplexing
66 * operations. If the upstream iterator returns
67 * BT_NOTIFICATION_ITERATOR_STATUS_AGAIN, then this object
68 * is considered invalid, because its current notification is
69 * still the previous one, but we already took it into account.
70 *
71 * The value of this flag is not important if notif_iter above
72 * is NULL (which means the upstream iterator is finished).
73 */
74 bool is_valid;
75 };
76
77 struct muxer_notif_iter {
78 /*
79 * Array of struct muxer_upstream_notif_iter * (owned by this).
80 *
81 * NOTE: This array is searched in linearly to find the youngest
82 * current notification. Keep this until benchmarks confirm that
83 * another data structure is faster than this for our typical
84 * use cases.
85 */
86 GPtrArray *muxer_upstream_notif_iters;
87
88 /*
89 * List of "recently" connected input ports (owned by this) to
90 * handle by this muxer notification iterator.
91 * muxer_port_connected() adds entries to this list, and the
92 * entries are removed when a notification iterator is created
93 * on the port's connection and put into
94 * muxer_upstream_notif_iters above by
95 * muxer_notif_iter_handle_newly_connected_ports().
96 */
97 GList *newly_connected_priv_ports;
98
99 /* Next thing to return by the "next" method */
100 struct bt_notification_iterator_next_return next_next_return;
101
102 /* Last time returned in a notification */
103 int64_t last_returned_ts_ns;
104 };
105
106 static
107 void destroy_muxer_upstream_notif_iter(
108 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter)
109 {
110 if (!muxer_upstream_notif_iter) {
111 return;
112 }
113
114 bt_put(muxer_upstream_notif_iter->notif_iter);
115 bt_put(muxer_upstream_notif_iter->priv_port);
116 g_free(muxer_upstream_notif_iter);
117 }
118
119 static
120 struct muxer_upstream_notif_iter *muxer_notif_iter_add_upstream_notif_iter(
121 struct muxer_notif_iter *muxer_notif_iter,
122 struct bt_notification_iterator *notif_iter,
123 struct bt_private_port *priv_port)
124 {
125 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter =
126 g_new0(struct muxer_upstream_notif_iter, 1);
127
128 if (!muxer_upstream_notif_iter) {
129 goto end;
130 }
131
132 muxer_upstream_notif_iter->notif_iter = bt_get(notif_iter);
133 muxer_upstream_notif_iter->priv_port = bt_get(priv_port);
134 muxer_upstream_notif_iter->is_valid = false;
135 g_ptr_array_add(muxer_notif_iter->muxer_upstream_notif_iters,
136 muxer_upstream_notif_iter);
137
138 end:
139 return muxer_upstream_notif_iter;
140 }
141
142 static
143 int ensure_available_input_port(struct bt_private_component *priv_comp)
144 {
145 struct muxer_comp *muxer_comp =
146 bt_private_component_get_user_data(priv_comp);
147 int ret = 0;
148 GString *port_name = NULL;
149 void *priv_port = NULL;
150
151 assert(muxer_comp);
152
153 if (muxer_comp->available_input_ports >= 1) {
154 goto end;
155 }
156
157 port_name = g_string_new("in");
158 if (!port_name) {
159 ret = -1;
160 goto end;
161 }
162
163 g_string_append_printf(port_name, "%u", muxer_comp->next_port_num);
164 priv_port = bt_private_component_filter_add_input_private_port(
165 priv_comp, port_name->str, NULL);
166 if (!priv_port) {
167 ret = -1;
168 goto end;
169 }
170
171 muxer_comp->available_input_ports++;
172 muxer_comp->next_port_num++;
173
174 end:
175 if (port_name) {
176 g_string_free(port_name, TRUE);
177 }
178
179 BT_PUT(priv_port);
180 return ret;
181 }
182
183 static
184 int create_output_port(struct bt_private_component *priv_comp)
185 {
186 void *priv_port;
187 int ret = 0;
188
189 priv_port = bt_private_component_filter_add_output_private_port(
190 priv_comp, "out", NULL);
191 if (!priv_port) {
192 ret = -1;
193 }
194
195 bt_put(priv_port);
196 return ret;
197 }
198
199 static
200 void destroy_muxer_comp(struct muxer_comp *muxer_comp)
201 {
202 if (!muxer_comp) {
203 return;
204 }
205
206 if (muxer_comp->muxer_notif_iters) {
207 g_ptr_array_free(muxer_comp->muxer_notif_iters, TRUE);
208 }
209
210 g_free(muxer_comp);
211 }
212
213 BT_HIDDEN
214 enum bt_component_status muxer_init(
215 struct bt_private_component *priv_comp,
216 struct bt_value *params, void *init_data)
217 {
218 int ret;
219 enum bt_component_status status = BT_COMPONENT_STATUS_OK;
220 struct muxer_comp *muxer_comp = g_new0(struct muxer_comp, 1);
221
222 if (!muxer_comp) {
223 goto error;
224 }
225
226 muxer_comp->muxer_notif_iters = g_ptr_array_new();
227 if (!muxer_comp->muxer_notif_iters) {
228 goto error;
229 }
230
231 muxer_comp->priv_comp = priv_comp;
232 ret = bt_private_component_set_user_data(priv_comp, muxer_comp);
233 assert(ret == 0);
234 ret = ensure_available_input_port(priv_comp);
235 if (ret) {
236 goto error;
237 }
238
239 ret = create_output_port(priv_comp);
240 if (ret) {
241 goto error;
242 }
243
244 goto end;
245
246 error:
247 destroy_muxer_comp(muxer_comp);
248 ret = bt_private_component_set_user_data(priv_comp, NULL);
249 assert(ret == 0);
250 status = BT_COMPONENT_STATUS_ERROR;
251
252 end:
253 return status;
254 }
255
256 BT_HIDDEN
257 void muxer_finalize(struct bt_private_component *priv_comp)
258 {
259 struct muxer_comp *muxer_comp =
260 bt_private_component_get_user_data(priv_comp);
261
262 destroy_muxer_comp(muxer_comp);
263 }
264
265 static
266 struct bt_notification_iterator *create_notif_iter_on_input_port(
267 struct bt_private_port *priv_port, int *ret)
268 {
269 struct bt_port *port = bt_port_from_private_port(priv_port);
270 struct bt_notification_iterator *notif_iter = NULL;
271 struct bt_private_connection *priv_conn = NULL;
272
273 assert(ret);
274 *ret = 0;
275 assert(port);
276
277 assert(bt_port_is_connected(port));
278 priv_conn = bt_private_port_get_private_connection(priv_port);
279 if (!priv_conn) {
280 *ret = -1;
281 goto end;
282 }
283
284 // TODO: Advance the iterator to >= the time of the latest
285 // returned notification by the muxer notification
286 // iterator which creates it.
287 notif_iter = bt_private_connection_create_notification_iterator(
288 priv_conn);
289 if (!notif_iter) {
290 *ret = -1;
291 goto end;
292 }
293
294 end:
295 bt_put(port);
296 bt_put(priv_conn);
297 return notif_iter;
298 }
299
300 static
301 enum bt_notification_iterator_status muxer_upstream_notif_iter_next(
302 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter)
303 {
304 enum bt_notification_iterator_status status;
305
306 status = bt_notification_iterator_next(
307 muxer_upstream_notif_iter->notif_iter);
308
309 switch (status) {
310 case BT_NOTIFICATION_ITERATOR_STATUS_OK:
311 /*
312 * Notification iterator's current notification is valid:
313 * it must be considered for muxing operations.
314 */
315 muxer_upstream_notif_iter->is_valid = true;
316 break;
317 case BT_NOTIFICATION_ITERATOR_STATUS_AGAIN:
318 /*
319 * Notification iterator's current notification is not
320 * valid anymore. Return
321 * BT_NOTIFICATION_ITERATOR_STATUS_AGAIN
322 * immediately.
323 */
324 muxer_upstream_notif_iter->is_valid = false;
325 break;
326 case BT_NOTIFICATION_ITERATOR_STATUS_END:
327 /*
328 * Notification iterator reached the end: release it. It
329 * won't be considered again to find the youngest
330 * notification.
331 */
332 BT_PUT(muxer_upstream_notif_iter->notif_iter);
333 muxer_upstream_notif_iter->is_valid = false;
334 status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
335 break;
336 default:
337 /* Error or unsupported status code */
338 status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
339 break;
340 }
341
342 return status;
343 }
344
345 static
346 int muxer_notif_iter_handle_newly_connected_ports(
347 struct muxer_notif_iter *muxer_notif_iter)
348 {
349 int ret = 0;
350
351 /*
352 * Here we create one upstream notification iterator for each
353 * newly connected port. We do not perform an initial "next" on
354 * those new upstream notification iterators: they are
355 * invalidated, to be validated later. The list of newly
356 * connected ports to handle here is updated by
357 * muxer_port_connected().
358 */
359 while (true) {
360 GList *node = muxer_notif_iter->newly_connected_priv_ports;
361 struct bt_private_port *priv_port;
362 struct bt_port *port;
363 struct bt_notification_iterator *upstream_notif_iter = NULL;
364 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter;
365
366 if (!node) {
367 break;
368 }
369
370 priv_port = node->data;
371 port = bt_port_from_private_port(priv_port);
372 assert(port);
373
374 if (!bt_port_is_connected(port)) {
375 /*
376 * Looks like this port is not connected
377 * anymore: we can't create an upstream
378 * notification iterator on its (non-existing)
379 * connection in this case.
380 */
381 goto remove_node;
382 }
383
384 BT_PUT(port);
385 upstream_notif_iter = create_notif_iter_on_input_port(priv_port,
386 &ret);
387 if (ret) {
388 assert(!upstream_notif_iter);
389 bt_put(priv_port);
390 goto error;
391 }
392
393 muxer_upstream_notif_iter =
394 muxer_notif_iter_add_upstream_notif_iter(
395 muxer_notif_iter, upstream_notif_iter,
396 priv_port);
397 BT_PUT(priv_port);
398 BT_PUT(upstream_notif_iter);
399 if (!muxer_upstream_notif_iter) {
400 goto error;
401 }
402
403 remove_node:
404 bt_put(upstream_notif_iter);
405 bt_put(port);
406 bt_put(priv_port);
407 muxer_notif_iter->newly_connected_priv_ports =
408 g_list_delete_link(
409 muxer_notif_iter->newly_connected_priv_ports,
410 node);
411 }
412
413 goto end;
414
415 error:
416 if (ret >= 0) {
417 ret = -1;
418 }
419
420 end:
421 return ret;
422 }
423
424 static
425 int get_notif_ts_ns(struct muxer_comp *muxer_comp,
426 struct bt_notification *notif, int64_t last_returned_ts_ns,
427 int64_t *ts_ns)
428 {
429 struct bt_clock_class_priority_map *cc_prio_map = NULL;
430 struct bt_ctf_clock_class *clock_class = NULL;
431 struct bt_ctf_clock_value *clock_value = NULL;
432 struct bt_ctf_event *event = NULL;
433 int ret = 0;
434
435 assert(notif);
436 assert(ts_ns);
437
438 switch (bt_notification_get_type(notif)) {
439 case BT_NOTIFICATION_TYPE_EVENT:
440 cc_prio_map =
441 bt_notification_event_get_clock_class_priority_map(
442 notif);
443 break;
444
445 case BT_NOTIFICATION_TYPE_INACTIVITY:
446 cc_prio_map =
447 bt_notification_inactivity_get_clock_class_priority_map(
448 notif);
449 break;
450 default:
451 /* All the other notifications have a higher priority */
452 *ts_ns = last_returned_ts_ns;
453 goto end;
454 }
455
456 if (!cc_prio_map) {
457 goto error;
458 }
459
460 /*
461 * If the clock class priority map is empty, then we consider
462 * that this notification has no time. In this case it's always
463 * the youngest.
464 */
465 if (bt_clock_class_priority_map_get_clock_class_count(cc_prio_map) == 0) {
466 *ts_ns = last_returned_ts_ns;
467 goto end;
468 }
469
470 clock_class =
471 bt_clock_class_priority_map_get_highest_priority_clock_class(
472 cc_prio_map);
473 if (!clock_class) {
474 goto error;
475 }
476
477 if (!bt_ctf_clock_class_is_absolute(clock_class)) {
478 // TODO: Allow this with an explicit parameter
479 goto error;
480 }
481
482 switch (bt_notification_get_type(notif)) {
483 case BT_NOTIFICATION_TYPE_EVENT:
484 event = bt_notification_event_get_event(notif);
485 assert(event);
486 clock_value = bt_ctf_event_get_clock_value(event,
487 clock_class);
488 break;
489 case BT_NOTIFICATION_TYPE_INACTIVITY:
490 clock_value = bt_notification_inactivity_get_clock_value(
491 notif, clock_class);
492 break;
493 default:
494 assert(false);
495 }
496
497 if (!clock_value) {
498 goto error;
499 }
500
501 ret = bt_ctf_clock_value_get_value_ns_from_epoch(clock_value, ts_ns);
502 if (ret) {
503 goto error;
504 }
505
506 goto end;
507
508 error:
509 ret = -1;
510
511 end:
512 bt_put(cc_prio_map);
513 bt_put(event);
514 bt_put(clock_class);
515 bt_put(clock_value);
516 return ret;
517 }
518
519 /*
520 * This function finds the youngest available notification amongst the
521 * non-ended upstream notification iterators and returns the upstream
522 * notification iterator which has it, or
523 * BT_NOTIFICATION_ITERATOR_STATUS_END if there's no available
524 * notification.
525 *
526 * This function does NOT:
527 *
528 * * Update any upstream notification iterator.
529 * * Check for newly connected ports.
530 * * Check the upstream notification iterators to retry.
531 *
532 * On sucess, this function sets *muxer_upstream_notif_iter to the
533 * upstream notification iterator of which the current notification is
534 * the youngest, and sets *ts_ns to its time.
535 */
536 static
537 enum bt_notification_iterator_status
538 muxer_notif_iter_youngest_upstream_notif_iter(
539 struct muxer_comp *muxer_comp,
540 struct muxer_notif_iter *muxer_notif_iter,
541 struct muxer_upstream_notif_iter **muxer_upstream_notif_iter,
542 int64_t *ts_ns)
543 {
544 size_t i;
545 int ret;
546 int64_t youngest_ts_ns = INT64_MAX;
547 enum bt_notification_iterator_status status =
548 BT_NOTIFICATION_ITERATOR_STATUS_OK;
549
550 assert(muxer_comp);
551 assert(muxer_notif_iter);
552 assert(muxer_upstream_notif_iter);
553 *muxer_upstream_notif_iter = NULL;
554
555 for (i = 0; i < muxer_notif_iter->muxer_upstream_notif_iters->len; i++) {
556 struct bt_notification *notif;
557 struct muxer_upstream_notif_iter *cur_muxer_upstream_notif_iter =
558 g_ptr_array_index(muxer_notif_iter->muxer_upstream_notif_iters, i);
559 int64_t notif_ts_ns;
560
561 if (!cur_muxer_upstream_notif_iter->notif_iter) {
562 /* This upstream notification iterator is ended */
563 continue;
564 }
565
566 assert(cur_muxer_upstream_notif_iter->is_valid);
567 notif = bt_notification_iterator_get_notification(
568 cur_muxer_upstream_notif_iter->notif_iter);
569 assert(notif);
570 ret = get_notif_ts_ns(muxer_comp, notif,
571 muxer_notif_iter->last_returned_ts_ns, &notif_ts_ns);
572 bt_put(notif);
573 if (ret) {
574 *muxer_upstream_notif_iter = NULL;
575 status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
576 goto end;
577 }
578
579 if (notif_ts_ns <= youngest_ts_ns) {
580 *muxer_upstream_notif_iter =
581 cur_muxer_upstream_notif_iter;
582 youngest_ts_ns = notif_ts_ns;
583 *ts_ns = youngest_ts_ns;
584 }
585 }
586
587 if (!*muxer_upstream_notif_iter) {
588 status = BT_NOTIFICATION_ITERATOR_STATUS_END;
589 *ts_ns = INT64_MIN;
590 }
591
592 end:
593 return status;
594 }
595
596 static
597 enum bt_notification_iterator_status validate_muxer_upstream_notif_iter(
598 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter)
599 {
600 enum bt_notification_iterator_status status =
601 BT_NOTIFICATION_ITERATOR_STATUS_OK;
602
603 if (muxer_upstream_notif_iter->is_valid ||
604 !muxer_upstream_notif_iter->notif_iter) {
605 goto end;
606 }
607
608 status = muxer_upstream_notif_iter_next(muxer_upstream_notif_iter);
609
610 end:
611 return status;
612 }
613
614 static
615 enum bt_notification_iterator_status validate_muxer_upstream_notif_iters(
616 struct muxer_notif_iter *muxer_notif_iter)
617 {
618 enum bt_notification_iterator_status status =
619 BT_NOTIFICATION_ITERATOR_STATUS_OK;
620 size_t i;
621
622 for (i = 0; i < muxer_notif_iter->muxer_upstream_notif_iters->len; i++) {
623 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter =
624 g_ptr_array_index(
625 muxer_notif_iter->muxer_upstream_notif_iters,
626 i);
627
628 status = validate_muxer_upstream_notif_iter(
629 muxer_upstream_notif_iter);
630 if (status != BT_NOTIFICATION_ITERATOR_STATUS_OK) {
631 goto end;
632 }
633 }
634
635 end:
636 return status;
637 }
638
639 static
640 struct bt_notification_iterator_next_return muxer_notif_iter_do_next(
641 struct muxer_comp *muxer_comp,
642 struct muxer_notif_iter *muxer_notif_iter)
643 {
644 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter = NULL;
645 struct bt_notification_iterator_next_return next_return = {
646 .notification = NULL,
647 .status = BT_NOTIFICATION_ITERATOR_STATUS_OK,
648 };
649 int64_t next_return_ts;
650
651 while (true) {
652 int ret = muxer_notif_iter_handle_newly_connected_ports(
653 muxer_notif_iter);
654
655 if (ret) {
656 next_return.status =
657 BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
658 goto end;
659 }
660
661 next_return.status =
662 validate_muxer_upstream_notif_iters(muxer_notif_iter);
663 if (next_return.status != BT_NOTIFICATION_ITERATOR_STATUS_OK) {
664 goto end;
665 }
666
667 /*
668 * At this point, we know that all the existing upstream
669 * notification iterators are valid. However the
670 * operations to validate them (during
671 * validate_muxer_upstream_notif_iters()) may have
672 * connected new ports. If no ports were connected
673 * during this operation, exit the loop.
674 */
675 if (!muxer_notif_iter->newly_connected_priv_ports) {
676 break;
677 }
678 }
679
680 assert(!muxer_notif_iter->newly_connected_priv_ports);
681
682 /*
683 * At this point we know that all the existing upstream
684 * notification iterators are valid. We can find the one,
685 * amongst those, of which the current notification is the
686 * youngest.
687 */
688 next_return.status =
689 muxer_notif_iter_youngest_upstream_notif_iter(muxer_comp,
690 muxer_notif_iter, &muxer_upstream_notif_iter,
691 &next_return_ts);
692 if (next_return.status < 0 ||
693 next_return.status == BT_NOTIFICATION_ITERATOR_STATUS_END) {
694 goto end;
695 }
696
697 if (next_return_ts < muxer_notif_iter->last_returned_ts_ns) {
698 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
699 goto end;
700 }
701
702 assert(next_return.status == BT_NOTIFICATION_ITERATOR_STATUS_OK);
703 assert(muxer_upstream_notif_iter);
704 next_return.notification = bt_notification_iterator_get_notification(
705 muxer_upstream_notif_iter->notif_iter);
706 assert(next_return.notification);
707
708 /*
709 * We invalidate the upstream notification iterator so that, the
710 * next time this function is called,
711 * validate_muxer_upstream_notif_iters() will make it valid.
712 */
713 muxer_upstream_notif_iter->is_valid = false;
714 muxer_notif_iter->last_returned_ts_ns = next_return_ts;
715
716 end:
717 return next_return;
718 }
719
720 static
721 void destroy_muxer_notif_iter(struct muxer_notif_iter *muxer_notif_iter)
722 {
723 GList *node;
724
725 if (!muxer_notif_iter) {
726 return;
727 }
728
729 if (muxer_notif_iter->muxer_upstream_notif_iters) {
730 g_ptr_array_free(
731 muxer_notif_iter->muxer_upstream_notif_iters, TRUE);
732 }
733
734 for (node = muxer_notif_iter->newly_connected_priv_ports;
735 node; node = g_list_next(node)) {
736 bt_put(node->data);
737 }
738
739 g_list_free(muxer_notif_iter->newly_connected_priv_ports);
740 g_free(muxer_notif_iter);
741 }
742
743 static
744 int muxer_notif_iter_init_newly_connected_ports(struct muxer_comp *muxer_comp,
745 struct muxer_notif_iter *muxer_notif_iter)
746 {
747 struct bt_component *comp;
748 int64_t count;
749 int64_t i;
750 int ret = 0;
751
752 /*
753 * Add the connected input ports to this muxer notification
754 * iterator's list of newly connected ports. They will be
755 * handled by muxer_notif_iter_handle_newly_connected_ports().
756 */
757 comp = bt_component_from_private_component(muxer_comp->priv_comp);
758 assert(comp);
759 count = bt_component_filter_get_input_port_count(comp);
760 if (count < 0) {
761 goto end;
762 }
763
764 for (i = 0; i < count; i++) {
765 struct bt_private_port *priv_port =
766 bt_private_component_filter_get_input_private_port_by_index(
767 muxer_comp->priv_comp, i);
768 struct bt_port *port;
769
770 assert(priv_port);
771 port = bt_port_from_private_port(priv_port);
772 assert(port);
773
774 if (!bt_port_is_connected(port)) {
775 bt_put(priv_port);
776 bt_put(port);
777 continue;
778 }
779
780 bt_put(port);
781 muxer_notif_iter->newly_connected_priv_ports =
782 g_list_append(
783 muxer_notif_iter->newly_connected_priv_ports,
784 priv_port);
785 if (!muxer_notif_iter->newly_connected_priv_ports) {
786 bt_put(priv_port);
787 ret = -1;
788 goto end;
789 }
790 }
791
792 end:
793 bt_put(comp);
794 return ret;
795 }
796
797 BT_HIDDEN
798 enum bt_notification_iterator_status muxer_notif_iter_init(
799 struct bt_private_notification_iterator *priv_notif_iter,
800 struct bt_private_port *output_priv_port)
801 {
802 struct muxer_comp *muxer_comp = NULL;
803 struct muxer_notif_iter *muxer_notif_iter = NULL;
804 struct bt_private_component *priv_comp = NULL;
805 enum bt_notification_iterator_status status =
806 BT_NOTIFICATION_ITERATOR_STATUS_OK;
807 int ret;
808
809 priv_comp = bt_private_notification_iterator_get_private_component(
810 priv_notif_iter);
811 assert(priv_comp);
812 muxer_comp = bt_private_component_get_user_data(priv_comp);
813 assert(muxer_comp);
814
815 if (muxer_comp->initializing_muxer_notif_iter) {
816 /*
817 * Weird, unhandled situation detected: downstream
818 * creates a muxer notification iterator while creating
819 * another muxer notification iterator (same component).
820 */
821 goto error;
822 }
823
824 muxer_comp->initializing_muxer_notif_iter = true;
825 muxer_notif_iter = g_new0(struct muxer_notif_iter, 1);
826 if (!muxer_notif_iter) {
827 goto error;
828 }
829
830 muxer_notif_iter->last_returned_ts_ns = INT64_MIN;
831 muxer_notif_iter->muxer_upstream_notif_iters =
832 g_ptr_array_new_with_free_func(
833 (GDestroyNotify) destroy_muxer_upstream_notif_iter);
834 if (!muxer_notif_iter->muxer_upstream_notif_iters) {
835 goto error;
836 }
837
838 /*
839 * Add the muxer notification iterator to the component's array
840 * of muxer notification iterators here because
841 * muxer_notif_iter_init_newly_connected_ports() can cause
842 * muxer_port_connected() to be called, which adds the newly
843 * connected port to each muxer notification iterator's list of
844 * newly connected ports.
845 */
846 g_ptr_array_add(muxer_comp->muxer_notif_iters, muxer_notif_iter);
847 ret = muxer_notif_iter_init_newly_connected_ports(muxer_comp,
848 muxer_notif_iter);
849 if (ret) {
850 goto error;
851 }
852
853 ret = bt_private_notification_iterator_set_user_data(priv_notif_iter,
854 muxer_notif_iter);
855 assert(ret == 0);
856 goto end;
857
858 error:
859 if (g_ptr_array_index(muxer_comp->muxer_notif_iters,
860 muxer_comp->muxer_notif_iters->len - 1) == muxer_notif_iter) {
861 g_ptr_array_remove_index(muxer_comp->muxer_notif_iters,
862 muxer_comp->muxer_notif_iters->len - 1);
863 }
864
865 destroy_muxer_notif_iter(muxer_notif_iter);
866 ret = bt_private_notification_iterator_set_user_data(priv_notif_iter,
867 NULL);
868 assert(ret == 0);
869 status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
870
871 end:
872 muxer_comp->initializing_muxer_notif_iter = false;
873 bt_put(priv_comp);
874 return status;
875 }
876
877 BT_HIDDEN
878 void muxer_notif_iter_finalize(
879 struct bt_private_notification_iterator *priv_notif_iter)
880 {
881 struct muxer_notif_iter *muxer_notif_iter =
882 bt_private_notification_iterator_get_user_data(priv_notif_iter);
883 struct bt_private_component *priv_comp = NULL;
884 struct muxer_comp *muxer_comp = NULL;
885
886 priv_comp = bt_private_notification_iterator_get_private_component(
887 priv_notif_iter);
888 assert(priv_comp);
889 muxer_comp = bt_private_component_get_user_data(priv_comp);
890
891 if (muxer_comp) {
892 (void) g_ptr_array_remove_fast(muxer_comp->muxer_notif_iters,
893 muxer_notif_iter);
894 destroy_muxer_notif_iter(muxer_notif_iter);
895 }
896
897 bt_put(priv_comp);
898 }
899
900 BT_HIDDEN
901 struct bt_notification_iterator_next_return muxer_notif_iter_next(
902 struct bt_private_notification_iterator *priv_notif_iter)
903 {
904 struct bt_notification_iterator_next_return next_ret;
905 struct muxer_notif_iter *muxer_notif_iter =
906 bt_private_notification_iterator_get_user_data(priv_notif_iter);
907 struct bt_private_component *priv_comp = NULL;
908 struct muxer_comp *muxer_comp = NULL;
909
910 assert(muxer_notif_iter);
911 priv_comp = bt_private_notification_iterator_get_private_component(
912 priv_notif_iter);
913 assert(priv_comp);
914 muxer_comp = bt_private_component_get_user_data(priv_comp);
915 assert(muxer_comp);
916
917 /* Are we in an error state set elsewhere? */
918 if (unlikely(muxer_comp->error)) {
919 next_ret.notification = NULL;
920 next_ret.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
921 goto end;
922 }
923
924 next_ret = muxer_notif_iter_do_next(muxer_comp, muxer_notif_iter);
925
926 end:
927 bt_put(priv_comp);
928 return next_ret;
929 }
930
931 BT_HIDDEN
932 void muxer_port_connected(
933 struct bt_private_component *priv_comp,
934 struct bt_private_port *self_private_port,
935 struct bt_port *other_port)
936 {
937 struct bt_port *self_port =
938 bt_port_from_private_port(self_private_port);
939 struct muxer_comp *muxer_comp =
940 bt_private_component_get_user_data(priv_comp);
941 size_t i;
942
943 assert(self_port);
944 assert(muxer_comp);
945
946 if (bt_port_get_type(self_port) == BT_PORT_TYPE_INPUT) {
947 int ret;
948
949 /* One less available input port */
950 muxer_comp->available_input_ports--;
951 ret = ensure_available_input_port(priv_comp);
952 if (ret) {
953 /*
954 * Only way to report an error later since this
955 * method does not return anything.
956 */
957 muxer_comp->error = true;
958 goto end;
959 }
960 }
961
962 for (i = 0; i < muxer_comp->muxer_notif_iters->len; i++) {
963 struct muxer_notif_iter *muxer_notif_iter =
964 g_ptr_array_index(muxer_comp->muxer_notif_iters, i);
965
966 /*
967 * Add this port to the list of newly connected ports
968 * for this muxer notification iterator. We append at
969 * the end of this list while
970 * muxer_notif_iter_handle_newly_connected_ports()
971 * removes the nodes from the beginning.
972 *
973 * The list node owns the private port.
974 */
975 muxer_notif_iter->newly_connected_priv_ports =
976 g_list_append(
977 muxer_notif_iter->newly_connected_priv_ports,
978 bt_get(self_private_port));
979 if (!muxer_notif_iter->newly_connected_priv_ports) {
980 /* Put reference taken by bt_get() above */
981 bt_put(self_private_port);
982 muxer_comp->error = true;
983 goto end;
984 }
985 }
986
987 end:
988 bt_put(self_port);
989 }
990
991 BT_HIDDEN
992 void muxer_port_disconnected(struct bt_private_component *priv_comp,
993 struct bt_private_port *priv_port)
994 {
995 struct bt_port *port = bt_port_from_private_port(priv_port);
996 struct muxer_comp *muxer_comp =
997 bt_private_component_get_user_data(priv_comp);
998
999 assert(port);
1000 assert(muxer_comp);
1001
1002 /*
1003 * There's nothing special to do when a port is disconnected
1004 * because this component deals with upstream notification
1005 * iterators which were already created thanks to connected
1006 * ports. The fact that the port is disconnected does not cancel
1007 * the upstream notification iterators created using its
1008 * connection: they still exist, even if the connection is dead.
1009 * The only way to remove an upstream notification iterator is
1010 * for its "next" operation to return
1011 * BT_NOTIFICATION_ITERATOR_STATUS_END.
1012 */
1013 if (bt_port_get_type(port) == BT_PORT_TYPE_INPUT) {
1014 /* One more available input port */
1015 muxer_comp->available_input_ports++;
1016 }
1017
1018 bt_put(port);
1019 }
This page took 0.056413 seconds and 5 git commands to generate.