lib: remove internal stream destroy listener API
[babeltrace.git] / plugins / utils / muxer / muxer.c
CommitLineData
958f7d11
PP
1/*
2 * Copyright 2017 Philippe Proulx <pproulx@efficios.com>
3 *
4 * Permission is hereby granted, free of charge, to any person obtaining a copy
5 * of this software and associated documentation files (the "Software"), to deal
6 * in the Software without restriction, including without limitation the rights
7 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 * copies of the Software, and to permit persons to whom the Software is
9 * furnished to do so, subject to the following conditions:
10 *
11 * The above copyright notice and this permission notice shall be included in
12 * all copies or substantial portions of the Software.
13 *
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20 * SOFTWARE.
21 */
22
fed72692
PP
23#define BT_LOG_TAG "PLUGIN-UTILS-MUXER-FLT"
24#include "logging.h"
25
958f7d11 26#include <babeltrace/babeltrace-internal.h>
282c8cd0 27#include <babeltrace/compat/uuid-internal.h>
9d408fca
PP
28#include <babeltrace/babeltrace.h>
29#include <babeltrace/values-internal.h>
fed72692 30#include <babeltrace/graph/component-internal.h>
fed72692 31#include <babeltrace/graph/notification-iterator-internal.h>
fed72692 32#include <babeltrace/graph/connection-internal.h>
958f7d11
PP
33#include <plugins-common.h>
34#include <glib.h>
c55a9f58 35#include <stdbool.h>
fed72692 36#include <inttypes.h>
f6ccaed9 37#include <babeltrace/assert-internal.h>
0fbb9a9f 38#include <stdlib.h>
282c8cd0 39#include <string.h>
958f7d11 40
c3acd5f3 41#define ASSUME_ABSOLUTE_CLOCK_CLASSES_PARAM_NAME "assume-absolute-clock-classes"
65ee897d 42
958f7d11 43struct muxer_comp {
90157d89
PP
44 /*
45 * Array of struct
46 * bt_private_connection_private_notification_iterator *
47 * (weak refs)
48 */
958f7d11
PP
49 GPtrArray *muxer_notif_iters;
50
51 /* Weak ref */
52 struct bt_private_component *priv_comp;
53 unsigned int next_port_num;
54 size_t available_input_ports;
55 bool error;
a09c6b95 56 bool initializing_muxer_notif_iter;
282c8cd0 57 bool assume_absolute_clock_classes;
958f7d11
PP
58};
59
60struct muxer_upstream_notif_iter {
ab11110e 61 /* Owned by this, NULL if ended */
958f7d11
PP
62 struct bt_notification_iterator *notif_iter;
63
07245ac2
PP
64 /* Owned by this */
65 struct bt_notification *notif;
66
089717de
PP
67 /*
68 * This flag is true if the upstream notification iterator's
69 * current notification must be considered for the multiplexing
70 * operations. If the upstream iterator returns
71 * BT_NOTIFICATION_ITERATOR_STATUS_AGAIN, then this object
72 * is considered invalid, because its current notification is
73 * still the previous one, but we already took it into account.
74 *
75 * The value of this flag is not important if notif_iter above
76 * is NULL (which means the upstream iterator is finished).
77 */
78 bool is_valid;
958f7d11
PP
79};
80
282c8cd0
PP
81enum muxer_notif_iter_clock_class_expectation {
82 MUXER_NOTIF_ITER_CLOCK_CLASS_EXPECTATION_ANY = 0,
83 MUXER_NOTIF_ITER_CLOCK_CLASS_EXPECTATION_ABSOLUTE,
84 MUXER_NOTIF_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_SPEC_UUID,
85 MUXER_NOTIF_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_NO_UUID,
86};
87
958f7d11 88struct muxer_notif_iter {
ab11110e
PP
89 /*
90 * Array of struct muxer_upstream_notif_iter * (owned by this).
91 *
92 * NOTE: This array is searched in linearly to find the youngest
93 * current notification. Keep this until benchmarks confirm that
94 * another data structure is faster than this for our typical
95 * use cases.
96 */
958f7d11
PP
97 GPtrArray *muxer_upstream_notif_iters;
98
ab11110e 99 /*
06a2cb0d 100 * List of "recently" connected input ports (weak) to
ab11110e
PP
101 * handle by this muxer notification iterator.
102 * muxer_port_connected() adds entries to this list, and the
103 * entries are removed when a notification iterator is created
104 * on the port's connection and put into
105 * muxer_upstream_notif_iters above by
106 * muxer_notif_iter_handle_newly_connected_ports().
107 */
108 GList *newly_connected_priv_ports;
958f7d11
PP
109
110 /* Next thing to return by the "next" method */
90157d89 111 struct bt_notification_iterator_next_method_return next_next_return;
958f7d11
PP
112
113 /* Last time returned in a notification */
114 int64_t last_returned_ts_ns;
282c8cd0
PP
115
116 /* Clock class expectation state */
117 enum muxer_notif_iter_clock_class_expectation clock_class_expectation;
118
119 /*
120 * Expected clock class UUID, only valid when
121 * clock_class_expectation is
122 * MUXER_NOTIF_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_SPEC_UUID.
123 */
124 unsigned char expected_clock_class_uuid[BABELTRACE_UUID_LEN];
958f7d11
PP
125};
126
ab11110e
PP
127static
128void destroy_muxer_upstream_notif_iter(
129 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter)
130{
131 if (!muxer_upstream_notif_iter) {
132 return;
133 }
134
fed72692
PP
135 BT_LOGD("Destroying muxer's upstream notification iterator wrapper: "
136 "addr=%p, notif-iter-addr=%p, is-valid=%d",
137 muxer_upstream_notif_iter,
138 muxer_upstream_notif_iter->notif_iter,
139 muxer_upstream_notif_iter->is_valid);
ab11110e 140 bt_put(muxer_upstream_notif_iter->notif_iter);
07245ac2 141 bt_put(muxer_upstream_notif_iter->notif);
ab11110e
PP
142 g_free(muxer_upstream_notif_iter);
143}
144
958f7d11
PP
145static
146struct muxer_upstream_notif_iter *muxer_notif_iter_add_upstream_notif_iter(
147 struct muxer_notif_iter *muxer_notif_iter,
148 struct bt_notification_iterator *notif_iter,
149 struct bt_private_port *priv_port)
150{
151 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter =
152 g_new0(struct muxer_upstream_notif_iter, 1);
153
154 if (!muxer_upstream_notif_iter) {
fed72692 155 BT_LOGE_STR("Failed to allocate one muxer's upstream notification iterator wrapper.");
958f7d11
PP
156 goto end;
157 }
158
159 muxer_upstream_notif_iter->notif_iter = bt_get(notif_iter);
089717de 160 muxer_upstream_notif_iter->is_valid = false;
958f7d11
PP
161 g_ptr_array_add(muxer_notif_iter->muxer_upstream_notif_iters,
162 muxer_upstream_notif_iter);
fed72692
PP
163 BT_LOGD("Added muxer's upstream notification iterator wrapper: "
164 "addr=%p, muxer-notif-iter-addr=%p, notif-iter-addr=%p",
165 muxer_upstream_notif_iter, muxer_notif_iter,
166 notif_iter);
958f7d11
PP
167
168end:
169 return muxer_upstream_notif_iter;
170}
171
958f7d11 172static
147337a3
PP
173enum bt_component_status ensure_available_input_port(
174 struct bt_private_component *priv_comp)
958f7d11
PP
175{
176 struct muxer_comp *muxer_comp =
177 bt_private_component_get_user_data(priv_comp);
147337a3 178 enum bt_component_status status = BT_COMPONENT_STATUS_OK;
958f7d11 179 GString *port_name = NULL;
958f7d11 180
f6ccaed9 181 BT_ASSERT(muxer_comp);
958f7d11
PP
182
183 if (muxer_comp->available_input_ports >= 1) {
184 goto end;
185 }
186
187 port_name = g_string_new("in");
188 if (!port_name) {
fed72692 189 BT_LOGE_STR("Failed to allocate a GString.");
147337a3 190 status = BT_COMPONENT_STATUS_NOMEM;
958f7d11
PP
191 goto end;
192 }
193
194 g_string_append_printf(port_name, "%u", muxer_comp->next_port_num);
147337a3
PP
195 status = bt_private_component_filter_add_input_private_port(
196 priv_comp, port_name->str, NULL, NULL);
197 if (status != BT_COMPONENT_STATUS_OK) {
fed72692
PP
198 BT_LOGE("Cannot add input port to muxer component: "
199 "port-name=\"%s\", comp-addr=%p, status=%s",
200 port_name->str, priv_comp,
201 bt_component_status_string(status));
958f7d11
PP
202 goto end;
203 }
204
205 muxer_comp->available_input_ports++;
206 muxer_comp->next_port_num++;
fed72692
PP
207 BT_LOGD("Added one input port to muxer component: "
208 "port-name=\"%s\", comp-addr=%p",
209 port_name->str, priv_comp);
958f7d11
PP
210end:
211 if (port_name) {
212 g_string_free(port_name, TRUE);
213 }
214
147337a3 215 return status;
958f7d11
PP
216}
217
958f7d11 218static
147337a3
PP
219enum bt_component_status create_output_port(
220 struct bt_private_component *priv_comp)
958f7d11 221{
147337a3
PP
222 return bt_private_component_filter_add_output_private_port(
223 priv_comp, "out", NULL, NULL);
958f7d11
PP
224}
225
226static
227void destroy_muxer_comp(struct muxer_comp *muxer_comp)
228{
229 if (!muxer_comp) {
230 return;
231 }
232
fed72692
PP
233 BT_LOGD("Destroying muxer component: muxer-comp-addr=%p, "
234 "muxer-notif-iter-count=%u", muxer_comp,
af18c5e7
JG
235 muxer_comp->muxer_notif_iters ?
236 muxer_comp->muxer_notif_iters->len : 0);
fed72692 237
958f7d11
PP
238 if (muxer_comp->muxer_notif_iters) {
239 g_ptr_array_free(muxer_comp->muxer_notif_iters, TRUE);
240 }
241
242 g_free(muxer_comp);
243}
244
65ee897d
PP
245static
246struct bt_value *get_default_params(void)
247{
248 struct bt_value *params;
249 int ret;
250
251 params = bt_value_map_create();
252 if (!params) {
fed72692 253 BT_LOGE_STR("Cannot create a map value object.");
65ee897d
PP
254 goto error;
255 }
256
c3acd5f3
JG
257 ret = bt_value_map_insert_bool(params,
258 ASSUME_ABSOLUTE_CLOCK_CLASSES_PARAM_NAME, false);
65ee897d 259 if (ret) {
fed72692 260 BT_LOGE_STR("Cannot add boolean value to map value object.");
65ee897d
PP
261 goto error;
262 }
263
264 goto end;
265
266error:
267 BT_PUT(params);
268
269end:
270 return params;
271}
272
273static
274int configure_muxer_comp(struct muxer_comp *muxer_comp, struct bt_value *params)
275{
276 struct bt_value *default_params = NULL;
277 struct bt_value *real_params = NULL;
282c8cd0 278 struct bt_value *assume_absolute_clock_classes = NULL;
65ee897d 279 int ret = 0;
c55a9f58 280 bt_bool bool_val;
65ee897d
PP
281
282 default_params = get_default_params();
283 if (!default_params) {
fed72692
PP
284 BT_LOGE("Cannot get default parameters: "
285 "muxer-comp-addr=%p", muxer_comp);
65ee897d
PP
286 goto error;
287 }
288
289 real_params = bt_value_map_extend(default_params, params);
290 if (!real_params) {
fed72692
PP
291 BT_LOGE("Cannot extend default parameters map value: "
292 "muxer-comp-addr=%p, def-params-addr=%p, "
293 "params-addr=%p", muxer_comp, default_params,
294 params);
65ee897d
PP
295 goto error;
296 }
297
778f1ddd 298 assume_absolute_clock_classes = bt_value_map_borrow(real_params,
c3acd5f3 299 ASSUME_ABSOLUTE_CLOCK_CLASSES_PARAM_NAME);
f6ccaed9
PP
300 if (assume_absolute_clock_classes &&
301 !bt_value_is_bool(assume_absolute_clock_classes)) {
fed72692
PP
302 BT_LOGE("Expecting a boolean value for the `%s` parameter: "
303 "muxer-comp-addr=%p, value-type=%s",
304 ASSUME_ABSOLUTE_CLOCK_CLASSES_PARAM_NAME, muxer_comp,
305 bt_value_type_string(
306 bt_value_get_type(assume_absolute_clock_classes)));
65ee897d
PP
307 goto error;
308 }
309
fed72692 310 ret = bt_value_bool_get(assume_absolute_clock_classes, &bool_val);
f6ccaed9 311 BT_ASSERT(ret == 0);
282c8cd0 312 muxer_comp->assume_absolute_clock_classes = (bool) bool_val;
fed72692
PP
313 BT_LOGD("Configured muxer component: muxer-comp-addr=%p, "
314 "assume-absolute-clock-classes=%d",
315 muxer_comp, muxer_comp->assume_absolute_clock_classes);
65ee897d
PP
316 goto end;
317
318error:
319 ret = -1;
320
321end:
322 bt_put(default_params);
323 bt_put(real_params);
65ee897d
PP
324 return ret;
325}
326
958f7d11
PP
327BT_HIDDEN
328enum bt_component_status muxer_init(
329 struct bt_private_component *priv_comp,
330 struct bt_value *params, void *init_data)
331{
332 int ret;
333 enum bt_component_status status = BT_COMPONENT_STATUS_OK;
334 struct muxer_comp *muxer_comp = g_new0(struct muxer_comp, 1);
335
fed72692
PP
336 BT_LOGD("Initializing muxer component: "
337 "comp-addr=%p, params-addr=%p", priv_comp, params);
338
958f7d11 339 if (!muxer_comp) {
fed72692 340 BT_LOGE_STR("Failed to allocate one muxer component.");
958f7d11
PP
341 goto error;
342 }
343
65ee897d
PP
344 ret = configure_muxer_comp(muxer_comp, params);
345 if (ret) {
fed72692
PP
346 BT_LOGE("Cannot configure muxer component: "
347 "muxer-comp-addr=%p, params-addr=%p",
348 muxer_comp, params);
65ee897d
PP
349 goto error;
350 }
351
958f7d11
PP
352 muxer_comp->muxer_notif_iters = g_ptr_array_new();
353 if (!muxer_comp->muxer_notif_iters) {
fed72692 354 BT_LOGE_STR("Failed to allocate a GPtrArray.");
958f7d11
PP
355 goto error;
356 }
357
358 muxer_comp->priv_comp = priv_comp;
359 ret = bt_private_component_set_user_data(priv_comp, muxer_comp);
f6ccaed9 360 BT_ASSERT(ret == 0);
147337a3
PP
361 status = ensure_available_input_port(priv_comp);
362 if (status != BT_COMPONENT_STATUS_OK) {
fed72692
PP
363 BT_LOGE("Cannot ensure that at least one muxer component's input port is available: "
364 "muxer-comp-addr=%p, status=%s",
365 muxer_comp,
366 bt_component_status_string(status));
958f7d11
PP
367 goto error;
368 }
369
fed72692
PP
370 status = create_output_port(priv_comp);
371 if (status) {
372 BT_LOGE("Cannot create muxer component's output port: "
373 "muxer-comp-addr=%p, status=%s",
374 muxer_comp,
375 bt_component_status_string(status));
958f7d11
PP
376 goto error;
377 }
378
fed72692
PP
379 BT_LOGD("Initialized muxer component: "
380 "comp-addr=%p, params-addr=%p, muxer-comp-addr=%p",
381 priv_comp, params, muxer_comp);
382
958f7d11
PP
383 goto end;
384
385error:
386 destroy_muxer_comp(muxer_comp);
387 ret = bt_private_component_set_user_data(priv_comp, NULL);
f6ccaed9 388 BT_ASSERT(ret == 0);
147337a3
PP
389
390 if (status == BT_COMPONENT_STATUS_OK) {
391 status = BT_COMPONENT_STATUS_ERROR;
392 }
958f7d11
PP
393
394end:
395 return status;
396}
397
398BT_HIDDEN
399void muxer_finalize(struct bt_private_component *priv_comp)
400{
401 struct muxer_comp *muxer_comp =
402 bt_private_component_get_user_data(priv_comp);
403
fed72692
PP
404 BT_LOGD("Finalizing muxer component: comp-addr=%p",
405 priv_comp);
958f7d11
PP
406 destroy_muxer_comp(muxer_comp);
407}
408
409static
410struct bt_notification_iterator *create_notif_iter_on_input_port(
411 struct bt_private_port *priv_port, int *ret)
412{
5c563278 413 struct bt_port *port = bt_port_borrow_from_private(priv_port);
958f7d11
PP
414 struct bt_notification_iterator *notif_iter = NULL;
415 struct bt_private_connection *priv_conn = NULL;
73d5c1ad 416 enum bt_connection_status conn_status;
958f7d11 417
f6ccaed9 418 BT_ASSERT(ret);
958f7d11 419 *ret = 0;
f6ccaed9
PP
420 BT_ASSERT(port);
421 BT_ASSERT(bt_port_is_connected(port));
958f7d11 422 priv_conn = bt_private_port_get_private_connection(priv_port);
f6ccaed9 423 BT_ASSERT(priv_conn);
958f7d11 424
ab11110e
PP
425 // TODO: Advance the iterator to >= the time of the latest
426 // returned notification by the muxer notification
427 // iterator which creates it.
73d5c1ad 428 conn_status = bt_private_connection_create_notification_iterator(
f42867e2 429 priv_conn, &notif_iter);
73d5c1ad 430 if (conn_status != BT_CONNECTION_STATUS_OK) {
fed72692
PP
431 BT_LOGE("Cannot create upstream notification iterator on input port's connection: "
432 "port-addr=%p, port-name=\"%s\", conn-addr=%p, "
433 "status=%s",
434 port, bt_port_get_name(port), priv_conn,
435 bt_connection_status_string(conn_status));
958f7d11
PP
436 *ret = -1;
437 goto end;
438 }
439
fed72692
PP
440 BT_LOGD("Created upstream notification iterator on input port's connection: "
441 "port-addr=%p, port-name=\"%s\", conn-addr=%p, "
442 "notif-iter-addr=%p",
443 port, bt_port_get_name(port), priv_conn, notif_iter);
444
958f7d11 445end:
958f7d11
PP
446 bt_put(priv_conn);
447 return notif_iter;
448}
449
ab11110e 450static
089717de 451enum bt_notification_iterator_status muxer_upstream_notif_iter_next(
ab11110e
PP
452 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter)
453{
089717de 454 enum bt_notification_iterator_status status;
07245ac2 455 struct bt_notification *notif = NULL;
ab11110e 456
fed72692
PP
457 BT_LOGV("Calling upstream notification iterator's \"next\" method: "
458 "muxer-upstream-notif-iter-wrap-addr=%p, notif-iter-addr=%p",
459 muxer_upstream_notif_iter,
460 muxer_upstream_notif_iter->notif_iter);
07245ac2
PP
461 status = bt_private_connection_notification_iterator_next(
462 muxer_upstream_notif_iter->notif_iter, &notif);
fed72692
PP
463 BT_LOGV("Upstream notification iterator's \"next\" method returned: "
464 "status=%s", bt_notification_iterator_status_string(status));
ab11110e 465
089717de 466 switch (status) {
ab11110e 467 case BT_NOTIFICATION_ITERATOR_STATUS_OK:
089717de
PP
468 /*
469 * Notification iterator's current notification is valid:
470 * it must be considered for muxing operations.
471 */
fed72692 472 BT_LOGV_STR("Validated upstream notification iterator wrapper.");
089717de 473 muxer_upstream_notif_iter->is_valid = true;
07245ac2 474 BT_MOVE(muxer_upstream_notif_iter->notif, notif);
ab11110e
PP
475 break;
476 case BT_NOTIFICATION_ITERATOR_STATUS_AGAIN:
089717de
PP
477 /*
478 * Notification iterator's current notification is not
479 * valid anymore. Return
480 * BT_NOTIFICATION_ITERATOR_STATUS_AGAIN
481 * immediately.
482 */
fed72692 483 BT_LOGV_STR("Invalidated upstream notification iterator wrapper because of BT_NOTIFICATION_ITERATOR_STATUS_AGAIN.");
089717de 484 muxer_upstream_notif_iter->is_valid = false;
ab11110e 485 break;
beed0223
MD
486 case BT_NOTIFICATION_ITERATOR_STATUS_END: /* Fall-through. */
487 case BT_NOTIFICATION_ITERATOR_STATUS_CANCELED:
ab11110e
PP
488 /*
489 * Notification iterator reached the end: release it. It
490 * won't be considered again to find the youngest
491 * notification.
492 */
fed72692 493 BT_LOGV_STR("Invalidated upstream notification iterator wrapper because of BT_NOTIFICATION_ITERATOR_STATUS_END or BT_NOTIFICATION_ITERATOR_STATUS_CANCELED.");
ab11110e 494 BT_PUT(muxer_upstream_notif_iter->notif_iter);
089717de
PP
495 muxer_upstream_notif_iter->is_valid = false;
496 status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
497 break;
ab11110e
PP
498 default:
499 /* Error or unsupported status code */
fed72692
PP
500 BT_LOGE("Error or unsupported status code: "
501 "status-code=%d", status);
089717de
PP
502 status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
503 break;
ab11110e
PP
504 }
505
07245ac2 506 BT_ASSERT(!notif);
089717de 507 return status;
ab11110e
PP
508}
509
510static
089717de 511int muxer_notif_iter_handle_newly_connected_ports(
ab11110e
PP
512 struct muxer_notif_iter *muxer_notif_iter)
513{
ab11110e
PP
514 int ret = 0;
515
fed72692
PP
516 BT_LOGV("Handling newly connected ports: "
517 "muxer-notif-iter-addr=%p", muxer_notif_iter);
518
ab11110e
PP
519 /*
520 * Here we create one upstream notification iterator for each
089717de
PP
521 * newly connected port. We do not perform an initial "next" on
522 * those new upstream notification iterators: they are
523 * invalidated, to be validated later. The list of newly
524 * connected ports to handle here is updated by
525 * muxer_port_connected().
ab11110e
PP
526 */
527 while (true) {
528 GList *node = muxer_notif_iter->newly_connected_priv_ports;
529 struct bt_private_port *priv_port;
530 struct bt_port *port;
531 struct bt_notification_iterator *upstream_notif_iter = NULL;
532 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter;
533
534 if (!node) {
535 break;
536 }
537
538 priv_port = node->data;
5c563278 539 port = bt_port_borrow_from_private(priv_port);
f6ccaed9 540 BT_ASSERT(port);
ab11110e
PP
541
542 if (!bt_port_is_connected(port)) {
543 /*
544 * Looks like this port is not connected
545 * anymore: we can't create an upstream
089717de
PP
546 * notification iterator on its (non-existing)
547 * connection in this case.
ab11110e
PP
548 */
549 goto remove_node;
550 }
551
ab11110e
PP
552 upstream_notif_iter = create_notif_iter_on_input_port(priv_port,
553 &ret);
554 if (ret) {
fed72692 555 /* create_notif_iter_on_input_port() logs errors */
f6ccaed9 556 BT_ASSERT(!upstream_notif_iter);
ab11110e
PP
557 goto error;
558 }
559
560 muxer_upstream_notif_iter =
561 muxer_notif_iter_add_upstream_notif_iter(
562 muxer_notif_iter, upstream_notif_iter,
563 priv_port);
ab11110e
PP
564 BT_PUT(upstream_notif_iter);
565 if (!muxer_upstream_notif_iter) {
fed72692
PP
566 /*
567 * muxer_notif_iter_add_upstream_notif_iter()
568 * logs errors.
569 */
ab11110e
PP
570 goto error;
571 }
572
ab11110e
PP
573remove_node:
574 bt_put(upstream_notif_iter);
ab11110e
PP
575 muxer_notif_iter->newly_connected_priv_ports =
576 g_list_delete_link(
577 muxer_notif_iter->newly_connected_priv_ports,
578 node);
579 }
580
581 goto end;
582
583error:
089717de 584 if (ret >= 0) {
ab11110e
PP
585 ret = -1;
586 }
587
588end:
ab11110e
PP
589 return ret;
590}
591
958f7d11
PP
592static
593int get_notif_ts_ns(struct muxer_comp *muxer_comp,
282c8cd0 594 struct muxer_notif_iter *muxer_notif_iter,
958f7d11
PP
595 struct bt_notification *notif, int64_t last_returned_ts_ns,
596 int64_t *ts_ns)
597{
598 struct bt_clock_class_priority_map *cc_prio_map = NULL;
50842bdc
PP
599 struct bt_clock_class *clock_class = NULL;
600 struct bt_clock_value *clock_value = NULL;
601 struct bt_event *event = NULL;
958f7d11 602 int ret = 0;
282c8cd0 603 const unsigned char *cc_uuid;
fed72692 604 const char *cc_name;
958f7d11 605
f6ccaed9
PP
606 BT_ASSERT(notif);
607 BT_ASSERT(ts_ns);
958f7d11 608
fed72692
PP
609 BT_LOGV("Getting notification's timestamp: "
610 "muxer-notif-iter-addr=%p, notif-addr=%p, "
611 "last-returned-ts=%" PRId64,
612 muxer_notif_iter, notif, last_returned_ts_ns);
613
958f7d11
PP
614 switch (bt_notification_get_type(notif)) {
615 case BT_NOTIFICATION_TYPE_EVENT:
616 cc_prio_map =
778f1ddd 617 bt_notification_event_borrow_clock_class_priority_map(
958f7d11
PP
618 notif);
619 break;
620
621 case BT_NOTIFICATION_TYPE_INACTIVITY:
622 cc_prio_map =
778f1ddd 623 bt_notification_inactivity_borrow_clock_class_priority_map(
958f7d11
PP
624 notif);
625 break;
626 default:
089717de 627 /* All the other notifications have a higher priority */
fed72692 628 BT_LOGV_STR("Notification has no timestamp: using the last returned timestamp.");
958f7d11
PP
629 *ts_ns = last_returned_ts_ns;
630 goto end;
631 }
632
633 if (!cc_prio_map) {
fed72692
PP
634 BT_LOGE("Cannot get notification's clock class priority map: "
635 "notif-addr=%p", notif);
958f7d11
PP
636 goto error;
637 }
638
639 /*
640 * If the clock class priority map is empty, then we consider
641 * that this notification has no time. In this case it's always
642 * the youngest.
643 */
644 if (bt_clock_class_priority_map_get_clock_class_count(cc_prio_map) == 0) {
63f99cca 645 BT_LOGV_STR("Notification's clock class priority map contains 0 clock classes: "
fed72692 646 "using the last returned timestamp.");
958f7d11
PP
647 *ts_ns = last_returned_ts_ns;
648 goto end;
649 }
650
651 clock_class =
778f1ddd 652 bt_clock_class_priority_map_borrow_highest_priority_clock_class(
958f7d11
PP
653 cc_prio_map);
654 if (!clock_class) {
fed72692
PP
655 BT_LOGE("Cannot get the clock class with the highest priority from clock class priority map: "
656 "cc-prio-map-addr=%p", cc_prio_map);
958f7d11
PP
657 goto error;
658 }
659
50842bdc
PP
660 cc_uuid = bt_clock_class_get_uuid(clock_class);
661 cc_name = bt_clock_class_get_name(clock_class);
282c8cd0
PP
662
663 if (muxer_notif_iter->clock_class_expectation ==
664 MUXER_NOTIF_ITER_CLOCK_CLASS_EXPECTATION_ANY) {
665 /*
666 * This is the first clock class that this muxer
667 * notification iterator encounters. Its properties
668 * determine what to expect for the whole lifetime of
669 * the iterator without a true
670 * `assume-absolute-clock-classes` parameter.
671 */
50842bdc 672 if (bt_clock_class_is_absolute(clock_class)) {
282c8cd0
PP
673 /* Expect absolute clock classes */
674 muxer_notif_iter->clock_class_expectation =
675 MUXER_NOTIF_ITER_CLOCK_CLASS_EXPECTATION_ABSOLUTE;
676 } else {
677 if (cc_uuid) {
678 /*
679 * Expect non-absolute clock classes
680 * with a specific UUID.
681 */
682 muxer_notif_iter->clock_class_expectation =
683 MUXER_NOTIF_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_SPEC_UUID;
684 memcpy(muxer_notif_iter->expected_clock_class_uuid,
685 cc_uuid, BABELTRACE_UUID_LEN);
686 } else {
687 /*
688 * Expect non-absolute clock classes
689 * with no UUID.
690 */
691 muxer_notif_iter->clock_class_expectation =
692 MUXER_NOTIF_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_NO_UUID;
693 }
694 }
695 }
696
697 if (!muxer_comp->assume_absolute_clock_classes) {
698 switch (muxer_notif_iter->clock_class_expectation) {
699 case MUXER_NOTIF_ITER_CLOCK_CLASS_EXPECTATION_ABSOLUTE:
50842bdc 700 if (!bt_clock_class_is_absolute(clock_class)) {
fed72692
PP
701 BT_LOGE("Expecting an absolute clock class, "
702 "but got a non-absolute one: "
703 "clock-class-addr=%p, clock-class-name=\"%s\"",
704 clock_class, cc_name);
282c8cd0
PP
705 goto error;
706 }
707 break;
708 case MUXER_NOTIF_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_NO_UUID:
50842bdc 709 if (bt_clock_class_is_absolute(clock_class)) {
fed72692
PP
710 BT_LOGE("Expecting a non-absolute clock class with no UUID, "
711 "but got an absolute one: "
712 "clock-class-addr=%p, clock-class-name=\"%s\"",
713 clock_class, cc_name);
282c8cd0
PP
714 goto error;
715 }
716
717 if (cc_uuid) {
fed72692
PP
718 BT_LOGE("Expecting a non-absolute clock class with no UUID, "
719 "but got one with a UUID: "
720 "clock-class-addr=%p, clock-class-name=\"%s\", "
721 "uuid=\"%02x%02x%02x%02x-%02x%02x-%02x%02x-%02x%02x-%02x%02x%02x%02x%02x%02x\"",
722 clock_class, cc_name,
723 (unsigned int) cc_uuid[0],
724 (unsigned int) cc_uuid[1],
725 (unsigned int) cc_uuid[2],
726 (unsigned int) cc_uuid[3],
727 (unsigned int) cc_uuid[4],
728 (unsigned int) cc_uuid[5],
729 (unsigned int) cc_uuid[6],
730 (unsigned int) cc_uuid[7],
731 (unsigned int) cc_uuid[8],
732 (unsigned int) cc_uuid[9],
733 (unsigned int) cc_uuid[10],
734 (unsigned int) cc_uuid[11],
735 (unsigned int) cc_uuid[12],
736 (unsigned int) cc_uuid[13],
737 (unsigned int) cc_uuid[14],
738 (unsigned int) cc_uuid[15]);
282c8cd0
PP
739 goto error;
740 }
741 break;
742 case MUXER_NOTIF_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_SPEC_UUID:
50842bdc 743 if (bt_clock_class_is_absolute(clock_class)) {
fed72692
PP
744 BT_LOGE("Expecting a non-absolute clock class with a specific UUID, "
745 "but got an absolute one: "
746 "clock-class-addr=%p, clock-class-name=\"%s\"",
747 clock_class, cc_name);
282c8cd0
PP
748 goto error;
749 }
750
751 if (!cc_uuid) {
fed72692
PP
752 BT_LOGE("Expecting a non-absolute clock class with a specific UUID, "
753 "but got one with no UUID: "
754 "clock-class-addr=%p, clock-class-name=\"%s\"",
755 clock_class, cc_name);
282c8cd0
PP
756 goto error;
757 }
758
759 if (memcmp(muxer_notif_iter->expected_clock_class_uuid,
760 cc_uuid, BABELTRACE_UUID_LEN) != 0) {
fed72692
PP
761 BT_LOGE("Expecting a non-absolute clock class with a specific UUID, "
762 "but got one with different UUID: "
763 "clock-class-addr=%p, clock-class-name=\"%s\", "
764 "expected-uuid=\"%02x%02x%02x%02x-%02x%02x-%02x%02x-%02x%02x-%02x%02x%02x%02x%02x%02x\", "
765 "uuid=\"%02x%02x%02x%02x-%02x%02x-%02x%02x-%02x%02x-%02x%02x%02x%02x%02x%02x\"",
766 clock_class, cc_name,
767 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[0],
768 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[1],
769 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[2],
770 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[3],
771 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[4],
772 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[5],
773 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[6],
774 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[7],
775 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[8],
776 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[9],
777 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[10],
778 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[11],
779 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[12],
780 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[13],
781 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[14],
782 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[15],
783 (unsigned int) cc_uuid[0],
784 (unsigned int) cc_uuid[1],
785 (unsigned int) cc_uuid[2],
786 (unsigned int) cc_uuid[3],
787 (unsigned int) cc_uuid[4],
788 (unsigned int) cc_uuid[5],
789 (unsigned int) cc_uuid[6],
790 (unsigned int) cc_uuid[7],
791 (unsigned int) cc_uuid[8],
792 (unsigned int) cc_uuid[9],
793 (unsigned int) cc_uuid[10],
794 (unsigned int) cc_uuid[11],
795 (unsigned int) cc_uuid[12],
796 (unsigned int) cc_uuid[13],
797 (unsigned int) cc_uuid[14],
798 (unsigned int) cc_uuid[15]);
282c8cd0
PP
799 goto error;
800 }
801 break;
802 default:
803 /* Unexpected */
fed72692
PP
804 BT_LOGF("Unexpected clock class expectation: "
805 "expectation-code=%d",
806 muxer_notif_iter->clock_class_expectation);
282c8cd0
PP
807 abort();
808 }
958f7d11
PP
809 }
810
811 switch (bt_notification_get_type(notif)) {
812 case BT_NOTIFICATION_TYPE_EVENT:
778f1ddd 813 event = bt_notification_event_borrow_event(notif);
f6ccaed9 814 BT_ASSERT(event);
778f1ddd 815 clock_value = bt_event_borrow_clock_value(event,
958f7d11
PP
816 clock_class);
817 break;
818 case BT_NOTIFICATION_TYPE_INACTIVITY:
778f1ddd 819 clock_value = bt_notification_inactivity_borrow_clock_value(
958f7d11
PP
820 notif, clock_class);
821 break;
822 default:
fed72692
PP
823 BT_LOGF("Unexpected notification type at this point: "
824 "type=%d", bt_notification_get_type(notif));
0fbb9a9f 825 abort();
958f7d11
PP
826 }
827
828 if (!clock_value) {
fed72692
PP
829 BT_LOGE("Cannot get notification's clock value for clock class: "
830 "clock-class-addr=%p, clock-class-name=\"%s\"",
831 clock_class, cc_name);
958f7d11
PP
832 goto error;
833 }
834
50842bdc 835 ret = bt_clock_value_get_value_ns_from_epoch(clock_value, ts_ns);
958f7d11 836 if (ret) {
fed72692
PP
837 BT_LOGE("Cannot get nanoseconds from Epoch of clock value: "
838 "clock-value-addr=%p", clock_value);
958f7d11
PP
839 goto error;
840 }
841
842 goto end;
843
844error:
845 ret = -1;
846
847end:
fed72692
PP
848 if (ret == 0) {
849 BT_LOGV("Found notification's timestamp: "
850 "muxer-notif-iter-addr=%p, notif-addr=%p, "
851 "last-returned-ts=%" PRId64 ", ts=%" PRId64,
852 muxer_notif_iter, notif, last_returned_ts_ns,
853 *ts_ns);
854 }
855
958f7d11
PP
856 return ret;
857}
858
ab11110e
PP
859/*
860 * This function finds the youngest available notification amongst the
861 * non-ended upstream notification iterators and returns the upstream
862 * notification iterator which has it, or
863 * BT_NOTIFICATION_ITERATOR_STATUS_END if there's no available
864 * notification.
865 *
866 * This function does NOT:
867 *
868 * * Update any upstream notification iterator.
869 * * Check for newly connected ports.
870 * * Check the upstream notification iterators to retry.
871 *
872 * On sucess, this function sets *muxer_upstream_notif_iter to the
873 * upstream notification iterator of which the current notification is
874 * the youngest, and sets *ts_ns to its time.
875 */
958f7d11
PP
876static
877enum bt_notification_iterator_status
878muxer_notif_iter_youngest_upstream_notif_iter(
879 struct muxer_comp *muxer_comp,
880 struct muxer_notif_iter *muxer_notif_iter,
881 struct muxer_upstream_notif_iter **muxer_upstream_notif_iter,
882 int64_t *ts_ns)
883{
884 size_t i;
885 int ret;
886 int64_t youngest_ts_ns = INT64_MAX;
887 enum bt_notification_iterator_status status =
888 BT_NOTIFICATION_ITERATOR_STATUS_OK;
889
f6ccaed9
PP
890 BT_ASSERT(muxer_comp);
891 BT_ASSERT(muxer_notif_iter);
892 BT_ASSERT(muxer_upstream_notif_iter);
958f7d11
PP
893 *muxer_upstream_notif_iter = NULL;
894
895 for (i = 0; i < muxer_notif_iter->muxer_upstream_notif_iters->len; i++) {
896 struct bt_notification *notif;
897 struct muxer_upstream_notif_iter *cur_muxer_upstream_notif_iter =
898 g_ptr_array_index(muxer_notif_iter->muxer_upstream_notif_iters, i);
899 int64_t notif_ts_ns;
900
901 if (!cur_muxer_upstream_notif_iter->notif_iter) {
ab11110e 902 /* This upstream notification iterator is ended */
fed72692
PP
903 BT_LOGV("Skipping ended upstream notification iterator: "
904 "muxer-upstream-notif-iter-wrap-addr=%p",
905 cur_muxer_upstream_notif_iter);
958f7d11
PP
906 continue;
907 }
908
f6ccaed9 909 BT_ASSERT(cur_muxer_upstream_notif_iter->is_valid);
07245ac2 910 notif = cur_muxer_upstream_notif_iter->notif;
f6ccaed9 911 BT_ASSERT(notif);
282c8cd0 912 ret = get_notif_ts_ns(muxer_comp, muxer_notif_iter, notif,
958f7d11 913 muxer_notif_iter->last_returned_ts_ns, &notif_ts_ns);
958f7d11 914 if (ret) {
fed72692 915 /* get_notif_ts_ns() logs errors */
958f7d11
PP
916 *muxer_upstream_notif_iter = NULL;
917 status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
918 goto end;
919 }
920
921 if (notif_ts_ns <= youngest_ts_ns) {
922 *muxer_upstream_notif_iter =
923 cur_muxer_upstream_notif_iter;
924 youngest_ts_ns = notif_ts_ns;
925 *ts_ns = youngest_ts_ns;
926 }
927 }
928
929 if (!*muxer_upstream_notif_iter) {
930 status = BT_NOTIFICATION_ITERATOR_STATUS_END;
931 *ts_ns = INT64_MIN;
932 }
933
934end:
935 return status;
936}
937
938static
089717de
PP
939enum bt_notification_iterator_status validate_muxer_upstream_notif_iter(
940 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter)
958f7d11 941{
089717de
PP
942 enum bt_notification_iterator_status status =
943 BT_NOTIFICATION_ITERATOR_STATUS_OK;
958f7d11 944
fed72692
PP
945 BT_LOGV("Validating muxer's upstream notification iterator wrapper: "
946 "muxer-upstream-notif-iter-wrap-addr=%p",
947 muxer_upstream_notif_iter);
948
089717de
PP
949 if (muxer_upstream_notif_iter->is_valid ||
950 !muxer_upstream_notif_iter->notif_iter) {
ab11110e
PP
951 goto end;
952 }
953
fed72692 954 /* muxer_upstream_notif_iter_next() logs details/errors */
089717de
PP
955 status = muxer_upstream_notif_iter_next(muxer_upstream_notif_iter);
956
957end:
958 return status;
959}
960
961static
962enum bt_notification_iterator_status validate_muxer_upstream_notif_iters(
963 struct muxer_notif_iter *muxer_notif_iter)
964{
965 enum bt_notification_iterator_status status =
966 BT_NOTIFICATION_ITERATOR_STATUS_OK;
967 size_t i;
968
fed72692
PP
969 BT_LOGV("Validating muxer's upstream notification iterator wrappers: "
970 "muxer-notif-iter-addr=%p", muxer_notif_iter);
971
089717de
PP
972 for (i = 0; i < muxer_notif_iter->muxer_upstream_notif_iters->len; i++) {
973 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter =
974 g_ptr_array_index(
975 muxer_notif_iter->muxer_upstream_notif_iters,
976 i);
977
978 status = validate_muxer_upstream_notif_iter(
979 muxer_upstream_notif_iter);
980 if (status != BT_NOTIFICATION_ITERATOR_STATUS_OK) {
fed72692
PP
981 if (status < 0) {
982 BT_LOGE("Cannot validate muxer's upstream notification iterator wrapper: "
983 "muxer-notif-iter-addr=%p, "
984 "muxer-upstream-notif-iter-wrap-addr=%p",
985 muxer_notif_iter,
986 muxer_upstream_notif_iter);
987 } else {
988 BT_LOGV("Cannot validate muxer's upstream notification iterator wrapper: "
989 "muxer-notif-iter-addr=%p, "
990 "muxer-upstream-notif-iter-wrap-addr=%p",
991 muxer_notif_iter,
992 muxer_upstream_notif_iter);
993 }
994
089717de
PP
995 goto end;
996 }
744ba28b
PP
997
998 /*
999 * Remove this muxer upstream notification iterator
1000 * if it's ended or canceled.
1001 */
1002 if (!muxer_upstream_notif_iter->notif_iter) {
1003 /*
1004 * Use g_ptr_array_remove_fast() because the
1005 * order of those elements is not important.
1006 */
fed72692
PP
1007 BT_LOGV("Removing muxer's upstream notification iterator wrapper: ended or canceled: "
1008 "muxer-notif-iter-addr=%p, "
1009 "muxer-upstream-notif-iter-wrap-addr=%p",
1010 muxer_notif_iter, muxer_upstream_notif_iter);
744ba28b
PP
1011 g_ptr_array_remove_index_fast(
1012 muxer_notif_iter->muxer_upstream_notif_iters,
1013 i);
1014 i--;
1015 }
089717de
PP
1016 }
1017
1018end:
1019 return status;
1020}
1021
1022static
90157d89 1023struct bt_notification_iterator_next_method_return muxer_notif_iter_do_next(
089717de
PP
1024 struct muxer_comp *muxer_comp,
1025 struct muxer_notif_iter *muxer_notif_iter)
1026{
1027 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter = NULL;
90157d89 1028 struct bt_notification_iterator_next_method_return next_return = {
089717de
PP
1029 .notification = NULL,
1030 .status = BT_NOTIFICATION_ITERATOR_STATUS_OK,
1031 };
1032 int64_t next_return_ts;
1033
1034 while (true) {
1035 int ret = muxer_notif_iter_handle_newly_connected_ports(
1036 muxer_notif_iter);
1037
1038 if (ret) {
fed72692
PP
1039 BT_LOGE("Cannot handle newly connected input ports for muxer's notification iterator: "
1040 "muxer-comp-addr=%p, muxer-notif-iter-addr=%p, "
1041 "ret=%d",
1042 muxer_comp, muxer_notif_iter, ret);
089717de
PP
1043 next_return.status =
1044 BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
1045 goto end;
1046 }
1047
1048 next_return.status =
1049 validate_muxer_upstream_notif_iters(muxer_notif_iter);
1050 if (next_return.status != BT_NOTIFICATION_ITERATOR_STATUS_OK) {
3823b499 1051 /* validate_muxer_upstream_notif_iters() logs details */
089717de
PP
1052 goto end;
1053 }
ab11110e 1054
958f7d11 1055 /*
089717de
PP
1056 * At this point, we know that all the existing upstream
1057 * notification iterators are valid. However the
1058 * operations to validate them (during
1059 * validate_muxer_upstream_notif_iters()) may have
1060 * connected new ports. If no ports were connected
1061 * during this operation, exit the loop.
958f7d11 1062 */
089717de 1063 if (!muxer_notif_iter->newly_connected_priv_ports) {
fed72692
PP
1064 BT_LOGV("Not breaking this loop: muxer's notification iterator still has newly connected input ports to handle: "
1065 "muxer-comp-addr=%p", muxer_comp);
089717de
PP
1066 break;
1067 }
958f7d11
PP
1068 }
1069
f6ccaed9 1070 BT_ASSERT(!muxer_notif_iter->newly_connected_priv_ports);
089717de 1071
958f7d11 1072 /*
089717de
PP
1073 * At this point we know that all the existing upstream
1074 * notification iterators are valid. We can find the one,
1075 * amongst those, of which the current notification is the
1076 * youngest.
958f7d11 1077 */
089717de 1078 next_return.status =
958f7d11
PP
1079 muxer_notif_iter_youngest_upstream_notif_iter(muxer_comp,
1080 muxer_notif_iter, &muxer_upstream_notif_iter,
089717de
PP
1081 &next_return_ts);
1082 if (next_return.status < 0 ||
beed0223
MD
1083 next_return.status == BT_NOTIFICATION_ITERATOR_STATUS_END ||
1084 next_return.status == BT_NOTIFICATION_ITERATOR_STATUS_CANCELED) {
fed72692
PP
1085 if (next_return.status < 0) {
1086 BT_LOGE("Cannot find the youngest upstream notification iterator wrapper: "
1087 "status=%s",
1088 bt_notification_iterator_status_string(next_return.status));
1089 } else {
1090 BT_LOGV("Cannot find the youngest upstream notification iterator wrapper: "
1091 "status=%s",
1092 bt_notification_iterator_status_string(next_return.status));
1093 }
1094
958f7d11
PP
1095 goto end;
1096 }
1097
089717de 1098 if (next_return_ts < muxer_notif_iter->last_returned_ts_ns) {
fed72692
PP
1099 BT_LOGE("Youngest upstream notification iterator wrapper's timestamp is less than muxer's notification iterator's last returned timestamp: "
1100 "muxer-notif-iter-addr=%p, ts=%" PRId64 ", "
1101 "last-returned-ts=%" PRId64,
1102 muxer_notif_iter, next_return_ts,
1103 muxer_notif_iter->last_returned_ts_ns);
089717de 1104 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
958f7d11
PP
1105 goto end;
1106 }
1107
fed72692
PP
1108 BT_LOGV("Found youngest upstream notification iterator wrapper: "
1109 "muxer-notif-iter-addr=%p, "
1110 "muxer-upstream-notif-iter-wrap-addr=%p, "
1111 "ts=%" PRId64,
1112 muxer_notif_iter, muxer_upstream_notif_iter, next_return_ts);
f6ccaed9
PP
1113 BT_ASSERT(next_return.status == BT_NOTIFICATION_ITERATOR_STATUS_OK);
1114 BT_ASSERT(muxer_upstream_notif_iter);
07245ac2 1115 next_return.notification = bt_get(muxer_upstream_notif_iter->notif);
f6ccaed9 1116 BT_ASSERT(next_return.notification);
958f7d11
PP
1117
1118 /*
089717de
PP
1119 * We invalidate the upstream notification iterator so that, the
1120 * next time this function is called,
1121 * validate_muxer_upstream_notif_iters() will make it valid.
958f7d11 1122 */
089717de
PP
1123 muxer_upstream_notif_iter->is_valid = false;
1124 muxer_notif_iter->last_returned_ts_ns = next_return_ts;
958f7d11
PP
1125
1126end:
089717de 1127 return next_return;
958f7d11
PP
1128}
1129
1130static
ab11110e
PP
1131void destroy_muxer_notif_iter(struct muxer_notif_iter *muxer_notif_iter)
1132{
ab11110e
PP
1133 if (!muxer_notif_iter) {
1134 return;
1135 }
1136
fed72692
PP
1137 BT_LOGD("Destroying muxer component's notification iterator: "
1138 "muxer-notif-iter-addr=%p", muxer_notif_iter);
1139
ab11110e 1140 if (muxer_notif_iter->muxer_upstream_notif_iters) {
fed72692 1141 BT_LOGD_STR("Destroying muxer's upstream notification iterator wrappers.");
ab11110e
PP
1142 g_ptr_array_free(
1143 muxer_notif_iter->muxer_upstream_notif_iters, TRUE);
1144 }
1145
ab11110e
PP
1146 g_list_free(muxer_notif_iter->newly_connected_priv_ports);
1147 g_free(muxer_notif_iter);
1148}
1149
1150static
1151int muxer_notif_iter_init_newly_connected_ports(struct muxer_comp *muxer_comp,
958f7d11
PP
1152 struct muxer_notif_iter *muxer_notif_iter)
1153{
ab11110e 1154 struct bt_component *comp;
544d0515
PP
1155 int64_t count;
1156 int64_t i;
ab11110e 1157 int ret = 0;
958f7d11 1158
ab11110e
PP
1159 /*
1160 * Add the connected input ports to this muxer notification
1161 * iterator's list of newly connected ports. They will be
1162 * handled by muxer_notif_iter_handle_newly_connected_ports().
1163 */
5c563278 1164 comp = bt_component_borrow_from_private(muxer_comp->priv_comp);
f6ccaed9 1165 BT_ASSERT(comp);
544d0515
PP
1166 count = bt_component_filter_get_input_port_count(comp);
1167 if (count < 0) {
fed72692
PP
1168 BT_LOGD("No input port to initialize for muxer component's notification iterator: "
1169 "muxer-comp-addr=%p, muxer-notif-iter-addr=%p",
1170 muxer_comp, muxer_notif_iter);
ab11110e
PP
1171 goto end;
1172 }
958f7d11
PP
1173
1174 for (i = 0; i < count; i++) {
1175 struct bt_private_port *priv_port =
9ac68eb1 1176 bt_private_component_filter_get_input_private_port_by_index(
958f7d11
PP
1177 muxer_comp->priv_comp, i);
1178 struct bt_port *port;
958f7d11 1179
f6ccaed9 1180 BT_ASSERT(priv_port);
5c563278 1181 port = bt_port_borrow_from_private(priv_port);
f6ccaed9 1182 BT_ASSERT(port);
958f7d11
PP
1183
1184 if (!bt_port_is_connected(port)) {
fed72692
PP
1185 BT_LOGD("Skipping input port: not connected: "
1186 "muxer-comp-addr=%p, port-addr=%p, port-name\"%s\"",
1187 muxer_comp, port, bt_port_get_name(port));
958f7d11
PP
1188 bt_put(priv_port);
1189 continue;
1190 }
1191
06a2cb0d 1192 bt_put(priv_port);
ab11110e
PP
1193 muxer_notif_iter->newly_connected_priv_ports =
1194 g_list_append(
1195 muxer_notif_iter->newly_connected_priv_ports,
958f7d11 1196 priv_port);
ab11110e 1197 if (!muxer_notif_iter->newly_connected_priv_ports) {
fed72692
PP
1198 BT_LOGE("Cannot append port to muxer's notification iterator list of newly connected input ports: "
1199 "port-addr=%p, port-name=\"%s\", "
1200 "muxer-notif-iter-addr=%p", port,
1201 bt_port_get_name(port), muxer_notif_iter);
ab11110e
PP
1202 ret = -1;
1203 goto end;
958f7d11 1204 }
fed72692
PP
1205
1206 BT_LOGD("Appended port to muxer's notification iterator list of newly connected input ports: "
1207 "port-addr=%p, port-name=\"%s\", "
1208 "muxer-notif-iter-addr=%p", port,
1209 bt_port_get_name(port), muxer_notif_iter);
958f7d11
PP
1210 }
1211
1212end:
958f7d11
PP
1213 return ret;
1214}
1215
958f7d11
PP
1216BT_HIDDEN
1217enum bt_notification_iterator_status muxer_notif_iter_init(
90157d89 1218 struct bt_private_connection_private_notification_iterator *priv_notif_iter,
958f7d11
PP
1219 struct bt_private_port *output_priv_port)
1220{
1221 struct muxer_comp *muxer_comp = NULL;
1222 struct muxer_notif_iter *muxer_notif_iter = NULL;
1223 struct bt_private_component *priv_comp = NULL;
1224 enum bt_notification_iterator_status status =
1225 BT_NOTIFICATION_ITERATOR_STATUS_OK;
1226 int ret;
1227
90157d89 1228 priv_comp = bt_private_connection_private_notification_iterator_get_private_component(
958f7d11 1229 priv_notif_iter);
f6ccaed9 1230 BT_ASSERT(priv_comp);
958f7d11 1231 muxer_comp = bt_private_component_get_user_data(priv_comp);
f6ccaed9 1232 BT_ASSERT(muxer_comp);
fed72692
PP
1233 BT_LOGD("Initializing muxer component's notification iterator: "
1234 "comp-addr=%p, muxer-comp-addr=%p, notif-iter-addr=%p",
1235 priv_comp, muxer_comp, priv_notif_iter);
a09c6b95
PP
1236
1237 if (muxer_comp->initializing_muxer_notif_iter) {
1238 /*
089717de
PP
1239 * Weird, unhandled situation detected: downstream
1240 * creates a muxer notification iterator while creating
1241 * another muxer notification iterator (same component).
a09c6b95 1242 */
fed72692
PP
1243 BT_LOGE("Recursive initialization of muxer component's notification iterator: "
1244 "comp-addr=%p, muxer-comp-addr=%p, notif-iter-addr=%p",
1245 priv_comp, muxer_comp, priv_notif_iter);
958f7d11
PP
1246 goto error;
1247 }
1248
a09c6b95
PP
1249 muxer_comp->initializing_muxer_notif_iter = true;
1250 muxer_notif_iter = g_new0(struct muxer_notif_iter, 1);
1251 if (!muxer_notif_iter) {
fed72692 1252 BT_LOGE_STR("Failed to allocate one muxer component's notification iterator.");
ab11110e
PP
1253 goto error;
1254 }
1255
958f7d11
PP
1256 muxer_notif_iter->last_returned_ts_ns = INT64_MIN;
1257 muxer_notif_iter->muxer_upstream_notif_iters =
1258 g_ptr_array_new_with_free_func(
1259 (GDestroyNotify) destroy_muxer_upstream_notif_iter);
1260 if (!muxer_notif_iter->muxer_upstream_notif_iters) {
fed72692 1261 BT_LOGE_STR("Failed to allocate a GPtrArray.");
958f7d11
PP
1262 goto error;
1263 }
1264
a09c6b95
PP
1265 /*
1266 * Add the muxer notification iterator to the component's array
1267 * of muxer notification iterators here because
1268 * muxer_notif_iter_init_newly_connected_ports() can cause
1269 * muxer_port_connected() to be called, which adds the newly
1270 * connected port to each muxer notification iterator's list of
1271 * newly connected ports.
1272 */
1273 g_ptr_array_add(muxer_comp->muxer_notif_iters, muxer_notif_iter);
1274 ret = muxer_notif_iter_init_newly_connected_ports(muxer_comp,
1275 muxer_notif_iter);
1276 if (ret) {
fed72692
PP
1277 BT_LOGE("Cannot initialize newly connected input ports for muxer component's notification iterator: "
1278 "comp-addr=%p, muxer-comp-addr=%p, "
1279 "muxer-notif-iter-addr=%p, notif-iter-addr=%p, ret=%d",
1280 priv_comp, muxer_comp, muxer_notif_iter,
1281 priv_notif_iter, ret);
a09c6b95
PP
1282 goto error;
1283 }
1284
90157d89 1285 ret = bt_private_connection_private_notification_iterator_set_user_data(priv_notif_iter,
958f7d11 1286 muxer_notif_iter);
f6ccaed9 1287 BT_ASSERT(ret == 0);
fed72692
PP
1288 BT_LOGD("Initialized muxer component's notification iterator: "
1289 "comp-addr=%p, muxer-comp-addr=%p, muxer-notif-iter-addr=%p, "
1290 "notif-iter-addr=%p",
1291 priv_comp, muxer_comp, muxer_notif_iter, priv_notif_iter);
958f7d11
PP
1292 goto end;
1293
1294error:
a09c6b95
PP
1295 if (g_ptr_array_index(muxer_comp->muxer_notif_iters,
1296 muxer_comp->muxer_notif_iters->len - 1) == muxer_notif_iter) {
1297 g_ptr_array_remove_index(muxer_comp->muxer_notif_iters,
1298 muxer_comp->muxer_notif_iters->len - 1);
1299 }
1300
958f7d11 1301 destroy_muxer_notif_iter(muxer_notif_iter);
90157d89 1302 ret = bt_private_connection_private_notification_iterator_set_user_data(priv_notif_iter,
958f7d11 1303 NULL);
f6ccaed9 1304 BT_ASSERT(ret == 0);
958f7d11
PP
1305 status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
1306
1307end:
a09c6b95 1308 muxer_comp->initializing_muxer_notif_iter = false;
958f7d11
PP
1309 bt_put(priv_comp);
1310 return status;
1311}
1312
1313BT_HIDDEN
1314void muxer_notif_iter_finalize(
90157d89 1315 struct bt_private_connection_private_notification_iterator *priv_notif_iter)
958f7d11
PP
1316{
1317 struct muxer_notif_iter *muxer_notif_iter =
90157d89 1318 bt_private_connection_private_notification_iterator_get_user_data(priv_notif_iter);
958f7d11
PP
1319 struct bt_private_component *priv_comp = NULL;
1320 struct muxer_comp *muxer_comp = NULL;
1321
90157d89 1322 priv_comp = bt_private_connection_private_notification_iterator_get_private_component(
958f7d11 1323 priv_notif_iter);
f6ccaed9 1324 BT_ASSERT(priv_comp);
958f7d11 1325 muxer_comp = bt_private_component_get_user_data(priv_comp);
fed72692
PP
1326 BT_LOGD("Finalizing muxer component's notification iterator: "
1327 "comp-addr=%p, muxer-comp-addr=%p, muxer-notif-iter-addr=%p, "
1328 "notif-iter-addr=%p",
1329 priv_comp, muxer_comp, muxer_notif_iter, priv_notif_iter);
958f7d11
PP
1330
1331 if (muxer_comp) {
1332 (void) g_ptr_array_remove_fast(muxer_comp->muxer_notif_iters,
1333 muxer_notif_iter);
1334 destroy_muxer_notif_iter(muxer_notif_iter);
1335 }
1336
1337 bt_put(priv_comp);
1338}
1339
1340BT_HIDDEN
90157d89
PP
1341struct bt_notification_iterator_next_method_return muxer_notif_iter_next(
1342 struct bt_private_connection_private_notification_iterator *priv_notif_iter)
958f7d11 1343{
90157d89 1344 struct bt_notification_iterator_next_method_return next_ret;
958f7d11 1345 struct muxer_notif_iter *muxer_notif_iter =
90157d89 1346 bt_private_connection_private_notification_iterator_get_user_data(priv_notif_iter);
958f7d11
PP
1347 struct bt_private_component *priv_comp = NULL;
1348 struct muxer_comp *muxer_comp = NULL;
958f7d11 1349
f6ccaed9 1350 BT_ASSERT(muxer_notif_iter);
90157d89 1351 priv_comp = bt_private_connection_private_notification_iterator_get_private_component(
958f7d11 1352 priv_notif_iter);
f6ccaed9 1353 BT_ASSERT(priv_comp);
958f7d11 1354 muxer_comp = bt_private_component_get_user_data(priv_comp);
f6ccaed9 1355 BT_ASSERT(muxer_comp);
958f7d11 1356
fed72692
PP
1357 BT_LOGV("Muxer component's notification iterator's \"next\" method called: "
1358 "comp-addr=%p, muxer-comp-addr=%p, muxer-notif-iter-addr=%p, "
1359 "notif-iter-addr=%p",
1360 priv_comp, muxer_comp, muxer_notif_iter, priv_notif_iter);
1361
958f7d11
PP
1362 /* Are we in an error state set elsewhere? */
1363 if (unlikely(muxer_comp->error)) {
fed72692
PP
1364 BT_LOGE("Muxer component is already in an error state: returning BT_NOTIFICATION_ITERATOR_STATUS_ERROR: "
1365 "comp-addr=%p, muxer-comp-addr=%p, muxer-notif-iter-addr=%p, "
1366 "notif-iter-addr=%p",
1367 priv_comp, muxer_comp, muxer_notif_iter, priv_notif_iter);
089717de
PP
1368 next_ret.notification = NULL;
1369 next_ret.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
1370 goto end;
958f7d11
PP
1371 }
1372
089717de 1373 next_ret = muxer_notif_iter_do_next(muxer_comp, muxer_notif_iter);
fed72692
PP
1374 if (next_ret.status < 0) {
1375 BT_LOGE("Cannot get next notification: "
1376 "comp-addr=%p, muxer-comp-addr=%p, muxer-notif-iter-addr=%p, "
1377 "notif-iter-addr=%p, status=%s",
1378 priv_comp, muxer_comp, muxer_notif_iter, priv_notif_iter,
1379 bt_notification_iterator_status_string(next_ret.status));
1380 } else {
1381 BT_LOGV("Returning from muxer component's notification iterator's \"next\" method: "
1382 "status=%s, notif-addr=%p",
1383 bt_notification_iterator_status_string(next_ret.status),
1384 next_ret.notification);
1385 }
958f7d11
PP
1386
1387end:
1388 bt_put(priv_comp);
1389 return next_ret;
1390}
1391
1392BT_HIDDEN
1393void muxer_port_connected(
1394 struct bt_private_component *priv_comp,
1395 struct bt_private_port *self_private_port,
1396 struct bt_port *other_port)
1397{
1398 struct bt_port *self_port =
5c563278 1399 bt_port_borrow_from_private(self_private_port);
958f7d11
PP
1400 struct muxer_comp *muxer_comp =
1401 bt_private_component_get_user_data(priv_comp);
1402 size_t i;
06a2cb0d 1403 int ret;
958f7d11 1404
f6ccaed9
PP
1405 BT_ASSERT(self_port);
1406 BT_ASSERT(muxer_comp);
fed72692
PP
1407 BT_LOGD("Port connected: "
1408 "comp-addr=%p, muxer-comp-addr=%p, "
1409 "port-addr=%p, port-name=\"%s\", "
1410 "other-port-addr=%p, other-port-name=\"%s\"",
1411 priv_comp, muxer_comp, self_port, bt_port_get_name(self_port),
1412 other_port, bt_port_get_name(other_port));
958f7d11 1413
06a2cb0d
PP
1414 if (bt_port_get_type(self_port) == BT_PORT_TYPE_OUTPUT) {
1415 goto end;
958f7d11
PP
1416 }
1417
1418 for (i = 0; i < muxer_comp->muxer_notif_iters->len; i++) {
1419 struct muxer_notif_iter *muxer_notif_iter =
1420 g_ptr_array_index(muxer_comp->muxer_notif_iters, i);
1421
1422 /*
ab11110e
PP
1423 * Add this port to the list of newly connected ports
1424 * for this muxer notification iterator. We append at
1425 * the end of this list while
1426 * muxer_notif_iter_handle_newly_connected_ports()
1427 * removes the nodes from the beginning.
958f7d11 1428 */
ab11110e
PP
1429 muxer_notif_iter->newly_connected_priv_ports =
1430 g_list_append(
1431 muxer_notif_iter->newly_connected_priv_ports,
06a2cb0d 1432 self_private_port);
ab11110e 1433 if (!muxer_notif_iter->newly_connected_priv_ports) {
fed72692
PP
1434 BT_LOGE("Cannot append port to muxer's notification iterator list of newly connected input ports: "
1435 "port-addr=%p, port-name=\"%s\", "
1436 "muxer-notif-iter-addr=%p", self_port,
1437 bt_port_get_name(self_port), muxer_notif_iter);
958f7d11
PP
1438 muxer_comp->error = true;
1439 goto end;
1440 }
fed72692
PP
1441
1442 BT_LOGD("Appended port to muxer's notification iterator list of newly connected input ports: "
1443 "port-addr=%p, port-name=\"%s\", "
1444 "muxer-notif-iter-addr=%p", self_port,
1445 bt_port_get_name(self_port), muxer_notif_iter);
958f7d11
PP
1446 }
1447
06a2cb0d
PP
1448 /* One less available input port */
1449 muxer_comp->available_input_ports--;
1450 ret = ensure_available_input_port(priv_comp);
1451 if (ret) {
1452 /*
1453 * Only way to report an error later since this
1454 * method does not return anything.
1455 */
fed72692
PP
1456 BT_LOGE("Cannot ensure that at least one muxer component's input port is available: "
1457 "muxer-comp-addr=%p, status=%s",
1458 muxer_comp, bt_component_status_string(ret));
06a2cb0d
PP
1459 muxer_comp->error = true;
1460 goto end;
1461 }
1462
958f7d11 1463end:
5c563278 1464 return;
958f7d11
PP
1465}
1466
1467BT_HIDDEN
1468void muxer_port_disconnected(struct bt_private_component *priv_comp,
1469 struct bt_private_port *priv_port)
1470{
5c563278 1471 struct bt_port *port = bt_port_borrow_from_private(priv_port);
958f7d11
PP
1472 struct muxer_comp *muxer_comp =
1473 bt_private_component_get_user_data(priv_comp);
1474
f6ccaed9
PP
1475 BT_ASSERT(port);
1476 BT_ASSERT(muxer_comp);
fed72692
PP
1477 BT_LOGD("Port disconnected: "
1478 "comp-addr=%p, muxer-comp-addr=%p, port-addr=%p, "
1479 "port-name=\"%s\"", priv_comp, muxer_comp,
1480 port, bt_port_get_name(port));
958f7d11 1481
ab11110e
PP
1482 /*
1483 * There's nothing special to do when a port is disconnected
1484 * because this component deals with upstream notification
1485 * iterators which were already created thanks to connected
1486 * ports. The fact that the port is disconnected does not cancel
1487 * the upstream notification iterators created using its
089717de
PP
1488 * connection: they still exist, even if the connection is dead.
1489 * The only way to remove an upstream notification iterator is
1490 * for its "next" operation to return
1491 * BT_NOTIFICATION_ITERATOR_STATUS_END.
ab11110e 1492 */
958f7d11
PP
1493 if (bt_port_get_type(port) == BT_PORT_TYPE_INPUT) {
1494 /* One more available input port */
1495 muxer_comp->available_input_ports++;
fed72692
PP
1496 BT_LOGD("Leaving disconnected input port available for future connections: "
1497 "comp-addr=%p, muxer-comp-addr=%p, port-addr=%p, "
1498 "port-name=\"%s\", avail-input-port-count=%zu",
1499 priv_comp, muxer_comp, port, bt_port_get_name(port),
1500 muxer_comp->available_input_ports);
958f7d11 1501 }
958f7d11 1502}
This page took 0.101825 seconds and 4 git commands to generate.