Simplify sink.utils.dummy: only one upstream notification iterator
[babeltrace.git] / plugins / utils / muxer / muxer.c
CommitLineData
958f7d11
PP
1/*
2 * Copyright 2017 Philippe Proulx <pproulx@efficios.com>
3 *
4 * Permission is hereby granted, free of charge, to any person obtaining a copy
5 * of this software and associated documentation files (the "Software"), to deal
6 * in the Software without restriction, including without limitation the rights
7 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 * copies of the Software, and to permit persons to whom the Software is
9 * furnished to do so, subject to the following conditions:
10 *
11 * The above copyright notice and this permission notice shall be included in
12 * all copies or substantial portions of the Software.
13 *
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20 * SOFTWARE.
21 */
22
fed72692
PP
23#define BT_LOG_TAG "PLUGIN-UTILS-MUXER-FLT"
24#include "logging.h"
25
958f7d11 26#include <babeltrace/babeltrace-internal.h>
282c8cd0 27#include <babeltrace/compat/uuid-internal.h>
958f7d11
PP
28#include <babeltrace/ctf-ir/clock-class.h>
29#include <babeltrace/ctf-ir/event.h>
30#include <babeltrace/graph/clock-class-priority-map.h>
31#include <babeltrace/graph/component-filter.h>
32#include <babeltrace/graph/component.h>
fed72692 33#include <babeltrace/graph/component-internal.h>
958f7d11
PP
34#include <babeltrace/graph/notification-event.h>
35#include <babeltrace/graph/notification-inactivity.h>
36#include <babeltrace/graph/notification-iterator.h>
fed72692 37#include <babeltrace/graph/notification-iterator-internal.h>
958f7d11
PP
38#include <babeltrace/graph/notification.h>
39#include <babeltrace/graph/port.h>
40#include <babeltrace/graph/private-component-filter.h>
41#include <babeltrace/graph/private-component.h>
42#include <babeltrace/graph/private-component.h>
43#include <babeltrace/graph/private-connection.h>
44#include <babeltrace/graph/private-notification-iterator.h>
45#include <babeltrace/graph/private-port.h>
73d5c1ad 46#include <babeltrace/graph/connection.h>
fed72692
PP
47#include <babeltrace/graph/connection-internal.h>
48#include <babeltrace/values-internal.h>
958f7d11
PP
49#include <plugins-common.h>
50#include <glib.h>
c55a9f58 51#include <stdbool.h>
fed72692 52#include <inttypes.h>
958f7d11 53#include <assert.h>
0fbb9a9f 54#include <stdlib.h>
282c8cd0 55#include <string.h>
958f7d11 56
c3acd5f3 57#define ASSUME_ABSOLUTE_CLOCK_CLASSES_PARAM_NAME "assume-absolute-clock-classes"
65ee897d 58
958f7d11
PP
59struct muxer_comp {
60 /* Array of struct bt_private_notification_iterator * (weak refs) */
61 GPtrArray *muxer_notif_iters;
62
63 /* Weak ref */
64 struct bt_private_component *priv_comp;
65 unsigned int next_port_num;
66 size_t available_input_ports;
67 bool error;
a09c6b95 68 bool initializing_muxer_notif_iter;
282c8cd0 69 bool assume_absolute_clock_classes;
958f7d11
PP
70};
71
72struct muxer_upstream_notif_iter {
ab11110e 73 /* Owned by this, NULL if ended */
958f7d11
PP
74 struct bt_notification_iterator *notif_iter;
75
089717de
PP
76 /*
77 * This flag is true if the upstream notification iterator's
78 * current notification must be considered for the multiplexing
79 * operations. If the upstream iterator returns
80 * BT_NOTIFICATION_ITERATOR_STATUS_AGAIN, then this object
81 * is considered invalid, because its current notification is
82 * still the previous one, but we already took it into account.
83 *
84 * The value of this flag is not important if notif_iter above
85 * is NULL (which means the upstream iterator is finished).
86 */
87 bool is_valid;
958f7d11
PP
88};
89
282c8cd0
PP
90enum muxer_notif_iter_clock_class_expectation {
91 MUXER_NOTIF_ITER_CLOCK_CLASS_EXPECTATION_ANY = 0,
92 MUXER_NOTIF_ITER_CLOCK_CLASS_EXPECTATION_ABSOLUTE,
93 MUXER_NOTIF_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_SPEC_UUID,
94 MUXER_NOTIF_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_NO_UUID,
95};
96
958f7d11 97struct muxer_notif_iter {
ab11110e
PP
98 /*
99 * Array of struct muxer_upstream_notif_iter * (owned by this).
100 *
101 * NOTE: This array is searched in linearly to find the youngest
102 * current notification. Keep this until benchmarks confirm that
103 * another data structure is faster than this for our typical
104 * use cases.
105 */
958f7d11
PP
106 GPtrArray *muxer_upstream_notif_iters;
107
ab11110e 108 /*
06a2cb0d 109 * List of "recently" connected input ports (weak) to
ab11110e
PP
110 * handle by this muxer notification iterator.
111 * muxer_port_connected() adds entries to this list, and the
112 * entries are removed when a notification iterator is created
113 * on the port's connection and put into
114 * muxer_upstream_notif_iters above by
115 * muxer_notif_iter_handle_newly_connected_ports().
116 */
117 GList *newly_connected_priv_ports;
958f7d11
PP
118
119 /* Next thing to return by the "next" method */
120 struct bt_notification_iterator_next_return next_next_return;
958f7d11
PP
121
122 /* Last time returned in a notification */
123 int64_t last_returned_ts_ns;
282c8cd0
PP
124
125 /* Clock class expectation state */
126 enum muxer_notif_iter_clock_class_expectation clock_class_expectation;
127
128 /*
129 * Expected clock class UUID, only valid when
130 * clock_class_expectation is
131 * MUXER_NOTIF_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_SPEC_UUID.
132 */
133 unsigned char expected_clock_class_uuid[BABELTRACE_UUID_LEN];
958f7d11
PP
134};
135
ab11110e
PP
136static
137void destroy_muxer_upstream_notif_iter(
138 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter)
139{
140 if (!muxer_upstream_notif_iter) {
141 return;
142 }
143
fed72692
PP
144 BT_LOGD("Destroying muxer's upstream notification iterator wrapper: "
145 "addr=%p, notif-iter-addr=%p, is-valid=%d",
146 muxer_upstream_notif_iter,
147 muxer_upstream_notif_iter->notif_iter,
148 muxer_upstream_notif_iter->is_valid);
ab11110e 149 bt_put(muxer_upstream_notif_iter->notif_iter);
ab11110e
PP
150 g_free(muxer_upstream_notif_iter);
151}
152
958f7d11
PP
153static
154struct muxer_upstream_notif_iter *muxer_notif_iter_add_upstream_notif_iter(
155 struct muxer_notif_iter *muxer_notif_iter,
156 struct bt_notification_iterator *notif_iter,
157 struct bt_private_port *priv_port)
158{
159 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter =
160 g_new0(struct muxer_upstream_notif_iter, 1);
161
162 if (!muxer_upstream_notif_iter) {
fed72692 163 BT_LOGE_STR("Failed to allocate one muxer's upstream notification iterator wrapper.");
958f7d11
PP
164 goto end;
165 }
166
167 muxer_upstream_notif_iter->notif_iter = bt_get(notif_iter);
089717de 168 muxer_upstream_notif_iter->is_valid = false;
958f7d11
PP
169 g_ptr_array_add(muxer_notif_iter->muxer_upstream_notif_iters,
170 muxer_upstream_notif_iter);
fed72692
PP
171 BT_LOGD("Added muxer's upstream notification iterator wrapper: "
172 "addr=%p, muxer-notif-iter-addr=%p, notif-iter-addr=%p",
173 muxer_upstream_notif_iter, muxer_notif_iter,
174 notif_iter);
958f7d11
PP
175
176end:
177 return muxer_upstream_notif_iter;
178}
179
958f7d11 180static
147337a3
PP
181enum bt_component_status ensure_available_input_port(
182 struct bt_private_component *priv_comp)
958f7d11
PP
183{
184 struct muxer_comp *muxer_comp =
185 bt_private_component_get_user_data(priv_comp);
147337a3 186 enum bt_component_status status = BT_COMPONENT_STATUS_OK;
958f7d11 187 GString *port_name = NULL;
958f7d11
PP
188
189 assert(muxer_comp);
190
191 if (muxer_comp->available_input_ports >= 1) {
192 goto end;
193 }
194
195 port_name = g_string_new("in");
196 if (!port_name) {
fed72692 197 BT_LOGE_STR("Failed to allocate a GString.");
147337a3 198 status = BT_COMPONENT_STATUS_NOMEM;
958f7d11
PP
199 goto end;
200 }
201
202 g_string_append_printf(port_name, "%u", muxer_comp->next_port_num);
147337a3
PP
203 status = bt_private_component_filter_add_input_private_port(
204 priv_comp, port_name->str, NULL, NULL);
205 if (status != BT_COMPONENT_STATUS_OK) {
fed72692
PP
206 BT_LOGE("Cannot add input port to muxer component: "
207 "port-name=\"%s\", comp-addr=%p, status=%s",
208 port_name->str, priv_comp,
209 bt_component_status_string(status));
958f7d11
PP
210 goto end;
211 }
212
213 muxer_comp->available_input_ports++;
214 muxer_comp->next_port_num++;
fed72692
PP
215 BT_LOGD("Added one input port to muxer component: "
216 "port-name=\"%s\", comp-addr=%p",
217 port_name->str, priv_comp);
958f7d11
PP
218end:
219 if (port_name) {
220 g_string_free(port_name, TRUE);
221 }
222
147337a3 223 return status;
958f7d11
PP
224}
225
958f7d11 226static
147337a3
PP
227enum bt_component_status create_output_port(
228 struct bt_private_component *priv_comp)
958f7d11 229{
147337a3
PP
230 return bt_private_component_filter_add_output_private_port(
231 priv_comp, "out", NULL, NULL);
958f7d11
PP
232}
233
234static
235void destroy_muxer_comp(struct muxer_comp *muxer_comp)
236{
237 if (!muxer_comp) {
238 return;
239 }
240
fed72692
PP
241 BT_LOGD("Destroying muxer component: muxer-comp-addr=%p, "
242 "muxer-notif-iter-count=%u", muxer_comp,
243 muxer_comp->muxer_notif_iters->len);
244
958f7d11
PP
245 if (muxer_comp->muxer_notif_iters) {
246 g_ptr_array_free(muxer_comp->muxer_notif_iters, TRUE);
247 }
248
249 g_free(muxer_comp);
250}
251
65ee897d
PP
252static
253struct bt_value *get_default_params(void)
254{
255 struct bt_value *params;
256 int ret;
257
258 params = bt_value_map_create();
259 if (!params) {
fed72692 260 BT_LOGE_STR("Cannot create a map value object.");
65ee897d
PP
261 goto error;
262 }
263
c3acd5f3
JG
264 ret = bt_value_map_insert_bool(params,
265 ASSUME_ABSOLUTE_CLOCK_CLASSES_PARAM_NAME, false);
65ee897d 266 if (ret) {
fed72692 267 BT_LOGE_STR("Cannot add boolean value to map value object.");
65ee897d
PP
268 goto error;
269 }
270
271 goto end;
272
273error:
274 BT_PUT(params);
275
276end:
277 return params;
278}
279
280static
281int configure_muxer_comp(struct muxer_comp *muxer_comp, struct bt_value *params)
282{
283 struct bt_value *default_params = NULL;
284 struct bt_value *real_params = NULL;
282c8cd0 285 struct bt_value *assume_absolute_clock_classes = NULL;
65ee897d 286 int ret = 0;
c55a9f58 287 bt_bool bool_val;
65ee897d
PP
288
289 default_params = get_default_params();
290 if (!default_params) {
fed72692
PP
291 BT_LOGE("Cannot get default parameters: "
292 "muxer-comp-addr=%p", muxer_comp);
65ee897d
PP
293 goto error;
294 }
295
296 real_params = bt_value_map_extend(default_params, params);
297 if (!real_params) {
fed72692
PP
298 BT_LOGE("Cannot extend default parameters map value: "
299 "muxer-comp-addr=%p, def-params-addr=%p, "
300 "params-addr=%p", muxer_comp, default_params,
301 params);
65ee897d
PP
302 goto error;
303 }
304
282c8cd0 305 assume_absolute_clock_classes = bt_value_map_get(real_params,
c3acd5f3 306 ASSUME_ABSOLUTE_CLOCK_CLASSES_PARAM_NAME);
282c8cd0 307 if (!bt_value_is_bool(assume_absolute_clock_classes)) {
fed72692
PP
308 BT_LOGE("Expecting a boolean value for the `%s` parameter: "
309 "muxer-comp-addr=%p, value-type=%s",
310 ASSUME_ABSOLUTE_CLOCK_CLASSES_PARAM_NAME, muxer_comp,
311 bt_value_type_string(
312 bt_value_get_type(assume_absolute_clock_classes)));
65ee897d
PP
313 goto error;
314 }
315
fed72692
PP
316 ret = bt_value_bool_get(assume_absolute_clock_classes, &bool_val);
317 assert(ret == 0);
282c8cd0 318 muxer_comp->assume_absolute_clock_classes = (bool) bool_val;
fed72692
PP
319 BT_LOGD("Configured muxer component: muxer-comp-addr=%p, "
320 "assume-absolute-clock-classes=%d",
321 muxer_comp, muxer_comp->assume_absolute_clock_classes);
65ee897d
PP
322 goto end;
323
324error:
325 ret = -1;
326
327end:
328 bt_put(default_params);
329 bt_put(real_params);
282c8cd0 330 bt_put(assume_absolute_clock_classes);
65ee897d
PP
331 return ret;
332}
333
958f7d11
PP
334BT_HIDDEN
335enum bt_component_status muxer_init(
336 struct bt_private_component *priv_comp,
337 struct bt_value *params, void *init_data)
338{
339 int ret;
340 enum bt_component_status status = BT_COMPONENT_STATUS_OK;
341 struct muxer_comp *muxer_comp = g_new0(struct muxer_comp, 1);
342
fed72692
PP
343 BT_LOGD("Initializing muxer component: "
344 "comp-addr=%p, params-addr=%p", priv_comp, params);
345
958f7d11 346 if (!muxer_comp) {
fed72692 347 BT_LOGE_STR("Failed to allocate one muxer component.");
958f7d11
PP
348 goto error;
349 }
350
65ee897d
PP
351 ret = configure_muxer_comp(muxer_comp, params);
352 if (ret) {
fed72692
PP
353 BT_LOGE("Cannot configure muxer component: "
354 "muxer-comp-addr=%p, params-addr=%p",
355 muxer_comp, params);
65ee897d
PP
356 goto error;
357 }
358
958f7d11
PP
359 muxer_comp->muxer_notif_iters = g_ptr_array_new();
360 if (!muxer_comp->muxer_notif_iters) {
fed72692 361 BT_LOGE_STR("Failed to allocate a GPtrArray.");
958f7d11
PP
362 goto error;
363 }
364
365 muxer_comp->priv_comp = priv_comp;
366 ret = bt_private_component_set_user_data(priv_comp, muxer_comp);
367 assert(ret == 0);
147337a3
PP
368 status = ensure_available_input_port(priv_comp);
369 if (status != BT_COMPONENT_STATUS_OK) {
fed72692
PP
370 BT_LOGE("Cannot ensure that at least one muxer component's input port is available: "
371 "muxer-comp-addr=%p, status=%s",
372 muxer_comp,
373 bt_component_status_string(status));
958f7d11
PP
374 goto error;
375 }
376
fed72692
PP
377 status = create_output_port(priv_comp);
378 if (status) {
379 BT_LOGE("Cannot create muxer component's output port: "
380 "muxer-comp-addr=%p, status=%s",
381 muxer_comp,
382 bt_component_status_string(status));
958f7d11
PP
383 goto error;
384 }
385
fed72692
PP
386 BT_LOGD("Initialized muxer component: "
387 "comp-addr=%p, params-addr=%p, muxer-comp-addr=%p",
388 priv_comp, params, muxer_comp);
389
958f7d11
PP
390 goto end;
391
392error:
393 destroy_muxer_comp(muxer_comp);
394 ret = bt_private_component_set_user_data(priv_comp, NULL);
395 assert(ret == 0);
147337a3
PP
396
397 if (status == BT_COMPONENT_STATUS_OK) {
398 status = BT_COMPONENT_STATUS_ERROR;
399 }
958f7d11
PP
400
401end:
402 return status;
403}
404
405BT_HIDDEN
406void muxer_finalize(struct bt_private_component *priv_comp)
407{
408 struct muxer_comp *muxer_comp =
409 bt_private_component_get_user_data(priv_comp);
410
fed72692
PP
411 BT_LOGD("Finalizing muxer component: comp-addr=%p",
412 priv_comp);
958f7d11
PP
413 destroy_muxer_comp(muxer_comp);
414}
415
416static
417struct bt_notification_iterator *create_notif_iter_on_input_port(
418 struct bt_private_port *priv_port, int *ret)
419{
420 struct bt_port *port = bt_port_from_private_port(priv_port);
421 struct bt_notification_iterator *notif_iter = NULL;
422 struct bt_private_connection *priv_conn = NULL;
73d5c1ad 423 enum bt_connection_status conn_status;
958f7d11
PP
424
425 assert(ret);
426 *ret = 0;
427 assert(port);
958f7d11
PP
428 assert(bt_port_is_connected(port));
429 priv_conn = bt_private_port_get_private_connection(priv_port);
fed72692 430 assert(priv_conn);
958f7d11 431
ab11110e
PP
432 // TODO: Advance the iterator to >= the time of the latest
433 // returned notification by the muxer notification
434 // iterator which creates it.
73d5c1ad
PP
435 conn_status = bt_private_connection_create_notification_iterator(
436 priv_conn, NULL, &notif_iter);
437 if (conn_status != BT_CONNECTION_STATUS_OK) {
fed72692
PP
438 BT_LOGE("Cannot create upstream notification iterator on input port's connection: "
439 "port-addr=%p, port-name=\"%s\", conn-addr=%p, "
440 "status=%s",
441 port, bt_port_get_name(port), priv_conn,
442 bt_connection_status_string(conn_status));
958f7d11
PP
443 *ret = -1;
444 goto end;
445 }
446
fed72692
PP
447 BT_LOGD("Created upstream notification iterator on input port's connection: "
448 "port-addr=%p, port-name=\"%s\", conn-addr=%p, "
449 "notif-iter-addr=%p",
450 port, bt_port_get_name(port), priv_conn, notif_iter);
451
958f7d11
PP
452end:
453 bt_put(port);
454 bt_put(priv_conn);
455 return notif_iter;
456}
457
ab11110e 458static
089717de 459enum bt_notification_iterator_status muxer_upstream_notif_iter_next(
ab11110e
PP
460 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter)
461{
089717de 462 enum bt_notification_iterator_status status;
ab11110e 463
fed72692
PP
464 BT_LOGV("Calling upstream notification iterator's \"next\" method: "
465 "muxer-upstream-notif-iter-wrap-addr=%p, notif-iter-addr=%p",
466 muxer_upstream_notif_iter,
467 muxer_upstream_notif_iter->notif_iter);
089717de 468 status = bt_notification_iterator_next(
ab11110e 469 muxer_upstream_notif_iter->notif_iter);
fed72692
PP
470 BT_LOGV("Upstream notification iterator's \"next\" method returned: "
471 "status=%s", bt_notification_iterator_status_string(status));
ab11110e 472
089717de 473 switch (status) {
ab11110e 474 case BT_NOTIFICATION_ITERATOR_STATUS_OK:
089717de
PP
475 /*
476 * Notification iterator's current notification is valid:
477 * it must be considered for muxing operations.
478 */
fed72692 479 BT_LOGV_STR("Validated upstream notification iterator wrapper.");
089717de 480 muxer_upstream_notif_iter->is_valid = true;
ab11110e
PP
481 break;
482 case BT_NOTIFICATION_ITERATOR_STATUS_AGAIN:
089717de
PP
483 /*
484 * Notification iterator's current notification is not
485 * valid anymore. Return
486 * BT_NOTIFICATION_ITERATOR_STATUS_AGAIN
487 * immediately.
488 */
fed72692 489 BT_LOGV_STR("Invalidated upstream notification iterator wrapper because of BT_NOTIFICATION_ITERATOR_STATUS_AGAIN.");
089717de 490 muxer_upstream_notif_iter->is_valid = false;
ab11110e 491 break;
beed0223
MD
492 case BT_NOTIFICATION_ITERATOR_STATUS_END: /* Fall-through. */
493 case BT_NOTIFICATION_ITERATOR_STATUS_CANCELED:
ab11110e
PP
494 /*
495 * Notification iterator reached the end: release it. It
496 * won't be considered again to find the youngest
497 * notification.
498 */
fed72692 499 BT_LOGV_STR("Invalidated upstream notification iterator wrapper because of BT_NOTIFICATION_ITERATOR_STATUS_END or BT_NOTIFICATION_ITERATOR_STATUS_CANCELED.");
ab11110e 500 BT_PUT(muxer_upstream_notif_iter->notif_iter);
089717de
PP
501 muxer_upstream_notif_iter->is_valid = false;
502 status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
503 break;
ab11110e
PP
504 default:
505 /* Error or unsupported status code */
fed72692
PP
506 BT_LOGE("Error or unsupported status code: "
507 "status-code=%d", status);
089717de
PP
508 status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
509 break;
ab11110e
PP
510 }
511
089717de 512 return status;
ab11110e
PP
513}
514
515static
089717de 516int muxer_notif_iter_handle_newly_connected_ports(
ab11110e
PP
517 struct muxer_notif_iter *muxer_notif_iter)
518{
ab11110e
PP
519 int ret = 0;
520
fed72692
PP
521 BT_LOGV("Handling newly connected ports: "
522 "muxer-notif-iter-addr=%p", muxer_notif_iter);
523
ab11110e
PP
524 /*
525 * Here we create one upstream notification iterator for each
089717de
PP
526 * newly connected port. We do not perform an initial "next" on
527 * those new upstream notification iterators: they are
528 * invalidated, to be validated later. The list of newly
529 * connected ports to handle here is updated by
530 * muxer_port_connected().
ab11110e
PP
531 */
532 while (true) {
533 GList *node = muxer_notif_iter->newly_connected_priv_ports;
534 struct bt_private_port *priv_port;
535 struct bt_port *port;
536 struct bt_notification_iterator *upstream_notif_iter = NULL;
537 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter;
538
539 if (!node) {
540 break;
541 }
542
543 priv_port = node->data;
544 port = bt_port_from_private_port(priv_port);
545 assert(port);
546
547 if (!bt_port_is_connected(port)) {
548 /*
549 * Looks like this port is not connected
550 * anymore: we can't create an upstream
089717de
PP
551 * notification iterator on its (non-existing)
552 * connection in this case.
ab11110e
PP
553 */
554 goto remove_node;
555 }
556
557 BT_PUT(port);
558 upstream_notif_iter = create_notif_iter_on_input_port(priv_port,
559 &ret);
560 if (ret) {
fed72692 561 /* create_notif_iter_on_input_port() logs errors */
ab11110e 562 assert(!upstream_notif_iter);
ab11110e
PP
563 goto error;
564 }
565
566 muxer_upstream_notif_iter =
567 muxer_notif_iter_add_upstream_notif_iter(
568 muxer_notif_iter, upstream_notif_iter,
569 priv_port);
ab11110e
PP
570 BT_PUT(upstream_notif_iter);
571 if (!muxer_upstream_notif_iter) {
fed72692
PP
572 /*
573 * muxer_notif_iter_add_upstream_notif_iter()
574 * logs errors.
575 */
ab11110e
PP
576 goto error;
577 }
578
ab11110e
PP
579remove_node:
580 bt_put(upstream_notif_iter);
581 bt_put(port);
ab11110e
PP
582 muxer_notif_iter->newly_connected_priv_ports =
583 g_list_delete_link(
584 muxer_notif_iter->newly_connected_priv_ports,
585 node);
586 }
587
588 goto end;
589
590error:
089717de 591 if (ret >= 0) {
ab11110e
PP
592 ret = -1;
593 }
594
595end:
ab11110e
PP
596 return ret;
597}
598
958f7d11
PP
599static
600int get_notif_ts_ns(struct muxer_comp *muxer_comp,
282c8cd0 601 struct muxer_notif_iter *muxer_notif_iter,
958f7d11
PP
602 struct bt_notification *notif, int64_t last_returned_ts_ns,
603 int64_t *ts_ns)
604{
605 struct bt_clock_class_priority_map *cc_prio_map = NULL;
606 struct bt_ctf_clock_class *clock_class = NULL;
607 struct bt_ctf_clock_value *clock_value = NULL;
608 struct bt_ctf_event *event = NULL;
609 int ret = 0;
282c8cd0 610 const unsigned char *cc_uuid;
fed72692 611 const char *cc_name;
958f7d11
PP
612
613 assert(notif);
614 assert(ts_ns);
615
fed72692
PP
616 BT_LOGV("Getting notification's timestamp: "
617 "muxer-notif-iter-addr=%p, notif-addr=%p, "
618 "last-returned-ts=%" PRId64,
619 muxer_notif_iter, notif, last_returned_ts_ns);
620
958f7d11
PP
621 switch (bt_notification_get_type(notif)) {
622 case BT_NOTIFICATION_TYPE_EVENT:
623 cc_prio_map =
624 bt_notification_event_get_clock_class_priority_map(
625 notif);
626 break;
627
628 case BT_NOTIFICATION_TYPE_INACTIVITY:
629 cc_prio_map =
a09c6b95 630 bt_notification_inactivity_get_clock_class_priority_map(
958f7d11
PP
631 notif);
632 break;
633 default:
089717de 634 /* All the other notifications have a higher priority */
fed72692 635 BT_LOGV_STR("Notification has no timestamp: using the last returned timestamp.");
958f7d11
PP
636 *ts_ns = last_returned_ts_ns;
637 goto end;
638 }
639
640 if (!cc_prio_map) {
fed72692
PP
641 BT_LOGE("Cannot get notification's clock class priority map: "
642 "notif-addr=%p", notif);
958f7d11
PP
643 goto error;
644 }
645
646 /*
647 * If the clock class priority map is empty, then we consider
648 * that this notification has no time. In this case it's always
649 * the youngest.
650 */
651 if (bt_clock_class_priority_map_get_clock_class_count(cc_prio_map) == 0) {
fed72692
PP
652 BT_LOGV_STR("Notification's clock class priorty map contains 0 clock classes: "
653 "using the last returned timestamp.");
958f7d11
PP
654 *ts_ns = last_returned_ts_ns;
655 goto end;
656 }
657
658 clock_class =
659 bt_clock_class_priority_map_get_highest_priority_clock_class(
660 cc_prio_map);
661 if (!clock_class) {
fed72692
PP
662 BT_LOGE("Cannot get the clock class with the highest priority from clock class priority map: "
663 "cc-prio-map-addr=%p", cc_prio_map);
958f7d11
PP
664 goto error;
665 }
666
282c8cd0 667 cc_uuid = bt_ctf_clock_class_get_uuid(clock_class);
fed72692 668 cc_name = bt_ctf_clock_class_get_name(clock_class);
282c8cd0
PP
669
670 if (muxer_notif_iter->clock_class_expectation ==
671 MUXER_NOTIF_ITER_CLOCK_CLASS_EXPECTATION_ANY) {
672 /*
673 * This is the first clock class that this muxer
674 * notification iterator encounters. Its properties
675 * determine what to expect for the whole lifetime of
676 * the iterator without a true
677 * `assume-absolute-clock-classes` parameter.
678 */
679 if (bt_ctf_clock_class_is_absolute(clock_class)) {
680 /* Expect absolute clock classes */
681 muxer_notif_iter->clock_class_expectation =
682 MUXER_NOTIF_ITER_CLOCK_CLASS_EXPECTATION_ABSOLUTE;
683 } else {
684 if (cc_uuid) {
685 /*
686 * Expect non-absolute clock classes
687 * with a specific UUID.
688 */
689 muxer_notif_iter->clock_class_expectation =
690 MUXER_NOTIF_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_SPEC_UUID;
691 memcpy(muxer_notif_iter->expected_clock_class_uuid,
692 cc_uuid, BABELTRACE_UUID_LEN);
693 } else {
694 /*
695 * Expect non-absolute clock classes
696 * with no UUID.
697 */
698 muxer_notif_iter->clock_class_expectation =
699 MUXER_NOTIF_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_NO_UUID;
700 }
701 }
702 }
703
704 if (!muxer_comp->assume_absolute_clock_classes) {
705 switch (muxer_notif_iter->clock_class_expectation) {
706 case MUXER_NOTIF_ITER_CLOCK_CLASS_EXPECTATION_ABSOLUTE:
707 if (!bt_ctf_clock_class_is_absolute(clock_class)) {
fed72692
PP
708 BT_LOGE("Expecting an absolute clock class, "
709 "but got a non-absolute one: "
710 "clock-class-addr=%p, clock-class-name=\"%s\"",
711 clock_class, cc_name);
282c8cd0
PP
712 goto error;
713 }
714 break;
715 case MUXER_NOTIF_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_NO_UUID:
716 if (bt_ctf_clock_class_is_absolute(clock_class)) {
fed72692
PP
717 BT_LOGE("Expecting a non-absolute clock class with no UUID, "
718 "but got an absolute one: "
719 "clock-class-addr=%p, clock-class-name=\"%s\"",
720 clock_class, cc_name);
282c8cd0
PP
721 goto error;
722 }
723
724 if (cc_uuid) {
fed72692
PP
725 BT_LOGE("Expecting a non-absolute clock class with no UUID, "
726 "but got one with a UUID: "
727 "clock-class-addr=%p, clock-class-name=\"%s\", "
728 "uuid=\"%02x%02x%02x%02x-%02x%02x-%02x%02x-%02x%02x-%02x%02x%02x%02x%02x%02x\"",
729 clock_class, cc_name,
730 (unsigned int) cc_uuid[0],
731 (unsigned int) cc_uuid[1],
732 (unsigned int) cc_uuid[2],
733 (unsigned int) cc_uuid[3],
734 (unsigned int) cc_uuid[4],
735 (unsigned int) cc_uuid[5],
736 (unsigned int) cc_uuid[6],
737 (unsigned int) cc_uuid[7],
738 (unsigned int) cc_uuid[8],
739 (unsigned int) cc_uuid[9],
740 (unsigned int) cc_uuid[10],
741 (unsigned int) cc_uuid[11],
742 (unsigned int) cc_uuid[12],
743 (unsigned int) cc_uuid[13],
744 (unsigned int) cc_uuid[14],
745 (unsigned int) cc_uuid[15]);
282c8cd0
PP
746 goto error;
747 }
748 break;
749 case MUXER_NOTIF_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_SPEC_UUID:
750 if (bt_ctf_clock_class_is_absolute(clock_class)) {
fed72692
PP
751 BT_LOGE("Expecting a non-absolute clock class with a specific UUID, "
752 "but got an absolute one: "
753 "clock-class-addr=%p, clock-class-name=\"%s\"",
754 clock_class, cc_name);
282c8cd0
PP
755 goto error;
756 }
757
758 if (!cc_uuid) {
fed72692
PP
759 BT_LOGE("Expecting a non-absolute clock class with a specific UUID, "
760 "but got one with no UUID: "
761 "clock-class-addr=%p, clock-class-name=\"%s\"",
762 clock_class, cc_name);
282c8cd0
PP
763 goto error;
764 }
765
766 if (memcmp(muxer_notif_iter->expected_clock_class_uuid,
767 cc_uuid, BABELTRACE_UUID_LEN) != 0) {
fed72692
PP
768 BT_LOGE("Expecting a non-absolute clock class with a specific UUID, "
769 "but got one with different UUID: "
770 "clock-class-addr=%p, clock-class-name=\"%s\", "
771 "expected-uuid=\"%02x%02x%02x%02x-%02x%02x-%02x%02x-%02x%02x-%02x%02x%02x%02x%02x%02x\", "
772 "uuid=\"%02x%02x%02x%02x-%02x%02x-%02x%02x-%02x%02x-%02x%02x%02x%02x%02x%02x\"",
773 clock_class, cc_name,
774 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[0],
775 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[1],
776 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[2],
777 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[3],
778 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[4],
779 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[5],
780 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[6],
781 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[7],
782 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[8],
783 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[9],
784 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[10],
785 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[11],
786 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[12],
787 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[13],
788 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[14],
789 (unsigned int) muxer_notif_iter->expected_clock_class_uuid[15],
790 (unsigned int) cc_uuid[0],
791 (unsigned int) cc_uuid[1],
792 (unsigned int) cc_uuid[2],
793 (unsigned int) cc_uuid[3],
794 (unsigned int) cc_uuid[4],
795 (unsigned int) cc_uuid[5],
796 (unsigned int) cc_uuid[6],
797 (unsigned int) cc_uuid[7],
798 (unsigned int) cc_uuid[8],
799 (unsigned int) cc_uuid[9],
800 (unsigned int) cc_uuid[10],
801 (unsigned int) cc_uuid[11],
802 (unsigned int) cc_uuid[12],
803 (unsigned int) cc_uuid[13],
804 (unsigned int) cc_uuid[14],
805 (unsigned int) cc_uuid[15]);
282c8cd0
PP
806 goto error;
807 }
808 break;
809 default:
810 /* Unexpected */
fed72692
PP
811 BT_LOGF("Unexpected clock class expectation: "
812 "expectation-code=%d",
813 muxer_notif_iter->clock_class_expectation);
282c8cd0
PP
814 abort();
815 }
958f7d11
PP
816 }
817
818 switch (bt_notification_get_type(notif)) {
819 case BT_NOTIFICATION_TYPE_EVENT:
820 event = bt_notification_event_get_event(notif);
ab11110e 821 assert(event);
958f7d11
PP
822 clock_value = bt_ctf_event_get_clock_value(event,
823 clock_class);
824 break;
825 case BT_NOTIFICATION_TYPE_INACTIVITY:
826 clock_value = bt_notification_inactivity_get_clock_value(
827 notif, clock_class);
828 break;
829 default:
fed72692
PP
830 BT_LOGF("Unexpected notification type at this point: "
831 "type=%d", bt_notification_get_type(notif));
0fbb9a9f 832 abort();
958f7d11
PP
833 }
834
835 if (!clock_value) {
fed72692
PP
836 BT_LOGE("Cannot get notification's clock value for clock class: "
837 "clock-class-addr=%p, clock-class-name=\"%s\"",
838 clock_class, cc_name);
958f7d11
PP
839 goto error;
840 }
841
842 ret = bt_ctf_clock_value_get_value_ns_from_epoch(clock_value, ts_ns);
843 if (ret) {
fed72692
PP
844 BT_LOGE("Cannot get nanoseconds from Epoch of clock value: "
845 "clock-value-addr=%p", clock_value);
958f7d11
PP
846 goto error;
847 }
848
849 goto end;
850
851error:
852 ret = -1;
853
854end:
fed72692
PP
855 if (ret == 0) {
856 BT_LOGV("Found notification's timestamp: "
857 "muxer-notif-iter-addr=%p, notif-addr=%p, "
858 "last-returned-ts=%" PRId64 ", ts=%" PRId64,
859 muxer_notif_iter, notif, last_returned_ts_ns,
860 *ts_ns);
861 }
862
958f7d11
PP
863 bt_put(cc_prio_map);
864 bt_put(event);
865 bt_put(clock_class);
866 bt_put(clock_value);
867 return ret;
868}
869
ab11110e
PP
870/*
871 * This function finds the youngest available notification amongst the
872 * non-ended upstream notification iterators and returns the upstream
873 * notification iterator which has it, or
874 * BT_NOTIFICATION_ITERATOR_STATUS_END if there's no available
875 * notification.
876 *
877 * This function does NOT:
878 *
879 * * Update any upstream notification iterator.
880 * * Check for newly connected ports.
881 * * Check the upstream notification iterators to retry.
882 *
883 * On sucess, this function sets *muxer_upstream_notif_iter to the
884 * upstream notification iterator of which the current notification is
885 * the youngest, and sets *ts_ns to its time.
886 */
958f7d11
PP
887static
888enum bt_notification_iterator_status
889muxer_notif_iter_youngest_upstream_notif_iter(
890 struct muxer_comp *muxer_comp,
891 struct muxer_notif_iter *muxer_notif_iter,
892 struct muxer_upstream_notif_iter **muxer_upstream_notif_iter,
893 int64_t *ts_ns)
894{
895 size_t i;
896 int ret;
897 int64_t youngest_ts_ns = INT64_MAX;
898 enum bt_notification_iterator_status status =
899 BT_NOTIFICATION_ITERATOR_STATUS_OK;
900
901 assert(muxer_comp);
902 assert(muxer_notif_iter);
903 assert(muxer_upstream_notif_iter);
904 *muxer_upstream_notif_iter = NULL;
905
906 for (i = 0; i < muxer_notif_iter->muxer_upstream_notif_iters->len; i++) {
907 struct bt_notification *notif;
908 struct muxer_upstream_notif_iter *cur_muxer_upstream_notif_iter =
909 g_ptr_array_index(muxer_notif_iter->muxer_upstream_notif_iters, i);
910 int64_t notif_ts_ns;
911
912 if (!cur_muxer_upstream_notif_iter->notif_iter) {
ab11110e 913 /* This upstream notification iterator is ended */
fed72692
PP
914 BT_LOGV("Skipping ended upstream notification iterator: "
915 "muxer-upstream-notif-iter-wrap-addr=%p",
916 cur_muxer_upstream_notif_iter);
958f7d11
PP
917 continue;
918 }
919
089717de 920 assert(cur_muxer_upstream_notif_iter->is_valid);
958f7d11
PP
921 notif = bt_notification_iterator_get_notification(
922 cur_muxer_upstream_notif_iter->notif_iter);
923 assert(notif);
282c8cd0 924 ret = get_notif_ts_ns(muxer_comp, muxer_notif_iter, notif,
958f7d11
PP
925 muxer_notif_iter->last_returned_ts_ns, &notif_ts_ns);
926 bt_put(notif);
927 if (ret) {
fed72692 928 /* get_notif_ts_ns() logs errors */
958f7d11
PP
929 *muxer_upstream_notif_iter = NULL;
930 status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
931 goto end;
932 }
933
934 if (notif_ts_ns <= youngest_ts_ns) {
935 *muxer_upstream_notif_iter =
936 cur_muxer_upstream_notif_iter;
937 youngest_ts_ns = notif_ts_ns;
938 *ts_ns = youngest_ts_ns;
939 }
940 }
941
942 if (!*muxer_upstream_notif_iter) {
943 status = BT_NOTIFICATION_ITERATOR_STATUS_END;
944 *ts_ns = INT64_MIN;
945 }
946
947end:
948 return status;
949}
950
951static
089717de
PP
952enum bt_notification_iterator_status validate_muxer_upstream_notif_iter(
953 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter)
958f7d11 954{
089717de
PP
955 enum bt_notification_iterator_status status =
956 BT_NOTIFICATION_ITERATOR_STATUS_OK;
958f7d11 957
fed72692
PP
958 BT_LOGV("Validating muxer's upstream notification iterator wrapper: "
959 "muxer-upstream-notif-iter-wrap-addr=%p",
960 muxer_upstream_notif_iter);
961
089717de
PP
962 if (muxer_upstream_notif_iter->is_valid ||
963 !muxer_upstream_notif_iter->notif_iter) {
ab11110e
PP
964 goto end;
965 }
966
fed72692 967 /* muxer_upstream_notif_iter_next() logs details/errors */
089717de
PP
968 status = muxer_upstream_notif_iter_next(muxer_upstream_notif_iter);
969
970end:
971 return status;
972}
973
974static
975enum bt_notification_iterator_status validate_muxer_upstream_notif_iters(
976 struct muxer_notif_iter *muxer_notif_iter)
977{
978 enum bt_notification_iterator_status status =
979 BT_NOTIFICATION_ITERATOR_STATUS_OK;
980 size_t i;
981
fed72692
PP
982 BT_LOGV("Validating muxer's upstream notification iterator wrappers: "
983 "muxer-notif-iter-addr=%p", muxer_notif_iter);
984
089717de
PP
985 for (i = 0; i < muxer_notif_iter->muxer_upstream_notif_iters->len; i++) {
986 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter =
987 g_ptr_array_index(
988 muxer_notif_iter->muxer_upstream_notif_iters,
989 i);
990
991 status = validate_muxer_upstream_notif_iter(
992 muxer_upstream_notif_iter);
993 if (status != BT_NOTIFICATION_ITERATOR_STATUS_OK) {
fed72692
PP
994 if (status < 0) {
995 BT_LOGE("Cannot validate muxer's upstream notification iterator wrapper: "
996 "muxer-notif-iter-addr=%p, "
997 "muxer-upstream-notif-iter-wrap-addr=%p",
998 muxer_notif_iter,
999 muxer_upstream_notif_iter);
1000 } else {
1001 BT_LOGV("Cannot validate muxer's upstream notification iterator wrapper: "
1002 "muxer-notif-iter-addr=%p, "
1003 "muxer-upstream-notif-iter-wrap-addr=%p",
1004 muxer_notif_iter,
1005 muxer_upstream_notif_iter);
1006 }
1007
089717de
PP
1008 goto end;
1009 }
744ba28b
PP
1010
1011 /*
1012 * Remove this muxer upstream notification iterator
1013 * if it's ended or canceled.
1014 */
1015 if (!muxer_upstream_notif_iter->notif_iter) {
1016 /*
1017 * Use g_ptr_array_remove_fast() because the
1018 * order of those elements is not important.
1019 */
fed72692
PP
1020 BT_LOGV("Removing muxer's upstream notification iterator wrapper: ended or canceled: "
1021 "muxer-notif-iter-addr=%p, "
1022 "muxer-upstream-notif-iter-wrap-addr=%p",
1023 muxer_notif_iter, muxer_upstream_notif_iter);
744ba28b
PP
1024 g_ptr_array_remove_index_fast(
1025 muxer_notif_iter->muxer_upstream_notif_iters,
1026 i);
1027 i--;
1028 }
089717de
PP
1029 }
1030
1031end:
1032 return status;
1033}
1034
1035static
1036struct bt_notification_iterator_next_return muxer_notif_iter_do_next(
1037 struct muxer_comp *muxer_comp,
1038 struct muxer_notif_iter *muxer_notif_iter)
1039{
1040 struct muxer_upstream_notif_iter *muxer_upstream_notif_iter = NULL;
1041 struct bt_notification_iterator_next_return next_return = {
1042 .notification = NULL,
1043 .status = BT_NOTIFICATION_ITERATOR_STATUS_OK,
1044 };
1045 int64_t next_return_ts;
1046
1047 while (true) {
1048 int ret = muxer_notif_iter_handle_newly_connected_ports(
1049 muxer_notif_iter);
1050
1051 if (ret) {
fed72692
PP
1052 BT_LOGE("Cannot handle newly connected input ports for muxer's notification iterator: "
1053 "muxer-comp-addr=%p, muxer-notif-iter-addr=%p, "
1054 "ret=%d",
1055 muxer_comp, muxer_notif_iter, ret);
089717de
PP
1056 next_return.status =
1057 BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
1058 goto end;
1059 }
1060
1061 next_return.status =
1062 validate_muxer_upstream_notif_iters(muxer_notif_iter);
1063 if (next_return.status != BT_NOTIFICATION_ITERATOR_STATUS_OK) {
fed72692
PP
1064 BT_LOGE("Cannot validate upstream notification iterator wrappers for muxer's notification iterator: "
1065 "muxer-comp-addr=%p, muxer-notif-iter-addr=%p, "
1066 "status=%s",
1067 muxer_comp, muxer_notif_iter,
1068 bt_notification_iterator_status_string(next_return.status));
089717de
PP
1069 goto end;
1070 }
ab11110e 1071
958f7d11 1072 /*
089717de
PP
1073 * At this point, we know that all the existing upstream
1074 * notification iterators are valid. However the
1075 * operations to validate them (during
1076 * validate_muxer_upstream_notif_iters()) may have
1077 * connected new ports. If no ports were connected
1078 * during this operation, exit the loop.
958f7d11 1079 */
089717de 1080 if (!muxer_notif_iter->newly_connected_priv_ports) {
fed72692
PP
1081 BT_LOGV("Not breaking this loop: muxer's notification iterator still has newly connected input ports to handle: "
1082 "muxer-comp-addr=%p", muxer_comp);
089717de
PP
1083 break;
1084 }
958f7d11
PP
1085 }
1086
089717de
PP
1087 assert(!muxer_notif_iter->newly_connected_priv_ports);
1088
958f7d11 1089 /*
089717de
PP
1090 * At this point we know that all the existing upstream
1091 * notification iterators are valid. We can find the one,
1092 * amongst those, of which the current notification is the
1093 * youngest.
958f7d11 1094 */
089717de 1095 next_return.status =
958f7d11
PP
1096 muxer_notif_iter_youngest_upstream_notif_iter(muxer_comp,
1097 muxer_notif_iter, &muxer_upstream_notif_iter,
089717de
PP
1098 &next_return_ts);
1099 if (next_return.status < 0 ||
beed0223
MD
1100 next_return.status == BT_NOTIFICATION_ITERATOR_STATUS_END ||
1101 next_return.status == BT_NOTIFICATION_ITERATOR_STATUS_CANCELED) {
fed72692
PP
1102 if (next_return.status < 0) {
1103 BT_LOGE("Cannot find the youngest upstream notification iterator wrapper: "
1104 "status=%s",
1105 bt_notification_iterator_status_string(next_return.status));
1106 } else {
1107 BT_LOGV("Cannot find the youngest upstream notification iterator wrapper: "
1108 "status=%s",
1109 bt_notification_iterator_status_string(next_return.status));
1110 }
1111
958f7d11
PP
1112 goto end;
1113 }
1114
089717de 1115 if (next_return_ts < muxer_notif_iter->last_returned_ts_ns) {
fed72692
PP
1116 BT_LOGE("Youngest upstream notification iterator wrapper's timestamp is less than muxer's notification iterator's last returned timestamp: "
1117 "muxer-notif-iter-addr=%p, ts=%" PRId64 ", "
1118 "last-returned-ts=%" PRId64,
1119 muxer_notif_iter, next_return_ts,
1120 muxer_notif_iter->last_returned_ts_ns);
089717de 1121 next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
958f7d11
PP
1122 goto end;
1123 }
1124
fed72692
PP
1125 BT_LOGV("Found youngest upstream notification iterator wrapper: "
1126 "muxer-notif-iter-addr=%p, "
1127 "muxer-upstream-notif-iter-wrap-addr=%p, "
1128 "ts=%" PRId64,
1129 muxer_notif_iter, muxer_upstream_notif_iter, next_return_ts);
089717de
PP
1130 assert(next_return.status == BT_NOTIFICATION_ITERATOR_STATUS_OK);
1131 assert(muxer_upstream_notif_iter);
1132 next_return.notification = bt_notification_iterator_get_notification(
1133 muxer_upstream_notif_iter->notif_iter);
1134 assert(next_return.notification);
958f7d11
PP
1135
1136 /*
089717de
PP
1137 * We invalidate the upstream notification iterator so that, the
1138 * next time this function is called,
1139 * validate_muxer_upstream_notif_iters() will make it valid.
958f7d11 1140 */
089717de
PP
1141 muxer_upstream_notif_iter->is_valid = false;
1142 muxer_notif_iter->last_returned_ts_ns = next_return_ts;
958f7d11
PP
1143
1144end:
089717de 1145 return next_return;
958f7d11
PP
1146}
1147
1148static
ab11110e
PP
1149void destroy_muxer_notif_iter(struct muxer_notif_iter *muxer_notif_iter)
1150{
ab11110e
PP
1151 if (!muxer_notif_iter) {
1152 return;
1153 }
1154
fed72692
PP
1155 BT_LOGD("Destroying muxer component's notification iterator: "
1156 "muxer-notif-iter-addr=%p", muxer_notif_iter);
1157
ab11110e 1158 if (muxer_notif_iter->muxer_upstream_notif_iters) {
fed72692 1159 BT_LOGD_STR("Destroying muxer's upstream notification iterator wrappers.");
ab11110e
PP
1160 g_ptr_array_free(
1161 muxer_notif_iter->muxer_upstream_notif_iters, TRUE);
1162 }
1163
ab11110e
PP
1164 g_list_free(muxer_notif_iter->newly_connected_priv_ports);
1165 g_free(muxer_notif_iter);
1166}
1167
1168static
1169int muxer_notif_iter_init_newly_connected_ports(struct muxer_comp *muxer_comp,
958f7d11
PP
1170 struct muxer_notif_iter *muxer_notif_iter)
1171{
ab11110e 1172 struct bt_component *comp;
544d0515
PP
1173 int64_t count;
1174 int64_t i;
ab11110e 1175 int ret = 0;
958f7d11 1176
ab11110e
PP
1177 /*
1178 * Add the connected input ports to this muxer notification
1179 * iterator's list of newly connected ports. They will be
1180 * handled by muxer_notif_iter_handle_newly_connected_ports().
1181 */
958f7d11
PP
1182 comp = bt_component_from_private_component(muxer_comp->priv_comp);
1183 assert(comp);
544d0515
PP
1184 count = bt_component_filter_get_input_port_count(comp);
1185 if (count < 0) {
fed72692
PP
1186 BT_LOGD("No input port to initialize for muxer component's notification iterator: "
1187 "muxer-comp-addr=%p, muxer-notif-iter-addr=%p",
1188 muxer_comp, muxer_notif_iter);
ab11110e
PP
1189 goto end;
1190 }
958f7d11
PP
1191
1192 for (i = 0; i < count; i++) {
1193 struct bt_private_port *priv_port =
9ac68eb1 1194 bt_private_component_filter_get_input_private_port_by_index(
958f7d11
PP
1195 muxer_comp->priv_comp, i);
1196 struct bt_port *port;
958f7d11
PP
1197
1198 assert(priv_port);
958f7d11 1199 port = bt_port_from_private_port(priv_port);
ab11110e 1200 assert(port);
958f7d11
PP
1201
1202 if (!bt_port_is_connected(port)) {
fed72692
PP
1203 BT_LOGD("Skipping input port: not connected: "
1204 "muxer-comp-addr=%p, port-addr=%p, port-name\"%s\"",
1205 muxer_comp, port, bt_port_get_name(port));
958f7d11 1206 bt_put(priv_port);
ab11110e 1207 bt_put(port);
958f7d11
PP
1208 continue;
1209 }
1210
1211 bt_put(port);
06a2cb0d 1212 bt_put(priv_port);
ab11110e
PP
1213 muxer_notif_iter->newly_connected_priv_ports =
1214 g_list_append(
1215 muxer_notif_iter->newly_connected_priv_ports,
958f7d11 1216 priv_port);
ab11110e 1217 if (!muxer_notif_iter->newly_connected_priv_ports) {
fed72692
PP
1218 BT_LOGE("Cannot append port to muxer's notification iterator list of newly connected input ports: "
1219 "port-addr=%p, port-name=\"%s\", "
1220 "muxer-notif-iter-addr=%p", port,
1221 bt_port_get_name(port), muxer_notif_iter);
ab11110e
PP
1222 ret = -1;
1223 goto end;
958f7d11 1224 }
fed72692
PP
1225
1226 BT_LOGD("Appended port to muxer's notification iterator list of newly connected input ports: "
1227 "port-addr=%p, port-name=\"%s\", "
1228 "muxer-notif-iter-addr=%p", port,
1229 bt_port_get_name(port), muxer_notif_iter);
958f7d11
PP
1230 }
1231
1232end:
1233 bt_put(comp);
1234 return ret;
1235}
1236
958f7d11
PP
1237BT_HIDDEN
1238enum bt_notification_iterator_status muxer_notif_iter_init(
1239 struct bt_private_notification_iterator *priv_notif_iter,
1240 struct bt_private_port *output_priv_port)
1241{
1242 struct muxer_comp *muxer_comp = NULL;
1243 struct muxer_notif_iter *muxer_notif_iter = NULL;
1244 struct bt_private_component *priv_comp = NULL;
1245 enum bt_notification_iterator_status status =
1246 BT_NOTIFICATION_ITERATOR_STATUS_OK;
1247 int ret;
1248
1249 priv_comp = bt_private_notification_iterator_get_private_component(
1250 priv_notif_iter);
1251 assert(priv_comp);
1252 muxer_comp = bt_private_component_get_user_data(priv_comp);
1253 assert(muxer_comp);
fed72692
PP
1254 BT_LOGD("Initializing muxer component's notification iterator: "
1255 "comp-addr=%p, muxer-comp-addr=%p, notif-iter-addr=%p",
1256 priv_comp, muxer_comp, priv_notif_iter);
a09c6b95
PP
1257
1258 if (muxer_comp->initializing_muxer_notif_iter) {
1259 /*
089717de
PP
1260 * Weird, unhandled situation detected: downstream
1261 * creates a muxer notification iterator while creating
1262 * another muxer notification iterator (same component).
a09c6b95 1263 */
fed72692
PP
1264 BT_LOGE("Recursive initialization of muxer component's notification iterator: "
1265 "comp-addr=%p, muxer-comp-addr=%p, notif-iter-addr=%p",
1266 priv_comp, muxer_comp, priv_notif_iter);
958f7d11
PP
1267 goto error;
1268 }
1269
a09c6b95
PP
1270 muxer_comp->initializing_muxer_notif_iter = true;
1271 muxer_notif_iter = g_new0(struct muxer_notif_iter, 1);
1272 if (!muxer_notif_iter) {
fed72692 1273 BT_LOGE_STR("Failed to allocate one muxer component's notification iterator.");
ab11110e
PP
1274 goto error;
1275 }
1276
958f7d11
PP
1277 muxer_notif_iter->last_returned_ts_ns = INT64_MIN;
1278 muxer_notif_iter->muxer_upstream_notif_iters =
1279 g_ptr_array_new_with_free_func(
1280 (GDestroyNotify) destroy_muxer_upstream_notif_iter);
1281 if (!muxer_notif_iter->muxer_upstream_notif_iters) {
fed72692 1282 BT_LOGE_STR("Failed to allocate a GPtrArray.");
958f7d11
PP
1283 goto error;
1284 }
1285
a09c6b95
PP
1286 /*
1287 * Add the muxer notification iterator to the component's array
1288 * of muxer notification iterators here because
1289 * muxer_notif_iter_init_newly_connected_ports() can cause
1290 * muxer_port_connected() to be called, which adds the newly
1291 * connected port to each muxer notification iterator's list of
1292 * newly connected ports.
1293 */
1294 g_ptr_array_add(muxer_comp->muxer_notif_iters, muxer_notif_iter);
1295 ret = muxer_notif_iter_init_newly_connected_ports(muxer_comp,
1296 muxer_notif_iter);
1297 if (ret) {
fed72692
PP
1298 BT_LOGE("Cannot initialize newly connected input ports for muxer component's notification iterator: "
1299 "comp-addr=%p, muxer-comp-addr=%p, "
1300 "muxer-notif-iter-addr=%p, notif-iter-addr=%p, ret=%d",
1301 priv_comp, muxer_comp, muxer_notif_iter,
1302 priv_notif_iter, ret);
a09c6b95
PP
1303 goto error;
1304 }
1305
958f7d11
PP
1306 ret = bt_private_notification_iterator_set_user_data(priv_notif_iter,
1307 muxer_notif_iter);
1308 assert(ret == 0);
fed72692
PP
1309 BT_LOGD("Initialized muxer component's notification iterator: "
1310 "comp-addr=%p, muxer-comp-addr=%p, muxer-notif-iter-addr=%p, "
1311 "notif-iter-addr=%p",
1312 priv_comp, muxer_comp, muxer_notif_iter, priv_notif_iter);
958f7d11
PP
1313 goto end;
1314
1315error:
a09c6b95
PP
1316 if (g_ptr_array_index(muxer_comp->muxer_notif_iters,
1317 muxer_comp->muxer_notif_iters->len - 1) == muxer_notif_iter) {
1318 g_ptr_array_remove_index(muxer_comp->muxer_notif_iters,
1319 muxer_comp->muxer_notif_iters->len - 1);
1320 }
1321
958f7d11
PP
1322 destroy_muxer_notif_iter(muxer_notif_iter);
1323 ret = bt_private_notification_iterator_set_user_data(priv_notif_iter,
1324 NULL);
1325 assert(ret == 0);
1326 status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
1327
1328end:
a09c6b95 1329 muxer_comp->initializing_muxer_notif_iter = false;
958f7d11
PP
1330 bt_put(priv_comp);
1331 return status;
1332}
1333
1334BT_HIDDEN
1335void muxer_notif_iter_finalize(
1336 struct bt_private_notification_iterator *priv_notif_iter)
1337{
1338 struct muxer_notif_iter *muxer_notif_iter =
1339 bt_private_notification_iterator_get_user_data(priv_notif_iter);
1340 struct bt_private_component *priv_comp = NULL;
1341 struct muxer_comp *muxer_comp = NULL;
1342
1343 priv_comp = bt_private_notification_iterator_get_private_component(
1344 priv_notif_iter);
1345 assert(priv_comp);
1346 muxer_comp = bt_private_component_get_user_data(priv_comp);
fed72692
PP
1347 BT_LOGD("Finalizing muxer component's notification iterator: "
1348 "comp-addr=%p, muxer-comp-addr=%p, muxer-notif-iter-addr=%p, "
1349 "notif-iter-addr=%p",
1350 priv_comp, muxer_comp, muxer_notif_iter, priv_notif_iter);
958f7d11
PP
1351
1352 if (muxer_comp) {
1353 (void) g_ptr_array_remove_fast(muxer_comp->muxer_notif_iters,
1354 muxer_notif_iter);
1355 destroy_muxer_notif_iter(muxer_notif_iter);
1356 }
1357
1358 bt_put(priv_comp);
1359}
1360
1361BT_HIDDEN
1362struct bt_notification_iterator_next_return muxer_notif_iter_next(
1363 struct bt_private_notification_iterator *priv_notif_iter)
1364{
089717de 1365 struct bt_notification_iterator_next_return next_ret;
958f7d11
PP
1366 struct muxer_notif_iter *muxer_notif_iter =
1367 bt_private_notification_iterator_get_user_data(priv_notif_iter);
1368 struct bt_private_component *priv_comp = NULL;
1369 struct muxer_comp *muxer_comp = NULL;
958f7d11
PP
1370
1371 assert(muxer_notif_iter);
1372 priv_comp = bt_private_notification_iterator_get_private_component(
1373 priv_notif_iter);
1374 assert(priv_comp);
1375 muxer_comp = bt_private_component_get_user_data(priv_comp);
1376 assert(muxer_comp);
1377
fed72692
PP
1378 BT_LOGV("Muxer component's notification iterator's \"next\" method called: "
1379 "comp-addr=%p, muxer-comp-addr=%p, muxer-notif-iter-addr=%p, "
1380 "notif-iter-addr=%p",
1381 priv_comp, muxer_comp, muxer_notif_iter, priv_notif_iter);
1382
958f7d11
PP
1383 /* Are we in an error state set elsewhere? */
1384 if (unlikely(muxer_comp->error)) {
fed72692
PP
1385 BT_LOGE("Muxer component is already in an error state: returning BT_NOTIFICATION_ITERATOR_STATUS_ERROR: "
1386 "comp-addr=%p, muxer-comp-addr=%p, muxer-notif-iter-addr=%p, "
1387 "notif-iter-addr=%p",
1388 priv_comp, muxer_comp, muxer_notif_iter, priv_notif_iter);
089717de
PP
1389 next_ret.notification = NULL;
1390 next_ret.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
1391 goto end;
958f7d11
PP
1392 }
1393
089717de 1394 next_ret = muxer_notif_iter_do_next(muxer_comp, muxer_notif_iter);
fed72692
PP
1395 if (next_ret.status < 0) {
1396 BT_LOGE("Cannot get next notification: "
1397 "comp-addr=%p, muxer-comp-addr=%p, muxer-notif-iter-addr=%p, "
1398 "notif-iter-addr=%p, status=%s",
1399 priv_comp, muxer_comp, muxer_notif_iter, priv_notif_iter,
1400 bt_notification_iterator_status_string(next_ret.status));
1401 } else {
1402 BT_LOGV("Returning from muxer component's notification iterator's \"next\" method: "
1403 "status=%s, notif-addr=%p",
1404 bt_notification_iterator_status_string(next_ret.status),
1405 next_ret.notification);
1406 }
958f7d11
PP
1407
1408end:
1409 bt_put(priv_comp);
1410 return next_ret;
1411}
1412
1413BT_HIDDEN
1414void muxer_port_connected(
1415 struct bt_private_component *priv_comp,
1416 struct bt_private_port *self_private_port,
1417 struct bt_port *other_port)
1418{
1419 struct bt_port *self_port =
1420 bt_port_from_private_port(self_private_port);
1421 struct muxer_comp *muxer_comp =
1422 bt_private_component_get_user_data(priv_comp);
1423 size_t i;
06a2cb0d 1424 int ret;
958f7d11
PP
1425
1426 assert(self_port);
1427 assert(muxer_comp);
fed72692
PP
1428 BT_LOGD("Port connected: "
1429 "comp-addr=%p, muxer-comp-addr=%p, "
1430 "port-addr=%p, port-name=\"%s\", "
1431 "other-port-addr=%p, other-port-name=\"%s\"",
1432 priv_comp, muxer_comp, self_port, bt_port_get_name(self_port),
1433 other_port, bt_port_get_name(other_port));
958f7d11 1434
06a2cb0d
PP
1435 if (bt_port_get_type(self_port) == BT_PORT_TYPE_OUTPUT) {
1436 goto end;
958f7d11
PP
1437 }
1438
1439 for (i = 0; i < muxer_comp->muxer_notif_iters->len; i++) {
1440 struct muxer_notif_iter *muxer_notif_iter =
1441 g_ptr_array_index(muxer_comp->muxer_notif_iters, i);
1442
1443 /*
ab11110e
PP
1444 * Add this port to the list of newly connected ports
1445 * for this muxer notification iterator. We append at
1446 * the end of this list while
1447 * muxer_notif_iter_handle_newly_connected_ports()
1448 * removes the nodes from the beginning.
958f7d11 1449 */
ab11110e
PP
1450 muxer_notif_iter->newly_connected_priv_ports =
1451 g_list_append(
1452 muxer_notif_iter->newly_connected_priv_ports,
06a2cb0d 1453 self_private_port);
ab11110e 1454 if (!muxer_notif_iter->newly_connected_priv_ports) {
fed72692
PP
1455 BT_LOGE("Cannot append port to muxer's notification iterator list of newly connected input ports: "
1456 "port-addr=%p, port-name=\"%s\", "
1457 "muxer-notif-iter-addr=%p", self_port,
1458 bt_port_get_name(self_port), muxer_notif_iter);
958f7d11
PP
1459 muxer_comp->error = true;
1460 goto end;
1461 }
fed72692
PP
1462
1463 BT_LOGD("Appended port to muxer's notification iterator list of newly connected input ports: "
1464 "port-addr=%p, port-name=\"%s\", "
1465 "muxer-notif-iter-addr=%p", self_port,
1466 bt_port_get_name(self_port), muxer_notif_iter);
958f7d11
PP
1467 }
1468
06a2cb0d
PP
1469 /* One less available input port */
1470 muxer_comp->available_input_ports--;
1471 ret = ensure_available_input_port(priv_comp);
1472 if (ret) {
1473 /*
1474 * Only way to report an error later since this
1475 * method does not return anything.
1476 */
fed72692
PP
1477 BT_LOGE("Cannot ensure that at least one muxer component's input port is available: "
1478 "muxer-comp-addr=%p, status=%s",
1479 muxer_comp, bt_component_status_string(ret));
06a2cb0d
PP
1480 muxer_comp->error = true;
1481 goto end;
1482 }
1483
958f7d11
PP
1484end:
1485 bt_put(self_port);
1486}
1487
1488BT_HIDDEN
1489void muxer_port_disconnected(struct bt_private_component *priv_comp,
1490 struct bt_private_port *priv_port)
1491{
1492 struct bt_port *port = bt_port_from_private_port(priv_port);
1493 struct muxer_comp *muxer_comp =
1494 bt_private_component_get_user_data(priv_comp);
1495
1496 assert(port);
1497 assert(muxer_comp);
fed72692
PP
1498 BT_LOGD("Port disconnected: "
1499 "comp-addr=%p, muxer-comp-addr=%p, port-addr=%p, "
1500 "port-name=\"%s\"", priv_comp, muxer_comp,
1501 port, bt_port_get_name(port));
958f7d11 1502
ab11110e
PP
1503 /*
1504 * There's nothing special to do when a port is disconnected
1505 * because this component deals with upstream notification
1506 * iterators which were already created thanks to connected
1507 * ports. The fact that the port is disconnected does not cancel
1508 * the upstream notification iterators created using its
089717de
PP
1509 * connection: they still exist, even if the connection is dead.
1510 * The only way to remove an upstream notification iterator is
1511 * for its "next" operation to return
1512 * BT_NOTIFICATION_ITERATOR_STATUS_END.
ab11110e 1513 */
958f7d11
PP
1514 if (bt_port_get_type(port) == BT_PORT_TYPE_INPUT) {
1515 /* One more available input port */
1516 muxer_comp->available_input_ports++;
fed72692
PP
1517 BT_LOGD("Leaving disconnected input port available for future connections: "
1518 "comp-addr=%p, muxer-comp-addr=%p, port-addr=%p, "
1519 "port-name=\"%s\", avail-input-port-count=%zu",
1520 priv_comp, muxer_comp, port, bt_port_get_name(port),
1521 muxer_comp->available_input_ports);
958f7d11
PP
1522 }
1523
1524 bt_put(port);
1525}
This page took 0.101416 seconds and 4 git commands to generate.