Values API: split into private and public APIs
[babeltrace.git] / plugins / utils / muxer / muxer.c
CommitLineData
958f7d11
PP
1/*
2 * Copyright 2017 Philippe Proulx <pproulx@efficios.com>
3 *
4 * Permission is hereby granted, free of charge, to any person obtaining a copy
5 * of this software and associated documentation files (the "Software"), to deal
6 * in the Software without restriction, including without limitation the rights
7 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 * copies of the Software, and to permit persons to whom the Software is
9 * furnished to do so, subject to the following conditions:
10 *
11 * The above copyright notice and this permission notice shall be included in
12 * all copies or substantial portions of the Software.
13 *
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20 * SOFTWARE.
21 */
22
fed72692
PP
23#define BT_LOG_TAG "PLUGIN-UTILS-MUXER-FLT"
24#include "logging.h"
25
958f7d11 26#include <babeltrace/babeltrace-internal.h>
282c8cd0 27#include <babeltrace/compat/uuid-internal.h>
9d408fca
PP
28#include <babeltrace/babeltrace.h>
29#include <babeltrace/values-internal.h>
fed72692 30#include <babeltrace/graph/component-internal.h>
fed72692 31#include <babeltrace/graph/notification-iterator-internal.h>
fed72692 32#include <babeltrace/graph/connection-internal.h>
958f7d11
PP
33#include <plugins-common.h>
34#include <glib.h>
c55a9f58 35#include <stdbool.h>
fed72692 36#include <inttypes.h>
f6ccaed9 37#include <babeltrace/assert-internal.h>
da91b29a 38#include <babeltrace/common-internal.h>
0fbb9a9f 39#include <stdlib.h>
282c8cd0 40#include <string.h>
958f7d11 41
c3acd5f3 42#define ASSUME_ABSOLUTE_CLOCK_CLASSES_PARAM_NAME "assume-absolute-clock-classes"
65ee897d 43
958f7d11 44struct muxer_comp {
90157d89
PP
45 /*
46 * Array of struct
47 * bt_private_connection_private_notification_iterator *
48 * (weak refs)
49 */
958f7d11
PP
50 GPtrArray *muxer_notif_iters;
51
52 /* Weak ref */
53 struct bt_private_component *priv_comp;
54 unsigned int next_port_num;
55 size_t available_input_ports;
a09c6b95 56 bool initializing_muxer_notif_iter;
282c8cd0 57 bool assume_absolute_clock_classes;
958f7d11
PP
58};
59
60struct muxer_upstream_notif_iter {
ab11110e 61 /* Owned by this, NULL if ended */
958f7d11
PP
62 struct bt_notification_iterator *notif_iter;
63
d4393e08
PP
64 /* Contains `struct bt_notification *`, owned by this */
65 GQueue *notifs;
958f7d11
PP
66};
67
282c8cd0
PP
68enum muxer_notif_iter_clock_class_expectation {
69 MUXER_NOTIF_ITER_CLOCK_CLASS_EXPECTATION_ANY = 0,
70 MUXER_NOTIF_ITER_CLOCK_CLASS_EXPECTATION_ABSOLUTE,
71 MUXER_NOTIF_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_SPEC_UUID,
72 MUXER_NOTIF_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_NO_UUID,
73};
74
958f7d11 75struct muxer_notif_iter {
ab11110e
PP
76 /*
77 * Array of struct muxer_upstream_notif_iter * (owned by this).
78 *
79 * NOTE: This array is searched in linearly to find the youngest
80 * current notification. Keep this until benchmarks confirm that
81 * another data structure is faster than this for our typical
82 * use cases.
83 */
958f7d11
PP
84 GPtrArray *muxer_upstream_notif_iters;
85
ab11110e 86 /*
06a2cb0d 87 * List of "recently" connected input ports (weak) to
ab11110e
PP
88 * handle by this muxer notification iterator.
89 * muxer_port_connected() adds entries to this list, and the
90 * entries are removed when a notification iterator is created
91 * on the port's connection and put into
92 * muxer_upstream_notif_iters above by
93 * muxer_notif_iter_handle_newly_connected_ports().
94 */
95 GList *newly_connected_priv_ports;
958f7d11 96
958f7d11
PP
97 /* Last time returned in a notification */
98 int64_t last_returned_ts_ns;
282c8cd0
PP
99
100 /* Clock class expectation state */
101 enum muxer_notif_iter_clock_class_expectation clock_class_expectation;
102
103 /*
104 * Expected clock class UUID, only valid when
105 * clock_class_expectation is
106 * MUXER_NOTIF_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_SPEC_UUID.
107 */
108 unsigned char expected_clock_class_uuid[BABELTRACE_UUID_LEN];
958f7d11
PP
109};
110
ab11110e
PP
111static
112void destroy_muxer_upstream_notif_iter(
113 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter)
114{
115 if (!muxer_upstream_notif_iter) {
116 return;
117 }
118
fed72692 119 BT_LOGD("Destroying muxer's upstream notification iterator wrapper: "
d4393e08 120 "addr=%p, notif-iter-addr=%p, queue-len=%u",
fed72692
PP
121 muxer_upstream_notif_iter,
122 muxer_upstream_notif_iter->notif_iter,
d4393e08 123 muxer_upstream_notif_iter->notifs->length);
65300d60 124 bt_object_put_ref(muxer_upstream_notif_iter->notif_iter);
d4393e08
PP
125
126 if (muxer_upstream_notif_iter->notifs) {
127 struct bt_notification *notif;
128
129 while ((notif = g_queue_pop_head(
130 muxer_upstream_notif_iter->notifs))) {
65300d60 131 bt_object_put_ref(notif);
d4393e08
PP
132 }
133
134 g_queue_free(muxer_upstream_notif_iter->notifs);
135 }
136
ab11110e
PP
137 g_free(muxer_upstream_notif_iter);
138}
139
958f7d11
PP
140static
141struct muxer_upstream_notif_iter *muxer_notif_iter_add_upstream_notif_iter(
142 struct muxer_notif_iter *muxer_notif_iter,
143 struct bt_notification_iterator *notif_iter,
144 struct bt_private_port *priv_port)
145{
146 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter =
147 g_new0(struct muxer_upstream_notif_iter, 1);
148
149 if (!muxer_upstream_notif_iter) {
fed72692 150 BT_LOGE_STR("Failed to allocate one muxer's upstream notification iterator wrapper.");
958f7d11
PP
151 goto end;
152 }
153
65300d60 154 muxer_upstream_notif_iter->notif_iter = bt_object_get_ref(notif_iter);
d4393e08
PP
155 muxer_upstream_notif_iter->notifs = g_queue_new();
156 if (!muxer_upstream_notif_iter->notifs) {
157 BT_LOGE_STR("Failed to allocate a GQueue.");
158
159 goto end;
160 }
161
958f7d11
PP
162 g_ptr_array_add(muxer_notif_iter->muxer_upstream_notif_iters,
163 muxer_upstream_notif_iter);
fed72692
PP
164 BT_LOGD("Added muxer's upstream notification iterator wrapper: "
165 "addr=%p, muxer-notif-iter-addr=%p, notif-iter-addr=%p",
166 muxer_upstream_notif_iter, muxer_notif_iter,
167 notif_iter);
958f7d11
PP
168
169end:
170 return muxer_upstream_notif_iter;
171}
172
958f7d11 173static
147337a3
PP
174enum bt_component_status ensure_available_input_port(
175 struct bt_private_component *priv_comp)
958f7d11
PP
176{
177 struct muxer_comp *muxer_comp =
178 bt_private_component_get_user_data(priv_comp);
147337a3 179 enum bt_component_status status = BT_COMPONENT_STATUS_OK;
958f7d11 180 GString *port_name = NULL;
958f7d11 181
f6ccaed9 182 BT_ASSERT(muxer_comp);
958f7d11
PP
183
184 if (muxer_comp->available_input_ports >= 1) {
185 goto end;
186 }
187
188 port_name = g_string_new("in");
189 if (!port_name) {
fed72692 190 BT_LOGE_STR("Failed to allocate a GString.");
147337a3 191 status = BT_COMPONENT_STATUS_NOMEM;
958f7d11
PP
192 goto end;
193 }
194
195 g_string_append_printf(port_name, "%u", muxer_comp->next_port_num);
147337a3
PP
196 status = bt_private_component_filter_add_input_private_port(
197 priv_comp, port_name->str, NULL, NULL);
198 if (status != BT_COMPONENT_STATUS_OK) {
fed72692
PP
199 BT_LOGE("Cannot add input port to muxer component: "
200 "port-name=\"%s\", comp-addr=%p, status=%s",
201 port_name->str, priv_comp,
202 bt_component_status_string(status));
958f7d11
PP
203 goto end;
204 }
205
206 muxer_comp->available_input_ports++;
207 muxer_comp->next_port_num++;
fed72692
PP
208 BT_LOGD("Added one input port to muxer component: "
209 "port-name=\"%s\", comp-addr=%p",
210 port_name->str, priv_comp);
958f7d11
PP
211end:
212 if (port_name) {
213 g_string_free(port_name, TRUE);
214 }
215
147337a3 216 return status;
958f7d11
PP
217}
218
958f7d11 219static
147337a3
PP
220enum bt_component_status create_output_port(
221 struct bt_private_component *priv_comp)
958f7d11 222{
147337a3
PP
223 return bt_private_component_filter_add_output_private_port(
224 priv_comp, "out", NULL, NULL);
958f7d11
PP
225}
226
227static
228void destroy_muxer_comp(struct muxer_comp *muxer_comp)
229{
230 if (!muxer_comp) {
231 return;
232 }
233
fed72692
PP
234 BT_LOGD("Destroying muxer component: muxer-comp-addr=%p, "
235 "muxer-notif-iter-count=%u", muxer_comp,
af18c5e7
JG
236 muxer_comp->muxer_notif_iters ?
237 muxer_comp->muxer_notif_iters->len : 0);
fed72692 238
958f7d11
PP
239 if (muxer_comp->muxer_notif_iters) {
240 g_ptr_array_free(muxer_comp->muxer_notif_iters, TRUE);
241 }
242
243 g_free(muxer_comp);
244}
245
65ee897d 246static
da91b29a 247struct bt_private_value *get_default_params(void)
65ee897d 248{
da91b29a 249 struct bt_private_value *params;
65ee897d
PP
250 int ret;
251
da91b29a 252 params = bt_private_value_map_create();
65ee897d 253 if (!params) {
fed72692 254 BT_LOGE_STR("Cannot create a map value object.");
65ee897d
PP
255 goto error;
256 }
257
da91b29a 258 ret = bt_private_value_map_insert_bool_entry(params,
c3acd5f3 259 ASSUME_ABSOLUTE_CLOCK_CLASSES_PARAM_NAME, false);
65ee897d 260 if (ret) {
fed72692 261 BT_LOGE_STR("Cannot add boolean value to map value object.");
65ee897d
PP
262 goto error;
263 }
264
265 goto end;
266
267error:
65300d60 268 BT_OBJECT_PUT_REF_AND_RESET(params);
65ee897d
PP
269
270end:
271 return params;
272}
273
274static
275int configure_muxer_comp(struct muxer_comp *muxer_comp, struct bt_value *params)
276{
da91b29a
PP
277 struct bt_private_value *default_params = NULL;
278 struct bt_private_value *real_params = NULL;
282c8cd0 279 struct bt_value *assume_absolute_clock_classes = NULL;
65ee897d 280 int ret = 0;
c55a9f58 281 bt_bool bool_val;
65ee897d
PP
282
283 default_params = get_default_params();
284 if (!default_params) {
fed72692
PP
285 BT_LOGE("Cannot get default parameters: "
286 "muxer-comp-addr=%p", muxer_comp);
65ee897d
PP
287 goto error;
288 }
289
da91b29a
PP
290 real_params = bt_value_map_extend(
291 bt_value_borrow_from_private(default_params), params);
65ee897d 292 if (!real_params) {
fed72692
PP
293 BT_LOGE("Cannot extend default parameters map value: "
294 "muxer-comp-addr=%p, def-params-addr=%p, "
295 "params-addr=%p", muxer_comp, default_params,
296 params);
65ee897d
PP
297 goto error;
298 }
299
da91b29a
PP
300 assume_absolute_clock_classes = bt_value_map_borrow_entry_value(
301 bt_value_borrow_from_private(real_params),
c3acd5f3 302 ASSUME_ABSOLUTE_CLOCK_CLASSES_PARAM_NAME);
f6ccaed9
PP
303 if (assume_absolute_clock_classes &&
304 !bt_value_is_bool(assume_absolute_clock_classes)) {
fed72692
PP
305 BT_LOGE("Expecting a boolean value for the `%s` parameter: "
306 "muxer-comp-addr=%p, value-type=%s",
307 ASSUME_ABSOLUTE_CLOCK_CLASSES_PARAM_NAME, muxer_comp,
da91b29a 308 bt_common_value_type_string(
fed72692 309 bt_value_get_type(assume_absolute_clock_classes)));
65ee897d
PP
310 goto error;
311 }
312
fed72692 313 ret = bt_value_bool_get(assume_absolute_clock_classes, &bool_val);
f6ccaed9 314 BT_ASSERT(ret == 0);
282c8cd0 315 muxer_comp->assume_absolute_clock_classes = (bool) bool_val;
fed72692
PP
316 BT_LOGD("Configured muxer component: muxer-comp-addr=%p, "
317 "assume-absolute-clock-classes=%d",
318 muxer_comp, muxer_comp->assume_absolute_clock_classes);
65ee897d
PP
319 goto end;
320
321error:
322 ret = -1;
323
324end:
65300d60
PP
325 bt_object_put_ref(default_params);
326 bt_object_put_ref(real_params);
65ee897d
PP
327 return ret;
328}
329
958f7d11
PP
330BT_HIDDEN
331enum bt_component_status muxer_init(
332 struct bt_private_component *priv_comp,
333 struct bt_value *params, void *init_data)
334{
335 int ret;
336 enum bt_component_status status = BT_COMPONENT_STATUS_OK;
337 struct muxer_comp *muxer_comp = g_new0(struct muxer_comp, 1);
338
fed72692
PP
339 BT_LOGD("Initializing muxer component: "
340 "comp-addr=%p, params-addr=%p", priv_comp, params);
341
958f7d11 342 if (!muxer_comp) {
fed72692 343 BT_LOGE_STR("Failed to allocate one muxer component.");
958f7d11
PP
344 goto error;
345 }
346
65ee897d
PP
347 ret = configure_muxer_comp(muxer_comp, params);
348 if (ret) {
fed72692
PP
349 BT_LOGE("Cannot configure muxer component: "
350 "muxer-comp-addr=%p, params-addr=%p",
351 muxer_comp, params);
65ee897d
PP
352 goto error;
353 }
354
958f7d11
PP
355 muxer_comp->muxer_notif_iters = g_ptr_array_new();
356 if (!muxer_comp->muxer_notif_iters) {
fed72692 357 BT_LOGE_STR("Failed to allocate a GPtrArray.");
958f7d11
PP
358 goto error;
359 }
360
361 muxer_comp->priv_comp = priv_comp;
362 ret = bt_private_component_set_user_data(priv_comp, muxer_comp);
f6ccaed9 363 BT_ASSERT(ret == 0);
147337a3
PP
364 status = ensure_available_input_port(priv_comp);
365 if (status != BT_COMPONENT_STATUS_OK) {
fed72692
PP
366 BT_LOGE("Cannot ensure that at least one muxer component's input port is available: "
367 "muxer-comp-addr=%p, status=%s",
368 muxer_comp,
369 bt_component_status_string(status));
958f7d11
PP
370 goto error;
371 }
372
fed72692
PP
373 status = create_output_port(priv_comp);
374 if (status) {
375 BT_LOGE("Cannot create muxer component's output port: "
376 "muxer-comp-addr=%p, status=%s",
377 muxer_comp,
378 bt_component_status_string(status));
958f7d11
PP
379 goto error;
380 }
381
fed72692
PP
382 BT_LOGD("Initialized muxer component: "
383 "comp-addr=%p, params-addr=%p, muxer-comp-addr=%p",
384 priv_comp, params, muxer_comp);
385
958f7d11
PP
386 goto end;
387
388error:
389 destroy_muxer_comp(muxer_comp);
390 ret = bt_private_component_set_user_data(priv_comp, NULL);
f6ccaed9 391 BT_ASSERT(ret == 0);
147337a3
PP
392
393 if (status == BT_COMPONENT_STATUS_OK) {
394 status = BT_COMPONENT_STATUS_ERROR;
395 }
958f7d11
PP
396
397end:
398 return status;
399}
400
401BT_HIDDEN
402void muxer_finalize(struct bt_private_component *priv_comp)
403{
404 struct muxer_comp *muxer_comp =
405 bt_private_component_get_user_data(priv_comp);
406
fed72692
PP
407 BT_LOGD("Finalizing muxer component: comp-addr=%p",
408 priv_comp);
958f7d11
PP
409 destroy_muxer_comp(muxer_comp);
410}
411
412static
413struct bt_notification_iterator *create_notif_iter_on_input_port(
414 struct bt_private_port *priv_port, int *ret)
415{
5c563278 416 struct bt_port *port = bt_port_borrow_from_private(priv_port);
958f7d11
PP
417 struct bt_notification_iterator *notif_iter = NULL;
418 struct bt_private_connection *priv_conn = NULL;
73d5c1ad 419 enum bt_connection_status conn_status;
958f7d11 420
f6ccaed9 421 BT_ASSERT(ret);
958f7d11 422 *ret = 0;
f6ccaed9
PP
423 BT_ASSERT(port);
424 BT_ASSERT(bt_port_is_connected(port));
958f7d11 425 priv_conn = bt_private_port_get_private_connection(priv_port);
f6ccaed9 426 BT_ASSERT(priv_conn);
958f7d11 427
ab11110e
PP
428 // TODO: Advance the iterator to >= the time of the latest
429 // returned notification by the muxer notification
430 // iterator which creates it.
73d5c1ad 431 conn_status = bt_private_connection_create_notification_iterator(
f42867e2 432 priv_conn, &notif_iter);
73d5c1ad 433 if (conn_status != BT_CONNECTION_STATUS_OK) {
fed72692
PP
434 BT_LOGE("Cannot create upstream notification iterator on input port's connection: "
435 "port-addr=%p, port-name=\"%s\", conn-addr=%p, "
436 "status=%s",
437 port, bt_port_get_name(port), priv_conn,
438 bt_connection_status_string(conn_status));
958f7d11
PP
439 *ret = -1;
440 goto end;
441 }
442
fed72692
PP
443 BT_LOGD("Created upstream notification iterator on input port's connection: "
444 "port-addr=%p, port-name=\"%s\", conn-addr=%p, "
445 "notif-iter-addr=%p",
446 port, bt_port_get_name(port), priv_conn, notif_iter);
447
958f7d11 448end:
65300d60 449 bt_object_put_ref(priv_conn);
958f7d11
PP
450 return notif_iter;
451}
452
ab11110e 453static
089717de 454enum bt_notification_iterator_status muxer_upstream_notif_iter_next(
ab11110e
PP
455 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter)
456{
089717de 457 enum bt_notification_iterator_status status;
d4393e08
PP
458 bt_notification_array notifs;
459 uint64_t i;
460 uint64_t count;
ab11110e 461
fed72692
PP
462 BT_LOGV("Calling upstream notification iterator's \"next\" method: "
463 "muxer-upstream-notif-iter-wrap-addr=%p, notif-iter-addr=%p",
464 muxer_upstream_notif_iter,
465 muxer_upstream_notif_iter->notif_iter);
07245ac2 466 status = bt_private_connection_notification_iterator_next(
d4393e08 467 muxer_upstream_notif_iter->notif_iter, &notifs, &count);
fed72692
PP
468 BT_LOGV("Upstream notification iterator's \"next\" method returned: "
469 "status=%s", bt_notification_iterator_status_string(status));
ab11110e 470
089717de 471 switch (status) {
ab11110e 472 case BT_NOTIFICATION_ITERATOR_STATUS_OK:
089717de 473 /*
d4393e08
PP
474 * Notification iterator's current notification is
475 * valid: it must be considered for muxing operations.
089717de 476 */
fed72692 477 BT_LOGV_STR("Validated upstream notification iterator wrapper.");
d4393e08
PP
478 BT_ASSERT(count > 0);
479
480 /* Move notifications to our queue */
481 for (i = 0; i < count; i++) {
482 /*
483 * Push to tail in order; other side
484 * (muxer_notif_iter_do_next_one()) consumes
485 * from the head first.
486 */
487 g_queue_push_tail(muxer_upstream_notif_iter->notifs,
488 notifs[i]);
489 }
ab11110e
PP
490 break;
491 case BT_NOTIFICATION_ITERATOR_STATUS_AGAIN:
089717de
PP
492 /*
493 * Notification iterator's current notification is not
494 * valid anymore. Return
d4393e08 495 * BT_NOTIFICATION_ITERATOR_STATUS_AGAIN immediately.
089717de 496 */
ab11110e 497 break;
beed0223
MD
498 case BT_NOTIFICATION_ITERATOR_STATUS_END: /* Fall-through. */
499 case BT_NOTIFICATION_ITERATOR_STATUS_CANCELED:
ab11110e
PP
500 /*
501 * Notification iterator reached the end: release it. It
502 * won't be considered again to find the youngest
503 * notification.
504 */
65300d60 505 BT_OBJECT_PUT_REF_AND_RESET(muxer_upstream_notif_iter->notif_iter);
089717de
PP
506 status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
507 break;
ab11110e
PP
508 default:
509 /* Error or unsupported status code */
fed72692
PP
510 BT_LOGE("Error or unsupported status code: "
511 "status-code=%d", status);
089717de
PP
512 status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
513 break;
ab11110e
PP
514 }
515
089717de 516 return status;
ab11110e
PP
517}
518
519static
089717de 520int muxer_notif_iter_handle_newly_connected_ports(
ab11110e
PP
521 struct muxer_notif_iter *muxer_notif_iter)
522{
ab11110e
PP
523 int ret = 0;
524
fed72692
PP
525 BT_LOGV("Handling newly connected ports: "
526 "muxer-notif-iter-addr=%p", muxer_notif_iter);
527
ab11110e
PP
528 /*
529 * Here we create one upstream notification iterator for each
d4393e08 530 * newly connected port. We do NOT perform an initial "next" on
089717de
PP
531 * those new upstream notification iterators: they are
532 * invalidated, to be validated later. The list of newly
533 * connected ports to handle here is updated by
534 * muxer_port_connected().
ab11110e
PP
535 */
536 while (true) {
537 GList *node = muxer_notif_iter->newly_connected_priv_ports;
538 struct bt_private_port *priv_port;
539 struct bt_port *port;
540 struct bt_notification_iterator *upstream_notif_iter = NULL;
541 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter;
542
543 if (!node) {
544 break;
545 }
546
547 priv_port = node->data;
5c563278 548 port = bt_port_borrow_from_private(priv_port);
f6ccaed9 549 BT_ASSERT(port);
ab11110e
PP
550
551 if (!bt_port_is_connected(port)) {
552 /*
553 * Looks like this port is not connected
554 * anymore: we can't create an upstream
089717de
PP
555 * notification iterator on its (non-existing)
556 * connection in this case.
ab11110e
PP
557 */
558 goto remove_node;
559 }
560
ab11110e
PP
561 upstream_notif_iter = create_notif_iter_on_input_port(priv_port,
562 &ret);
563 if (ret) {
fed72692 564 /* create_notif_iter_on_input_port() logs errors */
f6ccaed9 565 BT_ASSERT(!upstream_notif_iter);
ab11110e
PP
566 goto error;
567 }
568
569 muxer_upstream_notif_iter =
570 muxer_notif_iter_add_upstream_notif_iter(
571 muxer_notif_iter, upstream_notif_iter,
572 priv_port);
65300d60 573 BT_OBJECT_PUT_REF_AND_RESET(upstream_notif_iter);
ab11110e 574 if (!muxer_upstream_notif_iter) {
fed72692
PP
575 /*
576 * muxer_notif_iter_add_upstream_notif_iter()
577 * logs errors.
578 */
ab11110e
PP
579 goto error;
580 }
581
ab11110e 582remove_node:
65300d60 583 bt_object_put_ref(upstream_notif_iter);
ab11110e
PP
584 muxer_notif_iter->newly_connected_priv_ports =
585 g_list_delete_link(
586 muxer_notif_iter->newly_connected_priv_ports,
587 node);
588 }
589
590 goto end;
591
592error:
089717de 593 if (ret >= 0) {
ab11110e
PP
594 ret = -1;
595 }
596
597end:
ab11110e
PP
598 return ret;
599}
600
958f7d11
PP
601static
602int get_notif_ts_ns(struct muxer_comp *muxer_comp,
282c8cd0 603 struct muxer_notif_iter *muxer_notif_iter,
958f7d11
PP
604 struct bt_notification *notif, int64_t last_returned_ts_ns,
605 int64_t *ts_ns)
606{
50842bdc
PP
607 struct bt_clock_class *clock_class = NULL;
608 struct bt_clock_value *clock_value = NULL;
609 struct bt_event *event = NULL;
958f7d11 610 int ret = 0;
282c8cd0 611 const unsigned char *cc_uuid;
fed72692 612 const char *cc_name;
44c440bc 613 enum bt_clock_value_status cv_status = BT_CLOCK_VALUE_STATUS_KNOWN;
958f7d11 614
f6ccaed9
PP
615 BT_ASSERT(notif);
616 BT_ASSERT(ts_ns);
958f7d11 617
fed72692
PP
618 BT_LOGV("Getting notification's timestamp: "
619 "muxer-notif-iter-addr=%p, notif-addr=%p, "
620 "last-returned-ts=%" PRId64,
621 muxer_notif_iter, notif, last_returned_ts_ns);
622
958f7d11
PP
623 switch (bt_notification_get_type(notif)) {
624 case BT_NOTIFICATION_TYPE_EVENT:
e22b45d0
PP
625 event = bt_notification_event_borrow_event(notif);
626 BT_ASSERT(event);
44c440bc
PP
627 cv_status = bt_event_borrow_default_clock_value(event,
628 &clock_value);
958f7d11
PP
629 break;
630
631 case BT_NOTIFICATION_TYPE_INACTIVITY:
e22b45d0
PP
632 clock_value =
633 bt_notification_inactivity_borrow_default_clock_value(
958f7d11
PP
634 notif);
635 break;
636 default:
089717de 637 /* All the other notifications have a higher priority */
fed72692 638 BT_LOGV_STR("Notification has no timestamp: using the last returned timestamp.");
958f7d11
PP
639 *ts_ns = last_returned_ts_ns;
640 goto end;
641 }
642
44c440bc
PP
643 if (cv_status != BT_CLOCK_VALUE_STATUS_KNOWN) {
644 BT_LOGE_STR("Unsupported unknown clock value.");
645 ret = -1;
646 goto end;
647 }
648
958f7d11 649 /*
e22b45d0
PP
650 * If the clock value is missing, then we consider that this
651 * notification has no time. In this case it's always the
652 * youngest.
958f7d11 653 */
e22b45d0
PP
654 if (!clock_value) {
655 BT_LOGV_STR("Notification's default clock value is missing: "
fed72692 656 "using the last returned timestamp.");
958f7d11
PP
657 *ts_ns = last_returned_ts_ns;
658 goto end;
659 }
660
44c440bc 661 clock_class = bt_clock_value_borrow_clock_class(clock_value);
e22b45d0 662 BT_ASSERT(clock_class);
50842bdc
PP
663 cc_uuid = bt_clock_class_get_uuid(clock_class);
664 cc_name = bt_clock_class_get_name(clock_class);
282c8cd0
PP
665
666 if (muxer_notif_iter->clock_class_expectation ==
667 MUXER_NOTIF_ITER_CLOCK_CLASS_EXPECTATION_ANY) {
668 /*
669 * This is the first clock class that this muxer
670 * notification iterator encounters. Its properties
671 * determine what to expect for the whole lifetime of
672 * the iterator without a true
673 * `assume-absolute-clock-classes` parameter.
674 */
50842bdc 675 if (bt_clock_class_is_absolute(clock_class)) {
282c8cd0
PP
676 /* Expect absolute clock classes */
677 muxer_notif_iter->clock_class_expectation =
678 MUXER_NOTIF_ITER_CLOCK_CLASS_EXPECTATION_ABSOLUTE;
679 } else {
680 if (cc_uuid) {
681 /*
682 * Expect non-absolute clock classes
683 * with a specific UUID.
684 */
685 muxer_notif_iter->clock_class_expectation =
686 MUXER_NOTIF_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_SPEC_UUID;
687 memcpy(muxer_notif_iter->expected_clock_class_uuid,
688 cc_uuid, BABELTRACE_UUID_LEN);
689 } else {
690 /*
691 * Expect non-absolute clock classes
692 * with no UUID.
693 */
694 muxer_notif_iter->clock_class_expectation =
695 MUXER_NOTIF_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_NO_UUID;
696 }
697 }
698 }
699
700 if (!muxer_comp->assume_absolute_clock_classes) {
701 switch (muxer_notif_iter->clock_class_expectation) {
702 case MUXER_NOTIF_ITER_CLOCK_CLASS_EXPECTATION_ABSOLUTE:
50842bdc 703 if (!bt_clock_class_is_absolute(clock_class)) {
fed72692
PP
704 BT_LOGE("Expecting an absolute clock class, "
705 "but got a non-absolute one: "
706 "clock-class-addr=%p, clock-class-name=\"%s\"",
707 clock_class, cc_name);
282c8cd0
PP
708 goto error;
709 }
710 break;
711 case MUXER_NOTIF_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_NO_UUID:
50842bdc 712 if (bt_clock_class_is_absolute(clock_class)) {
fed72692
PP
713 BT_LOGE("Expecting a non-absolute clock class with no UUID, "
714 "but got an absolute one: "
715 "clock-class-addr=%p, clock-class-name=\"%s\"",
716 clock_class, cc_name);
282c8cd0
PP
717 goto error;
718 }
719
720 if (cc_uuid) {
fed72692
PP
721 BT_LOGE("Expecting a non-absolute clock class with no UUID, "
722 "but got one with a UUID: "
723 "clock-class-addr=%p, clock-class-name=\"%s\", "
724 "uuid=\"%02x%02x%02x%02x-%02x%02x-%02x%02x-%02x%02x-%02x%02x%02x%02x%02x%02x\"",
725 clock_class, cc_name,
726 (unsigned int) cc_uuid[0],
727 (unsigned int) cc_uuid[1],
728 (unsigned int) cc_uuid[2],
729 (unsigned int) cc_uuid[3],
730 (unsigned int) cc_uuid[4],
731 (unsigned int) cc_uuid[5],
732 (unsigned int) cc_uuid[6],
733 (unsigned int) cc_uuid[7],
734 (unsigned int) cc_uuid[8],
735 (unsigned int) cc_uuid[9],
736 (unsigned int) cc_uuid[10],
737 (unsigned int) cc_uuid[11],
738 (unsigned int) cc_uuid[12],
739 (unsigned int) cc_uuid[13],
740 (unsigned int) cc_uuid[14],
741 (unsigned int) cc_uuid[15]);
282c8cd0
PP
742 goto error;
743 }
744 break;
745 case MUXER_NOTIF_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_SPEC_UUID:
50842bdc 746 if (bt_clock_class_is_absolute(clock_class)) {
fed72692
PP
747 BT_LOGE("Expecting a non-absolute clock class with a specific UUID, "
748 "but got an absolute one: "
749 "clock-class-addr=%p, clock-class-name=\"%s\"",
750 clock_class, cc_name);
282c8cd0
PP
751 goto error;
752 }
753
754 if (!cc_uuid) {
fed72692
PP
755 BT_LOGE("Expecting a non-absolute clock class with a specific UUID, "
756 "but got one with no UUID: "
757 "clock-class-addr=%p, clock-class-name=\"%s\"",
758 clock_class, cc_name);
282c8cd0
PP
759 goto error;
760 }
761
762 if (memcmp(muxer_notif_iter->expected_clock_class_uuid,
763 cc_uuid, BABELTRACE_UUID_LEN) != 0) {
fed72692
PP
764 BT_LOGE("Expecting a non-absolute clock class with a specific UUID, "
765 "but got one with different UUID: "
766 "clock-class-addr=%p, clock-class-name=\"%s\", "
767 "expected-uuid=\"%02x%02x%02x%02x-%02x%02x-%02x%02x-%02x%02x-%02x%02x%02x%02x%02x%02x\", "
768 "uuid=\"%02x%02x%02x%02x-%02x%02x-%02x%02x-%02x%02x-%02x%02x%02x%02x%02x%02x\"",
769 clock_class, cc_name,
770 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[0],
771 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[1],
772 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[2],
773 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[3],
774 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[4],
775 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[5],
776 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[6],
777 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[7],
778 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[8],
779 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[9],
780 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[10],
781 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[11],
782 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[12],
783 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[13],
784 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[14],
785 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[15],
786 (unsigned int) cc_uuid[0],
787 (unsigned int) cc_uuid[1],
788 (unsigned int) cc_uuid[2],
789 (unsigned int) cc_uuid[3],
790 (unsigned int) cc_uuid[4],
791 (unsigned int) cc_uuid[5],
792 (unsigned int) cc_uuid[6],
793 (unsigned int) cc_uuid[7],
794 (unsigned int) cc_uuid[8],
795 (unsigned int) cc_uuid[9],
796 (unsigned int) cc_uuid[10],
797 (unsigned int) cc_uuid[11],
798 (unsigned int) cc_uuid[12],
799 (unsigned int) cc_uuid[13],
800 (unsigned int) cc_uuid[14],
801 (unsigned int) cc_uuid[15]);
282c8cd0
PP
802 goto error;
803 }
804 break;
805 default:
806 /* Unexpected */
fed72692
PP
807 BT_LOGF("Unexpected clock class expectation: "
808 "expectation-code=%d",
809 muxer_notif_iter->clock_class_expectation);
282c8cd0
PP
810 abort();
811 }
958f7d11
PP
812 }
813
44c440bc 814 ret = bt_clock_value_get_ns_from_origin(clock_value, ts_ns);
958f7d11 815 if (ret) {
fed72692
PP
816 BT_LOGE("Cannot get nanoseconds from Epoch of clock value: "
817 "clock-value-addr=%p", clock_value);
958f7d11
PP
818 goto error;
819 }
820
821 goto end;
822
823error:
824 ret = -1;
825
826end:
fed72692
PP
827 if (ret == 0) {
828 BT_LOGV("Found notification's timestamp: "
829 "muxer-notif-iter-addr=%p, notif-addr=%p, "
830 "last-returned-ts=%" PRId64 ", ts=%" PRId64,
831 muxer_notif_iter, notif, last_returned_ts_ns,
832 *ts_ns);
833 }
834
958f7d11
PP
835 return ret;
836}
837
ab11110e
PP
838/*
839 * This function finds the youngest available notification amongst the
840 * non-ended upstream notification iterators and returns the upstream
841 * notification iterator which has it, or
842 * BT_NOTIFICATION_ITERATOR_STATUS_END if there's no available
843 * notification.
844 *
845 * This function does NOT:
846 *
847 * * Update any upstream notification iterator.
848 * * Check for newly connected ports.
849 * * Check the upstream notification iterators to retry.
850 *
851 * On sucess, this function sets *muxer_upstream_notif_iter to the
852 * upstream notification iterator of which the current notification is
853 * the youngest, and sets *ts_ns to its time.
854 */
958f7d11
PP
855static
856enum bt_notification_iterator_status
857muxer_notif_iter_youngest_upstream_notif_iter(
858 struct muxer_comp *muxer_comp,
859 struct muxer_notif_iter *muxer_notif_iter,
860 struct muxer_upstream_notif_iter **muxer_upstream_notif_iter,
861 int64_t *ts_ns)
862{
863 size_t i;
864 int ret;
865 int64_t youngest_ts_ns = INT64_MAX;
866 enum bt_notification_iterator_status status =
867 BT_NOTIFICATION_ITERATOR_STATUS_OK;
868
f6ccaed9
PP
869 BT_ASSERT(muxer_comp);
870 BT_ASSERT(muxer_notif_iter);
871 BT_ASSERT(muxer_upstream_notif_iter);
958f7d11
PP
872 *muxer_upstream_notif_iter = NULL;
873
874 for (i = 0; i < muxer_notif_iter->muxer_upstream_notif_iters->len; i++) {
875 struct bt_notification *notif;
876 struct muxer_upstream_notif_iter *cur_muxer_upstream_notif_iter =
877 g_ptr_array_index(muxer_notif_iter->muxer_upstream_notif_iters, i);
878 int64_t notif_ts_ns;
879
880 if (!cur_muxer_upstream_notif_iter->notif_iter) {
ab11110e 881 /* This upstream notification iterator is ended */
fed72692
PP
882 BT_LOGV("Skipping ended upstream notification iterator: "
883 "muxer-upstream-notif-iter-wrap-addr=%p",
884 cur_muxer_upstream_notif_iter);
958f7d11
PP
885 continue;
886 }
887
d4393e08
PP
888 BT_ASSERT(cur_muxer_upstream_notif_iter->notifs->length > 0);
889 notif = g_queue_peek_head(cur_muxer_upstream_notif_iter->notifs);
f6ccaed9 890 BT_ASSERT(notif);
282c8cd0 891 ret = get_notif_ts_ns(muxer_comp, muxer_notif_iter, notif,
958f7d11 892 muxer_notif_iter->last_returned_ts_ns, &notif_ts_ns);
958f7d11 893 if (ret) {
fed72692 894 /* get_notif_ts_ns() logs errors */
958f7d11
PP
895 *muxer_upstream_notif_iter = NULL;
896 status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
897 goto end;
898 }
899
900 if (notif_ts_ns <= youngest_ts_ns) {
901 *muxer_upstream_notif_iter =
902 cur_muxer_upstream_notif_iter;
903 youngest_ts_ns = notif_ts_ns;
904 *ts_ns = youngest_ts_ns;
905 }
906 }
907
908 if (!*muxer_upstream_notif_iter) {
909 status = BT_NOTIFICATION_ITERATOR_STATUS_END;
910 *ts_ns = INT64_MIN;
911 }
912
913end:
914 return status;
915}
916
917static
089717de
PP
918enum bt_notification_iterator_status validate_muxer_upstream_notif_iter(
919 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter)
958f7d11 920{
089717de
PP
921 enum bt_notification_iterator_status status =
922 BT_NOTIFICATION_ITERATOR_STATUS_OK;
958f7d11 923
fed72692
PP
924 BT_LOGV("Validating muxer's upstream notification iterator wrapper: "
925 "muxer-upstream-notif-iter-wrap-addr=%p",
926 muxer_upstream_notif_iter);
927
d4393e08 928 if (muxer_upstream_notif_iter->notifs->length > 0 ||
089717de 929 !muxer_upstream_notif_iter->notif_iter) {
d4393e08
PP
930 BT_LOGV("Already valid or not considered: "
931 "queue-len=%u, upstream-notif-iter-addr=%p",
932 muxer_upstream_notif_iter->notifs->length,
933 muxer_upstream_notif_iter->notif_iter);
ab11110e
PP
934 goto end;
935 }
936
fed72692 937 /* muxer_upstream_notif_iter_next() logs details/errors */
089717de
PP
938 status = muxer_upstream_notif_iter_next(muxer_upstream_notif_iter);
939
940end:
941 return status;
942}
943
944static
945enum bt_notification_iterator_status validate_muxer_upstream_notif_iters(
d4393e08 946 struct muxer_notif_iter *muxer_notif_iter)
089717de
PP
947{
948 enum bt_notification_iterator_status status =
949 BT_NOTIFICATION_ITERATOR_STATUS_OK;
950 size_t i;
951
fed72692
PP
952 BT_LOGV("Validating muxer's upstream notification iterator wrappers: "
953 "muxer-notif-iter-addr=%p", muxer_notif_iter);
954
089717de
PP
955 for (i = 0; i < muxer_notif_iter->muxer_upstream_notif_iters->len; i++) {
956 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter =
957 g_ptr_array_index(
958 muxer_notif_iter->muxer_upstream_notif_iters,
959 i);
960
961 status = validate_muxer_upstream_notif_iter(
962 muxer_upstream_notif_iter);
963 if (status != BT_NOTIFICATION_ITERATOR_STATUS_OK) {
fed72692
PP
964 if (status < 0) {
965 BT_LOGE("Cannot validate muxer's upstream notification iterator wrapper: "
966 "muxer-notif-iter-addr=%p, "
967 "muxer-upstream-notif-iter-wrap-addr=%p",
968 muxer_notif_iter,
969 muxer_upstream_notif_iter);
970 } else {
971 BT_LOGV("Cannot validate muxer's upstream notification iterator wrapper: "
972 "muxer-notif-iter-addr=%p, "
973 "muxer-upstream-notif-iter-wrap-addr=%p",
974 muxer_notif_iter,
975 muxer_upstream_notif_iter);
976 }
977
089717de
PP
978 goto end;
979 }
744ba28b
PP
980
981 /*
982 * Remove this muxer upstream notification iterator
983 * if it's ended or canceled.
984 */
985 if (!muxer_upstream_notif_iter->notif_iter) {
986 /*
987 * Use g_ptr_array_remove_fast() because the
988 * order of those elements is not important.
989 */
fed72692
PP
990 BT_LOGV("Removing muxer's upstream notification iterator wrapper: ended or canceled: "
991 "muxer-notif-iter-addr=%p, "
992 "muxer-upstream-notif-iter-wrap-addr=%p",
993 muxer_notif_iter, muxer_upstream_notif_iter);
744ba28b
PP
994 g_ptr_array_remove_index_fast(
995 muxer_notif_iter->muxer_upstream_notif_iters,
996 i);
997 i--;
998 }
089717de
PP
999 }
1000
1001end:
1002 return status;
1003}
1004
d4393e08
PP
1005static inline
1006enum bt_notification_iterator_status muxer_notif_iter_do_next_one(
089717de 1007 struct muxer_comp *muxer_comp,
d4393e08
PP
1008 struct muxer_notif_iter *muxer_notif_iter,
1009 struct bt_notification **notif)
089717de 1010{
d4393e08
PP
1011 enum bt_notification_iterator_status status =
1012 BT_NOTIFICATION_ITERATOR_STATUS_OK;
089717de 1013 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter = NULL;
089717de
PP
1014 int64_t next_return_ts;
1015
1016 while (true) {
1017 int ret = muxer_notif_iter_handle_newly_connected_ports(
1018 muxer_notif_iter);
1019
1020 if (ret) {
fed72692
PP
1021 BT_LOGE("Cannot handle newly connected input ports for muxer's notification iterator: "
1022 "muxer-comp-addr=%p, muxer-notif-iter-addr=%p, "
1023 "ret=%d",
1024 muxer_comp, muxer_notif_iter, ret);
d4393e08 1025 status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
089717de
PP
1026 goto end;
1027 }
1028
d4393e08
PP
1029 status = validate_muxer_upstream_notif_iters(muxer_notif_iter);
1030 if (status != BT_NOTIFICATION_ITERATOR_STATUS_OK) {
3823b499 1031 /* validate_muxer_upstream_notif_iters() logs details */
089717de
PP
1032 goto end;
1033 }
ab11110e 1034
958f7d11 1035 /*
089717de
PP
1036 * At this point, we know that all the existing upstream
1037 * notification iterators are valid. However the
1038 * operations to validate them (during
1039 * validate_muxer_upstream_notif_iters()) may have
1040 * connected new ports. If no ports were connected
1041 * during this operation, exit the loop.
958f7d11 1042 */
089717de 1043 if (!muxer_notif_iter->newly_connected_priv_ports) {
fed72692
PP
1044 BT_LOGV("Not breaking this loop: muxer's notification iterator still has newly connected input ports to handle: "
1045 "muxer-comp-addr=%p", muxer_comp);
089717de
PP
1046 break;
1047 }
958f7d11
PP
1048 }
1049
f6ccaed9 1050 BT_ASSERT(!muxer_notif_iter->newly_connected_priv_ports);
089717de 1051
958f7d11 1052 /*
089717de
PP
1053 * At this point we know that all the existing upstream
1054 * notification iterators are valid. We can find the one,
1055 * amongst those, of which the current notification is the
1056 * youngest.
958f7d11 1057 */
d4393e08 1058 status = muxer_notif_iter_youngest_upstream_notif_iter(muxer_comp,
958f7d11 1059 muxer_notif_iter, &muxer_upstream_notif_iter,
089717de 1060 &next_return_ts);
d4393e08
PP
1061 if (status < 0 || status == BT_NOTIFICATION_ITERATOR_STATUS_END ||
1062 status == BT_NOTIFICATION_ITERATOR_STATUS_CANCELED) {
1063 if (status < 0) {
fed72692
PP
1064 BT_LOGE("Cannot find the youngest upstream notification iterator wrapper: "
1065 "status=%s",
d4393e08 1066 bt_notification_iterator_status_string(status));
fed72692
PP
1067 } else {
1068 BT_LOGV("Cannot find the youngest upstream notification iterator wrapper: "
1069 "status=%s",
d4393e08 1070 bt_notification_iterator_status_string(status));
fed72692
PP
1071 }
1072
958f7d11
PP
1073 goto end;
1074 }
1075
089717de 1076 if (next_return_ts < muxer_notif_iter->last_returned_ts_ns) {
fed72692
PP
1077 BT_LOGE("Youngest upstream notification iterator wrapper's timestamp is less than muxer's notification iterator's last returned timestamp: "
1078 "muxer-notif-iter-addr=%p, ts=%" PRId64 ", "
1079 "last-returned-ts=%" PRId64,
1080 muxer_notif_iter, next_return_ts,
1081 muxer_notif_iter->last_returned_ts_ns);
d4393e08 1082 status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
958f7d11
PP
1083 goto end;
1084 }
1085
fed72692
PP
1086 BT_LOGV("Found youngest upstream notification iterator wrapper: "
1087 "muxer-notif-iter-addr=%p, "
1088 "muxer-upstream-notif-iter-wrap-addr=%p, "
1089 "ts=%" PRId64,
1090 muxer_notif_iter, muxer_upstream_notif_iter, next_return_ts);
d4393e08 1091 BT_ASSERT(status == BT_NOTIFICATION_ITERATOR_STATUS_OK);
f6ccaed9 1092 BT_ASSERT(muxer_upstream_notif_iter);
958f7d11
PP
1093
1094 /*
d4393e08
PP
1095 * Consume from the queue's head: other side
1096 * (muxer_upstream_notif_iter_next()) writes to the tail.
958f7d11 1097 */
d4393e08
PP
1098 *notif = g_queue_pop_head(muxer_upstream_notif_iter->notifs);
1099 BT_ASSERT(*notif);
089717de 1100 muxer_notif_iter->last_returned_ts_ns = next_return_ts;
958f7d11
PP
1101
1102end:
d4393e08
PP
1103 return status;
1104}
1105
1106static
1107enum bt_notification_iterator_status muxer_notif_iter_do_next(
1108 struct muxer_comp *muxer_comp,
1109 struct muxer_notif_iter *muxer_notif_iter,
1110 bt_notification_array notifs, uint64_t capacity,
1111 uint64_t *count)
1112{
1113 enum bt_notification_iterator_status status =
1114 BT_NOTIFICATION_ITERATOR_STATUS_OK;
1115 uint64_t i = 0;
1116
1117 while (i < capacity && status == BT_NOTIFICATION_ITERATOR_STATUS_OK) {
1118 status = muxer_notif_iter_do_next_one(muxer_comp,
1119 muxer_notif_iter, &notifs[i]);
1120 if (status == BT_NOTIFICATION_ITERATOR_STATUS_OK) {
1121 i++;
1122 }
1123 }
1124
1125 if (i > 0) {
1126 /*
1127 * Even if muxer_notif_iter_do_next_one() returned
1128 * something else than
1129 * BT_NOTIFICATION_ITERATOR_STATUS_OK, we accumulated
1130 * notification objects in the output notification
1131 * array, so we need to return
1132 * BT_NOTIFICATION_ITERATOR_STATUS_OK so that they are
1133 * transfered to downstream. This other status occurs
1134 * again the next time muxer_notif_iter_do_next() is
1135 * called, possibly without any accumulated
1136 * notification, in which case we'll return it.
1137 */
1138 *count = i;
1139 status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
1140 }
1141
1142 return status;
958f7d11
PP
1143}
1144
1145static
ab11110e
PP
1146void destroy_muxer_notif_iter(struct muxer_notif_iter *muxer_notif_iter)
1147{
ab11110e
PP
1148 if (!muxer_notif_iter) {
1149 return;
1150 }
1151
fed72692
PP
1152 BT_LOGD("Destroying muxer component's notification iterator: "
1153 "muxer-notif-iter-addr=%p", muxer_notif_iter);
1154
ab11110e 1155 if (muxer_notif_iter->muxer_upstream_notif_iters) {
fed72692 1156 BT_LOGD_STR("Destroying muxer's upstream notification iterator wrappers.");
ab11110e
PP
1157 g_ptr_array_free(
1158 muxer_notif_iter->muxer_upstream_notif_iters, TRUE);
1159 }
1160
ab11110e
PP
1161 g_list_free(muxer_notif_iter->newly_connected_priv_ports);
1162 g_free(muxer_notif_iter);
1163}
1164
1165static
1166int muxer_notif_iter_init_newly_connected_ports(struct muxer_comp *muxer_comp,
958f7d11
PP
1167 struct muxer_notif_iter *muxer_notif_iter)
1168{
ab11110e 1169 struct bt_component *comp;
544d0515
PP
1170 int64_t count;
1171 int64_t i;
ab11110e 1172 int ret = 0;
958f7d11 1173
ab11110e
PP
1174 /*
1175 * Add the connected input ports to this muxer notification
1176 * iterator's list of newly connected ports. They will be
1177 * handled by muxer_notif_iter_handle_newly_connected_ports().
1178 */
5c563278 1179 comp = bt_component_borrow_from_private(muxer_comp->priv_comp);
f6ccaed9 1180 BT_ASSERT(comp);
544d0515
PP
1181 count = bt_component_filter_get_input_port_count(comp);
1182 if (count < 0) {
fed72692
PP
1183 BT_LOGD("No input port to initialize for muxer component's notification iterator: "
1184 "muxer-comp-addr=%p, muxer-notif-iter-addr=%p",
1185 muxer_comp, muxer_notif_iter);
ab11110e
PP
1186 goto end;
1187 }
958f7d11
PP
1188
1189 for (i = 0; i < count; i++) {
1190 struct bt_private_port *priv_port =
9ac68eb1 1191 bt_private_component_filter_get_input_private_port_by_index(
958f7d11
PP
1192 muxer_comp->priv_comp, i);
1193 struct bt_port *port;
958f7d11 1194
f6ccaed9 1195 BT_ASSERT(priv_port);
5c563278 1196 port = bt_port_borrow_from_private(priv_port);
f6ccaed9 1197 BT_ASSERT(port);
958f7d11
PP
1198
1199 if (!bt_port_is_connected(port)) {
fed72692
PP
1200 BT_LOGD("Skipping input port: not connected: "
1201 "muxer-comp-addr=%p, port-addr=%p, port-name\"%s\"",
1202 muxer_comp, port, bt_port_get_name(port));
65300d60 1203 bt_object_put_ref(priv_port);
958f7d11
PP
1204 continue;
1205 }
1206
65300d60 1207 bt_object_put_ref(priv_port);
ab11110e
PP
1208 muxer_notif_iter->newly_connected_priv_ports =
1209 g_list_append(
1210 muxer_notif_iter->newly_connected_priv_ports,
958f7d11 1211 priv_port);
ab11110e 1212 if (!muxer_notif_iter->newly_connected_priv_ports) {
fed72692
PP
1213 BT_LOGE("Cannot append port to muxer's notification iterator list of newly connected input ports: "
1214 "port-addr=%p, port-name=\"%s\", "
1215 "muxer-notif-iter-addr=%p", port,
1216 bt_port_get_name(port), muxer_notif_iter);
ab11110e
PP
1217 ret = -1;
1218 goto end;
958f7d11 1219 }
fed72692
PP
1220
1221 BT_LOGD("Appended port to muxer's notification iterator list of newly connected input ports: "
1222 "port-addr=%p, port-name=\"%s\", "
1223 "muxer-notif-iter-addr=%p", port,
1224 bt_port_get_name(port), muxer_notif_iter);
958f7d11
PP
1225 }
1226
1227end:
958f7d11
PP
1228 return ret;
1229}
1230
958f7d11
PP
1231BT_HIDDEN
1232enum bt_notification_iterator_status muxer_notif_iter_init(
90157d89 1233 struct bt_private_connection_private_notification_iterator *priv_notif_iter,
958f7d11
PP
1234 struct bt_private_port *output_priv_port)
1235{
1236 struct muxer_comp *muxer_comp = NULL;
1237 struct muxer_notif_iter *muxer_notif_iter = NULL;
1238 struct bt_private_component *priv_comp = NULL;
1239 enum bt_notification_iterator_status status =
1240 BT_NOTIFICATION_ITERATOR_STATUS_OK;
1241 int ret;
1242
90157d89 1243 priv_comp = bt_private_connection_private_notification_iterator_get_private_component(
958f7d11 1244 priv_notif_iter);
f6ccaed9 1245 BT_ASSERT(priv_comp);
958f7d11 1246 muxer_comp = bt_private_component_get_user_data(priv_comp);
f6ccaed9 1247 BT_ASSERT(muxer_comp);
fed72692
PP
1248 BT_LOGD("Initializing muxer component's notification iterator: "
1249 "comp-addr=%p, muxer-comp-addr=%p, notif-iter-addr=%p",
1250 priv_comp, muxer_comp, priv_notif_iter);
a09c6b95
PP
1251
1252 if (muxer_comp->initializing_muxer_notif_iter) {
1253 /*
089717de
PP
1254 * Weird, unhandled situation detected: downstream
1255 * creates a muxer notification iterator while creating
1256 * another muxer notification iterator (same component).
a09c6b95 1257 */
fed72692
PP
1258 BT_LOGE("Recursive initialization of muxer component's notification iterator: "
1259 "comp-addr=%p, muxer-comp-addr=%p, notif-iter-addr=%p",
1260 priv_comp, muxer_comp, priv_notif_iter);
958f7d11
PP
1261 goto error;
1262 }
1263
a09c6b95
PP
1264 muxer_comp->initializing_muxer_notif_iter = true;
1265 muxer_notif_iter = g_new0(struct muxer_notif_iter, 1);
1266 if (!muxer_notif_iter) {
fed72692 1267 BT_LOGE_STR("Failed to allocate one muxer component's notification iterator.");
ab11110e
PP
1268 goto error;
1269 }
1270
958f7d11
PP
1271 muxer_notif_iter->last_returned_ts_ns = INT64_MIN;
1272 muxer_notif_iter->muxer_upstream_notif_iters =
1273 g_ptr_array_new_with_free_func(
1274 (GDestroyNotify) destroy_muxer_upstream_notif_iter);
1275 if (!muxer_notif_iter->muxer_upstream_notif_iters) {
fed72692 1276 BT_LOGE_STR("Failed to allocate a GPtrArray.");
958f7d11
PP
1277 goto error;
1278 }
1279
a09c6b95
PP
1280 /*
1281 * Add the muxer notification iterator to the component's array
1282 * of muxer notification iterators here because
1283 * muxer_notif_iter_init_newly_connected_ports() can cause
1284 * muxer_port_connected() to be called, which adds the newly
1285 * connected port to each muxer notification iterator's list of
1286 * newly connected ports.
1287 */
1288 g_ptr_array_add(muxer_comp->muxer_notif_iters, muxer_notif_iter);
1289 ret = muxer_notif_iter_init_newly_connected_ports(muxer_comp,
1290 muxer_notif_iter);
1291 if (ret) {
fed72692
PP
1292 BT_LOGE("Cannot initialize newly connected input ports for muxer component's notification iterator: "
1293 "comp-addr=%p, muxer-comp-addr=%p, "
1294 "muxer-notif-iter-addr=%p, notif-iter-addr=%p, ret=%d",
1295 priv_comp, muxer_comp, muxer_notif_iter,
1296 priv_notif_iter, ret);
a09c6b95
PP
1297 goto error;
1298 }
1299
90157d89 1300 ret = bt_private_connection_private_notification_iterator_set_user_data(priv_notif_iter,
958f7d11 1301 muxer_notif_iter);
f6ccaed9 1302 BT_ASSERT(ret == 0);
fed72692
PP
1303 BT_LOGD("Initialized muxer component's notification iterator: "
1304 "comp-addr=%p, muxer-comp-addr=%p, muxer-notif-iter-addr=%p, "
1305 "notif-iter-addr=%p",
1306 priv_comp, muxer_comp, muxer_notif_iter, priv_notif_iter);
958f7d11
PP
1307 goto end;
1308
1309error:
a09c6b95
PP
1310 if (g_ptr_array_index(muxer_comp->muxer_notif_iters,
1311 muxer_comp->muxer_notif_iters->len - 1) == muxer_notif_iter) {
1312 g_ptr_array_remove_index(muxer_comp->muxer_notif_iters,
1313 muxer_comp->muxer_notif_iters->len - 1);
1314 }
1315
958f7d11 1316 destroy_muxer_notif_iter(muxer_notif_iter);
90157d89 1317 ret = bt_private_connection_private_notification_iterator_set_user_data(priv_notif_iter,
958f7d11 1318 NULL);
f6ccaed9 1319 BT_ASSERT(ret == 0);
958f7d11
PP
1320 status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
1321
1322end:
a09c6b95 1323 muxer_comp->initializing_muxer_notif_iter = false;
65300d60 1324 bt_object_put_ref(priv_comp);
958f7d11
PP
1325 return status;
1326}
1327
1328BT_HIDDEN
1329void muxer_notif_iter_finalize(
90157d89 1330 struct bt_private_connection_private_notification_iterator *priv_notif_iter)
958f7d11
PP
1331{
1332 struct muxer_notif_iter *muxer_notif_iter =
90157d89 1333 bt_private_connection_private_notification_iterator_get_user_data(priv_notif_iter);
958f7d11
PP
1334 struct bt_private_component *priv_comp = NULL;
1335 struct muxer_comp *muxer_comp = NULL;
1336
90157d89 1337 priv_comp = bt_private_connection_private_notification_iterator_get_private_component(
958f7d11 1338 priv_notif_iter);
f6ccaed9 1339 BT_ASSERT(priv_comp);
958f7d11 1340 muxer_comp = bt_private_component_get_user_data(priv_comp);
fed72692
PP
1341 BT_LOGD("Finalizing muxer component's notification iterator: "
1342 "comp-addr=%p, muxer-comp-addr=%p, muxer-notif-iter-addr=%p, "
1343 "notif-iter-addr=%p",
1344 priv_comp, muxer_comp, muxer_notif_iter, priv_notif_iter);
958f7d11
PP
1345
1346 if (muxer_comp) {
1347 (void) g_ptr_array_remove_fast(muxer_comp->muxer_notif_iters,
1348 muxer_notif_iter);
1349 destroy_muxer_notif_iter(muxer_notif_iter);
1350 }
1351
65300d60 1352 bt_object_put_ref(priv_comp);
958f7d11
PP
1353}
1354
1355BT_HIDDEN
d4393e08
PP
1356enum bt_notification_iterator_status muxer_notif_iter_next(
1357 struct bt_private_connection_private_notification_iterator *priv_notif_iter,
1358 bt_notification_array notifs, uint64_t capacity,
1359 uint64_t *count)
958f7d11 1360{
d4393e08 1361 enum bt_notification_iterator_status status;
958f7d11 1362 struct muxer_notif_iter *muxer_notif_iter =
90157d89 1363 bt_private_connection_private_notification_iterator_get_user_data(priv_notif_iter);
958f7d11
PP
1364 struct bt_private_component *priv_comp = NULL;
1365 struct muxer_comp *muxer_comp = NULL;
958f7d11 1366
f6ccaed9 1367 BT_ASSERT(muxer_notif_iter);
90157d89 1368 priv_comp = bt_private_connection_private_notification_iterator_get_private_component(
958f7d11 1369 priv_notif_iter);
f6ccaed9 1370 BT_ASSERT(priv_comp);
958f7d11 1371 muxer_comp = bt_private_component_get_user_data(priv_comp);
f6ccaed9 1372 BT_ASSERT(muxer_comp);
fed72692
PP
1373 BT_LOGV("Muxer component's notification iterator's \"next\" method called: "
1374 "comp-addr=%p, muxer-comp-addr=%p, muxer-notif-iter-addr=%p, "
1375 "notif-iter-addr=%p",
1376 priv_comp, muxer_comp, muxer_notif_iter, priv_notif_iter);
1377
d4393e08
PP
1378 status = muxer_notif_iter_do_next(muxer_comp, muxer_notif_iter,
1379 notifs, capacity, count);
1380 if (status < 0) {
fed72692
PP
1381 BT_LOGE("Cannot get next notification: "
1382 "comp-addr=%p, muxer-comp-addr=%p, muxer-notif-iter-addr=%p, "
1383 "notif-iter-addr=%p, status=%s",
1384 priv_comp, muxer_comp, muxer_notif_iter, priv_notif_iter,
d4393e08 1385 bt_notification_iterator_status_string(status));
fed72692
PP
1386 } else {
1387 BT_LOGV("Returning from muxer component's notification iterator's \"next\" method: "
d4393e08
PP
1388 "status=%s",
1389 bt_notification_iterator_status_string(status));
fed72692 1390 }
958f7d11 1391
65300d60 1392 bt_object_put_ref(priv_comp);
d4393e08 1393 return status;
958f7d11
PP
1394}
1395
1396BT_HIDDEN
bf55043c 1397enum bt_component_status muxer_port_connected(
958f7d11
PP
1398 struct bt_private_component *priv_comp,
1399 struct bt_private_port *self_private_port,
1400 struct bt_port *other_port)
1401{
bf55043c 1402 enum bt_component_status status = BT_COMPONENT_STATUS_OK;
958f7d11 1403 struct bt_port *self_port =
5c563278 1404 bt_port_borrow_from_private(self_private_port);
958f7d11
PP
1405 struct muxer_comp *muxer_comp =
1406 bt_private_component_get_user_data(priv_comp);
1407 size_t i;
06a2cb0d 1408 int ret;
958f7d11 1409
f6ccaed9
PP
1410 BT_ASSERT(self_port);
1411 BT_ASSERT(muxer_comp);
fed72692
PP
1412 BT_LOGD("Port connected: "
1413 "comp-addr=%p, muxer-comp-addr=%p, "
1414 "port-addr=%p, port-name=\"%s\", "
1415 "other-port-addr=%p, other-port-name=\"%s\"",
1416 priv_comp, muxer_comp, self_port, bt_port_get_name(self_port),
1417 other_port, bt_port_get_name(other_port));
958f7d11 1418
06a2cb0d
PP
1419 if (bt_port_get_type(self_port) == BT_PORT_TYPE_OUTPUT) {
1420 goto end;
958f7d11
PP
1421 }
1422
1423 for (i = 0; i < muxer_comp->muxer_notif_iters->len; i++) {
1424 struct muxer_notif_iter *muxer_notif_iter =
1425 g_ptr_array_index(muxer_comp->muxer_notif_iters, i);
1426
1427 /*
ab11110e
PP
1428 * Add this port to the list of newly connected ports
1429 * for this muxer notification iterator. We append at
1430 * the end of this list while
1431 * muxer_notif_iter_handle_newly_connected_ports()
1432 * removes the nodes from the beginning.
958f7d11 1433 */
ab11110e
PP
1434 muxer_notif_iter->newly_connected_priv_ports =
1435 g_list_append(
1436 muxer_notif_iter->newly_connected_priv_ports,
06a2cb0d 1437 self_private_port);
ab11110e 1438 if (!muxer_notif_iter->newly_connected_priv_ports) {
fed72692
PP
1439 BT_LOGE("Cannot append port to muxer's notification iterator list of newly connected input ports: "
1440 "port-addr=%p, port-name=\"%s\", "
1441 "muxer-notif-iter-addr=%p", self_port,
1442 bt_port_get_name(self_port), muxer_notif_iter);
bf55043c 1443 status = BT_COMPONENT_STATUS_ERROR;
958f7d11
PP
1444 goto end;
1445 }
fed72692
PP
1446
1447 BT_LOGD("Appended port to muxer's notification iterator list of newly connected input ports: "
1448 "port-addr=%p, port-name=\"%s\", "
1449 "muxer-notif-iter-addr=%p", self_port,
1450 bt_port_get_name(self_port), muxer_notif_iter);
958f7d11
PP
1451 }
1452
06a2cb0d
PP
1453 /* One less available input port */
1454 muxer_comp->available_input_ports--;
1455 ret = ensure_available_input_port(priv_comp);
1456 if (ret) {
1457 /*
1458 * Only way to report an error later since this
1459 * method does not return anything.
1460 */
fed72692
PP
1461 BT_LOGE("Cannot ensure that at least one muxer component's input port is available: "
1462 "muxer-comp-addr=%p, status=%s",
1463 muxer_comp, bt_component_status_string(ret));
bf55043c 1464 status = BT_COMPONENT_STATUS_ERROR;
06a2cb0d
PP
1465 goto end;
1466 }
1467
958f7d11 1468end:
bf55043c 1469 return status;
958f7d11
PP
1470}
1471
1472BT_HIDDEN
1473void muxer_port_disconnected(struct bt_private_component *priv_comp,
1474 struct bt_private_port *priv_port)
1475{
5c563278 1476 struct bt_port *port = bt_port_borrow_from_private(priv_port);
958f7d11
PP
1477 struct muxer_comp *muxer_comp =
1478 bt_private_component_get_user_data(priv_comp);
1479
f6ccaed9
PP
1480 BT_ASSERT(port);
1481 BT_ASSERT(muxer_comp);
fed72692
PP
1482 BT_LOGD("Port disconnected: "
1483 "comp-addr=%p, muxer-comp-addr=%p, port-addr=%p, "
1484 "port-name=\"%s\"", priv_comp, muxer_comp,
1485 port, bt_port_get_name(port));
958f7d11 1486
ab11110e
PP
1487 /*
1488 * There's nothing special to do when a port is disconnected
1489 * because this component deals with upstream notification
1490 * iterators which were already created thanks to connected
1491 * ports. The fact that the port is disconnected does not cancel
1492 * the upstream notification iterators created using its
089717de
PP
1493 * connection: they still exist, even if the connection is dead.
1494 * The only way to remove an upstream notification iterator is
1495 * for its "next" operation to return
1496 * BT_NOTIFICATION_ITERATOR_STATUS_END.
ab11110e 1497 */
958f7d11
PP
1498 if (bt_port_get_type(port) == BT_PORT_TYPE_INPUT) {
1499 /* One more available input port */
1500 muxer_comp->available_input_ports++;
fed72692
PP
1501 BT_LOGD("Leaving disconnected input port available for future connections: "
1502 "comp-addr=%p, muxer-comp-addr=%p, port-addr=%p, "
1503 "port-name=\"%s\", avail-input-port-count=%zu",
1504 priv_comp, muxer_comp, port, bt_port_get_name(port),
1505 muxer_comp->available_input_ports);
958f7d11 1506 }
958f7d11 1507}
This page took 0.111683 seconds and 4 git commands to generate.