lib: simplify the public notification iterator interfaces
[babeltrace.git] / plugins / utils / muxer / muxer.c
1 /*
2 * Copyright 2017 Philippe Proulx <pproulx@efficios.com>
3 *
4 * Permission is hereby granted, free of charge, to any person obtaining a copy
5 * of this software and associated documentation files (the "Software"), to deal
6 * in the Software without restriction, including without limitation the rights
7 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 * copies of the Software, and to permit persons to whom the Software is
9 * furnished to do so, subject to the following conditions:
10 *
11 * The above copyright notice and this permission notice shall be included in
12 * all copies or substantial portions of the Software.
13 *
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20 * SOFTWARE.
21 */
22
23 #define BT_LOG_TAG "PLUGIN-UTILS-MUXER-FLT"
24 #include "logging.h"
25
26 #include <babeltrace/babeltrace-internal.h>
27 #include <babeltrace/compat/uuid-internal.h>
28 #include <babeltrace/babeltrace.h>
29 #include <babeltrace/values-internal.h>
30 #include <babeltrace/graph/component-internal.h>
31 #include <babeltrace/graph/notification-iterator-internal.h>
32 #include <babeltrace/graph/connection-internal.h>
33 #include <plugins-common.h>
34 #include <glib.h>
35 #include <stdbool.h>
36 #include <inttypes.h>
37 #include <babeltrace/assert-internal.h>
38 #include <stdlib.h>
39 #include <string.h>
40
41 #define ASSUME_ABSOLUTE_CLOCK_CLASSES_PARAM_NAME "assume-absolute-clock-classes"
42
43 struct muxer_comp {
44 /*
45 * Array of struct
46 * bt_private_connection_private_notification_iterator *
47 * (weak refs)
48 */
49 GPtrArray *muxer_notif_iters;
50
51 /* Weak ref */
52 struct bt_private_component *priv_comp;
53 unsigned int next_port_num;
54 size_t available_input_ports;
55 bool error;
56 bool initializing_muxer_notif_iter;
57 bool assume_absolute_clock_classes;
58 };
59
60 struct muxer_upstream_notif_iter {
61 /* Owned by this, NULL if ended */
62 struct bt_notification_iterator *notif_iter;
63
64 /* Owned by this */
65 struct bt_notification *notif;
66
67 /*
68 * This flag is true if the upstream notification iterator's
69 * current notification must be considered for the multiplexing
70 * operations. If the upstream iterator returns
71 * BT_NOTIFICATION_ITERATOR_STATUS_AGAIN, then this object
72 * is considered invalid, because its current notification is
73 * still the previous one, but we already took it into account.
74 *
75 * The value of this flag is not important if notif_iter above
76 * is NULL (which means the upstream iterator is finished).
77 */
78 bool is_valid;
79 };
80
81 enum muxer_notif_iter_clock_class_expectation {
82 MUXER_NOTIF_ITER_CLOCK_CLASS_EXPECTATION_ANY = 0,
83 MUXER_NOTIF_ITER_CLOCK_CLASS_EXPECTATION_ABSOLUTE,
84 MUXER_NOTIF_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_SPEC_UUID,
85 MUXER_NOTIF_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_NO_UUID,
86 };
87
88 struct muxer_notif_iter {
89 /*
90 * Array of struct muxer_upstream_notif_iter * (owned by this).
91 *
92 * NOTE: This array is searched in linearly to find the youngest
93 * current notification. Keep this until benchmarks confirm that
94 * another data structure is faster than this for our typical
95 * use cases.
96 */
97 GPtrArray *muxer_upstream_notif_iters;
98
99 /*
100 * List of "recently" connected input ports (weak) to
101 * handle by this muxer notification iterator.
102 * muxer_port_connected() adds entries to this list, and the
103 * entries are removed when a notification iterator is created
104 * on the port's connection and put into
105 * muxer_upstream_notif_iters above by
106 * muxer_notif_iter_handle_newly_connected_ports().
107 */
108 GList *newly_connected_priv_ports;
109
110 /* Next thing to return by the "next" method */
111 struct bt_notification_iterator_next_method_return next_next_return;
112
113 /* Last time returned in a notification */
114 int64_t last_returned_ts_ns;
115
116 /* Clock class expectation state */
117 enum muxer_notif_iter_clock_class_expectation clock_class_expectation;
118
119 /*
120 * Expected clock class UUID, only valid when
121 * clock_class_expectation is
122 * MUXER_NOTIF_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_SPEC_UUID.
123 */
124 unsigned char expected_clock_class_uuid[BABELTRACE_UUID_LEN];
125 };
126
127 static
128 void destroy_muxer_upstream_notif_iter(
129 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter)
130 {
131 if (!muxer_upstream_notif_iter) {
132 return;
133 }
134
135 BT_LOGD("Destroying muxer's upstream notification iterator wrapper: "
136 "addr=%p, notif-iter-addr=%p, is-valid=%d",
137 muxer_upstream_notif_iter,
138 muxer_upstream_notif_iter->notif_iter,
139 muxer_upstream_notif_iter->is_valid);
140 bt_put(muxer_upstream_notif_iter->notif_iter);
141 bt_put(muxer_upstream_notif_iter->notif);
142 g_free(muxer_upstream_notif_iter);
143 }
144
145 static
146 struct muxer_upstream_notif_iter *muxer_notif_iter_add_upstream_notif_iter(
147 struct muxer_notif_iter *muxer_notif_iter,
148 struct bt_notification_iterator *notif_iter,
149 struct bt_private_port *priv_port)
150 {
151 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter =
152 g_new0(struct muxer_upstream_notif_iter, 1);
153
154 if (!muxer_upstream_notif_iter) {
155 BT_LOGE_STR("Failed to allocate one muxer's upstream notification iterator wrapper.");
156 goto end;
157 }
158
159 muxer_upstream_notif_iter->notif_iter = bt_get(notif_iter);
160 muxer_upstream_notif_iter->is_valid = false;
161 g_ptr_array_add(muxer_notif_iter->muxer_upstream_notif_iters,
162 muxer_upstream_notif_iter);
163 BT_LOGD("Added muxer's upstream notification iterator wrapper: "
164 "addr=%p, muxer-notif-iter-addr=%p, notif-iter-addr=%p",
165 muxer_upstream_notif_iter, muxer_notif_iter,
166 notif_iter);
167
168 end:
169 return muxer_upstream_notif_iter;
170 }
171
172 static
173 enum bt_component_status ensure_available_input_port(
174 struct bt_private_component *priv_comp)
175 {
176 struct muxer_comp *muxer_comp =
177 bt_private_component_get_user_data(priv_comp);
178 enum bt_component_status status = BT_COMPONENT_STATUS_OK;
179 GString *port_name = NULL;
180
181 BT_ASSERT(muxer_comp);
182
183 if (muxer_comp->available_input_ports >= 1) {
184 goto end;
185 }
186
187 port_name = g_string_new("in");
188 if (!port_name) {
189 BT_LOGE_STR("Failed to allocate a GString.");
190 status = BT_COMPONENT_STATUS_NOMEM;
191 goto end;
192 }
193
194 g_string_append_printf(port_name, "%u", muxer_comp->next_port_num);
195 status = bt_private_component_filter_add_input_private_port(
196 priv_comp, port_name->str, NULL, NULL);
197 if (status != BT_COMPONENT_STATUS_OK) {
198 BT_LOGE("Cannot add input port to muxer component: "
199 "port-name=\"%s\", comp-addr=%p, status=%s",
200 port_name->str, priv_comp,
201 bt_component_status_string(status));
202 goto end;
203 }
204
205 muxer_comp->available_input_ports++;
206 muxer_comp->next_port_num++;
207 BT_LOGD("Added one input port to muxer component: "
208 "port-name=\"%s\", comp-addr=%p",
209 port_name->str, priv_comp);
210 end:
211 if (port_name) {
212 g_string_free(port_name, TRUE);
213 }
214
215 return status;
216 }
217
218 static
219 enum bt_component_status create_output_port(
220 struct bt_private_component *priv_comp)
221 {
222 return bt_private_component_filter_add_output_private_port(
223 priv_comp, "out", NULL, NULL);
224 }
225
226 static
227 void destroy_muxer_comp(struct muxer_comp *muxer_comp)
228 {
229 if (!muxer_comp) {
230 return;
231 }
232
233 BT_LOGD("Destroying muxer component: muxer-comp-addr=%p, "
234 "muxer-notif-iter-count=%u", muxer_comp,
235 muxer_comp->muxer_notif_iters ?
236 muxer_comp->muxer_notif_iters->len : 0);
237
238 if (muxer_comp->muxer_notif_iters) {
239 g_ptr_array_free(muxer_comp->muxer_notif_iters, TRUE);
240 }
241
242 g_free(muxer_comp);
243 }
244
245 static
246 struct bt_value *get_default_params(void)
247 {
248 struct bt_value *params;
249 int ret;
250
251 params = bt_value_map_create();
252 if (!params) {
253 BT_LOGE_STR("Cannot create a map value object.");
254 goto error;
255 }
256
257 ret = bt_value_map_insert_bool(params,
258 ASSUME_ABSOLUTE_CLOCK_CLASSES_PARAM_NAME, false);
259 if (ret) {
260 BT_LOGE_STR("Cannot add boolean value to map value object.");
261 goto error;
262 }
263
264 goto end;
265
266 error:
267 BT_PUT(params);
268
269 end:
270 return params;
271 }
272
273 static
274 int configure_muxer_comp(struct muxer_comp *muxer_comp, struct bt_value *params)
275 {
276 struct bt_value *default_params = NULL;
277 struct bt_value *real_params = NULL;
278 struct bt_value *assume_absolute_clock_classes = NULL;
279 int ret = 0;
280 bt_bool bool_val;
281
282 default_params = get_default_params();
283 if (!default_params) {
284 BT_LOGE("Cannot get default parameters: "
285 "muxer-comp-addr=%p", muxer_comp);
286 goto error;
287 }
288
289 real_params = bt_value_map_extend(default_params, params);
290 if (!real_params) {
291 BT_LOGE("Cannot extend default parameters map value: "
292 "muxer-comp-addr=%p, def-params-addr=%p, "
293 "params-addr=%p", muxer_comp, default_params,
294 params);
295 goto error;
296 }
297
298 assume_absolute_clock_classes = bt_value_map_borrow(real_params,
299 ASSUME_ABSOLUTE_CLOCK_CLASSES_PARAM_NAME);
300 if (assume_absolute_clock_classes &&
301 !bt_value_is_bool(assume_absolute_clock_classes)) {
302 BT_LOGE("Expecting a boolean value for the `%s` parameter: "
303 "muxer-comp-addr=%p, value-type=%s",
304 ASSUME_ABSOLUTE_CLOCK_CLASSES_PARAM_NAME, muxer_comp,
305 bt_value_type_string(
306 bt_value_get_type(assume_absolute_clock_classes)));
307 goto error;
308 }
309
310 ret = bt_value_bool_get(assume_absolute_clock_classes, &bool_val);
311 BT_ASSERT(ret == 0);
312 muxer_comp->assume_absolute_clock_classes = (bool) bool_val;
313 BT_LOGD("Configured muxer component: muxer-comp-addr=%p, "
314 "assume-absolute-clock-classes=%d",
315 muxer_comp, muxer_comp->assume_absolute_clock_classes);
316 goto end;
317
318 error:
319 ret = -1;
320
321 end:
322 bt_put(default_params);
323 bt_put(real_params);
324 return ret;
325 }
326
327 BT_HIDDEN
328 enum bt_component_status muxer_init(
329 struct bt_private_component *priv_comp,
330 struct bt_value *params, void *init_data)
331 {
332 int ret;
333 enum bt_component_status status = BT_COMPONENT_STATUS_OK;
334 struct muxer_comp *muxer_comp = g_new0(struct muxer_comp, 1);
335
336 BT_LOGD("Initializing muxer component: "
337 "comp-addr=%p, params-addr=%p", priv_comp, params);
338
339 if (!muxer_comp) {
340 BT_LOGE_STR("Failed to allocate one muxer component.");
341 goto error;
342 }
343
344 ret = configure_muxer_comp(muxer_comp, params);
345 if (ret) {
346 BT_LOGE("Cannot configure muxer component: "
347 "muxer-comp-addr=%p, params-addr=%p",
348 muxer_comp, params);
349 goto error;
350 }
351
352 muxer_comp->muxer_notif_iters = g_ptr_array_new();
353 if (!muxer_comp->muxer_notif_iters) {
354 BT_LOGE_STR("Failed to allocate a GPtrArray.");
355 goto error;
356 }
357
358 muxer_comp->priv_comp = priv_comp;
359 ret = bt_private_component_set_user_data(priv_comp, muxer_comp);
360 BT_ASSERT(ret == 0);
361 status = ensure_available_input_port(priv_comp);
362 if (status != BT_COMPONENT_STATUS_OK) {
363 BT_LOGE("Cannot ensure that at least one muxer component's input port is available: "
364 "muxer-comp-addr=%p, status=%s",
365 muxer_comp,
366 bt_component_status_string(status));
367 goto error;
368 }
369
370 status = create_output_port(priv_comp);
371 if (status) {
372 BT_LOGE("Cannot create muxer component's output port: "
373 "muxer-comp-addr=%p, status=%s",
374 muxer_comp,
375 bt_component_status_string(status));
376 goto error;
377 }
378
379 BT_LOGD("Initialized muxer component: "
380 "comp-addr=%p, params-addr=%p, muxer-comp-addr=%p",
381 priv_comp, params, muxer_comp);
382
383 goto end;
384
385 error:
386 destroy_muxer_comp(muxer_comp);
387 ret = bt_private_component_set_user_data(priv_comp, NULL);
388 BT_ASSERT(ret == 0);
389
390 if (status == BT_COMPONENT_STATUS_OK) {
391 status = BT_COMPONENT_STATUS_ERROR;
392 }
393
394 end:
395 return status;
396 }
397
398 BT_HIDDEN
399 void muxer_finalize(struct bt_private_component *priv_comp)
400 {
401 struct muxer_comp *muxer_comp =
402 bt_private_component_get_user_data(priv_comp);
403
404 BT_LOGD("Finalizing muxer component: comp-addr=%p",
405 priv_comp);
406 destroy_muxer_comp(muxer_comp);
407 }
408
409 static
410 struct bt_notification_iterator *create_notif_iter_on_input_port(
411 struct bt_private_port *priv_port, int *ret)
412 {
413 struct bt_port *port = bt_port_borrow_from_private(priv_port);
414 struct bt_notification_iterator *notif_iter = NULL;
415 struct bt_private_connection *priv_conn = NULL;
416 enum bt_connection_status conn_status;
417
418 BT_ASSERT(ret);
419 *ret = 0;
420 BT_ASSERT(port);
421 BT_ASSERT(bt_port_is_connected(port));
422 priv_conn = bt_private_port_get_private_connection(priv_port);
423 BT_ASSERT(priv_conn);
424
425 // TODO: Advance the iterator to >= the time of the latest
426 // returned notification by the muxer notification
427 // iterator which creates it.
428 conn_status = bt_private_connection_create_notification_iterator(
429 priv_conn, &notif_iter);
430 if (conn_status != BT_CONNECTION_STATUS_OK) {
431 BT_LOGE("Cannot create upstream notification iterator on input port's connection: "
432 "port-addr=%p, port-name=\"%s\", conn-addr=%p, "
433 "status=%s",
434 port, bt_port_get_name(port), priv_conn,
435 bt_connection_status_string(conn_status));
436 *ret = -1;
437 goto end;
438 }
439
440 BT_LOGD("Created upstream notification iterator on input port's connection: "
441 "port-addr=%p, port-name=\"%s\", conn-addr=%p, "
442 "notif-iter-addr=%p",
443 port, bt_port_get_name(port), priv_conn, notif_iter);
444
445 end:
446 bt_put(priv_conn);
447 return notif_iter;
448 }
449
450 static
451 enum bt_notification_iterator_status muxer_upstream_notif_iter_next(
452 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter)
453 {
454 enum bt_notification_iterator_status status;
455 struct bt_notification *notif = NULL;
456
457 BT_LOGV("Calling upstream notification iterator's \"next\" method: "
458 "muxer-upstream-notif-iter-wrap-addr=%p, notif-iter-addr=%p",
459 muxer_upstream_notif_iter,
460 muxer_upstream_notif_iter->notif_iter);
461 status = bt_private_connection_notification_iterator_next(
462 muxer_upstream_notif_iter->notif_iter, &notif);
463 BT_LOGV("Upstream notification iterator's \"next\" method returned: "
464 "status=%s", bt_notification_iterator_status_string(status));
465
466 switch (status) {
467 case BT_NOTIFICATION_ITERATOR_STATUS_OK:
468 /*
469 * Notification iterator's current notification is valid:
470 * it must be considered for muxing operations.
471 */
472 BT_LOGV_STR("Validated upstream notification iterator wrapper.");
473 muxer_upstream_notif_iter->is_valid = true;
474 BT_MOVE(muxer_upstream_notif_iter->notif, notif);
475 break;
476 case BT_NOTIFICATION_ITERATOR_STATUS_AGAIN:
477 /*
478 * Notification iterator's current notification is not
479 * valid anymore. Return
480 * BT_NOTIFICATION_ITERATOR_STATUS_AGAIN
481 * immediately.
482 */
483 BT_LOGV_STR("Invalidated upstream notification iterator wrapper because of BT_NOTIFICATION_ITERATOR_STATUS_AGAIN.");
484 muxer_upstream_notif_iter->is_valid = false;
485 break;
486 case BT_NOTIFICATION_ITERATOR_STATUS_END: /* Fall-through. */
487 case BT_NOTIFICATION_ITERATOR_STATUS_CANCELED:
488 /*
489 * Notification iterator reached the end: release it. It
490 * won't be considered again to find the youngest
491 * notification.
492 */
493 BT_LOGV_STR("Invalidated upstream notification iterator wrapper because of BT_NOTIFICATION_ITERATOR_STATUS_END or BT_NOTIFICATION_ITERATOR_STATUS_CANCELED.");
494 BT_PUT(muxer_upstream_notif_iter->notif_iter);
495 muxer_upstream_notif_iter->is_valid = false;
496 status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
497 break;
498 default:
499 /* Error or unsupported status code */
500 BT_LOGE("Error or unsupported status code: "
501 "status-code=%d", status);
502 status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
503 break;
504 }
505
506 BT_ASSERT(!notif);
507 return status;
508 }
509
510 static
511 int muxer_notif_iter_handle_newly_connected_ports(
512 struct muxer_notif_iter *muxer_notif_iter)
513 {
514 int ret = 0;
515
516 BT_LOGV("Handling newly connected ports: "
517 "muxer-notif-iter-addr=%p", muxer_notif_iter);
518
519 /*
520 * Here we create one upstream notification iterator for each
521 * newly connected port. We do not perform an initial "next" on
522 * those new upstream notification iterators: they are
523 * invalidated, to be validated later. The list of newly
524 * connected ports to handle here is updated by
525 * muxer_port_connected().
526 */
527 while (true) {
528 GList *node = muxer_notif_iter->newly_connected_priv_ports;
529 struct bt_private_port *priv_port;
530 struct bt_port *port;
531 struct bt_notification_iterator *upstream_notif_iter = NULL;
532 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter;
533
534 if (!node) {
535 break;
536 }
537
538 priv_port = node->data;
539 port = bt_port_borrow_from_private(priv_port);
540 BT_ASSERT(port);
541
542 if (!bt_port_is_connected(port)) {
543 /*
544 * Looks like this port is not connected
545 * anymore: we can't create an upstream
546 * notification iterator on its (non-existing)
547 * connection in this case.
548 */
549 goto remove_node;
550 }
551
552 upstream_notif_iter = create_notif_iter_on_input_port(priv_port,
553 &ret);
554 if (ret) {
555 /* create_notif_iter_on_input_port() logs errors */
556 BT_ASSERT(!upstream_notif_iter);
557 goto error;
558 }
559
560 muxer_upstream_notif_iter =
561 muxer_notif_iter_add_upstream_notif_iter(
562 muxer_notif_iter, upstream_notif_iter,
563 priv_port);
564 BT_PUT(upstream_notif_iter);
565 if (!muxer_upstream_notif_iter) {
566 /*
567 * muxer_notif_iter_add_upstream_notif_iter()
568 * logs errors.
569 */
570 goto error;
571 }
572
573 remove_node:
574 bt_put(upstream_notif_iter);
575 muxer_notif_iter->newly_connected_priv_ports =
576 g_list_delete_link(
577 muxer_notif_iter->newly_connected_priv_ports,
578 node);
579 }
580
581 goto end;
582
583 error:
584 if (ret >= 0) {
585 ret = -1;
586 }
587
588 end:
589 return ret;
590 }
591
592 static
593 int get_notif_ts_ns(struct muxer_comp *muxer_comp,
594 struct muxer_notif_iter *muxer_notif_iter,
595 struct bt_notification *notif, int64_t last_returned_ts_ns,
596 int64_t *ts_ns)
597 {
598 struct bt_clock_class_priority_map *cc_prio_map = NULL;
599 struct bt_clock_class *clock_class = NULL;
600 struct bt_clock_value *clock_value = NULL;
601 struct bt_event *event = NULL;
602 int ret = 0;
603 const unsigned char *cc_uuid;
604 const char *cc_name;
605
606 BT_ASSERT(notif);
607 BT_ASSERT(ts_ns);
608
609 BT_LOGV("Getting notification's timestamp: "
610 "muxer-notif-iter-addr=%p, notif-addr=%p, "
611 "last-returned-ts=%" PRId64,
612 muxer_notif_iter, notif, last_returned_ts_ns);
613
614 switch (bt_notification_get_type(notif)) {
615 case BT_NOTIFICATION_TYPE_EVENT:
616 cc_prio_map =
617 bt_notification_event_borrow_clock_class_priority_map(
618 notif);
619 break;
620
621 case BT_NOTIFICATION_TYPE_INACTIVITY:
622 cc_prio_map =
623 bt_notification_inactivity_borrow_clock_class_priority_map(
624 notif);
625 break;
626 default:
627 /* All the other notifications have a higher priority */
628 BT_LOGV_STR("Notification has no timestamp: using the last returned timestamp.");
629 *ts_ns = last_returned_ts_ns;
630 goto end;
631 }
632
633 if (!cc_prio_map) {
634 BT_LOGE("Cannot get notification's clock class priority map: "
635 "notif-addr=%p", notif);
636 goto error;
637 }
638
639 /*
640 * If the clock class priority map is empty, then we consider
641 * that this notification has no time. In this case it's always
642 * the youngest.
643 */
644 if (bt_clock_class_priority_map_get_clock_class_count(cc_prio_map) == 0) {
645 BT_LOGV_STR("Notification's clock class priority map contains 0 clock classes: "
646 "using the last returned timestamp.");
647 *ts_ns = last_returned_ts_ns;
648 goto end;
649 }
650
651 clock_class =
652 bt_clock_class_priority_map_borrow_highest_priority_clock_class(
653 cc_prio_map);
654 if (!clock_class) {
655 BT_LOGE("Cannot get the clock class with the highest priority from clock class priority map: "
656 "cc-prio-map-addr=%p", cc_prio_map);
657 goto error;
658 }
659
660 cc_uuid = bt_clock_class_get_uuid(clock_class);
661 cc_name = bt_clock_class_get_name(clock_class);
662
663 if (muxer_notif_iter->clock_class_expectation ==
664 MUXER_NOTIF_ITER_CLOCK_CLASS_EXPECTATION_ANY) {
665 /*
666 * This is the first clock class that this muxer
667 * notification iterator encounters. Its properties
668 * determine what to expect for the whole lifetime of
669 * the iterator without a true
670 * `assume-absolute-clock-classes` parameter.
671 */
672 if (bt_clock_class_is_absolute(clock_class)) {
673 /* Expect absolute clock classes */
674 muxer_notif_iter->clock_class_expectation =
675 MUXER_NOTIF_ITER_CLOCK_CLASS_EXPECTATION_ABSOLUTE;
676 } else {
677 if (cc_uuid) {
678 /*
679 * Expect non-absolute clock classes
680 * with a specific UUID.
681 */
682 muxer_notif_iter->clock_class_expectation =
683 MUXER_NOTIF_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_SPEC_UUID;
684 memcpy(muxer_notif_iter->expected_clock_class_uuid,
685 cc_uuid, BABELTRACE_UUID_LEN);
686 } else {
687 /*
688 * Expect non-absolute clock classes
689 * with no UUID.
690 */
691 muxer_notif_iter->clock_class_expectation =
692 MUXER_NOTIF_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_NO_UUID;
693 }
694 }
695 }
696
697 if (!muxer_comp->assume_absolute_clock_classes) {
698 switch (muxer_notif_iter->clock_class_expectation) {
699 case MUXER_NOTIF_ITER_CLOCK_CLASS_EXPECTATION_ABSOLUTE:
700 if (!bt_clock_class_is_absolute(clock_class)) {
701 BT_LOGE("Expecting an absolute clock class, "
702 "but got a non-absolute one: "
703 "clock-class-addr=%p, clock-class-name=\"%s\"",
704 clock_class, cc_name);
705 goto error;
706 }
707 break;
708 case MUXER_NOTIF_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_NO_UUID:
709 if (bt_clock_class_is_absolute(clock_class)) {
710 BT_LOGE("Expecting a non-absolute clock class with no UUID, "
711 "but got an absolute one: "
712 "clock-class-addr=%p, clock-class-name=\"%s\"",
713 clock_class, cc_name);
714 goto error;
715 }
716
717 if (cc_uuid) {
718 BT_LOGE("Expecting a non-absolute clock class with no UUID, "
719 "but got one with a UUID: "
720 "clock-class-addr=%p, clock-class-name=\"%s\", "
721 "uuid=\"%02x%02x%02x%02x-%02x%02x-%02x%02x-%02x%02x-%02x%02x%02x%02x%02x%02x\"",
722 clock_class, cc_name,
723 (unsigned int) cc_uuid[0],
724 (unsigned int) cc_uuid[1],
725 (unsigned int) cc_uuid[2],
726 (unsigned int) cc_uuid[3],
727 (unsigned int) cc_uuid[4],
728 (unsigned int) cc_uuid[5],
729 (unsigned int) cc_uuid[6],
730 (unsigned int) cc_uuid[7],
731 (unsigned int) cc_uuid[8],
732 (unsigned int) cc_uuid[9],
733 (unsigned int) cc_uuid[10],
734 (unsigned int) cc_uuid[11],
735 (unsigned int) cc_uuid[12],
736 (unsigned int) cc_uuid[13],
737 (unsigned int) cc_uuid[14],
738 (unsigned int) cc_uuid[15]);
739 goto error;
740 }
741 break;
742 case MUXER_NOTIF_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_SPEC_UUID:
743 if (bt_clock_class_is_absolute(clock_class)) {
744 BT_LOGE("Expecting a non-absolute clock class with a specific UUID, "
745 "but got an absolute one: "
746 "clock-class-addr=%p, clock-class-name=\"%s\"",
747 clock_class, cc_name);
748 goto error;
749 }
750
751 if (!cc_uuid) {
752 BT_LOGE("Expecting a non-absolute clock class with a specific UUID, "
753 "but got one with no UUID: "
754 "clock-class-addr=%p, clock-class-name=\"%s\"",
755 clock_class, cc_name);
756 goto error;
757 }
758
759 if (memcmp(muxer_notif_iter->expected_clock_class_uuid,
760 cc_uuid, BABELTRACE_UUID_LEN) != 0) {
761 BT_LOGE("Expecting a non-absolute clock class with a specific UUID, "
762 "but got one with different UUID: "
763 "clock-class-addr=%p, clock-class-name=\"%s\", "
764 "expected-uuid=\"%02x%02x%02x%02x-%02x%02x-%02x%02x-%02x%02x-%02x%02x%02x%02x%02x%02x\", "
765 "uuid=\"%02x%02x%02x%02x-%02x%02x-%02x%02x-%02x%02x-%02x%02x%02x%02x%02x%02x\"",
766 clock_class, cc_name,
767 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[0],
768 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[1],
769 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[2],
770 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[3],
771 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[4],
772 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[5],
773 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[6],
774 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[7],
775 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[8],
776 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[9],
777 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[10],
778 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[11],
779 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[12],
780 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[13],
781 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[14],
782 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[15],
783 (unsigned int) cc_uuid[0],
784 (unsigned int) cc_uuid[1],
785 (unsigned int) cc_uuid[2],
786 (unsigned int) cc_uuid[3],
787 (unsigned int) cc_uuid[4],
788 (unsigned int) cc_uuid[5],
789 (unsigned int) cc_uuid[6],
790 (unsigned int) cc_uuid[7],
791 (unsigned int) cc_uuid[8],
792 (unsigned int) cc_uuid[9],
793 (unsigned int) cc_uuid[10],
794 (unsigned int) cc_uuid[11],
795 (unsigned int) cc_uuid[12],
796 (unsigned int) cc_uuid[13],
797 (unsigned int) cc_uuid[14],
798 (unsigned int) cc_uuid[15]);
799 goto error;
800 }
801 break;
802 default:
803 /* Unexpected */
804 BT_LOGF("Unexpected clock class expectation: "
805 "expectation-code=%d",
806 muxer_notif_iter->clock_class_expectation);
807 abort();
808 }
809 }
810
811 switch (bt_notification_get_type(notif)) {
812 case BT_NOTIFICATION_TYPE_EVENT:
813 event = bt_notification_event_borrow_event(notif);
814 BT_ASSERT(event);
815 clock_value = bt_event_borrow_clock_value(event,
816 clock_class);
817 break;
818 case BT_NOTIFICATION_TYPE_INACTIVITY:
819 clock_value = bt_notification_inactivity_borrow_clock_value(
820 notif, clock_class);
821 break;
822 default:
823 BT_LOGF("Unexpected notification type at this point: "
824 "type=%d", bt_notification_get_type(notif));
825 abort();
826 }
827
828 if (!clock_value) {
829 BT_LOGE("Cannot get notification's clock value for clock class: "
830 "clock-class-addr=%p, clock-class-name=\"%s\"",
831 clock_class, cc_name);
832 goto error;
833 }
834
835 ret = bt_clock_value_get_value_ns_from_epoch(clock_value, ts_ns);
836 if (ret) {
837 BT_LOGE("Cannot get nanoseconds from Epoch of clock value: "
838 "clock-value-addr=%p", clock_value);
839 goto error;
840 }
841
842 goto end;
843
844 error:
845 ret = -1;
846
847 end:
848 if (ret == 0) {
849 BT_LOGV("Found notification's timestamp: "
850 "muxer-notif-iter-addr=%p, notif-addr=%p, "
851 "last-returned-ts=%" PRId64 ", ts=%" PRId64,
852 muxer_notif_iter, notif, last_returned_ts_ns,
853 *ts_ns);
854 }
855
856 return ret;
857 }
858
859 /*
860 * This function finds the youngest available notification amongst the
861 * non-ended upstream notification iterators and returns the upstream
862 * notification iterator which has it, or
863 * BT_NOTIFICATION_ITERATOR_STATUS_END if there's no available
864 * notification.
865 *
866 * This function does NOT:
867 *
868 * * Update any upstream notification iterator.
869 * * Check for newly connected ports.
870 * * Check the upstream notification iterators to retry.
871 *
872 * On sucess, this function sets *muxer_upstream_notif_iter to the
873 * upstream notification iterator of which the current notification is
874 * the youngest, and sets *ts_ns to its time.
875 */
876 static
877 enum bt_notification_iterator_status
878 muxer_notif_iter_youngest_upstream_notif_iter(
879 struct muxer_comp *muxer_comp,
880 struct muxer_notif_iter *muxer_notif_iter,
881 struct muxer_upstream_notif_iter **muxer_upstream_notif_iter,
882 int64_t *ts_ns)
883 {
884 size_t i;
885 int ret;
886 int64_t youngest_ts_ns = INT64_MAX;
887 enum bt_notification_iterator_status status =
888 BT_NOTIFICATION_ITERATOR_STATUS_OK;
889
890 BT_ASSERT(muxer_comp);
891 BT_ASSERT(muxer_notif_iter);
892 BT_ASSERT(muxer_upstream_notif_iter);
893 *muxer_upstream_notif_iter = NULL;
894
895 for (i = 0; i < muxer_notif_iter->muxer_upstream_notif_iters->len; i++) {
896 struct bt_notification *notif;
897 struct muxer_upstream_notif_iter *cur_muxer_upstream_notif_iter =
898 g_ptr_array_index(muxer_notif_iter->muxer_upstream_notif_iters, i);
899 int64_t notif_ts_ns;
900
901 if (!cur_muxer_upstream_notif_iter->notif_iter) {
902 /* This upstream notification iterator is ended */
903 BT_LOGV("Skipping ended upstream notification iterator: "
904 "muxer-upstream-notif-iter-wrap-addr=%p",
905 cur_muxer_upstream_notif_iter);
906 continue;
907 }
908
909 BT_ASSERT(cur_muxer_upstream_notif_iter->is_valid);
910 notif = cur_muxer_upstream_notif_iter->notif;
911 BT_ASSERT(notif);
912 ret = get_notif_ts_ns(muxer_comp, muxer_notif_iter, notif,
913 muxer_notif_iter->last_returned_ts_ns, &notif_ts_ns);
914 if (ret) {
915 /* get_notif_ts_ns() logs errors */
916 *muxer_upstream_notif_iter = NULL;
917 status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
918 goto end;
919 }
920
921 if (notif_ts_ns <= youngest_ts_ns) {
922 *muxer_upstream_notif_iter =
923 cur_muxer_upstream_notif_iter;
924 youngest_ts_ns = notif_ts_ns;
925 *ts_ns = youngest_ts_ns;
926 }
927 }
928
929 if (!*muxer_upstream_notif_iter) {
930 status = BT_NOTIFICATION_ITERATOR_STATUS_END;
931 *ts_ns = INT64_MIN;
932 }
933
934 end:
935 return status;
936 }
937
938 static
939 enum bt_notification_iterator_status validate_muxer_upstream_notif_iter(
940 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter)
941 {
942 enum bt_notification_iterator_status status =
943 BT_NOTIFICATION_ITERATOR_STATUS_OK;
944
945 BT_LOGV("Validating muxer's upstream notification iterator wrapper: "
946 "muxer-upstream-notif-iter-wrap-addr=%p",
947 muxer_upstream_notif_iter);
948
949 if (muxer_upstream_notif_iter->is_valid ||
950 !muxer_upstream_notif_iter->notif_iter) {
951 goto end;
952 }
953
954 /* muxer_upstream_notif_iter_next() logs details/errors */
955 status = muxer_upstream_notif_iter_next(muxer_upstream_notif_iter);
956
957 end:
958 return status;
959 }
960
961 static
962 enum bt_notification_iterator_status validate_muxer_upstream_notif_iters(
963 struct muxer_notif_iter *muxer_notif_iter)
964 {
965 enum bt_notification_iterator_status status =
966 BT_NOTIFICATION_ITERATOR_STATUS_OK;
967 size_t i;
968
969 BT_LOGV("Validating muxer's upstream notification iterator wrappers: "
970 "muxer-notif-iter-addr=%p", muxer_notif_iter);
971
972 for (i = 0; i < muxer_notif_iter->muxer_upstream_notif_iters->len; i++) {
973 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter =
974 g_ptr_array_index(
975 muxer_notif_iter->muxer_upstream_notif_iters,
976 i);
977
978 status = validate_muxer_upstream_notif_iter(
979 muxer_upstream_notif_iter);
980 if (status != BT_NOTIFICATION_ITERATOR_STATUS_OK) {
981 if (status < 0) {
982 BT_LOGE("Cannot validate muxer's upstream notification iterator wrapper: "
983 "muxer-notif-iter-addr=%p, "
984 "muxer-upstream-notif-iter-wrap-addr=%p",
985 muxer_notif_iter,
986 muxer_upstream_notif_iter);
987 } else {
988 BT_LOGV("Cannot validate muxer's upstream notification iterator wrapper: "
989 "muxer-notif-iter-addr=%p, "
990 "muxer-upstream-notif-iter-wrap-addr=%p",
991 muxer_notif_iter,
992 muxer_upstream_notif_iter);
993 }
994
995 goto end;
996 }
997
998 /*
999 * Remove this muxer upstream notification iterator
1000 * if it's ended or canceled.
1001 */
1002 if (!muxer_upstream_notif_iter->notif_iter) {
1003 /*
1004 * Use g_ptr_array_remove_fast() because the
1005 * order of those elements is not important.
1006 */
1007 BT_LOGV("Removing muxer's upstream notification iterator wrapper: ended or canceled: "
1008 "muxer-notif-iter-addr=%p, "
1009 "muxer-upstream-notif-iter-wrap-addr=%p",
1010 muxer_notif_iter, muxer_upstream_notif_iter);
1011 g_ptr_array_remove_index_fast(
1012 muxer_notif_iter->muxer_upstream_notif_iters,
1013 i);
1014 i--;
1015 }
1016 }
1017
1018 end:
1019 return status;
1020 }
1021
1022 static
1023 struct bt_notification_iterator_next_method_return muxer_notif_iter_do_next(
1024 struct muxer_comp *muxer_comp,
1025 struct muxer_notif_iter *muxer_notif_iter)
1026 {
1027 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter = NULL;
1028 struct bt_notification_iterator_next_method_return next_return = {
1029 .notification = NULL,
1030 .status = BT_NOTIFICATION_ITERATOR_STATUS_OK,
1031 };
1032 int64_t next_return_ts;
1033
1034 while (true) {
1035 int ret = muxer_notif_iter_handle_newly_connected_ports(
1036 muxer_notif_iter);
1037
1038 if (ret) {
1039 BT_LOGE("Cannot handle newly connected input ports for muxer's notification iterator: "
1040 "muxer-comp-addr=%p, muxer-notif-iter-addr=%p, "
1041 "ret=%d",
1042 muxer_comp, muxer_notif_iter, ret);
1043 next_return.status =
1044 BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
1045 goto end;
1046 }
1047
1048 next_return.status =
1049 validate_muxer_upstream_notif_iters(muxer_notif_iter);
1050 if (next_return.status != BT_NOTIFICATION_ITERATOR_STATUS_OK) {
1051 /* validate_muxer_upstream_notif_iters() logs details */
1052 goto end;
1053 }
1054
1055 /*
1056 * At this point, we know that all the existing upstream
1057 * notification iterators are valid. However the
1058 * operations to validate them (during
1059 * validate_muxer_upstream_notif_iters()) may have
1060 * connected new ports. If no ports were connected
1061 * during this operation, exit the loop.
1062 */
1063 if (!muxer_notif_iter->newly_connected_priv_ports) {
1064 BT_LOGV("Not breaking this loop: muxer's notification iterator still has newly connected input ports to handle: "
1065 "muxer-comp-addr=%p", muxer_comp);
1066 break;
1067 }
1068 }
1069
1070 BT_ASSERT(!muxer_notif_iter->newly_connected_priv_ports);
1071
1072 /*
1073 * At this point we know that all the existing upstream
1074 * notification iterators are valid. We can find the one,
1075 * amongst those, of which the current notification is the
1076 * youngest.
1077 */
1078 next_return.status =
1079 muxer_notif_iter_youngest_upstream_notif_iter(muxer_comp,
1080 muxer_notif_iter, &muxer_upstream_notif_iter,
1081 &next_return_ts);
1082 if (next_return.status < 0 ||
1083 next_return.status == BT_NOTIFICATION_ITERATOR_STATUS_END ||
1084 next_return.status == BT_NOTIFICATION_ITERATOR_STATUS_CANCELED) {
1085 if (next_return.status < 0) {
1086 BT_LOGE("Cannot find the youngest upstream notification iterator wrapper: "
1087 "status=%s",
1088 bt_notification_iterator_status_string(next_return.status));
1089 } else {
1090 BT_LOGV("Cannot find the youngest upstream notification iterator wrapper: "
1091 "status=%s",
1092 bt_notification_iterator_status_string(next_return.status));
1093 }
1094
1095 goto end;
1096 }
1097
1098 if (next_return_ts < muxer_notif_iter->last_returned_ts_ns) {
1099 BT_LOGE("Youngest upstream notification iterator wrapper's timestamp is less than muxer's notification iterator's last returned timestamp: "
1100 "muxer-notif-iter-addr=%p, ts=%" PRId64 ", "
1101 "last-returned-ts=%" PRId64,
1102 muxer_notif_iter, next_return_ts,
1103 muxer_notif_iter->last_returned_ts_ns);
1104 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
1105 goto end;
1106 }
1107
1108 BT_LOGV("Found youngest upstream notification iterator wrapper: "
1109 "muxer-notif-iter-addr=%p, "
1110 "muxer-upstream-notif-iter-wrap-addr=%p, "
1111 "ts=%" PRId64,
1112 muxer_notif_iter, muxer_upstream_notif_iter, next_return_ts);
1113 BT_ASSERT(next_return.status == BT_NOTIFICATION_ITERATOR_STATUS_OK);
1114 BT_ASSERT(muxer_upstream_notif_iter);
1115 next_return.notification = bt_get(muxer_upstream_notif_iter->notif);
1116 BT_ASSERT(next_return.notification);
1117
1118 /*
1119 * We invalidate the upstream notification iterator so that, the
1120 * next time this function is called,
1121 * validate_muxer_upstream_notif_iters() will make it valid.
1122 */
1123 muxer_upstream_notif_iter->is_valid = false;
1124 muxer_notif_iter->last_returned_ts_ns = next_return_ts;
1125
1126 end:
1127 return next_return;
1128 }
1129
1130 static
1131 void destroy_muxer_notif_iter(struct muxer_notif_iter *muxer_notif_iter)
1132 {
1133 if (!muxer_notif_iter) {
1134 return;
1135 }
1136
1137 BT_LOGD("Destroying muxer component's notification iterator: "
1138 "muxer-notif-iter-addr=%p", muxer_notif_iter);
1139
1140 if (muxer_notif_iter->muxer_upstream_notif_iters) {
1141 BT_LOGD_STR("Destroying muxer's upstream notification iterator wrappers.");
1142 g_ptr_array_free(
1143 muxer_notif_iter->muxer_upstream_notif_iters, TRUE);
1144 }
1145
1146 g_list_free(muxer_notif_iter->newly_connected_priv_ports);
1147 g_free(muxer_notif_iter);
1148 }
1149
1150 static
1151 int muxer_notif_iter_init_newly_connected_ports(struct muxer_comp *muxer_comp,
1152 struct muxer_notif_iter *muxer_notif_iter)
1153 {
1154 struct bt_component *comp;
1155 int64_t count;
1156 int64_t i;
1157 int ret = 0;
1158
1159 /*
1160 * Add the connected input ports to this muxer notification
1161 * iterator's list of newly connected ports. They will be
1162 * handled by muxer_notif_iter_handle_newly_connected_ports().
1163 */
1164 comp = bt_component_borrow_from_private(muxer_comp->priv_comp);
1165 BT_ASSERT(comp);
1166 count = bt_component_filter_get_input_port_count(comp);
1167 if (count < 0) {
1168 BT_LOGD("No input port to initialize for muxer component's notification iterator: "
1169 "muxer-comp-addr=%p, muxer-notif-iter-addr=%p",
1170 muxer_comp, muxer_notif_iter);
1171 goto end;
1172 }
1173
1174 for (i = 0; i < count; i++) {
1175 struct bt_private_port *priv_port =
1176 bt_private_component_filter_get_input_private_port_by_index(
1177 muxer_comp->priv_comp, i);
1178 struct bt_port *port;
1179
1180 BT_ASSERT(priv_port);
1181 port = bt_port_borrow_from_private(priv_port);
1182 BT_ASSERT(port);
1183
1184 if (!bt_port_is_connected(port)) {
1185 BT_LOGD("Skipping input port: not connected: "
1186 "muxer-comp-addr=%p, port-addr=%p, port-name\"%s\"",
1187 muxer_comp, port, bt_port_get_name(port));
1188 bt_put(priv_port);
1189 continue;
1190 }
1191
1192 bt_put(priv_port);
1193 muxer_notif_iter->newly_connected_priv_ports =
1194 g_list_append(
1195 muxer_notif_iter->newly_connected_priv_ports,
1196 priv_port);
1197 if (!muxer_notif_iter->newly_connected_priv_ports) {
1198 BT_LOGE("Cannot append port to muxer's notification iterator list of newly connected input ports: "
1199 "port-addr=%p, port-name=\"%s\", "
1200 "muxer-notif-iter-addr=%p", port,
1201 bt_port_get_name(port), muxer_notif_iter);
1202 ret = -1;
1203 goto end;
1204 }
1205
1206 BT_LOGD("Appended port to muxer's notification iterator list of newly connected input ports: "
1207 "port-addr=%p, port-name=\"%s\", "
1208 "muxer-notif-iter-addr=%p", port,
1209 bt_port_get_name(port), muxer_notif_iter);
1210 }
1211
1212 end:
1213 return ret;
1214 }
1215
1216 BT_HIDDEN
1217 enum bt_notification_iterator_status muxer_notif_iter_init(
1218 struct bt_private_connection_private_notification_iterator *priv_notif_iter,
1219 struct bt_private_port *output_priv_port)
1220 {
1221 struct muxer_comp *muxer_comp = NULL;
1222 struct muxer_notif_iter *muxer_notif_iter = NULL;
1223 struct bt_private_component *priv_comp = NULL;
1224 enum bt_notification_iterator_status status =
1225 BT_NOTIFICATION_ITERATOR_STATUS_OK;
1226 int ret;
1227
1228 priv_comp = bt_private_connection_private_notification_iterator_get_private_component(
1229 priv_notif_iter);
1230 BT_ASSERT(priv_comp);
1231 muxer_comp = bt_private_component_get_user_data(priv_comp);
1232 BT_ASSERT(muxer_comp);
1233 BT_LOGD("Initializing muxer component's notification iterator: "
1234 "comp-addr=%p, muxer-comp-addr=%p, notif-iter-addr=%p",
1235 priv_comp, muxer_comp, priv_notif_iter);
1236
1237 if (muxer_comp->initializing_muxer_notif_iter) {
1238 /*
1239 * Weird, unhandled situation detected: downstream
1240 * creates a muxer notification iterator while creating
1241 * another muxer notification iterator (same component).
1242 */
1243 BT_LOGE("Recursive initialization of muxer component's notification iterator: "
1244 "comp-addr=%p, muxer-comp-addr=%p, notif-iter-addr=%p",
1245 priv_comp, muxer_comp, priv_notif_iter);
1246 goto error;
1247 }
1248
1249 muxer_comp->initializing_muxer_notif_iter = true;
1250 muxer_notif_iter = g_new0(struct muxer_notif_iter, 1);
1251 if (!muxer_notif_iter) {
1252 BT_LOGE_STR("Failed to allocate one muxer component's notification iterator.");
1253 goto error;
1254 }
1255
1256 muxer_notif_iter->last_returned_ts_ns = INT64_MIN;
1257 muxer_notif_iter->muxer_upstream_notif_iters =
1258 g_ptr_array_new_with_free_func(
1259 (GDestroyNotify) destroy_muxer_upstream_notif_iter);
1260 if (!muxer_notif_iter->muxer_upstream_notif_iters) {
1261 BT_LOGE_STR("Failed to allocate a GPtrArray.");
1262 goto error;
1263 }
1264
1265 /*
1266 * Add the muxer notification iterator to the component's array
1267 * of muxer notification iterators here because
1268 * muxer_notif_iter_init_newly_connected_ports() can cause
1269 * muxer_port_connected() to be called, which adds the newly
1270 * connected port to each muxer notification iterator's list of
1271 * newly connected ports.
1272 */
1273 g_ptr_array_add(muxer_comp->muxer_notif_iters, muxer_notif_iter);
1274 ret = muxer_notif_iter_init_newly_connected_ports(muxer_comp,
1275 muxer_notif_iter);
1276 if (ret) {
1277 BT_LOGE("Cannot initialize newly connected input ports for muxer component's notification iterator: "
1278 "comp-addr=%p, muxer-comp-addr=%p, "
1279 "muxer-notif-iter-addr=%p, notif-iter-addr=%p, ret=%d",
1280 priv_comp, muxer_comp, muxer_notif_iter,
1281 priv_notif_iter, ret);
1282 goto error;
1283 }
1284
1285 ret = bt_private_connection_private_notification_iterator_set_user_data(priv_notif_iter,
1286 muxer_notif_iter);
1287 BT_ASSERT(ret == 0);
1288 BT_LOGD("Initialized muxer component's notification iterator: "
1289 "comp-addr=%p, muxer-comp-addr=%p, muxer-notif-iter-addr=%p, "
1290 "notif-iter-addr=%p",
1291 priv_comp, muxer_comp, muxer_notif_iter, priv_notif_iter);
1292 goto end;
1293
1294 error:
1295 if (g_ptr_array_index(muxer_comp->muxer_notif_iters,
1296 muxer_comp->muxer_notif_iters->len - 1) == muxer_notif_iter) {
1297 g_ptr_array_remove_index(muxer_comp->muxer_notif_iters,
1298 muxer_comp->muxer_notif_iters->len - 1);
1299 }
1300
1301 destroy_muxer_notif_iter(muxer_notif_iter);
1302 ret = bt_private_connection_private_notification_iterator_set_user_data(priv_notif_iter,
1303 NULL);
1304 BT_ASSERT(ret == 0);
1305 status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
1306
1307 end:
1308 muxer_comp->initializing_muxer_notif_iter = false;
1309 bt_put(priv_comp);
1310 return status;
1311 }
1312
1313 BT_HIDDEN
1314 void muxer_notif_iter_finalize(
1315 struct bt_private_connection_private_notification_iterator *priv_notif_iter)
1316 {
1317 struct muxer_notif_iter *muxer_notif_iter =
1318 bt_private_connection_private_notification_iterator_get_user_data(priv_notif_iter);
1319 struct bt_private_component *priv_comp = NULL;
1320 struct muxer_comp *muxer_comp = NULL;
1321
1322 priv_comp = bt_private_connection_private_notification_iterator_get_private_component(
1323 priv_notif_iter);
1324 BT_ASSERT(priv_comp);
1325 muxer_comp = bt_private_component_get_user_data(priv_comp);
1326 BT_LOGD("Finalizing muxer component's notification iterator: "
1327 "comp-addr=%p, muxer-comp-addr=%p, muxer-notif-iter-addr=%p, "
1328 "notif-iter-addr=%p",
1329 priv_comp, muxer_comp, muxer_notif_iter, priv_notif_iter);
1330
1331 if (muxer_comp) {
1332 (void) g_ptr_array_remove_fast(muxer_comp->muxer_notif_iters,
1333 muxer_notif_iter);
1334 destroy_muxer_notif_iter(muxer_notif_iter);
1335 }
1336
1337 bt_put(priv_comp);
1338 }
1339
1340 BT_HIDDEN
1341 struct bt_notification_iterator_next_method_return muxer_notif_iter_next(
1342 struct bt_private_connection_private_notification_iterator *priv_notif_iter)
1343 {
1344 struct bt_notification_iterator_next_method_return next_ret;
1345 struct muxer_notif_iter *muxer_notif_iter =
1346 bt_private_connection_private_notification_iterator_get_user_data(priv_notif_iter);
1347 struct bt_private_component *priv_comp = NULL;
1348 struct muxer_comp *muxer_comp = NULL;
1349
1350 BT_ASSERT(muxer_notif_iter);
1351 priv_comp = bt_private_connection_private_notification_iterator_get_private_component(
1352 priv_notif_iter);
1353 BT_ASSERT(priv_comp);
1354 muxer_comp = bt_private_component_get_user_data(priv_comp);
1355 BT_ASSERT(muxer_comp);
1356
1357 BT_LOGV("Muxer component's notification iterator's \"next\" method called: "
1358 "comp-addr=%p, muxer-comp-addr=%p, muxer-notif-iter-addr=%p, "
1359 "notif-iter-addr=%p",
1360 priv_comp, muxer_comp, muxer_notif_iter, priv_notif_iter);
1361
1362 /* Are we in an error state set elsewhere? */
1363 if (unlikely(muxer_comp->error)) {
1364 BT_LOGE("Muxer component is already in an error state: returning BT_NOTIFICATION_ITERATOR_STATUS_ERROR: "
1365 "comp-addr=%p, muxer-comp-addr=%p, muxer-notif-iter-addr=%p, "
1366 "notif-iter-addr=%p",
1367 priv_comp, muxer_comp, muxer_notif_iter, priv_notif_iter);
1368 next_ret.notification = NULL;
1369 next_ret.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
1370 goto end;
1371 }
1372
1373 next_ret = muxer_notif_iter_do_next(muxer_comp, muxer_notif_iter);
1374 if (next_ret.status < 0) {
1375 BT_LOGE("Cannot get next notification: "
1376 "comp-addr=%p, muxer-comp-addr=%p, muxer-notif-iter-addr=%p, "
1377 "notif-iter-addr=%p, status=%s",
1378 priv_comp, muxer_comp, muxer_notif_iter, priv_notif_iter,
1379 bt_notification_iterator_status_string(next_ret.status));
1380 } else {
1381 BT_LOGV("Returning from muxer component's notification iterator's \"next\" method: "
1382 "status=%s, notif-addr=%p",
1383 bt_notification_iterator_status_string(next_ret.status),
1384 next_ret.notification);
1385 }
1386
1387 end:
1388 bt_put(priv_comp);
1389 return next_ret;
1390 }
1391
1392 BT_HIDDEN
1393 void muxer_port_connected(
1394 struct bt_private_component *priv_comp,
1395 struct bt_private_port *self_private_port,
1396 struct bt_port *other_port)
1397 {
1398 struct bt_port *self_port =
1399 bt_port_borrow_from_private(self_private_port);
1400 struct muxer_comp *muxer_comp =
1401 bt_private_component_get_user_data(priv_comp);
1402 size_t i;
1403 int ret;
1404
1405 BT_ASSERT(self_port);
1406 BT_ASSERT(muxer_comp);
1407 BT_LOGD("Port connected: "
1408 "comp-addr=%p, muxer-comp-addr=%p, "
1409 "port-addr=%p, port-name=\"%s\", "
1410 "other-port-addr=%p, other-port-name=\"%s\"",
1411 priv_comp, muxer_comp, self_port, bt_port_get_name(self_port),
1412 other_port, bt_port_get_name(other_port));
1413
1414 if (bt_port_get_type(self_port) == BT_PORT_TYPE_OUTPUT) {
1415 goto end;
1416 }
1417
1418 for (i = 0; i < muxer_comp->muxer_notif_iters->len; i++) {
1419 struct muxer_notif_iter *muxer_notif_iter =
1420 g_ptr_array_index(muxer_comp->muxer_notif_iters, i);
1421
1422 /*
1423 * Add this port to the list of newly connected ports
1424 * for this muxer notification iterator. We append at
1425 * the end of this list while
1426 * muxer_notif_iter_handle_newly_connected_ports()
1427 * removes the nodes from the beginning.
1428 */
1429 muxer_notif_iter->newly_connected_priv_ports =
1430 g_list_append(
1431 muxer_notif_iter->newly_connected_priv_ports,
1432 self_private_port);
1433 if (!muxer_notif_iter->newly_connected_priv_ports) {
1434 BT_LOGE("Cannot append port to muxer's notification iterator list of newly connected input ports: "
1435 "port-addr=%p, port-name=\"%s\", "
1436 "muxer-notif-iter-addr=%p", self_port,
1437 bt_port_get_name(self_port), muxer_notif_iter);
1438 muxer_comp->error = true;
1439 goto end;
1440 }
1441
1442 BT_LOGD("Appended port to muxer's notification iterator list of newly connected input ports: "
1443 "port-addr=%p, port-name=\"%s\", "
1444 "muxer-notif-iter-addr=%p", self_port,
1445 bt_port_get_name(self_port), muxer_notif_iter);
1446 }
1447
1448 /* One less available input port */
1449 muxer_comp->available_input_ports--;
1450 ret = ensure_available_input_port(priv_comp);
1451 if (ret) {
1452 /*
1453 * Only way to report an error later since this
1454 * method does not return anything.
1455 */
1456 BT_LOGE("Cannot ensure that at least one muxer component's input port is available: "
1457 "muxer-comp-addr=%p, status=%s",
1458 muxer_comp, bt_component_status_string(ret));
1459 muxer_comp->error = true;
1460 goto end;
1461 }
1462
1463 end:
1464 return;
1465 }
1466
1467 BT_HIDDEN
1468 void muxer_port_disconnected(struct bt_private_component *priv_comp,
1469 struct bt_private_port *priv_port)
1470 {
1471 struct bt_port *port = bt_port_borrow_from_private(priv_port);
1472 struct muxer_comp *muxer_comp =
1473 bt_private_component_get_user_data(priv_comp);
1474
1475 BT_ASSERT(port);
1476 BT_ASSERT(muxer_comp);
1477 BT_LOGD("Port disconnected: "
1478 "comp-addr=%p, muxer-comp-addr=%p, port-addr=%p, "
1479 "port-name=\"%s\"", priv_comp, muxer_comp,
1480 port, bt_port_get_name(port));
1481
1482 /*
1483 * There's nothing special to do when a port is disconnected
1484 * because this component deals with upstream notification
1485 * iterators which were already created thanks to connected
1486 * ports. The fact that the port is disconnected does not cancel
1487 * the upstream notification iterators created using its
1488 * connection: they still exist, even if the connection is dead.
1489 * The only way to remove an upstream notification iterator is
1490 * for its "next" operation to return
1491 * BT_NOTIFICATION_ITERATOR_STATUS_END.
1492 */
1493 if (bt_port_get_type(port) == BT_PORT_TYPE_INPUT) {
1494 /* One more available input port */
1495 muxer_comp->available_input_ports++;
1496 BT_LOGD("Leaving disconnected input port available for future connections: "
1497 "comp-addr=%p, muxer-comp-addr=%p, port-addr=%p, "
1498 "port-name=\"%s\", avail-input-port-count=%zu",
1499 priv_comp, muxer_comp, port, bt_port_get_name(port),
1500 muxer_comp->available_input_ports);
1501 }
1502 }
This page took 0.110559 seconds and 4 git commands to generate.