Move to kernel style SPDX license identifiers
[babeltrace.git] / src / plugins / utils / muxer / muxer.c
CommitLineData
958f7d11 1/*
0235b0db 2 * SPDX-License-Identifier: MIT
958f7d11 3 *
0235b0db 4 * Copyright 2017 Philippe Proulx <pproulx@efficios.com>
958f7d11
PP
5 */
6
5b6473ec 7#define BT_COMP_LOG_SELF_COMP (muxer_comp->self_comp)
87ec3926 8#define BT_LOG_OUTPUT_LEVEL (muxer_comp->log_level)
350ad6c1 9#define BT_LOG_TAG "PLUGIN/FLT.UTILS.MUXER"
d9c39b0a 10#include "logging/comp-logging.h"
fed72692 11
91d81473 12#include "common/macros.h"
6162e6b7 13#include "common/uuid.h"
3fadfbc0 14#include <babeltrace2/babeltrace.h>
958f7d11 15#include <glib.h>
c55a9f58 16#include <stdbool.h>
fed72692 17#include <inttypes.h>
578e048b
MJ
18#include "common/assert.h"
19#include "common/common.h"
0fbb9a9f 20#include <stdlib.h>
282c8cd0 21#include <string.h>
958f7d11 22
1aca5200 23#include "plugins/common/muxing/muxing.h"
006c5ffb 24#include "plugins/common/param-validation/param-validation.h"
1aca5200 25
fdf0b89e
FD
26#include "muxer.h"
27
958f7d11 28struct muxer_comp {
5b6473ec
PP
29 /* Weak refs */
30 bt_self_component_filter *self_comp_flt;
31 bt_self_component *self_comp;
d94d92ac 32
958f7d11
PP
33 unsigned int next_port_num;
34 size_t available_input_ports;
d6e69534 35 bool initializing_muxer_msg_iter;
87ec3926 36 bt_logging_level log_level;
958f7d11
PP
37};
38
d6e69534 39struct muxer_upstream_msg_iter {
87ec3926
PP
40 struct muxer_comp *muxer_comp;
41
ab11110e 42 /* Owned by this, NULL if ended */
9a2c8b8e 43 bt_message_iterator *msg_iter;
958f7d11 44
d6e69534
PP
45 /* Contains `const bt_message *`, owned by this */
46 GQueue *msgs;
958f7d11
PP
47};
48
d6e69534
PP
49enum muxer_msg_iter_clock_class_expectation {
50 MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ANY = 0,
60f3d027 51 MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NONE,
d6e69534
PP
52 MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ABSOLUTE,
53 MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_SPEC_UUID,
54 MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_NO_UUID,
282c8cd0
PP
55};
56
d6e69534 57struct muxer_msg_iter {
87ec3926
PP
58 struct muxer_comp *muxer_comp;
59
ca02df0a
PP
60 /* Weak */
61 bt_self_message_iterator *self_msg_iter;
62
ab11110e 63 /*
d6e69534 64 * Array of struct muxer_upstream_msg_iter * (owned by this).
ab11110e
PP
65 *
66 * NOTE: This array is searched in linearly to find the youngest
d6e69534 67 * current message. Keep this until benchmarks confirm that
ab11110e
PP
68 * another data structure is faster than this for our typical
69 * use cases.
70 */
54bdc1f7
PP
71 GPtrArray *active_muxer_upstream_msg_iters;
72
73 /*
74 * Array of struct muxer_upstream_msg_iter * (owned by this).
75 *
76 * We move ended message iterators from
77 * `active_muxer_upstream_msg_iters` to this array so as to be
78 * able to restore them when seeking.
79 */
80 GPtrArray *ended_muxer_upstream_msg_iters;
958f7d11 81
d6e69534 82 /* Last time returned in a message */
958f7d11 83 int64_t last_returned_ts_ns;
282c8cd0
PP
84
85 /* Clock class expectation state */
d6e69534 86 enum muxer_msg_iter_clock_class_expectation clock_class_expectation;
282c8cd0
PP
87
88 /*
89 * Expected clock class UUID, only valid when
90 * clock_class_expectation is
d6e69534 91 * MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_SPEC_UUID.
282c8cd0 92 */
6162e6b7 93 bt_uuid_t expected_clock_class_uuid;
cbca1c06
SM
94
95 /*
96 * Saved error. If we hit an error in the _next method, but have some
97 * messages ready to return, we save the error here and return it on
98 * the next _next call.
99 */
a3f0c7db 100 bt_message_iterator_class_next_method_status next_saved_status;
cbca1c06 101 const struct bt_error *next_saved_error;
958f7d11
PP
102};
103
54bdc1f7
PP
104static
105void empty_message_queue(struct muxer_upstream_msg_iter *upstream_msg_iter)
106{
107 const bt_message *msg;
108
109 while ((msg = g_queue_pop_head(upstream_msg_iter->msgs))) {
110 bt_message_put_ref(msg);
111 }
112}
113
ab11110e 114static
d6e69534
PP
115void destroy_muxer_upstream_msg_iter(
116 struct muxer_upstream_msg_iter *muxer_upstream_msg_iter)
ab11110e 117{
87ec3926
PP
118 struct muxer_comp *muxer_comp;
119
d6e69534 120 if (!muxer_upstream_msg_iter) {
ab11110e
PP
121 return;
122 }
123
87ec3926 124 muxer_comp = muxer_upstream_msg_iter->muxer_comp;
5b6473ec 125 BT_COMP_LOGD("Destroying muxer's upstream message iterator wrapper: "
d6e69534
PP
126 "addr=%p, msg-iter-addr=%p, queue-len=%u",
127 muxer_upstream_msg_iter,
128 muxer_upstream_msg_iter->msg_iter,
129 muxer_upstream_msg_iter->msgs->length);
9a2c8b8e 130 bt_message_iterator_put_ref(
54bdc1f7 131 muxer_upstream_msg_iter->msg_iter);
d4393e08 132
d6e69534 133 if (muxer_upstream_msg_iter->msgs) {
54bdc1f7 134 empty_message_queue(muxer_upstream_msg_iter);
d6e69534 135 g_queue_free(muxer_upstream_msg_iter->msgs);
d4393e08
PP
136 }
137
d6e69534 138 g_free(muxer_upstream_msg_iter);
ab11110e
PP
139}
140
958f7d11 141static
c61018b9 142int muxer_msg_iter_add_upstream_msg_iter(struct muxer_msg_iter *muxer_msg_iter,
9a2c8b8e 143 bt_message_iterator *self_msg_iter)
958f7d11 144{
c61018b9 145 int ret = 0;
d6e69534
PP
146 struct muxer_upstream_msg_iter *muxer_upstream_msg_iter =
147 g_new0(struct muxer_upstream_msg_iter, 1);
87ec3926 148 struct muxer_comp *muxer_comp = muxer_msg_iter->muxer_comp;
958f7d11 149
d6e69534 150 if (!muxer_upstream_msg_iter) {
5b6473ec 151 BT_COMP_LOGE_STR("Failed to allocate one muxer's upstream message iterator wrapper.");
6c20f4a0 152 goto error;
958f7d11
PP
153 }
154
87ec3926 155 muxer_upstream_msg_iter->muxer_comp = muxer_comp;
d6e69534 156 muxer_upstream_msg_iter->msg_iter = self_msg_iter;
9a2c8b8e 157 bt_message_iterator_get_ref(muxer_upstream_msg_iter->msg_iter);
d6e69534
PP
158 muxer_upstream_msg_iter->msgs = g_queue_new();
159 if (!muxer_upstream_msg_iter->msgs) {
5b6473ec 160 BT_COMP_LOGE_STR("Failed to allocate a GQueue.");
6c20f4a0 161 goto error;
d4393e08
PP
162 }
163
54bdc1f7 164 g_ptr_array_add(muxer_msg_iter->active_muxer_upstream_msg_iters,
d6e69534 165 muxer_upstream_msg_iter);
5b6473ec 166 BT_COMP_LOGD("Added muxer's upstream message iterator wrapper: "
d6e69534
PP
167 "addr=%p, muxer-msg-iter-addr=%p, msg-iter-addr=%p",
168 muxer_upstream_msg_iter, muxer_msg_iter,
169 self_msg_iter);
958f7d11 170
6c20f4a0
FD
171 goto end;
172
173error:
174 g_free(muxer_upstream_msg_iter);
c61018b9 175 ret = -1;
6c20f4a0 176
958f7d11 177end:
c61018b9 178 return ret;
958f7d11
PP
179}
180
958f7d11 181static
d24d5663 182bt_self_component_add_port_status add_available_input_port(
b19ff26f 183 bt_self_component_filter *self_comp)
958f7d11 184{
d94d92ac 185 struct muxer_comp *muxer_comp = bt_self_component_get_data(
707b7d35 186 bt_self_component_filter_as_self_component(self_comp));
d24d5663
PP
187 bt_self_component_add_port_status status =
188 BT_SELF_COMPONENT_ADD_PORT_STATUS_OK;
958f7d11 189 GString *port_name = NULL;
958f7d11 190
f6ccaed9 191 BT_ASSERT(muxer_comp);
958f7d11
PP
192 port_name = g_string_new("in");
193 if (!port_name) {
5b6473ec 194 BT_COMP_LOGE_STR("Failed to allocate a GString.");
d24d5663 195 status = BT_SELF_COMPONENT_ADD_PORT_STATUS_MEMORY_ERROR;
958f7d11
PP
196 goto end;
197 }
198
199 g_string_append_printf(port_name, "%u", muxer_comp->next_port_num);
d94d92ac
PP
200 status = bt_self_component_filter_add_input_port(
201 self_comp, port_name->str, NULL, NULL);
d24d5663 202 if (status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) {
5b6473ec 203 BT_COMP_LOGE("Cannot add input port to muxer component: "
fed72692 204 "port-name=\"%s\", comp-addr=%p, status=%s",
d94d92ac 205 port_name->str, self_comp,
d24d5663 206 bt_common_func_status_string(status));
958f7d11
PP
207 goto end;
208 }
209
210 muxer_comp->available_input_ports++;
211 muxer_comp->next_port_num++;
5b6473ec 212 BT_COMP_LOGI("Added one input port to muxer component: "
fed72692 213 "port-name=\"%s\", comp-addr=%p",
d94d92ac 214 port_name->str, self_comp);
5badd463 215
958f7d11
PP
216end:
217 if (port_name) {
218 g_string_free(port_name, TRUE);
219 }
220
147337a3 221 return status;
958f7d11
PP
222}
223
958f7d11 224static
d24d5663 225bt_self_component_add_port_status create_output_port(
b19ff26f 226 bt_self_component_filter *self_comp)
958f7d11 227{
d94d92ac
PP
228 return bt_self_component_filter_add_output_port(
229 self_comp, "out", NULL, NULL);
958f7d11
PP
230}
231
232static
233void destroy_muxer_comp(struct muxer_comp *muxer_comp)
234{
235 if (!muxer_comp) {
236 return;
237 }
238
958f7d11
PP
239 g_free(muxer_comp);
240}
241
d9120ccb 242static
006c5ffb
SM
243struct bt_param_validation_map_value_entry_descr muxer_params[] = {
244 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END
245};
246
958f7d11 247BT_HIDDEN
21a9f056 248bt_component_class_initialize_method_status muxer_init(
5b6473ec 249 bt_self_component_filter *self_comp_flt,
59225a3e 250 bt_self_component_filter_configuration *config,
fdf0b89e 251 const bt_value *params, void *init_data)
958f7d11 252{
006c5ffb 253 bt_component_class_initialize_method_status status;
d24d5663 254 bt_self_component_add_port_status add_port_status;
5b6473ec
PP
255 bt_self_component *self_comp =
256 bt_self_component_filter_as_self_component(self_comp_flt);
958f7d11 257 struct muxer_comp *muxer_comp = g_new0(struct muxer_comp, 1);
87ec3926 258 bt_logging_level log_level = bt_component_get_logging_level(
5b6473ec 259 bt_self_component_as_component(self_comp));
006c5ffb
SM
260 enum bt_param_validation_status validation_status;
261 gchar *validate_error = NULL;
958f7d11 262
5b6473ec 263 BT_COMP_LOG_CUR_LVL(BT_LOG_INFO, log_level, self_comp,
87ec3926 264 "Initializing muxer component: "
d94d92ac 265 "comp-addr=%p, params-addr=%p", self_comp, params);
fed72692 266
958f7d11 267 if (!muxer_comp) {
5b6473ec 268 BT_COMP_LOG_CUR_LVL(BT_LOG_ERROR, log_level, self_comp,
87ec3926 269 "Failed to allocate one muxer component.");
006c5ffb 270 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
958f7d11
PP
271 goto error;
272 }
273
87ec3926 274 muxer_comp->log_level = log_level;
5b6473ec
PP
275 muxer_comp->self_comp = self_comp;
276 muxer_comp->self_comp_flt = self_comp_flt;
65ee897d 277
006c5ffb
SM
278 validation_status = bt_param_validation_validate(params,
279 muxer_params, &validate_error);
280 if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
281 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
282 goto error;
283 } else if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) {
284 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
285 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "%s", validate_error);
286 goto error;
287 }
288
5b6473ec 289 bt_self_component_set_data(self_comp, muxer_comp);
d24d5663
PP
290 add_port_status = add_available_input_port(self_comp_flt);
291 if (add_port_status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) {
5b6473ec 292 BT_COMP_LOGE("Cannot ensure that at least one muxer component's input port is available: "
fed72692
PP
293 "muxer-comp-addr=%p, status=%s",
294 muxer_comp,
d24d5663 295 bt_common_func_status_string(add_port_status));
006c5ffb 296 status = (int) add_port_status;
958f7d11
PP
297 goto error;
298 }
299
d24d5663
PP
300 add_port_status = create_output_port(self_comp_flt);
301 if (add_port_status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) {
5b6473ec 302 BT_COMP_LOGE("Cannot create muxer component's output port: "
fed72692
PP
303 "muxer-comp-addr=%p, status=%s",
304 muxer_comp,
d24d5663 305 bt_common_func_status_string(add_port_status));
006c5ffb 306 status = (int) add_port_status;
958f7d11
PP
307 goto error;
308 }
309
5b6473ec 310 BT_COMP_LOGI("Initialized muxer component: "
fed72692 311 "comp-addr=%p, params-addr=%p, muxer-comp-addr=%p",
d94d92ac 312 self_comp, params, muxer_comp);
fed72692 313
006c5ffb 314 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
958f7d11
PP
315 goto end;
316
317error:
318 destroy_muxer_comp(muxer_comp);
5b6473ec 319 bt_self_component_set_data(self_comp, NULL);
147337a3 320
958f7d11 321end:
006c5ffb 322 g_free(validate_error);
958f7d11
PP
323 return status;
324}
325
326BT_HIDDEN
b19ff26f 327void muxer_finalize(bt_self_component_filter *self_comp)
958f7d11 328{
d94d92ac 329 struct muxer_comp *muxer_comp = bt_self_component_get_data(
707b7d35 330 bt_self_component_filter_as_self_component(self_comp));
958f7d11 331
5b6473ec 332 BT_COMP_LOGI("Finalizing muxer component: comp-addr=%p",
d94d92ac 333 self_comp);
958f7d11
PP
334 destroy_muxer_comp(muxer_comp);
335}
336
337static
9a2c8b8e 338bt_message_iterator_create_from_message_iterator_status
87ec3926 339create_msg_iter_on_input_port(struct muxer_comp *muxer_comp,
ca02df0a 340 struct muxer_msg_iter *muxer_msg_iter,
e803df70 341 bt_self_component_port_input *self_port,
9a2c8b8e 342 bt_message_iterator **msg_iter)
958f7d11 343{
b19ff26f 344 const bt_port *port = bt_self_component_port_as_port(
707b7d35 345 bt_self_component_port_input_as_self_component_port(
d94d92ac 346 self_port));
9a2c8b8e 347 bt_message_iterator_create_from_message_iterator_status
e803df70 348 status;
958f7d11 349
f6ccaed9
PP
350 BT_ASSERT(port);
351 BT_ASSERT(bt_port_is_connected(port));
958f7d11 352
ab11110e 353 // TODO: Advance the iterator to >= the time of the latest
d6e69534 354 // returned message by the muxer message
ab11110e 355 // iterator which creates it.
9a2c8b8e 356 status = bt_message_iterator_create_from_message_iterator(
e803df70 357 muxer_msg_iter->self_msg_iter, self_port, msg_iter);
9a2c8b8e 358 if (status != BT_MESSAGE_ITERATOR_CREATE_FROM_MESSAGE_ITERATOR_STATUS_OK) {
5b6473ec 359 BT_COMP_LOGE("Cannot create upstream message iterator on input port: "
d94d92ac
PP
360 "port-addr=%p, port-name=\"%s\"",
361 port, bt_port_get_name(port));
958f7d11
PP
362 goto end;
363 }
364
5b6473ec 365 BT_COMP_LOGI("Created upstream message iterator on input port: "
d6e69534
PP
366 "port-addr=%p, port-name=\"%s\", msg-iter-addr=%p",
367 port, bt_port_get_name(port), msg_iter);
fed72692 368
958f7d11 369end:
e803df70 370 return status;
958f7d11
PP
371}
372
ab11110e 373static
a3f0c7db 374bt_message_iterator_class_next_method_status muxer_upstream_msg_iter_next(
54bdc1f7
PP
375 struct muxer_upstream_msg_iter *muxer_upstream_msg_iter,
376 bool *is_ended)
ab11110e 377{
7fb13d3f 378 struct muxer_comp *muxer_comp = muxer_upstream_msg_iter->muxer_comp;
a3f0c7db 379 bt_message_iterator_class_next_method_status status;
d24d5663 380 bt_message_iterator_next_status input_port_iter_status;
d6e69534 381 bt_message_array_const msgs;
d4393e08
PP
382 uint64_t i;
383 uint64_t count;
ab11110e 384
5b6473ec 385 BT_COMP_LOGD("Calling upstream message iterator's \"next\" method: "
d6e69534
PP
386 "muxer-upstream-msg-iter-wrap-addr=%p, msg-iter-addr=%p",
387 muxer_upstream_msg_iter,
388 muxer_upstream_msg_iter->msg_iter);
9a2c8b8e 389 input_port_iter_status = bt_message_iterator_next(
d6e69534 390 muxer_upstream_msg_iter->msg_iter, &msgs, &count);
5b6473ec 391 BT_COMP_LOGD("Upstream message iterator's \"next\" method returned: "
d24d5663
PP
392 "status=%s",
393 bt_common_func_status_string(input_port_iter_status));
ab11110e 394
fdf0b89e 395 switch (input_port_iter_status) {
d24d5663 396 case BT_MESSAGE_ITERATOR_NEXT_STATUS_OK:
089717de 397 /*
d6e69534 398 * Message iterator's current message is
d4393e08 399 * valid: it must be considered for muxing operations.
089717de 400 */
5b6473ec 401 BT_COMP_LOGD_STR("Validated upstream message iterator wrapper.");
98b15851 402 BT_ASSERT_DBG(count > 0);
d4393e08 403
d6e69534 404 /* Move messages to our queue */
d4393e08
PP
405 for (i = 0; i < count; i++) {
406 /*
407 * Push to tail in order; other side
d6e69534 408 * (muxer_msg_iter_do_next_one()) consumes
d4393e08
PP
409 * from the head first.
410 */
d6e69534
PP
411 g_queue_push_tail(muxer_upstream_msg_iter->msgs,
412 (void *) msgs[i]);
d4393e08 413 }
a3f0c7db 414 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK;
ab11110e 415 break;
d24d5663 416 case BT_MESSAGE_ITERATOR_NEXT_STATUS_AGAIN:
089717de 417 /*
d6e69534 418 * Message iterator's current message is not
089717de 419 * valid anymore. Return
d24d5663 420 * BT_MESSAGE_ITERATOR_NEXT_STATUS_AGAIN immediately.
089717de 421 */
a3f0c7db 422 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_AGAIN;
ab11110e 423 break;
d24d5663 424 case BT_MESSAGE_ITERATOR_NEXT_STATUS_END: /* Fall-through. */
ab11110e 425 /*
d6e69534 426 * Message iterator reached the end: release it. It
ab11110e 427 * won't be considered again to find the youngest
d6e69534 428 * message.
ab11110e 429 */
54bdc1f7 430 *is_ended = true;
a3f0c7db 431 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK;
089717de 432 break;
7fb13d3f
SM
433 case BT_MESSAGE_ITERATOR_NEXT_STATUS_ERROR:
434 case BT_MESSAGE_ITERATOR_NEXT_STATUS_MEMORY_ERROR:
435 /* Error status code */
436 BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp,
437 "Upstream iterator's next method returned an error: status=%s",
438 bt_common_func_status_string(input_port_iter_status));
439 status = (int) input_port_iter_status;
440 break;
ab11110e 441 default:
7fb13d3f
SM
442 /* Unsupported status code */
443 BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp,
444 "Unsupported status code: status=%s",
445 bt_common_func_status_string(input_port_iter_status));
a3f0c7db 446 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
089717de 447 break;
ab11110e
PP
448 }
449
089717de 450 return status;
ab11110e
PP
451}
452
958f7d11 453static
d6e69534
PP
454int get_msg_ts_ns(struct muxer_comp *muxer_comp,
455 struct muxer_msg_iter *muxer_msg_iter,
456 const bt_message *msg, int64_t last_returned_ts_ns,
958f7d11
PP
457 int64_t *ts_ns)
458{
605e1019 459 const bt_clock_snapshot *clock_snapshot = NULL;
958f7d11 460 int ret = 0;
649934d2
PP
461 const bt_stream_class *stream_class = NULL;
462 bt_message_type msg_type;
958f7d11 463
98b15851
PP
464 BT_ASSERT_DBG(msg);
465 BT_ASSERT_DBG(ts_ns);
5b6473ec 466 BT_COMP_LOGD("Getting message's timestamp: "
d6e69534 467 "muxer-msg-iter-addr=%p, msg-addr=%p, "
fed72692 468 "last-returned-ts=%" PRId64,
d6e69534 469 muxer_msg_iter, msg, last_returned_ts_ns);
fed72692 470
91d81473 471 if (G_UNLIKELY(muxer_msg_iter->clock_class_expectation ==
60f3d027
PP
472 MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NONE)) {
473 *ts_ns = last_returned_ts_ns;
474 goto end;
475 }
476
649934d2
PP
477 msg_type = bt_message_get_type(msg);
478
91d81473 479 if (G_UNLIKELY(msg_type == BT_MESSAGE_TYPE_PACKET_BEGINNING)) {
649934d2
PP
480 stream_class = bt_stream_borrow_class_const(
481 bt_packet_borrow_stream_const(
482 bt_message_packet_beginning_borrow_packet_const(
483 msg)));
91d81473 484 } else if (G_UNLIKELY(msg_type == BT_MESSAGE_TYPE_PACKET_END)) {
649934d2
PP
485 stream_class = bt_stream_borrow_class_const(
486 bt_packet_borrow_stream_const(
487 bt_message_packet_end_borrow_packet_const(
488 msg)));
91d81473 489 } else if (G_UNLIKELY(msg_type == BT_MESSAGE_TYPE_DISCARDED_EVENTS)) {
2e90378a
PP
490 stream_class = bt_stream_borrow_class_const(
491 bt_message_discarded_events_borrow_stream_const(msg));
91d81473 492 } else if (G_UNLIKELY(msg_type == BT_MESSAGE_TYPE_DISCARDED_PACKETS)) {
2e90378a
PP
493 stream_class = bt_stream_borrow_class_const(
494 bt_message_discarded_packets_borrow_stream_const(msg));
649934d2
PP
495 }
496
497 switch (msg_type) {
d6e69534 498 case BT_MESSAGE_TYPE_EVENT:
98b15851 499 BT_ASSERT_DBG(bt_message_event_borrow_stream_class_default_clock_class_const(
60f3d027 500 msg));
0cbc2c33
PP
501 clock_snapshot = bt_message_event_borrow_default_clock_snapshot_const(
502 msg);
958f7d11 503 break;
5366eb53 504 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
9b24b6aa 505 if (bt_stream_class_packets_have_beginning_default_clock_snapshot(
649934d2
PP
506 stream_class)) {
507 clock_snapshot = bt_message_packet_beginning_borrow_default_clock_snapshot_const(
508 msg);
509 } else {
510 goto no_clock_snapshot;
511 }
512
5366eb53
PP
513 break;
514 case BT_MESSAGE_TYPE_PACKET_END:
9b24b6aa 515 if (bt_stream_class_packets_have_end_default_clock_snapshot(
649934d2
PP
516 stream_class)) {
517 clock_snapshot = bt_message_packet_end_borrow_default_clock_snapshot_const(
518 msg);
519 } else {
520 goto no_clock_snapshot;
521 }
522
5366eb53 523 break;
c47138bf
FD
524 case BT_MESSAGE_TYPE_STREAM_BEGINNING:
525 {
526 enum bt_message_stream_clock_snapshot_state snapshot_state =
527 bt_message_stream_beginning_borrow_default_clock_snapshot_const(
528 msg, &clock_snapshot);
529 if (snapshot_state == BT_MESSAGE_STREAM_CLOCK_SNAPSHOT_STATE_UNKNOWN) {
530 goto no_clock_snapshot;
531 }
532
533 break;
534 }
535 case BT_MESSAGE_TYPE_STREAM_END:
536 {
537 enum bt_message_stream_clock_snapshot_state snapshot_state =
538 bt_message_stream_end_borrow_default_clock_snapshot_const(
539 msg, &clock_snapshot);
540 if (snapshot_state == BT_MESSAGE_STREAM_CLOCK_SNAPSHOT_STATE_UNKNOWN) {
541 goto no_clock_snapshot;
542 }
543
544 break;
545 }
5366eb53 546 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
2e90378a
PP
547 if (bt_stream_class_discarded_events_have_default_clock_snapshots(
548 stream_class)) {
9b24b6aa 549 clock_snapshot = bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(
2e90378a
PP
550 msg);
551 } else {
552 goto no_clock_snapshot;
553 }
554
5366eb53
PP
555 break;
556 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
2e90378a
PP
557 if (bt_stream_class_discarded_packets_have_default_clock_snapshots(
558 stream_class)) {
9b24b6aa 559 clock_snapshot = bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(
2e90378a
PP
560 msg);
561 } else {
562 goto no_clock_snapshot;
563 }
564
5366eb53 565 break;
b9fd9cbb 566 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
60d02328 567 clock_snapshot = bt_message_message_iterator_inactivity_borrow_clock_snapshot_const(
0cbc2c33 568 msg);
958f7d11
PP
569 break;
570 default:
d6e69534 571 /* All the other messages have a higher priority */
5b6473ec 572 BT_COMP_LOGD_STR("Message has no timestamp: using the last returned timestamp.");
958f7d11
PP
573 *ts_ns = last_returned_ts_ns;
574 goto end;
575 }
576
60f3d027
PP
577 ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, ts_ns);
578 if (ret) {
5b6473ec 579 BT_COMP_LOGE("Cannot get nanoseconds from Epoch of clock snapshot: "
60f3d027
PP
580 "clock-snapshot-addr=%p", clock_snapshot);
581 goto error;
582 }
583
584 goto end;
585
586no_clock_snapshot:
5b6473ec 587 BT_COMP_LOGD_STR("Message's default clock snapshot is missing: "
60f3d027
PP
588 "using the last returned timestamp.");
589 *ts_ns = last_returned_ts_ns;
590 goto end;
591
592error:
593 ret = -1;
594
595end:
596 if (ret == 0) {
5b6473ec 597 BT_COMP_LOGD("Found message's timestamp: "
60f3d027
PP
598 "muxer-msg-iter-addr=%p, msg-addr=%p, "
599 "last-returned-ts=%" PRId64 ", ts=%" PRId64,
600 muxer_msg_iter, msg, last_returned_ts_ns,
601 *ts_ns);
44c440bc
PP
602 }
603
60f3d027
PP
604 return ret;
605}
606
607static inline
608int validate_clock_class(struct muxer_msg_iter *muxer_msg_iter,
609 struct muxer_comp *muxer_comp,
610 const bt_clock_class *clock_class)
611{
612 int ret = 0;
6162e6b7 613 const uint8_t *cc_uuid;
60f3d027
PP
614 const char *cc_name;
615
98b15851 616 BT_ASSERT_DBG(clock_class);
50842bdc
PP
617 cc_uuid = bt_clock_class_get_uuid(clock_class);
618 cc_name = bt_clock_class_get_name(clock_class);
282c8cd0 619
d6e69534
PP
620 if (muxer_msg_iter->clock_class_expectation ==
621 MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ANY) {
282c8cd0 622 /*
74f4949e
FD
623 * This is the first clock class that this muxer message
624 * iterator encounters. Its properties determine what to expect
625 * for the whole lifetime of the iterator.
282c8cd0 626 */
5552377a 627 if (bt_clock_class_origin_is_unix_epoch(clock_class)) {
282c8cd0 628 /* Expect absolute clock classes */
d6e69534
PP
629 muxer_msg_iter->clock_class_expectation =
630 MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ABSOLUTE;
282c8cd0
PP
631 } else {
632 if (cc_uuid) {
633 /*
634 * Expect non-absolute clock classes
635 * with a specific UUID.
636 */
d6e69534
PP
637 muxer_msg_iter->clock_class_expectation =
638 MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_SPEC_UUID;
6162e6b7 639 bt_uuid_copy(muxer_msg_iter->expected_clock_class_uuid, cc_uuid);
282c8cd0
PP
640 } else {
641 /*
642 * Expect non-absolute clock classes
643 * with no UUID.
644 */
d6e69534
PP
645 muxer_msg_iter->clock_class_expectation =
646 MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_NO_UUID;
282c8cd0
PP
647 }
648 }
649 }
650
74f4949e
FD
651 switch (muxer_msg_iter->clock_class_expectation) {
652 case MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ABSOLUTE:
653 if (!bt_clock_class_origin_is_unix_epoch(clock_class)) {
654 BT_COMP_LOGE("Expecting an absolute clock class, "
655 "but got a non-absolute one: "
656 "clock-class-addr=%p, clock-class-name=\"%s\"",
657 clock_class, cc_name);
658 goto error;
659 }
660 break;
661 case MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_NO_UUID:
662 if (bt_clock_class_origin_is_unix_epoch(clock_class)) {
663 BT_COMP_LOGE("Expecting a non-absolute clock class with no UUID, "
664 "but got an absolute one: "
665 "clock-class-addr=%p, clock-class-name=\"%s\"",
666 clock_class, cc_name);
667 goto error;
668 }
282c8cd0 669
74f4949e
FD
670 if (cc_uuid) {
671 BT_COMP_LOGE("Expecting a non-absolute clock class with no UUID, "
672 "but got one with a UUID: "
673 "clock-class-addr=%p, clock-class-name=\"%s\", "
674 "uuid=\"" BT_UUID_FMT "\"",
675 clock_class, cc_name, BT_UUID_FMT_VALUES(cc_uuid));
676 goto error;
677 }
678 break;
679 case MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_SPEC_UUID:
680 if (bt_clock_class_origin_is_unix_epoch(clock_class)) {
681 BT_COMP_LOGE("Expecting a non-absolute clock class with a specific UUID, "
682 "but got an absolute one: "
683 "clock-class-addr=%p, clock-class-name=\"%s\"",
684 clock_class, cc_name);
685 goto error;
686 }
282c8cd0 687
74f4949e
FD
688 if (!cc_uuid) {
689 BT_COMP_LOGE("Expecting a non-absolute clock class with a specific UUID, "
690 "but got one with no UUID: "
60f3d027
PP
691 "clock-class-addr=%p, clock-class-name=\"%s\"",
692 clock_class, cc_name);
693 goto error;
282c8cd0 694 }
74f4949e
FD
695
696 if (bt_uuid_compare(muxer_msg_iter->expected_clock_class_uuid, cc_uuid) != 0) {
697 BT_COMP_LOGE("Expecting a non-absolute clock class with a specific UUID, "
698 "but got one with different UUID: "
699 "clock-class-addr=%p, clock-class-name=\"%s\", "
700 "expected-uuid=\"" BT_UUID_FMT "\", "
701 "uuid=\"" BT_UUID_FMT "\"",
702 clock_class, cc_name,
703 BT_UUID_FMT_VALUES(muxer_msg_iter->expected_clock_class_uuid),
704 BT_UUID_FMT_VALUES(cc_uuid));
705 goto error;
706 }
707 break;
708 case MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NONE:
709 BT_COMP_LOGE("Expecting no clock class, but got one: "
710 "clock-class-addr=%p, clock-class-name=\"%s\"",
711 clock_class, cc_name);
712 goto error;
713 default:
714 /* Unexpected */
715 BT_COMP_LOGF("Unexpected clock class expectation: "
716 "expectation-code=%d",
717 muxer_msg_iter->clock_class_expectation);
498e7994 718 bt_common_abort();
958f7d11
PP
719 }
720
4c0f1c8c
PP
721 goto end;
722
958f7d11
PP
723error:
724 ret = -1;
725
726end:
60f3d027
PP
727 return ret;
728}
729
730static inline
731int validate_new_stream_clock_class(struct muxer_msg_iter *muxer_msg_iter,
732 struct muxer_comp *muxer_comp, const bt_stream *stream)
733{
734 int ret = 0;
735 const bt_stream_class *stream_class =
736 bt_stream_borrow_class_const(stream);
737 const bt_clock_class *clock_class =
738 bt_stream_class_borrow_default_clock_class_const(stream_class);
739
740 if (!clock_class) {
741 if (muxer_msg_iter->clock_class_expectation ==
742 MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ANY) {
743 /* Expect no clock class */
744 muxer_msg_iter->clock_class_expectation =
745 MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NONE;
9244b07a
SM
746 } else if (muxer_msg_iter->clock_class_expectation !=
747 MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NONE) {
748 BT_COMP_LOGE("Expecting stream class without a default clock class: "
60f3d027
PP
749 "stream-class-addr=%p, stream-class-name=\"%s\", "
750 "stream-class-id=%" PRIu64,
751 stream_class, bt_stream_class_get_name(stream_class),
752 bt_stream_class_get_id(stream_class));
753 ret = -1;
754 }
755
756 goto end;
fed72692
PP
757 }
758
60f3d027
PP
759 ret = validate_clock_class(muxer_msg_iter, muxer_comp, clock_class);
760
761end:
958f7d11
PP
762 return ret;
763}
764
ab11110e 765/*
d6e69534
PP
766 * This function finds the youngest available message amongst the
767 * non-ended upstream message iterators and returns the upstream
768 * message iterator which has it, or
769 * BT_MESSAGE_ITERATOR_STATUS_END if there's no available
770 * message.
ab11110e
PP
771 *
772 * This function does NOT:
773 *
d6e69534 774 * * Update any upstream message iterator.
d6e69534 775 * * Check the upstream message iterators to retry.
ab11110e 776 *
d6e69534
PP
777 * On sucess, this function sets *muxer_upstream_msg_iter to the
778 * upstream message iterator of which the current message is
ab11110e
PP
779 * the youngest, and sets *ts_ns to its time.
780 */
958f7d11 781static
a3f0c7db 782bt_message_iterator_class_next_method_status
d6e69534 783muxer_msg_iter_youngest_upstream_msg_iter(
958f7d11 784 struct muxer_comp *muxer_comp,
d6e69534
PP
785 struct muxer_msg_iter *muxer_msg_iter,
786 struct muxer_upstream_msg_iter **muxer_upstream_msg_iter,
958f7d11
PP
787 int64_t *ts_ns)
788{
789 size_t i;
790 int ret;
791 int64_t youngest_ts_ns = INT64_MAX;
a3f0c7db
SM
792 bt_message_iterator_class_next_method_status status =
793 BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK;
958f7d11 794
98b15851
PP
795 BT_ASSERT_DBG(muxer_comp);
796 BT_ASSERT_DBG(muxer_msg_iter);
797 BT_ASSERT_DBG(muxer_upstream_msg_iter);
d6e69534
PP
798 *muxer_upstream_msg_iter = NULL;
799
54bdc1f7
PP
800 for (i = 0; i < muxer_msg_iter->active_muxer_upstream_msg_iters->len;
801 i++) {
d6e69534
PP
802 const bt_message *msg;
803 struct muxer_upstream_msg_iter *cur_muxer_upstream_msg_iter =
54bdc1f7
PP
804 g_ptr_array_index(
805 muxer_msg_iter->active_muxer_upstream_msg_iters,
806 i);
d6e69534
PP
807 int64_t msg_ts_ns;
808
809 if (!cur_muxer_upstream_msg_iter->msg_iter) {
810 /* This upstream message iterator is ended */
ef267d12 811 BT_COMP_LOGT("Skipping ended upstream message iterator: "
d6e69534
PP
812 "muxer-upstream-msg-iter-wrap-addr=%p",
813 cur_muxer_upstream_msg_iter);
958f7d11
PP
814 continue;
815 }
816
98b15851 817 BT_ASSERT_DBG(cur_muxer_upstream_msg_iter->msgs->length > 0);
d6e69534 818 msg = g_queue_peek_head(cur_muxer_upstream_msg_iter->msgs);
98b15851 819 BT_ASSERT_DBG(msg);
60f3d027 820
91d81473 821 if (G_UNLIKELY(bt_message_get_type(msg) ==
60f3d027
PP
822 BT_MESSAGE_TYPE_STREAM_BEGINNING)) {
823 ret = validate_new_stream_clock_class(
824 muxer_msg_iter, muxer_comp,
825 bt_message_stream_beginning_borrow_stream_const(
826 msg));
827 if (ret) {
828 /*
829 * validate_new_stream_clock_class() logs
830 * errors.
831 */
a3f0c7db 832 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
60f3d027
PP
833 goto end;
834 }
91d81473 835 } else if (G_UNLIKELY(bt_message_get_type(msg) ==
60f3d027
PP
836 BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY)) {
837 const bt_clock_snapshot *cs;
60f3d027 838
60d02328 839 cs = bt_message_message_iterator_inactivity_borrow_clock_snapshot_const(
0cbc2c33 840 msg);
60f3d027
PP
841 ret = validate_clock_class(muxer_msg_iter, muxer_comp,
842 bt_clock_snapshot_borrow_clock_class_const(cs));
843 if (ret) {
844 /* validate_clock_class() logs errors */
a3f0c7db 845 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
60f3d027
PP
846 goto end;
847 }
848 }
849
d6e69534
PP
850 ret = get_msg_ts_ns(muxer_comp, muxer_msg_iter, msg,
851 muxer_msg_iter->last_returned_ts_ns, &msg_ts_ns);
958f7d11 852 if (ret) {
d6e69534
PP
853 /* get_msg_ts_ns() logs errors */
854 *muxer_upstream_msg_iter = NULL;
a3f0c7db 855 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
958f7d11
PP
856 goto end;
857 }
858
e5b2784a
FD
859 /*
860 * Update the current message iterator if it has not been set
861 * yet, or if its current message has a timestamp smaller than
862 * the previously selected youngest message.
863 */
864 if (G_UNLIKELY(*muxer_upstream_msg_iter == NULL) ||
865 msg_ts_ns < youngest_ts_ns) {
d6e69534
PP
866 *muxer_upstream_msg_iter =
867 cur_muxer_upstream_msg_iter;
868 youngest_ts_ns = msg_ts_ns;
958f7d11 869 *ts_ns = youngest_ts_ns;
6915f47a
FD
870 } else if (msg_ts_ns == youngest_ts_ns) {
871 /*
872 * The currently selected message to be sent downstream
873 * next has the exact same timestamp that of the
874 * current candidate message. We must break the tie
875 * in a predictable manner.
876 */
877 const bt_message *selected_msg = g_queue_peek_head(
878 (*muxer_upstream_msg_iter)->msgs);
879 BT_COMP_LOGD_STR("Two of the next message candidates have the same timestamps, pick one deterministically.");
880
881 /*
882 * Order the messages in an arbitrary but determinitic
883 * way.
884 */
1aca5200 885 ret = common_muxing_compare_messages(msg, selected_msg);
6915f47a
FD
886 if (ret < 0) {
887 /*
888 * The `msg` should go first. Update the next
889 * iterator and the current timestamp.
890 */
891 *muxer_upstream_msg_iter =
892 cur_muxer_upstream_msg_iter;
893 youngest_ts_ns = msg_ts_ns;
894 *ts_ns = youngest_ts_ns;
895 } else if (ret == 0) {
896 /* Unable to pick which one should go first. */
897 BT_COMP_LOGW("Cannot deterministically pick next upstream message iterator because they have identical next messages: "
898 "muxer-upstream-msg-iter-wrap-addr=%p"
899 "cur-muxer-upstream-msg-iter-wrap-addr=%p",
900 *muxer_upstream_msg_iter,
901 cur_muxer_upstream_msg_iter);
902 }
958f7d11
PP
903 }
904 }
905
d6e69534 906 if (!*muxer_upstream_msg_iter) {
a3f0c7db 907 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END;
958f7d11
PP
908 *ts_ns = INT64_MIN;
909 }
910
911end:
912 return status;
913}
914
915static
a3f0c7db 916bt_message_iterator_class_next_method_status
d24d5663 917validate_muxer_upstream_msg_iter(
54bdc1f7
PP
918 struct muxer_upstream_msg_iter *muxer_upstream_msg_iter,
919 bool *is_ended)
958f7d11 920{
7fb13d3f 921 struct muxer_comp *muxer_comp = muxer_upstream_msg_iter->muxer_comp;
a3f0c7db 922 bt_message_iterator_class_next_method_status status;
958f7d11 923
5b6473ec 924 BT_COMP_LOGD("Validating muxer's upstream message iterator wrapper: "
d6e69534
PP
925 "muxer-upstream-msg-iter-wrap-addr=%p",
926 muxer_upstream_msg_iter);
fed72692 927
d6e69534
PP
928 if (muxer_upstream_msg_iter->msgs->length > 0 ||
929 !muxer_upstream_msg_iter->msg_iter) {
5b6473ec 930 BT_COMP_LOGD("Already valid or not considered: "
d6e69534
PP
931 "queue-len=%u, upstream-msg-iter-addr=%p",
932 muxer_upstream_msg_iter->msgs->length,
933 muxer_upstream_msg_iter->msg_iter);
a3f0c7db 934 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK;
ab11110e
PP
935 goto end;
936 }
937
d6e69534 938 /* muxer_upstream_msg_iter_next() logs details/errors */
54bdc1f7
PP
939 status = muxer_upstream_msg_iter_next(muxer_upstream_msg_iter,
940 is_ended);
089717de
PP
941
942end:
943 return status;
944}
945
946static
a3f0c7db 947bt_message_iterator_class_next_method_status
d24d5663 948validate_muxer_upstream_msg_iters(
d6e69534 949 struct muxer_msg_iter *muxer_msg_iter)
089717de 950{
87ec3926 951 struct muxer_comp *muxer_comp = muxer_msg_iter->muxer_comp;
a3f0c7db 952 bt_message_iterator_class_next_method_status status;
089717de
PP
953 size_t i;
954
5b6473ec 955 BT_COMP_LOGD("Validating muxer's upstream message iterator wrappers: "
d6e69534 956 "muxer-msg-iter-addr=%p", muxer_msg_iter);
fed72692 957
54bdc1f7
PP
958 for (i = 0; i < muxer_msg_iter->active_muxer_upstream_msg_iters->len;
959 i++) {
6bf2abbd 960 bool is_ended = false;
d6e69534 961 struct muxer_upstream_msg_iter *muxer_upstream_msg_iter =
089717de 962 g_ptr_array_index(
54bdc1f7 963 muxer_msg_iter->active_muxer_upstream_msg_iters,
089717de
PP
964 i);
965
d6e69534 966 status = validate_muxer_upstream_msg_iter(
54bdc1f7 967 muxer_upstream_msg_iter, &is_ended);
a3f0c7db 968 if (status != BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK) {
fed72692 969 if (status < 0) {
7fb13d3f
SM
970 BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp,
971 "Cannot validate muxer's upstream message iterator wrapper: "
d6e69534
PP
972 "muxer-msg-iter-addr=%p, "
973 "muxer-upstream-msg-iter-wrap-addr=%p",
974 muxer_msg_iter,
975 muxer_upstream_msg_iter);
fed72692 976 } else {
5b6473ec 977 BT_COMP_LOGD("Cannot validate muxer's upstream message iterator wrapper: "
d6e69534
PP
978 "muxer-msg-iter-addr=%p, "
979 "muxer-upstream-msg-iter-wrap-addr=%p",
980 muxer_msg_iter,
981 muxer_upstream_msg_iter);
fed72692
PP
982 }
983
089717de
PP
984 goto end;
985 }
744ba28b
PP
986
987 /*
54bdc1f7
PP
988 * Move this muxer upstream message iterator to the
989 * array of ended iterators if it's ended.
744ba28b 990 */
91d81473 991 if (G_UNLIKELY(is_ended)) {
5b6473ec 992 BT_COMP_LOGD("Muxer's upstream message iterator wrapper: ended or canceled: "
54bdc1f7
PP
993 "muxer-msg-iter-addr=%p, "
994 "muxer-upstream-msg-iter-wrap-addr=%p",
995 muxer_msg_iter, muxer_upstream_msg_iter);
996 g_ptr_array_add(
997 muxer_msg_iter->ended_muxer_upstream_msg_iters,
998 muxer_upstream_msg_iter);
999 muxer_msg_iter->active_muxer_upstream_msg_iters->pdata[i] = NULL;
1000
744ba28b
PP
1001 /*
1002 * Use g_ptr_array_remove_fast() because the
1003 * order of those elements is not important.
1004 */
1005 g_ptr_array_remove_index_fast(
54bdc1f7 1006 muxer_msg_iter->active_muxer_upstream_msg_iters,
744ba28b
PP
1007 i);
1008 i--;
1009 }
089717de
PP
1010 }
1011
a3f0c7db 1012 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK;
7fb13d3f 1013
089717de
PP
1014end:
1015 return status;
1016}
1017
d4393e08 1018static inline
a3f0c7db 1019bt_message_iterator_class_next_method_status muxer_msg_iter_do_next_one(
089717de 1020 struct muxer_comp *muxer_comp,
d6e69534
PP
1021 struct muxer_msg_iter *muxer_msg_iter,
1022 const bt_message **msg)
089717de 1023{
a3f0c7db 1024 bt_message_iterator_class_next_method_status status;
d6e69534 1025 struct muxer_upstream_msg_iter *muxer_upstream_msg_iter = NULL;
18961057
SM
1026 /* Initialize to avoid -Wmaybe-uninitialized warning with gcc 4.8. */
1027 int64_t next_return_ts = 0;
089717de 1028
5badd463 1029 status = validate_muxer_upstream_msg_iters(muxer_msg_iter);
a3f0c7db 1030 if (status != BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK) {
5badd463
PP
1031 /* validate_muxer_upstream_msg_iters() logs details */
1032 goto end;
958f7d11
PP
1033 }
1034
1035 /*
089717de 1036 * At this point we know that all the existing upstream
d6e69534
PP
1037 * message iterators are valid. We can find the one,
1038 * amongst those, of which the current message is the
089717de 1039 * youngest.
958f7d11 1040 */
d6e69534
PP
1041 status = muxer_msg_iter_youngest_upstream_msg_iter(muxer_comp,
1042 muxer_msg_iter, &muxer_upstream_msg_iter,
089717de 1043 &next_return_ts);
a3f0c7db 1044 if (status < 0 || status == BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END) {
d4393e08 1045 if (status < 0) {
7fb13d3f
SM
1046 BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp,
1047 "Cannot find the youngest upstream message iterator wrapper: "
fed72692 1048 "status=%s",
d24d5663 1049 bt_common_func_status_string(status));
fed72692 1050 } else {
5b6473ec 1051 BT_COMP_LOGD("Cannot find the youngest upstream message iterator wrapper: "
fed72692 1052 "status=%s",
d24d5663 1053 bt_common_func_status_string(status));
fed72692
PP
1054 }
1055
958f7d11
PP
1056 goto end;
1057 }
1058
d6e69534 1059 if (next_return_ts < muxer_msg_iter->last_returned_ts_ns) {
7fb13d3f
SM
1060 BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp,
1061 "Youngest upstream message iterator wrapper's timestamp is less than muxer's message iterator's last returned timestamp: "
d6e69534 1062 "muxer-msg-iter-addr=%p, ts=%" PRId64 ", "
fed72692 1063 "last-returned-ts=%" PRId64,
d6e69534
PP
1064 muxer_msg_iter, next_return_ts,
1065 muxer_msg_iter->last_returned_ts_ns);
a3f0c7db 1066 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
958f7d11
PP
1067 goto end;
1068 }
1069
5b6473ec 1070 BT_COMP_LOGD("Found youngest upstream message iterator wrapper: "
d6e69534
PP
1071 "muxer-msg-iter-addr=%p, "
1072 "muxer-upstream-msg-iter-wrap-addr=%p, "
fed72692 1073 "ts=%" PRId64,
d6e69534 1074 muxer_msg_iter, muxer_upstream_msg_iter, next_return_ts);
98b15851 1075 BT_ASSERT_DBG(status ==
a3f0c7db 1076 BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK);
98b15851 1077 BT_ASSERT_DBG(muxer_upstream_msg_iter);
958f7d11
PP
1078
1079 /*
d4393e08 1080 * Consume from the queue's head: other side
d6e69534 1081 * (muxer_upstream_msg_iter_next()) writes to the tail.
958f7d11 1082 */
d6e69534 1083 *msg = g_queue_pop_head(muxer_upstream_msg_iter->msgs);
98b15851 1084 BT_ASSERT_DBG(*msg);
d6e69534 1085 muxer_msg_iter->last_returned_ts_ns = next_return_ts;
958f7d11
PP
1086
1087end:
d4393e08
PP
1088 return status;
1089}
1090
1091static
a3f0c7db 1092bt_message_iterator_class_next_method_status muxer_msg_iter_do_next(
d4393e08 1093 struct muxer_comp *muxer_comp,
d6e69534
PP
1094 struct muxer_msg_iter *muxer_msg_iter,
1095 bt_message_array_const msgs, uint64_t capacity,
d4393e08
PP
1096 uint64_t *count)
1097{
a3f0c7db 1098 bt_message_iterator_class_next_method_status status;
d4393e08
PP
1099 uint64_t i = 0;
1100
cbca1c06
SM
1101 if (G_UNLIKELY(muxer_msg_iter->next_saved_error)) {
1102 /*
1103 * Last time we were called, we hit an error but had some
1104 * messages to deliver, so we stashed the error here. Return
1105 * it now.
1106 */
1107 BT_CURRENT_THREAD_MOVE_ERROR_AND_RESET(muxer_msg_iter->next_saved_error);
1108 status = muxer_msg_iter->next_saved_status;
1109 goto end;
1110 }
1111
1112 do {
d6e69534
PP
1113 status = muxer_msg_iter_do_next_one(muxer_comp,
1114 muxer_msg_iter, &msgs[i]);
a3f0c7db 1115 if (status == BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK) {
d4393e08
PP
1116 i++;
1117 }
a3f0c7db 1118 } while (i < capacity && status == BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK);
d4393e08
PP
1119
1120 if (i > 0) {
1121 /*
d6e69534 1122 * Even if muxer_msg_iter_do_next_one() returned
d4393e08 1123 * something else than
d6e69534
PP
1124 * BT_MESSAGE_ITERATOR_STATUS_OK, we accumulated
1125 * message objects in the output message
d4393e08 1126 * array, so we need to return
d6e69534 1127 * BT_MESSAGE_ITERATOR_STATUS_OK so that they are
d4393e08 1128 * transfered to downstream. This other status occurs
d6e69534 1129 * again the next time muxer_msg_iter_do_next() is
d4393e08 1130 * called, possibly without any accumulated
d6e69534 1131 * message, in which case we'll return it.
d4393e08 1132 */
cbca1c06
SM
1133 if (status < 0) {
1134 /*
1135 * Save this error for the next _next call. Assume that
1136 * this component always appends error causes when
1137 * returning an error status code, which will cause the
1138 * current thread error to be non-NULL.
1139 */
1140 muxer_msg_iter->next_saved_error = bt_current_thread_take_error();
1141 BT_ASSERT(muxer_msg_iter->next_saved_error);
1142 muxer_msg_iter->next_saved_status = status;
1143 }
1144
d4393e08 1145 *count = i;
a3f0c7db 1146 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK;
d4393e08
PP
1147 }
1148
cbca1c06 1149end:
d4393e08 1150 return status;
958f7d11
PP
1151}
1152
1153static
d6e69534 1154void destroy_muxer_msg_iter(struct muxer_msg_iter *muxer_msg_iter)
ab11110e 1155{
87ec3926
PP
1156 struct muxer_comp *muxer_comp;
1157
d6e69534 1158 if (!muxer_msg_iter) {
ab11110e
PP
1159 return;
1160 }
1161
87ec3926 1162 muxer_comp = muxer_msg_iter->muxer_comp;
5b6473ec 1163 BT_COMP_LOGD("Destroying muxer component's message iterator: "
d6e69534 1164 "muxer-msg-iter-addr=%p", muxer_msg_iter);
fed72692 1165
54bdc1f7 1166 if (muxer_msg_iter->active_muxer_upstream_msg_iters) {
5b6473ec 1167 BT_COMP_LOGD_STR("Destroying muxer's active upstream message iterator wrappers.");
54bdc1f7
PP
1168 g_ptr_array_free(
1169 muxer_msg_iter->active_muxer_upstream_msg_iters, TRUE);
1170 }
1171
1172 if (muxer_msg_iter->ended_muxer_upstream_msg_iters) {
5b6473ec 1173 BT_COMP_LOGD_STR("Destroying muxer's ended upstream message iterator wrappers.");
ab11110e 1174 g_ptr_array_free(
54bdc1f7 1175 muxer_msg_iter->ended_muxer_upstream_msg_iters, TRUE);
ab11110e
PP
1176 }
1177
d6e69534 1178 g_free(muxer_msg_iter);
ab11110e
PP
1179}
1180
1181static
a3f0c7db 1182bt_message_iterator_class_initialize_method_status
e803df70 1183muxer_msg_iter_init_upstream_iterators(struct muxer_comp *muxer_comp,
c0e46a7c
SM
1184 struct muxer_msg_iter *muxer_msg_iter,
1185 struct bt_self_message_iterator_configuration *config)
958f7d11 1186{
544d0515
PP
1187 int64_t count;
1188 int64_t i;
a3f0c7db 1189 bt_message_iterator_class_initialize_method_status status;
c0e46a7c 1190 bool can_seek_forward = true;
958f7d11 1191
d94d92ac 1192 count = bt_component_filter_get_input_port_count(
707b7d35 1193 bt_self_component_filter_as_component_filter(
5b6473ec 1194 muxer_comp->self_comp_flt));
544d0515 1195 if (count < 0) {
5b6473ec 1196 BT_COMP_LOGD("No input port to initialize for muxer component's message iterator: "
d6e69534
PP
1197 "muxer-comp-addr=%p, muxer-msg-iter-addr=%p",
1198 muxer_comp, muxer_msg_iter);
a3f0c7db 1199 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_OK;
ab11110e
PP
1200 goto end;
1201 }
958f7d11
PP
1202
1203 for (i = 0; i < count; i++) {
9a2c8b8e 1204 bt_message_iterator *upstream_msg_iter;
b19ff26f 1205 bt_self_component_port_input *self_port =
d94d92ac 1206 bt_self_component_filter_borrow_input_port_by_index(
5b6473ec 1207 muxer_comp->self_comp_flt, i);
b19ff26f 1208 const bt_port *port;
9a2c8b8e 1209 bt_message_iterator_create_from_message_iterator_status
e803df70
SM
1210 msg_iter_status;
1211 int int_status;
958f7d11 1212
d94d92ac 1213 BT_ASSERT(self_port);
707b7d35
PP
1214 port = bt_self_component_port_as_port(
1215 bt_self_component_port_input_as_self_component_port(
d94d92ac 1216 self_port));
f6ccaed9 1217 BT_ASSERT(port);
958f7d11
PP
1218
1219 if (!bt_port_is_connected(port)) {
5badd463 1220 /* Skip non-connected port */
958f7d11
PP
1221 continue;
1222 }
1223
e803df70
SM
1224 msg_iter_status = create_msg_iter_on_input_port(muxer_comp,
1225 muxer_msg_iter, self_port, &upstream_msg_iter);
9a2c8b8e 1226 if (msg_iter_status != BT_MESSAGE_ITERATOR_CREATE_FROM_MESSAGE_ITERATOR_STATUS_OK) {
5badd463 1227 /* create_msg_iter_on_input_port() logs errors */
e803df70 1228 status = (int) msg_iter_status;
ab11110e 1229 goto end;
958f7d11 1230 }
fed72692 1231
e803df70 1232 int_status = muxer_msg_iter_add_upstream_msg_iter(muxer_msg_iter,
c61018b9 1233 upstream_msg_iter);
9a2c8b8e 1234 bt_message_iterator_put_ref(
5badd463 1235 upstream_msg_iter);
e803df70 1236 if (int_status) {
a3f0c7db 1237 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
5badd463 1238 /* muxer_msg_iter_add_upstream_msg_iter() logs errors */
5badd463
PP
1239 goto end;
1240 }
c0e46a7c
SM
1241
1242 can_seek_forward = can_seek_forward &&
9a2c8b8e 1243 bt_message_iterator_can_seek_forward(
c0e46a7c 1244 upstream_msg_iter);
958f7d11
PP
1245 }
1246
c0e46a7c
SM
1247 /*
1248 * This iterator can seek forward if all of its iterators can seek
1249 * forward.
1250 */
1251 bt_self_message_iterator_configuration_set_can_seek_forward(
1252 config, can_seek_forward);
1253
a3f0c7db 1254 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_OK;
e803df70 1255
958f7d11 1256end:
e803df70 1257 return status;
958f7d11
PP
1258}
1259
958f7d11 1260BT_HIDDEN
a3f0c7db 1261bt_message_iterator_class_initialize_method_status muxer_msg_iter_init(
d6e69534 1262 bt_self_message_iterator *self_msg_iter,
8d8b141d 1263 bt_self_message_iterator_configuration *config,
b19ff26f 1264 bt_self_component_port_output *port)
958f7d11
PP
1265{
1266 struct muxer_comp *muxer_comp = NULL;
d6e69534 1267 struct muxer_msg_iter *muxer_msg_iter = NULL;
a3f0c7db 1268 bt_message_iterator_class_initialize_method_status status;
f615b250
PP
1269 bt_self_component *self_comp =
1270 bt_self_message_iterator_borrow_component(self_msg_iter);
958f7d11 1271
a3f0c7db 1272 muxer_comp = bt_self_component_get_data(self_comp);
f6ccaed9 1273 BT_ASSERT(muxer_comp);
5b6473ec 1274 BT_COMP_LOGD("Initializing muxer component's message iterator: "
d6e69534
PP
1275 "comp-addr=%p, muxer-comp-addr=%p, msg-iter-addr=%p",
1276 self_comp, muxer_comp, self_msg_iter);
a09c6b95 1277
d6e69534 1278 if (muxer_comp->initializing_muxer_msg_iter) {
a09c6b95 1279 /*
089717de 1280 * Weird, unhandled situation detected: downstream
d6e69534
PP
1281 * creates a muxer message iterator while creating
1282 * another muxer message iterator (same component).
a09c6b95 1283 */
5b6473ec 1284 BT_COMP_LOGE("Recursive initialization of muxer component's message iterator: "
d6e69534
PP
1285 "comp-addr=%p, muxer-comp-addr=%p, msg-iter-addr=%p",
1286 self_comp, muxer_comp, self_msg_iter);
a3f0c7db 1287 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
958f7d11
PP
1288 goto error;
1289 }
1290
d6e69534
PP
1291 muxer_comp->initializing_muxer_msg_iter = true;
1292 muxer_msg_iter = g_new0(struct muxer_msg_iter, 1);
1293 if (!muxer_msg_iter) {
5b6473ec 1294 BT_COMP_LOGE_STR("Failed to allocate one muxer component's message iterator.");
a3f0c7db 1295 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
ab11110e
PP
1296 goto error;
1297 }
1298
87ec3926 1299 muxer_msg_iter->muxer_comp = muxer_comp;
ca02df0a 1300 muxer_msg_iter->self_msg_iter = self_msg_iter;
d6e69534 1301 muxer_msg_iter->last_returned_ts_ns = INT64_MIN;
54bdc1f7 1302 muxer_msg_iter->active_muxer_upstream_msg_iters =
958f7d11 1303 g_ptr_array_new_with_free_func(
d6e69534 1304 (GDestroyNotify) destroy_muxer_upstream_msg_iter);
54bdc1f7 1305 if (!muxer_msg_iter->active_muxer_upstream_msg_iters) {
5b6473ec 1306 BT_COMP_LOGE_STR("Failed to allocate a GPtrArray.");
a3f0c7db 1307 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
54bdc1f7
PP
1308 goto error;
1309 }
1310
1311 muxer_msg_iter->ended_muxer_upstream_msg_iters =
1312 g_ptr_array_new_with_free_func(
1313 (GDestroyNotify) destroy_muxer_upstream_msg_iter);
1314 if (!muxer_msg_iter->ended_muxer_upstream_msg_iters) {
5b6473ec 1315 BT_COMP_LOGE_STR("Failed to allocate a GPtrArray.");
a3f0c7db 1316 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
958f7d11
PP
1317 goto error;
1318 }
1319
e803df70 1320 status = muxer_msg_iter_init_upstream_iterators(muxer_comp,
c0e46a7c 1321 muxer_msg_iter, config);
e803df70 1322 if (status) {
5b6473ec 1323 BT_COMP_LOGE("Cannot initialize connected input ports for muxer component's message iterator: "
fed72692 1324 "comp-addr=%p, muxer-comp-addr=%p, "
d6e69534
PP
1325 "muxer-msg-iter-addr=%p, msg-iter-addr=%p, ret=%d",
1326 self_comp, muxer_comp, muxer_msg_iter,
e803df70 1327 self_msg_iter, status);
a09c6b95
PP
1328 goto error;
1329 }
1330
5badd463 1331 bt_self_message_iterator_set_data(self_msg_iter, muxer_msg_iter);
5b6473ec 1332 BT_COMP_LOGD("Initialized muxer component's message iterator: "
d6e69534
PP
1333 "comp-addr=%p, muxer-comp-addr=%p, muxer-msg-iter-addr=%p, "
1334 "msg-iter-addr=%p",
1335 self_comp, muxer_comp, muxer_msg_iter, self_msg_iter);
958f7d11
PP
1336 goto end;
1337
1338error:
d6e69534 1339 destroy_muxer_msg_iter(muxer_msg_iter);
5badd463 1340 bt_self_message_iterator_set_data(self_msg_iter, NULL);
958f7d11
PP
1341
1342end:
d6e69534 1343 muxer_comp->initializing_muxer_msg_iter = false;
958f7d11
PP
1344 return status;
1345}
1346
1347BT_HIDDEN
54bdc1f7 1348void muxer_msg_iter_finalize(bt_self_message_iterator *self_msg_iter)
958f7d11 1349{
d6e69534
PP
1350 struct muxer_msg_iter *muxer_msg_iter =
1351 bt_self_message_iterator_get_data(self_msg_iter);
b19ff26f 1352 bt_self_component *self_comp = NULL;
958f7d11
PP
1353 struct muxer_comp *muxer_comp = NULL;
1354
d6e69534
PP
1355 self_comp = bt_self_message_iterator_borrow_component(
1356 self_msg_iter);
d94d92ac
PP
1357 BT_ASSERT(self_comp);
1358 muxer_comp = bt_self_component_get_data(self_comp);
5b6473ec 1359 BT_COMP_LOGD("Finalizing muxer component's message iterator: "
d6e69534
PP
1360 "comp-addr=%p, muxer-comp-addr=%p, muxer-msg-iter-addr=%p, "
1361 "msg-iter-addr=%p",
1362 self_comp, muxer_comp, muxer_msg_iter, self_msg_iter);
958f7d11 1363
5badd463 1364 if (muxer_msg_iter) {
d6e69534 1365 destroy_muxer_msg_iter(muxer_msg_iter);
958f7d11 1366 }
958f7d11
PP
1367}
1368
1369BT_HIDDEN
a3f0c7db 1370bt_message_iterator_class_next_method_status muxer_msg_iter_next(
d6e69534
PP
1371 bt_self_message_iterator *self_msg_iter,
1372 bt_message_array_const msgs, uint64_t capacity,
d4393e08 1373 uint64_t *count)
958f7d11 1374{
a3f0c7db 1375 bt_message_iterator_class_next_method_status status;
d6e69534
PP
1376 struct muxer_msg_iter *muxer_msg_iter =
1377 bt_self_message_iterator_get_data(self_msg_iter);
b19ff26f 1378 bt_self_component *self_comp = NULL;
958f7d11 1379 struct muxer_comp *muxer_comp = NULL;
958f7d11 1380
98b15851 1381 BT_ASSERT_DBG(muxer_msg_iter);
d6e69534
PP
1382 self_comp = bt_self_message_iterator_borrow_component(
1383 self_msg_iter);
98b15851 1384 BT_ASSERT_DBG(self_comp);
d94d92ac 1385 muxer_comp = bt_self_component_get_data(self_comp);
98b15851 1386 BT_ASSERT_DBG(muxer_comp);
ef267d12 1387 BT_COMP_LOGT("Muxer component's message iterator's \"next\" method called: "
d6e69534
PP
1388 "comp-addr=%p, muxer-comp-addr=%p, muxer-msg-iter-addr=%p, "
1389 "msg-iter-addr=%p",
1390 self_comp, muxer_comp, muxer_msg_iter, self_msg_iter);
fed72692 1391
d6e69534
PP
1392 status = muxer_msg_iter_do_next(muxer_comp, muxer_msg_iter,
1393 msgs, capacity, count);
d4393e08 1394 if (status < 0) {
5b6473ec 1395 BT_COMP_LOGE("Cannot get next message: "
d6e69534
PP
1396 "comp-addr=%p, muxer-comp-addr=%p, muxer-msg-iter-addr=%p, "
1397 "msg-iter-addr=%p, status=%s",
1398 self_comp, muxer_comp, muxer_msg_iter, self_msg_iter,
d24d5663 1399 bt_common_func_status_string(status));
fed72692 1400 } else {
ef267d12 1401 BT_COMP_LOGT("Returning from muxer component's message iterator's \"next\" method: "
d4393e08 1402 "status=%s",
d24d5663 1403 bt_common_func_status_string(status));
fed72692 1404 }
958f7d11 1405
d4393e08 1406 return status;
958f7d11
PP
1407}
1408
1409BT_HIDDEN
d24d5663 1410bt_component_class_port_connected_method_status muxer_input_port_connected(
b19ff26f
PP
1411 bt_self_component_filter *self_comp,
1412 bt_self_component_port_input *self_port,
1413 const bt_port_output *other_port)
958f7d11 1414{
d24d5663
PP
1415 bt_component_class_port_connected_method_status status =
1416 BT_COMPONENT_CLASS_PORT_CONNECTED_METHOD_STATUS_OK;
1417 bt_self_component_add_port_status add_port_status;
87ec3926
PP
1418 struct muxer_comp *muxer_comp = bt_self_component_get_data(
1419 bt_self_component_filter_as_self_component(self_comp));
958f7d11 1420
d24d5663
PP
1421 add_port_status = add_available_input_port(self_comp);
1422 if (add_port_status) {
5b6473ec 1423 BT_COMP_LOGE("Cannot add one muxer component's input port: "
5badd463 1424 "status=%s",
d24d5663
PP
1425 bt_common_func_status_string(status));
1426
1427 if (add_port_status ==
1428 BT_SELF_COMPONENT_ADD_PORT_STATUS_MEMORY_ERROR) {
1429 status = BT_COMPONENT_CLASS_PORT_CONNECTED_METHOD_STATUS_MEMORY_ERROR;
1430 } else {
1431 status = BT_COMPONENT_CLASS_PORT_CONNECTED_METHOD_STATUS_ERROR;
1432 }
1433
06a2cb0d
PP
1434 goto end;
1435 }
1436
958f7d11 1437end:
bf55043c 1438 return status;
958f7d11 1439}
b5443165 1440
54bdc1f7 1441static inline
a3f0c7db 1442bt_message_iterator_class_can_seek_beginning_method_status
f2fb1b32
SM
1443muxer_upstream_msg_iters_can_all_seek_beginning(
1444 GPtrArray *muxer_upstream_msg_iters, bt_bool *can_seek)
b5443165 1445{
a3f0c7db
SM
1446 bt_message_iterator_class_can_seek_beginning_method_status status =
1447 BT_MESSAGE_ITERATOR_CLASS_CAN_SEEK_BEGINNING_METHOD_STATUS_OK;
b5443165 1448 uint64_t i;
b5443165 1449
54bdc1f7 1450 for (i = 0; i < muxer_upstream_msg_iters->len; i++) {
b5443165 1451 struct muxer_upstream_msg_iter *upstream_msg_iter =
54bdc1f7 1452 muxer_upstream_msg_iters->pdata[i];
9a2c8b8e 1453 status = (int) bt_message_iterator_can_seek_beginning(
f2fb1b32 1454 upstream_msg_iter->msg_iter, can_seek);
a3f0c7db 1455 if (status != BT_MESSAGE_ITERATOR_CLASS_CAN_SEEK_BEGINNING_METHOD_STATUS_OK) {
f2fb1b32
SM
1456 goto end;
1457 }
b5443165 1458
f2fb1b32 1459 if (!*can_seek) {
b5443165
PP
1460 goto end;
1461 }
1462 }
1463
f2fb1b32
SM
1464 *can_seek = BT_TRUE;
1465
b5443165 1466end:
f2fb1b32 1467 return status;
b5443165
PP
1468}
1469
54bdc1f7 1470BT_HIDDEN
a3f0c7db 1471bt_message_iterator_class_can_seek_beginning_method_status
f2fb1b32
SM
1472muxer_msg_iter_can_seek_beginning(
1473 bt_self_message_iterator *self_msg_iter, bt_bool *can_seek)
54bdc1f7
PP
1474{
1475 struct muxer_msg_iter *muxer_msg_iter =
1476 bt_self_message_iterator_get_data(self_msg_iter);
a3f0c7db 1477 bt_message_iterator_class_can_seek_beginning_method_status status;
54bdc1f7 1478
f2fb1b32
SM
1479 status = muxer_upstream_msg_iters_can_all_seek_beginning(
1480 muxer_msg_iter->active_muxer_upstream_msg_iters, can_seek);
a3f0c7db 1481 if (status != BT_MESSAGE_ITERATOR_CLASS_CAN_SEEK_BEGINNING_METHOD_STATUS_OK) {
54bdc1f7
PP
1482 goto end;
1483 }
1484
f2fb1b32 1485 if (!*can_seek) {
54bdc1f7
PP
1486 goto end;
1487 }
1488
f2fb1b32
SM
1489 status = muxer_upstream_msg_iters_can_all_seek_beginning(
1490 muxer_msg_iter->ended_muxer_upstream_msg_iters, can_seek);
1491
54bdc1f7 1492end:
f2fb1b32 1493 return status;
54bdc1f7
PP
1494}
1495
b5443165 1496BT_HIDDEN
a3f0c7db 1497bt_message_iterator_class_seek_beginning_method_status muxer_msg_iter_seek_beginning(
b5443165
PP
1498 bt_self_message_iterator *self_msg_iter)
1499{
1500 struct muxer_msg_iter *muxer_msg_iter =
1501 bt_self_message_iterator_get_data(self_msg_iter);
a3f0c7db
SM
1502 bt_message_iterator_class_seek_beginning_method_status status =
1503 BT_MESSAGE_ITERATOR_CLASS_SEEK_BEGINNING_METHOD_STATUS_OK;
d24d5663 1504 bt_message_iterator_seek_beginning_status seek_beg_status;
b5443165
PP
1505 uint64_t i;
1506
54bdc1f7
PP
1507 /* Seek all ended upstream iterators first */
1508 for (i = 0; i < muxer_msg_iter->ended_muxer_upstream_msg_iters->len;
1509 i++) {
b5443165 1510 struct muxer_upstream_msg_iter *upstream_msg_iter =
54bdc1f7 1511 muxer_msg_iter->ended_muxer_upstream_msg_iters->pdata[i];
b5443165 1512
9a2c8b8e 1513 seek_beg_status = bt_message_iterator_seek_beginning(
b5443165 1514 upstream_msg_iter->msg_iter);
d24d5663
PP
1515 if (seek_beg_status != BT_MESSAGE_ITERATOR_SEEK_BEGINNING_STATUS_OK) {
1516 status = (int) seek_beg_status;
b5443165
PP
1517 goto end;
1518 }
54bdc1f7
PP
1519
1520 empty_message_queue(upstream_msg_iter);
1521 }
1522
1523 /* Seek all previously active upstream iterators */
1524 for (i = 0; i < muxer_msg_iter->active_muxer_upstream_msg_iters->len;
1525 i++) {
1526 struct muxer_upstream_msg_iter *upstream_msg_iter =
1527 muxer_msg_iter->active_muxer_upstream_msg_iters->pdata[i];
1528
9a2c8b8e 1529 seek_beg_status = bt_message_iterator_seek_beginning(
54bdc1f7 1530 upstream_msg_iter->msg_iter);
d24d5663
PP
1531 if (seek_beg_status != BT_MESSAGE_ITERATOR_SEEK_BEGINNING_STATUS_OK) {
1532 status = (int) seek_beg_status;
54bdc1f7
PP
1533 goto end;
1534 }
1535
1536 empty_message_queue(upstream_msg_iter);
1537 }
1538
1539 /* Make them all active */
1540 for (i = 0; i < muxer_msg_iter->ended_muxer_upstream_msg_iters->len;
1541 i++) {
1542 struct muxer_upstream_msg_iter *upstream_msg_iter =
1543 muxer_msg_iter->ended_muxer_upstream_msg_iters->pdata[i];
1544
1545 g_ptr_array_add(muxer_msg_iter->active_muxer_upstream_msg_iters,
1546 upstream_msg_iter);
1547 muxer_msg_iter->ended_muxer_upstream_msg_iters->pdata[i] = NULL;
b5443165
PP
1548 }
1549
3625612b
MJ
1550 /*
1551 * GLib < 2.48.0 asserts when g_ptr_array_remove_range() is
1552 * called on an empty array.
1553 */
1554 if (muxer_msg_iter->ended_muxer_upstream_msg_iters->len > 0) {
1555 g_ptr_array_remove_range(muxer_msg_iter->ended_muxer_upstream_msg_iters,
1556 0, muxer_msg_iter->ended_muxer_upstream_msg_iters->len);
1557 }
b5443165 1558 muxer_msg_iter->last_returned_ts_ns = INT64_MIN;
54bdc1f7
PP
1559 muxer_msg_iter->clock_class_expectation =
1560 MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ANY;
b5443165
PP
1561
1562end:
d24d5663 1563 return status;
b5443165 1564}
This page took 0.155092 seconds and 4 git commands to generate.