Move to kernel style SPDX license identifiers
[babeltrace.git] / src / plugins / utils / muxer / muxer.c
1 /*
2 * SPDX-License-Identifier: MIT
3 *
4 * Copyright 2017 Philippe Proulx <pproulx@efficios.com>
5 */
6
7 #define BT_COMP_LOG_SELF_COMP (muxer_comp->self_comp)
8 #define BT_LOG_OUTPUT_LEVEL (muxer_comp->log_level)
9 #define BT_LOG_TAG "PLUGIN/FLT.UTILS.MUXER"
10 #include "logging/comp-logging.h"
11
12 #include "common/macros.h"
13 #include "common/uuid.h"
14 #include <babeltrace2/babeltrace.h>
15 #include <glib.h>
16 #include <stdbool.h>
17 #include <inttypes.h>
18 #include "common/assert.h"
19 #include "common/common.h"
20 #include <stdlib.h>
21 #include <string.h>
22
23 #include "plugins/common/muxing/muxing.h"
24 #include "plugins/common/param-validation/param-validation.h"
25
26 #include "muxer.h"
27
28 struct muxer_comp {
29 /* Weak refs */
30 bt_self_component_filter *self_comp_flt;
31 bt_self_component *self_comp;
32
33 unsigned int next_port_num;
34 size_t available_input_ports;
35 bool initializing_muxer_msg_iter;
36 bt_logging_level log_level;
37 };
38
39 struct muxer_upstream_msg_iter {
40 struct muxer_comp *muxer_comp;
41
42 /* Owned by this, NULL if ended */
43 bt_message_iterator *msg_iter;
44
45 /* Contains `const bt_message *`, owned by this */
46 GQueue *msgs;
47 };
48
49 enum muxer_msg_iter_clock_class_expectation {
50 MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ANY = 0,
51 MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NONE,
52 MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ABSOLUTE,
53 MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_SPEC_UUID,
54 MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_NO_UUID,
55 };
56
57 struct muxer_msg_iter {
58 struct muxer_comp *muxer_comp;
59
60 /* Weak */
61 bt_self_message_iterator *self_msg_iter;
62
63 /*
64 * Array of struct muxer_upstream_msg_iter * (owned by this).
65 *
66 * NOTE: This array is searched in linearly to find the youngest
67 * current message. Keep this until benchmarks confirm that
68 * another data structure is faster than this for our typical
69 * use cases.
70 */
71 GPtrArray *active_muxer_upstream_msg_iters;
72
73 /*
74 * Array of struct muxer_upstream_msg_iter * (owned by this).
75 *
76 * We move ended message iterators from
77 * `active_muxer_upstream_msg_iters` to this array so as to be
78 * able to restore them when seeking.
79 */
80 GPtrArray *ended_muxer_upstream_msg_iters;
81
82 /* Last time returned in a message */
83 int64_t last_returned_ts_ns;
84
85 /* Clock class expectation state */
86 enum muxer_msg_iter_clock_class_expectation clock_class_expectation;
87
88 /*
89 * Expected clock class UUID, only valid when
90 * clock_class_expectation is
91 * MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_SPEC_UUID.
92 */
93 bt_uuid_t expected_clock_class_uuid;
94
95 /*
96 * Saved error. If we hit an error in the _next method, but have some
97 * messages ready to return, we save the error here and return it on
98 * the next _next call.
99 */
100 bt_message_iterator_class_next_method_status next_saved_status;
101 const struct bt_error *next_saved_error;
102 };
103
104 static
105 void empty_message_queue(struct muxer_upstream_msg_iter *upstream_msg_iter)
106 {
107 const bt_message *msg;
108
109 while ((msg = g_queue_pop_head(upstream_msg_iter->msgs))) {
110 bt_message_put_ref(msg);
111 }
112 }
113
114 static
115 void destroy_muxer_upstream_msg_iter(
116 struct muxer_upstream_msg_iter *muxer_upstream_msg_iter)
117 {
118 struct muxer_comp *muxer_comp;
119
120 if (!muxer_upstream_msg_iter) {
121 return;
122 }
123
124 muxer_comp = muxer_upstream_msg_iter->muxer_comp;
125 BT_COMP_LOGD("Destroying muxer's upstream message iterator wrapper: "
126 "addr=%p, msg-iter-addr=%p, queue-len=%u",
127 muxer_upstream_msg_iter,
128 muxer_upstream_msg_iter->msg_iter,
129 muxer_upstream_msg_iter->msgs->length);
130 bt_message_iterator_put_ref(
131 muxer_upstream_msg_iter->msg_iter);
132
133 if (muxer_upstream_msg_iter->msgs) {
134 empty_message_queue(muxer_upstream_msg_iter);
135 g_queue_free(muxer_upstream_msg_iter->msgs);
136 }
137
138 g_free(muxer_upstream_msg_iter);
139 }
140
141 static
142 int muxer_msg_iter_add_upstream_msg_iter(struct muxer_msg_iter *muxer_msg_iter,
143 bt_message_iterator *self_msg_iter)
144 {
145 int ret = 0;
146 struct muxer_upstream_msg_iter *muxer_upstream_msg_iter =
147 g_new0(struct muxer_upstream_msg_iter, 1);
148 struct muxer_comp *muxer_comp = muxer_msg_iter->muxer_comp;
149
150 if (!muxer_upstream_msg_iter) {
151 BT_COMP_LOGE_STR("Failed to allocate one muxer's upstream message iterator wrapper.");
152 goto error;
153 }
154
155 muxer_upstream_msg_iter->muxer_comp = muxer_comp;
156 muxer_upstream_msg_iter->msg_iter = self_msg_iter;
157 bt_message_iterator_get_ref(muxer_upstream_msg_iter->msg_iter);
158 muxer_upstream_msg_iter->msgs = g_queue_new();
159 if (!muxer_upstream_msg_iter->msgs) {
160 BT_COMP_LOGE_STR("Failed to allocate a GQueue.");
161 goto error;
162 }
163
164 g_ptr_array_add(muxer_msg_iter->active_muxer_upstream_msg_iters,
165 muxer_upstream_msg_iter);
166 BT_COMP_LOGD("Added muxer's upstream message iterator wrapper: "
167 "addr=%p, muxer-msg-iter-addr=%p, msg-iter-addr=%p",
168 muxer_upstream_msg_iter, muxer_msg_iter,
169 self_msg_iter);
170
171 goto end;
172
173 error:
174 g_free(muxer_upstream_msg_iter);
175 ret = -1;
176
177 end:
178 return ret;
179 }
180
181 static
182 bt_self_component_add_port_status add_available_input_port(
183 bt_self_component_filter *self_comp)
184 {
185 struct muxer_comp *muxer_comp = bt_self_component_get_data(
186 bt_self_component_filter_as_self_component(self_comp));
187 bt_self_component_add_port_status status =
188 BT_SELF_COMPONENT_ADD_PORT_STATUS_OK;
189 GString *port_name = NULL;
190
191 BT_ASSERT(muxer_comp);
192 port_name = g_string_new("in");
193 if (!port_name) {
194 BT_COMP_LOGE_STR("Failed to allocate a GString.");
195 status = BT_SELF_COMPONENT_ADD_PORT_STATUS_MEMORY_ERROR;
196 goto end;
197 }
198
199 g_string_append_printf(port_name, "%u", muxer_comp->next_port_num);
200 status = bt_self_component_filter_add_input_port(
201 self_comp, port_name->str, NULL, NULL);
202 if (status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) {
203 BT_COMP_LOGE("Cannot add input port to muxer component: "
204 "port-name=\"%s\", comp-addr=%p, status=%s",
205 port_name->str, self_comp,
206 bt_common_func_status_string(status));
207 goto end;
208 }
209
210 muxer_comp->available_input_ports++;
211 muxer_comp->next_port_num++;
212 BT_COMP_LOGI("Added one input port to muxer component: "
213 "port-name=\"%s\", comp-addr=%p",
214 port_name->str, self_comp);
215
216 end:
217 if (port_name) {
218 g_string_free(port_name, TRUE);
219 }
220
221 return status;
222 }
223
224 static
225 bt_self_component_add_port_status create_output_port(
226 bt_self_component_filter *self_comp)
227 {
228 return bt_self_component_filter_add_output_port(
229 self_comp, "out", NULL, NULL);
230 }
231
232 static
233 void destroy_muxer_comp(struct muxer_comp *muxer_comp)
234 {
235 if (!muxer_comp) {
236 return;
237 }
238
239 g_free(muxer_comp);
240 }
241
242 static
243 struct bt_param_validation_map_value_entry_descr muxer_params[] = {
244 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END
245 };
246
247 BT_HIDDEN
248 bt_component_class_initialize_method_status muxer_init(
249 bt_self_component_filter *self_comp_flt,
250 bt_self_component_filter_configuration *config,
251 const bt_value *params, void *init_data)
252 {
253 bt_component_class_initialize_method_status status;
254 bt_self_component_add_port_status add_port_status;
255 bt_self_component *self_comp =
256 bt_self_component_filter_as_self_component(self_comp_flt);
257 struct muxer_comp *muxer_comp = g_new0(struct muxer_comp, 1);
258 bt_logging_level log_level = bt_component_get_logging_level(
259 bt_self_component_as_component(self_comp));
260 enum bt_param_validation_status validation_status;
261 gchar *validate_error = NULL;
262
263 BT_COMP_LOG_CUR_LVL(BT_LOG_INFO, log_level, self_comp,
264 "Initializing muxer component: "
265 "comp-addr=%p, params-addr=%p", self_comp, params);
266
267 if (!muxer_comp) {
268 BT_COMP_LOG_CUR_LVL(BT_LOG_ERROR, log_level, self_comp,
269 "Failed to allocate one muxer component.");
270 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
271 goto error;
272 }
273
274 muxer_comp->log_level = log_level;
275 muxer_comp->self_comp = self_comp;
276 muxer_comp->self_comp_flt = self_comp_flt;
277
278 validation_status = bt_param_validation_validate(params,
279 muxer_params, &validate_error);
280 if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
281 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
282 goto error;
283 } else if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) {
284 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
285 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "%s", validate_error);
286 goto error;
287 }
288
289 bt_self_component_set_data(self_comp, muxer_comp);
290 add_port_status = add_available_input_port(self_comp_flt);
291 if (add_port_status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) {
292 BT_COMP_LOGE("Cannot ensure that at least one muxer component's input port is available: "
293 "muxer-comp-addr=%p, status=%s",
294 muxer_comp,
295 bt_common_func_status_string(add_port_status));
296 status = (int) add_port_status;
297 goto error;
298 }
299
300 add_port_status = create_output_port(self_comp_flt);
301 if (add_port_status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) {
302 BT_COMP_LOGE("Cannot create muxer component's output port: "
303 "muxer-comp-addr=%p, status=%s",
304 muxer_comp,
305 bt_common_func_status_string(add_port_status));
306 status = (int) add_port_status;
307 goto error;
308 }
309
310 BT_COMP_LOGI("Initialized muxer component: "
311 "comp-addr=%p, params-addr=%p, muxer-comp-addr=%p",
312 self_comp, params, muxer_comp);
313
314 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
315 goto end;
316
317 error:
318 destroy_muxer_comp(muxer_comp);
319 bt_self_component_set_data(self_comp, NULL);
320
321 end:
322 g_free(validate_error);
323 return status;
324 }
325
326 BT_HIDDEN
327 void muxer_finalize(bt_self_component_filter *self_comp)
328 {
329 struct muxer_comp *muxer_comp = bt_self_component_get_data(
330 bt_self_component_filter_as_self_component(self_comp));
331
332 BT_COMP_LOGI("Finalizing muxer component: comp-addr=%p",
333 self_comp);
334 destroy_muxer_comp(muxer_comp);
335 }
336
337 static
338 bt_message_iterator_create_from_message_iterator_status
339 create_msg_iter_on_input_port(struct muxer_comp *muxer_comp,
340 struct muxer_msg_iter *muxer_msg_iter,
341 bt_self_component_port_input *self_port,
342 bt_message_iterator **msg_iter)
343 {
344 const bt_port *port = bt_self_component_port_as_port(
345 bt_self_component_port_input_as_self_component_port(
346 self_port));
347 bt_message_iterator_create_from_message_iterator_status
348 status;
349
350 BT_ASSERT(port);
351 BT_ASSERT(bt_port_is_connected(port));
352
353 // TODO: Advance the iterator to >= the time of the latest
354 // returned message by the muxer message
355 // iterator which creates it.
356 status = bt_message_iterator_create_from_message_iterator(
357 muxer_msg_iter->self_msg_iter, self_port, msg_iter);
358 if (status != BT_MESSAGE_ITERATOR_CREATE_FROM_MESSAGE_ITERATOR_STATUS_OK) {
359 BT_COMP_LOGE("Cannot create upstream message iterator on input port: "
360 "port-addr=%p, port-name=\"%s\"",
361 port, bt_port_get_name(port));
362 goto end;
363 }
364
365 BT_COMP_LOGI("Created upstream message iterator on input port: "
366 "port-addr=%p, port-name=\"%s\", msg-iter-addr=%p",
367 port, bt_port_get_name(port), msg_iter);
368
369 end:
370 return status;
371 }
372
373 static
374 bt_message_iterator_class_next_method_status muxer_upstream_msg_iter_next(
375 struct muxer_upstream_msg_iter *muxer_upstream_msg_iter,
376 bool *is_ended)
377 {
378 struct muxer_comp *muxer_comp = muxer_upstream_msg_iter->muxer_comp;
379 bt_message_iterator_class_next_method_status status;
380 bt_message_iterator_next_status input_port_iter_status;
381 bt_message_array_const msgs;
382 uint64_t i;
383 uint64_t count;
384
385 BT_COMP_LOGD("Calling upstream message iterator's \"next\" method: "
386 "muxer-upstream-msg-iter-wrap-addr=%p, msg-iter-addr=%p",
387 muxer_upstream_msg_iter,
388 muxer_upstream_msg_iter->msg_iter);
389 input_port_iter_status = bt_message_iterator_next(
390 muxer_upstream_msg_iter->msg_iter, &msgs, &count);
391 BT_COMP_LOGD("Upstream message iterator's \"next\" method returned: "
392 "status=%s",
393 bt_common_func_status_string(input_port_iter_status));
394
395 switch (input_port_iter_status) {
396 case BT_MESSAGE_ITERATOR_NEXT_STATUS_OK:
397 /*
398 * Message iterator's current message is
399 * valid: it must be considered for muxing operations.
400 */
401 BT_COMP_LOGD_STR("Validated upstream message iterator wrapper.");
402 BT_ASSERT_DBG(count > 0);
403
404 /* Move messages to our queue */
405 for (i = 0; i < count; i++) {
406 /*
407 * Push to tail in order; other side
408 * (muxer_msg_iter_do_next_one()) consumes
409 * from the head first.
410 */
411 g_queue_push_tail(muxer_upstream_msg_iter->msgs,
412 (void *) msgs[i]);
413 }
414 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK;
415 break;
416 case BT_MESSAGE_ITERATOR_NEXT_STATUS_AGAIN:
417 /*
418 * Message iterator's current message is not
419 * valid anymore. Return
420 * BT_MESSAGE_ITERATOR_NEXT_STATUS_AGAIN immediately.
421 */
422 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_AGAIN;
423 break;
424 case BT_MESSAGE_ITERATOR_NEXT_STATUS_END: /* Fall-through. */
425 /*
426 * Message iterator reached the end: release it. It
427 * won't be considered again to find the youngest
428 * message.
429 */
430 *is_ended = true;
431 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK;
432 break;
433 case BT_MESSAGE_ITERATOR_NEXT_STATUS_ERROR:
434 case BT_MESSAGE_ITERATOR_NEXT_STATUS_MEMORY_ERROR:
435 /* Error status code */
436 BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp,
437 "Upstream iterator's next method returned an error: status=%s",
438 bt_common_func_status_string(input_port_iter_status));
439 status = (int) input_port_iter_status;
440 break;
441 default:
442 /* Unsupported status code */
443 BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp,
444 "Unsupported status code: status=%s",
445 bt_common_func_status_string(input_port_iter_status));
446 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
447 break;
448 }
449
450 return status;
451 }
452
453 static
454 int get_msg_ts_ns(struct muxer_comp *muxer_comp,
455 struct muxer_msg_iter *muxer_msg_iter,
456 const bt_message *msg, int64_t last_returned_ts_ns,
457 int64_t *ts_ns)
458 {
459 const bt_clock_snapshot *clock_snapshot = NULL;
460 int ret = 0;
461 const bt_stream_class *stream_class = NULL;
462 bt_message_type msg_type;
463
464 BT_ASSERT_DBG(msg);
465 BT_ASSERT_DBG(ts_ns);
466 BT_COMP_LOGD("Getting message's timestamp: "
467 "muxer-msg-iter-addr=%p, msg-addr=%p, "
468 "last-returned-ts=%" PRId64,
469 muxer_msg_iter, msg, last_returned_ts_ns);
470
471 if (G_UNLIKELY(muxer_msg_iter->clock_class_expectation ==
472 MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NONE)) {
473 *ts_ns = last_returned_ts_ns;
474 goto end;
475 }
476
477 msg_type = bt_message_get_type(msg);
478
479 if (G_UNLIKELY(msg_type == BT_MESSAGE_TYPE_PACKET_BEGINNING)) {
480 stream_class = bt_stream_borrow_class_const(
481 bt_packet_borrow_stream_const(
482 bt_message_packet_beginning_borrow_packet_const(
483 msg)));
484 } else if (G_UNLIKELY(msg_type == BT_MESSAGE_TYPE_PACKET_END)) {
485 stream_class = bt_stream_borrow_class_const(
486 bt_packet_borrow_stream_const(
487 bt_message_packet_end_borrow_packet_const(
488 msg)));
489 } else if (G_UNLIKELY(msg_type == BT_MESSAGE_TYPE_DISCARDED_EVENTS)) {
490 stream_class = bt_stream_borrow_class_const(
491 bt_message_discarded_events_borrow_stream_const(msg));
492 } else if (G_UNLIKELY(msg_type == BT_MESSAGE_TYPE_DISCARDED_PACKETS)) {
493 stream_class = bt_stream_borrow_class_const(
494 bt_message_discarded_packets_borrow_stream_const(msg));
495 }
496
497 switch (msg_type) {
498 case BT_MESSAGE_TYPE_EVENT:
499 BT_ASSERT_DBG(bt_message_event_borrow_stream_class_default_clock_class_const(
500 msg));
501 clock_snapshot = bt_message_event_borrow_default_clock_snapshot_const(
502 msg);
503 break;
504 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
505 if (bt_stream_class_packets_have_beginning_default_clock_snapshot(
506 stream_class)) {
507 clock_snapshot = bt_message_packet_beginning_borrow_default_clock_snapshot_const(
508 msg);
509 } else {
510 goto no_clock_snapshot;
511 }
512
513 break;
514 case BT_MESSAGE_TYPE_PACKET_END:
515 if (bt_stream_class_packets_have_end_default_clock_snapshot(
516 stream_class)) {
517 clock_snapshot = bt_message_packet_end_borrow_default_clock_snapshot_const(
518 msg);
519 } else {
520 goto no_clock_snapshot;
521 }
522
523 break;
524 case BT_MESSAGE_TYPE_STREAM_BEGINNING:
525 {
526 enum bt_message_stream_clock_snapshot_state snapshot_state =
527 bt_message_stream_beginning_borrow_default_clock_snapshot_const(
528 msg, &clock_snapshot);
529 if (snapshot_state == BT_MESSAGE_STREAM_CLOCK_SNAPSHOT_STATE_UNKNOWN) {
530 goto no_clock_snapshot;
531 }
532
533 break;
534 }
535 case BT_MESSAGE_TYPE_STREAM_END:
536 {
537 enum bt_message_stream_clock_snapshot_state snapshot_state =
538 bt_message_stream_end_borrow_default_clock_snapshot_const(
539 msg, &clock_snapshot);
540 if (snapshot_state == BT_MESSAGE_STREAM_CLOCK_SNAPSHOT_STATE_UNKNOWN) {
541 goto no_clock_snapshot;
542 }
543
544 break;
545 }
546 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
547 if (bt_stream_class_discarded_events_have_default_clock_snapshots(
548 stream_class)) {
549 clock_snapshot = bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(
550 msg);
551 } else {
552 goto no_clock_snapshot;
553 }
554
555 break;
556 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
557 if (bt_stream_class_discarded_packets_have_default_clock_snapshots(
558 stream_class)) {
559 clock_snapshot = bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(
560 msg);
561 } else {
562 goto no_clock_snapshot;
563 }
564
565 break;
566 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
567 clock_snapshot = bt_message_message_iterator_inactivity_borrow_clock_snapshot_const(
568 msg);
569 break;
570 default:
571 /* All the other messages have a higher priority */
572 BT_COMP_LOGD_STR("Message has no timestamp: using the last returned timestamp.");
573 *ts_ns = last_returned_ts_ns;
574 goto end;
575 }
576
577 ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, ts_ns);
578 if (ret) {
579 BT_COMP_LOGE("Cannot get nanoseconds from Epoch of clock snapshot: "
580 "clock-snapshot-addr=%p", clock_snapshot);
581 goto error;
582 }
583
584 goto end;
585
586 no_clock_snapshot:
587 BT_COMP_LOGD_STR("Message's default clock snapshot is missing: "
588 "using the last returned timestamp.");
589 *ts_ns = last_returned_ts_ns;
590 goto end;
591
592 error:
593 ret = -1;
594
595 end:
596 if (ret == 0) {
597 BT_COMP_LOGD("Found message's timestamp: "
598 "muxer-msg-iter-addr=%p, msg-addr=%p, "
599 "last-returned-ts=%" PRId64 ", ts=%" PRId64,
600 muxer_msg_iter, msg, last_returned_ts_ns,
601 *ts_ns);
602 }
603
604 return ret;
605 }
606
607 static inline
608 int validate_clock_class(struct muxer_msg_iter *muxer_msg_iter,
609 struct muxer_comp *muxer_comp,
610 const bt_clock_class *clock_class)
611 {
612 int ret = 0;
613 const uint8_t *cc_uuid;
614 const char *cc_name;
615
616 BT_ASSERT_DBG(clock_class);
617 cc_uuid = bt_clock_class_get_uuid(clock_class);
618 cc_name = bt_clock_class_get_name(clock_class);
619
620 if (muxer_msg_iter->clock_class_expectation ==
621 MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ANY) {
622 /*
623 * This is the first clock class that this muxer message
624 * iterator encounters. Its properties determine what to expect
625 * for the whole lifetime of the iterator.
626 */
627 if (bt_clock_class_origin_is_unix_epoch(clock_class)) {
628 /* Expect absolute clock classes */
629 muxer_msg_iter->clock_class_expectation =
630 MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ABSOLUTE;
631 } else {
632 if (cc_uuid) {
633 /*
634 * Expect non-absolute clock classes
635 * with a specific UUID.
636 */
637 muxer_msg_iter->clock_class_expectation =
638 MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_SPEC_UUID;
639 bt_uuid_copy(muxer_msg_iter->expected_clock_class_uuid, cc_uuid);
640 } else {
641 /*
642 * Expect non-absolute clock classes
643 * with no UUID.
644 */
645 muxer_msg_iter->clock_class_expectation =
646 MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_NO_UUID;
647 }
648 }
649 }
650
651 switch (muxer_msg_iter->clock_class_expectation) {
652 case MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ABSOLUTE:
653 if (!bt_clock_class_origin_is_unix_epoch(clock_class)) {
654 BT_COMP_LOGE("Expecting an absolute clock class, "
655 "but got a non-absolute one: "
656 "clock-class-addr=%p, clock-class-name=\"%s\"",
657 clock_class, cc_name);
658 goto error;
659 }
660 break;
661 case MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_NO_UUID:
662 if (bt_clock_class_origin_is_unix_epoch(clock_class)) {
663 BT_COMP_LOGE("Expecting a non-absolute clock class with no UUID, "
664 "but got an absolute one: "
665 "clock-class-addr=%p, clock-class-name=\"%s\"",
666 clock_class, cc_name);
667 goto error;
668 }
669
670 if (cc_uuid) {
671 BT_COMP_LOGE("Expecting a non-absolute clock class with no UUID, "
672 "but got one with a UUID: "
673 "clock-class-addr=%p, clock-class-name=\"%s\", "
674 "uuid=\"" BT_UUID_FMT "\"",
675 clock_class, cc_name, BT_UUID_FMT_VALUES(cc_uuid));
676 goto error;
677 }
678 break;
679 case MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_SPEC_UUID:
680 if (bt_clock_class_origin_is_unix_epoch(clock_class)) {
681 BT_COMP_LOGE("Expecting a non-absolute clock class with a specific UUID, "
682 "but got an absolute one: "
683 "clock-class-addr=%p, clock-class-name=\"%s\"",
684 clock_class, cc_name);
685 goto error;
686 }
687
688 if (!cc_uuid) {
689 BT_COMP_LOGE("Expecting a non-absolute clock class with a specific UUID, "
690 "but got one with no UUID: "
691 "clock-class-addr=%p, clock-class-name=\"%s\"",
692 clock_class, cc_name);
693 goto error;
694 }
695
696 if (bt_uuid_compare(muxer_msg_iter->expected_clock_class_uuid, cc_uuid) != 0) {
697 BT_COMP_LOGE("Expecting a non-absolute clock class with a specific UUID, "
698 "but got one with different UUID: "
699 "clock-class-addr=%p, clock-class-name=\"%s\", "
700 "expected-uuid=\"" BT_UUID_FMT "\", "
701 "uuid=\"" BT_UUID_FMT "\"",
702 clock_class, cc_name,
703 BT_UUID_FMT_VALUES(muxer_msg_iter->expected_clock_class_uuid),
704 BT_UUID_FMT_VALUES(cc_uuid));
705 goto error;
706 }
707 break;
708 case MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NONE:
709 BT_COMP_LOGE("Expecting no clock class, but got one: "
710 "clock-class-addr=%p, clock-class-name=\"%s\"",
711 clock_class, cc_name);
712 goto error;
713 default:
714 /* Unexpected */
715 BT_COMP_LOGF("Unexpected clock class expectation: "
716 "expectation-code=%d",
717 muxer_msg_iter->clock_class_expectation);
718 bt_common_abort();
719 }
720
721 goto end;
722
723 error:
724 ret = -1;
725
726 end:
727 return ret;
728 }
729
730 static inline
731 int validate_new_stream_clock_class(struct muxer_msg_iter *muxer_msg_iter,
732 struct muxer_comp *muxer_comp, const bt_stream *stream)
733 {
734 int ret = 0;
735 const bt_stream_class *stream_class =
736 bt_stream_borrow_class_const(stream);
737 const bt_clock_class *clock_class =
738 bt_stream_class_borrow_default_clock_class_const(stream_class);
739
740 if (!clock_class) {
741 if (muxer_msg_iter->clock_class_expectation ==
742 MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ANY) {
743 /* Expect no clock class */
744 muxer_msg_iter->clock_class_expectation =
745 MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NONE;
746 } else if (muxer_msg_iter->clock_class_expectation !=
747 MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NONE) {
748 BT_COMP_LOGE("Expecting stream class without a default clock class: "
749 "stream-class-addr=%p, stream-class-name=\"%s\", "
750 "stream-class-id=%" PRIu64,
751 stream_class, bt_stream_class_get_name(stream_class),
752 bt_stream_class_get_id(stream_class));
753 ret = -1;
754 }
755
756 goto end;
757 }
758
759 ret = validate_clock_class(muxer_msg_iter, muxer_comp, clock_class);
760
761 end:
762 return ret;
763 }
764
765 /*
766 * This function finds the youngest available message amongst the
767 * non-ended upstream message iterators and returns the upstream
768 * message iterator which has it, or
769 * BT_MESSAGE_ITERATOR_STATUS_END if there's no available
770 * message.
771 *
772 * This function does NOT:
773 *
774 * * Update any upstream message iterator.
775 * * Check the upstream message iterators to retry.
776 *
777 * On sucess, this function sets *muxer_upstream_msg_iter to the
778 * upstream message iterator of which the current message is
779 * the youngest, and sets *ts_ns to its time.
780 */
781 static
782 bt_message_iterator_class_next_method_status
783 muxer_msg_iter_youngest_upstream_msg_iter(
784 struct muxer_comp *muxer_comp,
785 struct muxer_msg_iter *muxer_msg_iter,
786 struct muxer_upstream_msg_iter **muxer_upstream_msg_iter,
787 int64_t *ts_ns)
788 {
789 size_t i;
790 int ret;
791 int64_t youngest_ts_ns = INT64_MAX;
792 bt_message_iterator_class_next_method_status status =
793 BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK;
794
795 BT_ASSERT_DBG(muxer_comp);
796 BT_ASSERT_DBG(muxer_msg_iter);
797 BT_ASSERT_DBG(muxer_upstream_msg_iter);
798 *muxer_upstream_msg_iter = NULL;
799
800 for (i = 0; i < muxer_msg_iter->active_muxer_upstream_msg_iters->len;
801 i++) {
802 const bt_message *msg;
803 struct muxer_upstream_msg_iter *cur_muxer_upstream_msg_iter =
804 g_ptr_array_index(
805 muxer_msg_iter->active_muxer_upstream_msg_iters,
806 i);
807 int64_t msg_ts_ns;
808
809 if (!cur_muxer_upstream_msg_iter->msg_iter) {
810 /* This upstream message iterator is ended */
811 BT_COMP_LOGT("Skipping ended upstream message iterator: "
812 "muxer-upstream-msg-iter-wrap-addr=%p",
813 cur_muxer_upstream_msg_iter);
814 continue;
815 }
816
817 BT_ASSERT_DBG(cur_muxer_upstream_msg_iter->msgs->length > 0);
818 msg = g_queue_peek_head(cur_muxer_upstream_msg_iter->msgs);
819 BT_ASSERT_DBG(msg);
820
821 if (G_UNLIKELY(bt_message_get_type(msg) ==
822 BT_MESSAGE_TYPE_STREAM_BEGINNING)) {
823 ret = validate_new_stream_clock_class(
824 muxer_msg_iter, muxer_comp,
825 bt_message_stream_beginning_borrow_stream_const(
826 msg));
827 if (ret) {
828 /*
829 * validate_new_stream_clock_class() logs
830 * errors.
831 */
832 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
833 goto end;
834 }
835 } else if (G_UNLIKELY(bt_message_get_type(msg) ==
836 BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY)) {
837 const bt_clock_snapshot *cs;
838
839 cs = bt_message_message_iterator_inactivity_borrow_clock_snapshot_const(
840 msg);
841 ret = validate_clock_class(muxer_msg_iter, muxer_comp,
842 bt_clock_snapshot_borrow_clock_class_const(cs));
843 if (ret) {
844 /* validate_clock_class() logs errors */
845 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
846 goto end;
847 }
848 }
849
850 ret = get_msg_ts_ns(muxer_comp, muxer_msg_iter, msg,
851 muxer_msg_iter->last_returned_ts_ns, &msg_ts_ns);
852 if (ret) {
853 /* get_msg_ts_ns() logs errors */
854 *muxer_upstream_msg_iter = NULL;
855 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
856 goto end;
857 }
858
859 /*
860 * Update the current message iterator if it has not been set
861 * yet, or if its current message has a timestamp smaller than
862 * the previously selected youngest message.
863 */
864 if (G_UNLIKELY(*muxer_upstream_msg_iter == NULL) ||
865 msg_ts_ns < youngest_ts_ns) {
866 *muxer_upstream_msg_iter =
867 cur_muxer_upstream_msg_iter;
868 youngest_ts_ns = msg_ts_ns;
869 *ts_ns = youngest_ts_ns;
870 } else if (msg_ts_ns == youngest_ts_ns) {
871 /*
872 * The currently selected message to be sent downstream
873 * next has the exact same timestamp that of the
874 * current candidate message. We must break the tie
875 * in a predictable manner.
876 */
877 const bt_message *selected_msg = g_queue_peek_head(
878 (*muxer_upstream_msg_iter)->msgs);
879 BT_COMP_LOGD_STR("Two of the next message candidates have the same timestamps, pick one deterministically.");
880
881 /*
882 * Order the messages in an arbitrary but determinitic
883 * way.
884 */
885 ret = common_muxing_compare_messages(msg, selected_msg);
886 if (ret < 0) {
887 /*
888 * The `msg` should go first. Update the next
889 * iterator and the current timestamp.
890 */
891 *muxer_upstream_msg_iter =
892 cur_muxer_upstream_msg_iter;
893 youngest_ts_ns = msg_ts_ns;
894 *ts_ns = youngest_ts_ns;
895 } else if (ret == 0) {
896 /* Unable to pick which one should go first. */
897 BT_COMP_LOGW("Cannot deterministically pick next upstream message iterator because they have identical next messages: "
898 "muxer-upstream-msg-iter-wrap-addr=%p"
899 "cur-muxer-upstream-msg-iter-wrap-addr=%p",
900 *muxer_upstream_msg_iter,
901 cur_muxer_upstream_msg_iter);
902 }
903 }
904 }
905
906 if (!*muxer_upstream_msg_iter) {
907 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END;
908 *ts_ns = INT64_MIN;
909 }
910
911 end:
912 return status;
913 }
914
915 static
916 bt_message_iterator_class_next_method_status
917 validate_muxer_upstream_msg_iter(
918 struct muxer_upstream_msg_iter *muxer_upstream_msg_iter,
919 bool *is_ended)
920 {
921 struct muxer_comp *muxer_comp = muxer_upstream_msg_iter->muxer_comp;
922 bt_message_iterator_class_next_method_status status;
923
924 BT_COMP_LOGD("Validating muxer's upstream message iterator wrapper: "
925 "muxer-upstream-msg-iter-wrap-addr=%p",
926 muxer_upstream_msg_iter);
927
928 if (muxer_upstream_msg_iter->msgs->length > 0 ||
929 !muxer_upstream_msg_iter->msg_iter) {
930 BT_COMP_LOGD("Already valid or not considered: "
931 "queue-len=%u, upstream-msg-iter-addr=%p",
932 muxer_upstream_msg_iter->msgs->length,
933 muxer_upstream_msg_iter->msg_iter);
934 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK;
935 goto end;
936 }
937
938 /* muxer_upstream_msg_iter_next() logs details/errors */
939 status = muxer_upstream_msg_iter_next(muxer_upstream_msg_iter,
940 is_ended);
941
942 end:
943 return status;
944 }
945
946 static
947 bt_message_iterator_class_next_method_status
948 validate_muxer_upstream_msg_iters(
949 struct muxer_msg_iter *muxer_msg_iter)
950 {
951 struct muxer_comp *muxer_comp = muxer_msg_iter->muxer_comp;
952 bt_message_iterator_class_next_method_status status;
953 size_t i;
954
955 BT_COMP_LOGD("Validating muxer's upstream message iterator wrappers: "
956 "muxer-msg-iter-addr=%p", muxer_msg_iter);
957
958 for (i = 0; i < muxer_msg_iter->active_muxer_upstream_msg_iters->len;
959 i++) {
960 bool is_ended = false;
961 struct muxer_upstream_msg_iter *muxer_upstream_msg_iter =
962 g_ptr_array_index(
963 muxer_msg_iter->active_muxer_upstream_msg_iters,
964 i);
965
966 status = validate_muxer_upstream_msg_iter(
967 muxer_upstream_msg_iter, &is_ended);
968 if (status != BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK) {
969 if (status < 0) {
970 BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp,
971 "Cannot validate muxer's upstream message iterator wrapper: "
972 "muxer-msg-iter-addr=%p, "
973 "muxer-upstream-msg-iter-wrap-addr=%p",
974 muxer_msg_iter,
975 muxer_upstream_msg_iter);
976 } else {
977 BT_COMP_LOGD("Cannot validate muxer's upstream message iterator wrapper: "
978 "muxer-msg-iter-addr=%p, "
979 "muxer-upstream-msg-iter-wrap-addr=%p",
980 muxer_msg_iter,
981 muxer_upstream_msg_iter);
982 }
983
984 goto end;
985 }
986
987 /*
988 * Move this muxer upstream message iterator to the
989 * array of ended iterators if it's ended.
990 */
991 if (G_UNLIKELY(is_ended)) {
992 BT_COMP_LOGD("Muxer's upstream message iterator wrapper: ended or canceled: "
993 "muxer-msg-iter-addr=%p, "
994 "muxer-upstream-msg-iter-wrap-addr=%p",
995 muxer_msg_iter, muxer_upstream_msg_iter);
996 g_ptr_array_add(
997 muxer_msg_iter->ended_muxer_upstream_msg_iters,
998 muxer_upstream_msg_iter);
999 muxer_msg_iter->active_muxer_upstream_msg_iters->pdata[i] = NULL;
1000
1001 /*
1002 * Use g_ptr_array_remove_fast() because the
1003 * order of those elements is not important.
1004 */
1005 g_ptr_array_remove_index_fast(
1006 muxer_msg_iter->active_muxer_upstream_msg_iters,
1007 i);
1008 i--;
1009 }
1010 }
1011
1012 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK;
1013
1014 end:
1015 return status;
1016 }
1017
1018 static inline
1019 bt_message_iterator_class_next_method_status muxer_msg_iter_do_next_one(
1020 struct muxer_comp *muxer_comp,
1021 struct muxer_msg_iter *muxer_msg_iter,
1022 const bt_message **msg)
1023 {
1024 bt_message_iterator_class_next_method_status status;
1025 struct muxer_upstream_msg_iter *muxer_upstream_msg_iter = NULL;
1026 /* Initialize to avoid -Wmaybe-uninitialized warning with gcc 4.8. */
1027 int64_t next_return_ts = 0;
1028
1029 status = validate_muxer_upstream_msg_iters(muxer_msg_iter);
1030 if (status != BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK) {
1031 /* validate_muxer_upstream_msg_iters() logs details */
1032 goto end;
1033 }
1034
1035 /*
1036 * At this point we know that all the existing upstream
1037 * message iterators are valid. We can find the one,
1038 * amongst those, of which the current message is the
1039 * youngest.
1040 */
1041 status = muxer_msg_iter_youngest_upstream_msg_iter(muxer_comp,
1042 muxer_msg_iter, &muxer_upstream_msg_iter,
1043 &next_return_ts);
1044 if (status < 0 || status == BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END) {
1045 if (status < 0) {
1046 BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp,
1047 "Cannot find the youngest upstream message iterator wrapper: "
1048 "status=%s",
1049 bt_common_func_status_string(status));
1050 } else {
1051 BT_COMP_LOGD("Cannot find the youngest upstream message iterator wrapper: "
1052 "status=%s",
1053 bt_common_func_status_string(status));
1054 }
1055
1056 goto end;
1057 }
1058
1059 if (next_return_ts < muxer_msg_iter->last_returned_ts_ns) {
1060 BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp,
1061 "Youngest upstream message iterator wrapper's timestamp is less than muxer's message iterator's last returned timestamp: "
1062 "muxer-msg-iter-addr=%p, ts=%" PRId64 ", "
1063 "last-returned-ts=%" PRId64,
1064 muxer_msg_iter, next_return_ts,
1065 muxer_msg_iter->last_returned_ts_ns);
1066 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1067 goto end;
1068 }
1069
1070 BT_COMP_LOGD("Found youngest upstream message iterator wrapper: "
1071 "muxer-msg-iter-addr=%p, "
1072 "muxer-upstream-msg-iter-wrap-addr=%p, "
1073 "ts=%" PRId64,
1074 muxer_msg_iter, muxer_upstream_msg_iter, next_return_ts);
1075 BT_ASSERT_DBG(status ==
1076 BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK);
1077 BT_ASSERT_DBG(muxer_upstream_msg_iter);
1078
1079 /*
1080 * Consume from the queue's head: other side
1081 * (muxer_upstream_msg_iter_next()) writes to the tail.
1082 */
1083 *msg = g_queue_pop_head(muxer_upstream_msg_iter->msgs);
1084 BT_ASSERT_DBG(*msg);
1085 muxer_msg_iter->last_returned_ts_ns = next_return_ts;
1086
1087 end:
1088 return status;
1089 }
1090
1091 static
1092 bt_message_iterator_class_next_method_status muxer_msg_iter_do_next(
1093 struct muxer_comp *muxer_comp,
1094 struct muxer_msg_iter *muxer_msg_iter,
1095 bt_message_array_const msgs, uint64_t capacity,
1096 uint64_t *count)
1097 {
1098 bt_message_iterator_class_next_method_status status;
1099 uint64_t i = 0;
1100
1101 if (G_UNLIKELY(muxer_msg_iter->next_saved_error)) {
1102 /*
1103 * Last time we were called, we hit an error but had some
1104 * messages to deliver, so we stashed the error here. Return
1105 * it now.
1106 */
1107 BT_CURRENT_THREAD_MOVE_ERROR_AND_RESET(muxer_msg_iter->next_saved_error);
1108 status = muxer_msg_iter->next_saved_status;
1109 goto end;
1110 }
1111
1112 do {
1113 status = muxer_msg_iter_do_next_one(muxer_comp,
1114 muxer_msg_iter, &msgs[i]);
1115 if (status == BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK) {
1116 i++;
1117 }
1118 } while (i < capacity && status == BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK);
1119
1120 if (i > 0) {
1121 /*
1122 * Even if muxer_msg_iter_do_next_one() returned
1123 * something else than
1124 * BT_MESSAGE_ITERATOR_STATUS_OK, we accumulated
1125 * message objects in the output message
1126 * array, so we need to return
1127 * BT_MESSAGE_ITERATOR_STATUS_OK so that they are
1128 * transfered to downstream. This other status occurs
1129 * again the next time muxer_msg_iter_do_next() is
1130 * called, possibly without any accumulated
1131 * message, in which case we'll return it.
1132 */
1133 if (status < 0) {
1134 /*
1135 * Save this error for the next _next call. Assume that
1136 * this component always appends error causes when
1137 * returning an error status code, which will cause the
1138 * current thread error to be non-NULL.
1139 */
1140 muxer_msg_iter->next_saved_error = bt_current_thread_take_error();
1141 BT_ASSERT(muxer_msg_iter->next_saved_error);
1142 muxer_msg_iter->next_saved_status = status;
1143 }
1144
1145 *count = i;
1146 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK;
1147 }
1148
1149 end:
1150 return status;
1151 }
1152
1153 static
1154 void destroy_muxer_msg_iter(struct muxer_msg_iter *muxer_msg_iter)
1155 {
1156 struct muxer_comp *muxer_comp;
1157
1158 if (!muxer_msg_iter) {
1159 return;
1160 }
1161
1162 muxer_comp = muxer_msg_iter->muxer_comp;
1163 BT_COMP_LOGD("Destroying muxer component's message iterator: "
1164 "muxer-msg-iter-addr=%p", muxer_msg_iter);
1165
1166 if (muxer_msg_iter->active_muxer_upstream_msg_iters) {
1167 BT_COMP_LOGD_STR("Destroying muxer's active upstream message iterator wrappers.");
1168 g_ptr_array_free(
1169 muxer_msg_iter->active_muxer_upstream_msg_iters, TRUE);
1170 }
1171
1172 if (muxer_msg_iter->ended_muxer_upstream_msg_iters) {
1173 BT_COMP_LOGD_STR("Destroying muxer's ended upstream message iterator wrappers.");
1174 g_ptr_array_free(
1175 muxer_msg_iter->ended_muxer_upstream_msg_iters, TRUE);
1176 }
1177
1178 g_free(muxer_msg_iter);
1179 }
1180
1181 static
1182 bt_message_iterator_class_initialize_method_status
1183 muxer_msg_iter_init_upstream_iterators(struct muxer_comp *muxer_comp,
1184 struct muxer_msg_iter *muxer_msg_iter,
1185 struct bt_self_message_iterator_configuration *config)
1186 {
1187 int64_t count;
1188 int64_t i;
1189 bt_message_iterator_class_initialize_method_status status;
1190 bool can_seek_forward = true;
1191
1192 count = bt_component_filter_get_input_port_count(
1193 bt_self_component_filter_as_component_filter(
1194 muxer_comp->self_comp_flt));
1195 if (count < 0) {
1196 BT_COMP_LOGD("No input port to initialize for muxer component's message iterator: "
1197 "muxer-comp-addr=%p, muxer-msg-iter-addr=%p",
1198 muxer_comp, muxer_msg_iter);
1199 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_OK;
1200 goto end;
1201 }
1202
1203 for (i = 0; i < count; i++) {
1204 bt_message_iterator *upstream_msg_iter;
1205 bt_self_component_port_input *self_port =
1206 bt_self_component_filter_borrow_input_port_by_index(
1207 muxer_comp->self_comp_flt, i);
1208 const bt_port *port;
1209 bt_message_iterator_create_from_message_iterator_status
1210 msg_iter_status;
1211 int int_status;
1212
1213 BT_ASSERT(self_port);
1214 port = bt_self_component_port_as_port(
1215 bt_self_component_port_input_as_self_component_port(
1216 self_port));
1217 BT_ASSERT(port);
1218
1219 if (!bt_port_is_connected(port)) {
1220 /* Skip non-connected port */
1221 continue;
1222 }
1223
1224 msg_iter_status = create_msg_iter_on_input_port(muxer_comp,
1225 muxer_msg_iter, self_port, &upstream_msg_iter);
1226 if (msg_iter_status != BT_MESSAGE_ITERATOR_CREATE_FROM_MESSAGE_ITERATOR_STATUS_OK) {
1227 /* create_msg_iter_on_input_port() logs errors */
1228 status = (int) msg_iter_status;
1229 goto end;
1230 }
1231
1232 int_status = muxer_msg_iter_add_upstream_msg_iter(muxer_msg_iter,
1233 upstream_msg_iter);
1234 bt_message_iterator_put_ref(
1235 upstream_msg_iter);
1236 if (int_status) {
1237 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
1238 /* muxer_msg_iter_add_upstream_msg_iter() logs errors */
1239 goto end;
1240 }
1241
1242 can_seek_forward = can_seek_forward &&
1243 bt_message_iterator_can_seek_forward(
1244 upstream_msg_iter);
1245 }
1246
1247 /*
1248 * This iterator can seek forward if all of its iterators can seek
1249 * forward.
1250 */
1251 bt_self_message_iterator_configuration_set_can_seek_forward(
1252 config, can_seek_forward);
1253
1254 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_OK;
1255
1256 end:
1257 return status;
1258 }
1259
1260 BT_HIDDEN
1261 bt_message_iterator_class_initialize_method_status muxer_msg_iter_init(
1262 bt_self_message_iterator *self_msg_iter,
1263 bt_self_message_iterator_configuration *config,
1264 bt_self_component_port_output *port)
1265 {
1266 struct muxer_comp *muxer_comp = NULL;
1267 struct muxer_msg_iter *muxer_msg_iter = NULL;
1268 bt_message_iterator_class_initialize_method_status status;
1269 bt_self_component *self_comp =
1270 bt_self_message_iterator_borrow_component(self_msg_iter);
1271
1272 muxer_comp = bt_self_component_get_data(self_comp);
1273 BT_ASSERT(muxer_comp);
1274 BT_COMP_LOGD("Initializing muxer component's message iterator: "
1275 "comp-addr=%p, muxer-comp-addr=%p, msg-iter-addr=%p",
1276 self_comp, muxer_comp, self_msg_iter);
1277
1278 if (muxer_comp->initializing_muxer_msg_iter) {
1279 /*
1280 * Weird, unhandled situation detected: downstream
1281 * creates a muxer message iterator while creating
1282 * another muxer message iterator (same component).
1283 */
1284 BT_COMP_LOGE("Recursive initialization of muxer component's message iterator: "
1285 "comp-addr=%p, muxer-comp-addr=%p, msg-iter-addr=%p",
1286 self_comp, muxer_comp, self_msg_iter);
1287 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
1288 goto error;
1289 }
1290
1291 muxer_comp->initializing_muxer_msg_iter = true;
1292 muxer_msg_iter = g_new0(struct muxer_msg_iter, 1);
1293 if (!muxer_msg_iter) {
1294 BT_COMP_LOGE_STR("Failed to allocate one muxer component's message iterator.");
1295 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
1296 goto error;
1297 }
1298
1299 muxer_msg_iter->muxer_comp = muxer_comp;
1300 muxer_msg_iter->self_msg_iter = self_msg_iter;
1301 muxer_msg_iter->last_returned_ts_ns = INT64_MIN;
1302 muxer_msg_iter->active_muxer_upstream_msg_iters =
1303 g_ptr_array_new_with_free_func(
1304 (GDestroyNotify) destroy_muxer_upstream_msg_iter);
1305 if (!muxer_msg_iter->active_muxer_upstream_msg_iters) {
1306 BT_COMP_LOGE_STR("Failed to allocate a GPtrArray.");
1307 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
1308 goto error;
1309 }
1310
1311 muxer_msg_iter->ended_muxer_upstream_msg_iters =
1312 g_ptr_array_new_with_free_func(
1313 (GDestroyNotify) destroy_muxer_upstream_msg_iter);
1314 if (!muxer_msg_iter->ended_muxer_upstream_msg_iters) {
1315 BT_COMP_LOGE_STR("Failed to allocate a GPtrArray.");
1316 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
1317 goto error;
1318 }
1319
1320 status = muxer_msg_iter_init_upstream_iterators(muxer_comp,
1321 muxer_msg_iter, config);
1322 if (status) {
1323 BT_COMP_LOGE("Cannot initialize connected input ports for muxer component's message iterator: "
1324 "comp-addr=%p, muxer-comp-addr=%p, "
1325 "muxer-msg-iter-addr=%p, msg-iter-addr=%p, ret=%d",
1326 self_comp, muxer_comp, muxer_msg_iter,
1327 self_msg_iter, status);
1328 goto error;
1329 }
1330
1331 bt_self_message_iterator_set_data(self_msg_iter, muxer_msg_iter);
1332 BT_COMP_LOGD("Initialized muxer component's message iterator: "
1333 "comp-addr=%p, muxer-comp-addr=%p, muxer-msg-iter-addr=%p, "
1334 "msg-iter-addr=%p",
1335 self_comp, muxer_comp, muxer_msg_iter, self_msg_iter);
1336 goto end;
1337
1338 error:
1339 destroy_muxer_msg_iter(muxer_msg_iter);
1340 bt_self_message_iterator_set_data(self_msg_iter, NULL);
1341
1342 end:
1343 muxer_comp->initializing_muxer_msg_iter = false;
1344 return status;
1345 }
1346
1347 BT_HIDDEN
1348 void muxer_msg_iter_finalize(bt_self_message_iterator *self_msg_iter)
1349 {
1350 struct muxer_msg_iter *muxer_msg_iter =
1351 bt_self_message_iterator_get_data(self_msg_iter);
1352 bt_self_component *self_comp = NULL;
1353 struct muxer_comp *muxer_comp = NULL;
1354
1355 self_comp = bt_self_message_iterator_borrow_component(
1356 self_msg_iter);
1357 BT_ASSERT(self_comp);
1358 muxer_comp = bt_self_component_get_data(self_comp);
1359 BT_COMP_LOGD("Finalizing muxer component's message iterator: "
1360 "comp-addr=%p, muxer-comp-addr=%p, muxer-msg-iter-addr=%p, "
1361 "msg-iter-addr=%p",
1362 self_comp, muxer_comp, muxer_msg_iter, self_msg_iter);
1363
1364 if (muxer_msg_iter) {
1365 destroy_muxer_msg_iter(muxer_msg_iter);
1366 }
1367 }
1368
1369 BT_HIDDEN
1370 bt_message_iterator_class_next_method_status muxer_msg_iter_next(
1371 bt_self_message_iterator *self_msg_iter,
1372 bt_message_array_const msgs, uint64_t capacity,
1373 uint64_t *count)
1374 {
1375 bt_message_iterator_class_next_method_status status;
1376 struct muxer_msg_iter *muxer_msg_iter =
1377 bt_self_message_iterator_get_data(self_msg_iter);
1378 bt_self_component *self_comp = NULL;
1379 struct muxer_comp *muxer_comp = NULL;
1380
1381 BT_ASSERT_DBG(muxer_msg_iter);
1382 self_comp = bt_self_message_iterator_borrow_component(
1383 self_msg_iter);
1384 BT_ASSERT_DBG(self_comp);
1385 muxer_comp = bt_self_component_get_data(self_comp);
1386 BT_ASSERT_DBG(muxer_comp);
1387 BT_COMP_LOGT("Muxer component's message iterator's \"next\" method called: "
1388 "comp-addr=%p, muxer-comp-addr=%p, muxer-msg-iter-addr=%p, "
1389 "msg-iter-addr=%p",
1390 self_comp, muxer_comp, muxer_msg_iter, self_msg_iter);
1391
1392 status = muxer_msg_iter_do_next(muxer_comp, muxer_msg_iter,
1393 msgs, capacity, count);
1394 if (status < 0) {
1395 BT_COMP_LOGE("Cannot get next message: "
1396 "comp-addr=%p, muxer-comp-addr=%p, muxer-msg-iter-addr=%p, "
1397 "msg-iter-addr=%p, status=%s",
1398 self_comp, muxer_comp, muxer_msg_iter, self_msg_iter,
1399 bt_common_func_status_string(status));
1400 } else {
1401 BT_COMP_LOGT("Returning from muxer component's message iterator's \"next\" method: "
1402 "status=%s",
1403 bt_common_func_status_string(status));
1404 }
1405
1406 return status;
1407 }
1408
1409 BT_HIDDEN
1410 bt_component_class_port_connected_method_status muxer_input_port_connected(
1411 bt_self_component_filter *self_comp,
1412 bt_self_component_port_input *self_port,
1413 const bt_port_output *other_port)
1414 {
1415 bt_component_class_port_connected_method_status status =
1416 BT_COMPONENT_CLASS_PORT_CONNECTED_METHOD_STATUS_OK;
1417 bt_self_component_add_port_status add_port_status;
1418 struct muxer_comp *muxer_comp = bt_self_component_get_data(
1419 bt_self_component_filter_as_self_component(self_comp));
1420
1421 add_port_status = add_available_input_port(self_comp);
1422 if (add_port_status) {
1423 BT_COMP_LOGE("Cannot add one muxer component's input port: "
1424 "status=%s",
1425 bt_common_func_status_string(status));
1426
1427 if (add_port_status ==
1428 BT_SELF_COMPONENT_ADD_PORT_STATUS_MEMORY_ERROR) {
1429 status = BT_COMPONENT_CLASS_PORT_CONNECTED_METHOD_STATUS_MEMORY_ERROR;
1430 } else {
1431 status = BT_COMPONENT_CLASS_PORT_CONNECTED_METHOD_STATUS_ERROR;
1432 }
1433
1434 goto end;
1435 }
1436
1437 end:
1438 return status;
1439 }
1440
1441 static inline
1442 bt_message_iterator_class_can_seek_beginning_method_status
1443 muxer_upstream_msg_iters_can_all_seek_beginning(
1444 GPtrArray *muxer_upstream_msg_iters, bt_bool *can_seek)
1445 {
1446 bt_message_iterator_class_can_seek_beginning_method_status status =
1447 BT_MESSAGE_ITERATOR_CLASS_CAN_SEEK_BEGINNING_METHOD_STATUS_OK;
1448 uint64_t i;
1449
1450 for (i = 0; i < muxer_upstream_msg_iters->len; i++) {
1451 struct muxer_upstream_msg_iter *upstream_msg_iter =
1452 muxer_upstream_msg_iters->pdata[i];
1453 status = (int) bt_message_iterator_can_seek_beginning(
1454 upstream_msg_iter->msg_iter, can_seek);
1455 if (status != BT_MESSAGE_ITERATOR_CLASS_CAN_SEEK_BEGINNING_METHOD_STATUS_OK) {
1456 goto end;
1457 }
1458
1459 if (!*can_seek) {
1460 goto end;
1461 }
1462 }
1463
1464 *can_seek = BT_TRUE;
1465
1466 end:
1467 return status;
1468 }
1469
1470 BT_HIDDEN
1471 bt_message_iterator_class_can_seek_beginning_method_status
1472 muxer_msg_iter_can_seek_beginning(
1473 bt_self_message_iterator *self_msg_iter, bt_bool *can_seek)
1474 {
1475 struct muxer_msg_iter *muxer_msg_iter =
1476 bt_self_message_iterator_get_data(self_msg_iter);
1477 bt_message_iterator_class_can_seek_beginning_method_status status;
1478
1479 status = muxer_upstream_msg_iters_can_all_seek_beginning(
1480 muxer_msg_iter->active_muxer_upstream_msg_iters, can_seek);
1481 if (status != BT_MESSAGE_ITERATOR_CLASS_CAN_SEEK_BEGINNING_METHOD_STATUS_OK) {
1482 goto end;
1483 }
1484
1485 if (!*can_seek) {
1486 goto end;
1487 }
1488
1489 status = muxer_upstream_msg_iters_can_all_seek_beginning(
1490 muxer_msg_iter->ended_muxer_upstream_msg_iters, can_seek);
1491
1492 end:
1493 return status;
1494 }
1495
1496 BT_HIDDEN
1497 bt_message_iterator_class_seek_beginning_method_status muxer_msg_iter_seek_beginning(
1498 bt_self_message_iterator *self_msg_iter)
1499 {
1500 struct muxer_msg_iter *muxer_msg_iter =
1501 bt_self_message_iterator_get_data(self_msg_iter);
1502 bt_message_iterator_class_seek_beginning_method_status status =
1503 BT_MESSAGE_ITERATOR_CLASS_SEEK_BEGINNING_METHOD_STATUS_OK;
1504 bt_message_iterator_seek_beginning_status seek_beg_status;
1505 uint64_t i;
1506
1507 /* Seek all ended upstream iterators first */
1508 for (i = 0; i < muxer_msg_iter->ended_muxer_upstream_msg_iters->len;
1509 i++) {
1510 struct muxer_upstream_msg_iter *upstream_msg_iter =
1511 muxer_msg_iter->ended_muxer_upstream_msg_iters->pdata[i];
1512
1513 seek_beg_status = bt_message_iterator_seek_beginning(
1514 upstream_msg_iter->msg_iter);
1515 if (seek_beg_status != BT_MESSAGE_ITERATOR_SEEK_BEGINNING_STATUS_OK) {
1516 status = (int) seek_beg_status;
1517 goto end;
1518 }
1519
1520 empty_message_queue(upstream_msg_iter);
1521 }
1522
1523 /* Seek all previously active upstream iterators */
1524 for (i = 0; i < muxer_msg_iter->active_muxer_upstream_msg_iters->len;
1525 i++) {
1526 struct muxer_upstream_msg_iter *upstream_msg_iter =
1527 muxer_msg_iter->active_muxer_upstream_msg_iters->pdata[i];
1528
1529 seek_beg_status = bt_message_iterator_seek_beginning(
1530 upstream_msg_iter->msg_iter);
1531 if (seek_beg_status != BT_MESSAGE_ITERATOR_SEEK_BEGINNING_STATUS_OK) {
1532 status = (int) seek_beg_status;
1533 goto end;
1534 }
1535
1536 empty_message_queue(upstream_msg_iter);
1537 }
1538
1539 /* Make them all active */
1540 for (i = 0; i < muxer_msg_iter->ended_muxer_upstream_msg_iters->len;
1541 i++) {
1542 struct muxer_upstream_msg_iter *upstream_msg_iter =
1543 muxer_msg_iter->ended_muxer_upstream_msg_iters->pdata[i];
1544
1545 g_ptr_array_add(muxer_msg_iter->active_muxer_upstream_msg_iters,
1546 upstream_msg_iter);
1547 muxer_msg_iter->ended_muxer_upstream_msg_iters->pdata[i] = NULL;
1548 }
1549
1550 /*
1551 * GLib < 2.48.0 asserts when g_ptr_array_remove_range() is
1552 * called on an empty array.
1553 */
1554 if (muxer_msg_iter->ended_muxer_upstream_msg_iters->len > 0) {
1555 g_ptr_array_remove_range(muxer_msg_iter->ended_muxer_upstream_msg_iters,
1556 0, muxer_msg_iter->ended_muxer_upstream_msg_iters->len);
1557 }
1558 muxer_msg_iter->last_returned_ts_ns = INT64_MIN;
1559 muxer_msg_iter->clock_class_expectation =
1560 MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ANY;
1561
1562 end:
1563 return status;
1564 }
This page took 0.093081 seconds and 4 git commands to generate.