Move to kernel style SPDX license identifiers
[babeltrace.git] / src / lib / graph / iterator.c
CommitLineData
47e5a032 1/*
0235b0db
MJ
2 * SPDX-License-Identifier: MIT
3 *
e2f7325d 4 * Copyright 2017-2018 Philippe Proulx <pproulx@efficios.com>
47e5a032 5 * Copyright 2015 Jérémie Galarneau <jeremie.galarneau@efficios.com>
47e5a032
JG
6 */
7
350ad6c1 8#define BT_LOG_TAG "LIB/MSG-ITER"
c2d9d9cf 9#include "lib/logging.h"
5af447e5 10
578e048b 11#include "compat/compiler.h"
e74dbb33 12#include "compat/glib.h"
578e048b
MJ
13#include "lib/trace-ir/clock-class.h"
14#include "lib/trace-ir/clock-snapshot.h"
3fadfbc0 15#include <babeltrace2/trace-ir/field.h>
43c59509 16#include <babeltrace2/trace-ir/event.h>
578e048b 17#include "lib/trace-ir/event.h"
43c59509 18#include <babeltrace2/trace-ir/packet.h>
578e048b
MJ
19#include "lib/trace-ir/packet.h"
20#include "lib/trace-ir/stream.h"
43c59509
PP
21#include <babeltrace2/trace-ir/clock-class.h>
22#include <babeltrace2/trace-ir/stream-class.h>
23#include <babeltrace2/trace-ir/stream.h>
24#include <babeltrace2/graph/connection.h>
25#include <babeltrace2/graph/component.h>
26#include <babeltrace2/graph/message.h>
27#include <babeltrace2/graph/self-component.h>
28#include <babeltrace2/graph/port.h>
3fadfbc0 29#include <babeltrace2/graph/graph.h>
43c59509 30#include <babeltrace2/graph/message-iterator.h>
3fadfbc0 31#include <babeltrace2/types.h>
578e048b
MJ
32#include "common/assert.h"
33#include "lib/assert-pre.h"
f6f301d7 34#include "lib/assert-post.h"
fa054faf 35#include <stdint.h>
2ec84d26 36#include <inttypes.h>
c4f23e30 37#include <stdbool.h>
0fbb9a9f 38#include <stdlib.h>
3230ee6b 39
578e048b 40#include "component-class.h"
578e048b
MJ
41#include "component.h"
42#include "component-sink.h"
43#include "component-source.h"
44#include "connection.h"
45#include "graph.h"
a3f0c7db 46#include "message-iterator-class.h"
578e048b
MJ
47#include "message/discarded-items.h"
48#include "message/event.h"
49#include "message/iterator.h"
50#include "message/message.h"
51#include "message/message-iterator-inactivity.h"
52#include "message/stream.h"
53#include "message/packet.h"
d24d5663 54#include "lib/func-status.h"
578e048b 55
d4393e08
PP
56/*
57 * TODO: Use graph's state (number of active iterators, etc.) and
58 * possibly system specifications to make a better guess than this.
59 */
d6e69534 60#define MSG_BATCH_SIZE 15
d4393e08 61
7474e7d3 62#define BT_ASSERT_PRE_ITER_HAS_STATE_TO_SEEK(_iter) \
9a2c8b8e
PP
63 BT_ASSERT_PRE((_iter)->state == BT_MESSAGE_ITERATOR_STATE_ACTIVE || \
64 (_iter)->state == BT_MESSAGE_ITERATOR_STATE_ENDED || \
65 (_iter)->state == BT_MESSAGE_ITERATOR_STATE_LAST_SEEKING_RETURNED_AGAIN || \
66 (_iter)->state == BT_MESSAGE_ITERATOR_STATE_LAST_SEEKING_RETURNED_ERROR, \
7474e7d3 67 "Message iterator is in the wrong state: %!+i", _iter)
47e5a032 68
d0fea130 69static inline
9a2c8b8e
PP
70void set_msg_iterator_state(struct bt_message_iterator *iterator,
71 enum bt_message_iterator_state state)
d0fea130 72{
98b15851 73 BT_ASSERT_DBG(iterator);
7474e7d3 74 BT_LIB_LOGD("Updating message iterator's state: new-state=%s",
9a2c8b8e 75 bt_message_iterator_state_string(state));
d0fea130
PP
76 iterator->state = state;
77}
78
47e5a032 79static
9a2c8b8e 80void bt_message_iterator_destroy(struct bt_object *obj)
47e5a032 81{
9a2c8b8e 82 struct bt_message_iterator *iterator;
8738a040 83
f6ccaed9 84 BT_ASSERT(obj);
d3eb6e8f 85
bd14d768 86 /*
d6e69534 87 * The message iterator's reference count is 0 if we're
bd14d768
PP
88 * here. Increment it to avoid a double-destroy (possibly
89 * infinitely recursive). This could happen for example if the
d6e69534 90 * message iterator's finalization function does
d94d92ac
PP
91 * bt_object_get_ref() (or anything that causes
92 * bt_object_get_ref() to be called) on itself (ref. count goes
93 * from 0 to 1), and then bt_object_put_ref(): the reference
94 * count would go from 1 to 0 again and this function would be
95 * called again.
bd14d768 96 */
3fea54f6 97 obj->ref_count++;
07245ac2 98 iterator = (void *) obj;
3f7d4d90 99 BT_LIB_LOGI("Destroying self component input port message iterator object: "
d94d92ac 100 "%!+i", iterator);
9a2c8b8e 101 bt_message_iterator_try_finalize(iterator);
d3eb6e8f 102
bd14d768
PP
103 if (iterator->connection) {
104 /*
105 * Remove ourself from the originating connection so
106 * that it does not try to finalize a dangling pointer
107 * later.
108 */
109 bt_connection_remove_iterator(iterator->connection, iterator);
d94d92ac 110 iterator->connection = NULL;
bd14d768
PP
111 }
112
da9c4c52
SM
113 if (iterator->auto_seek.msgs) {
114 while (!g_queue_is_empty(iterator->auto_seek.msgs)) {
6871026b 115 bt_object_put_ref_no_null_check(
da9c4c52 116 g_queue_pop_tail(iterator->auto_seek.msgs));
7474e7d3
PP
117 }
118
da9c4c52
SM
119 g_queue_free(iterator->auto_seek.msgs);
120 iterator->auto_seek.msgs = NULL;
7474e7d3
PP
121 }
122
ca02df0a
PP
123 if (iterator->upstream_msg_iters) {
124 /*
125 * At this point the message iterator is finalized, so
126 * it's detached from any upstream message iterator.
127 */
128 BT_ASSERT(iterator->upstream_msg_iters->len == 0);
129 g_ptr_array_free(iterator->upstream_msg_iters, TRUE);
130 iterator->upstream_msg_iters = NULL;
131 }
132
6c373cc9
PP
133 if (iterator->msgs) {
134 g_ptr_array_free(iterator->msgs, TRUE);
135 iterator->msgs = NULL;
136 }
137
138 g_free(iterator);
47e5a032
JG
139}
140
bd14d768 141BT_HIDDEN
9a2c8b8e
PP
142void bt_message_iterator_try_finalize(
143 struct bt_message_iterator *iterator)
bd14d768 144{
ca02df0a 145 uint64_t i;
fca28f75 146 bool call_user_finalize = true;
bd14d768 147
f6ccaed9 148 BT_ASSERT(iterator);
bd14d768
PP
149
150 switch (iterator->state) {
9a2c8b8e 151 case BT_MESSAGE_ITERATOR_STATE_NON_INITIALIZED:
fca28f75
SM
152 /*
153 * If this function is called while the iterator is in the
154 * NON_INITIALIZED state, it means the user initialization
155 * method has either not been called, or has failed. We
156 * therefore don't want to call the user finalization method.
157 * However, the initialization method might have created some
158 * upstream message iterators before failing, so we want to
159 * execute the rest of this function, which unlinks the related
160 * iterators.
161 */
162 call_user_finalize = false;
163 break;
9a2c8b8e 164 case BT_MESSAGE_ITERATOR_STATE_FINALIZED:
bd14d768 165 /* Already finalized */
d6e69534 166 BT_LIB_LOGD("Not finalizing message iterator: already finalized: "
d94d92ac 167 "%!+i", iterator);
d0fea130 168 goto end;
9a2c8b8e 169 case BT_MESSAGE_ITERATOR_STATE_FINALIZING:
870631a2 170 /* Finalizing */
d0fea130
PP
171 BT_LIB_LOGF("Message iterator is already being finalized: "
172 "%!+i", iterator);
498e7994 173 bt_common_abort();
bd14d768
PP
174 default:
175 break;
176 }
177
d6e69534 178 BT_LIB_LOGD("Finalizing message iterator: %!+i", iterator);
9a2c8b8e
PP
179 set_msg_iterator_state(iterator,
180 BT_MESSAGE_ITERATOR_STATE_FINALIZING);
f6ccaed9 181 BT_ASSERT(iterator->upstream_component);
bd14d768
PP
182
183 /* Call user-defined destroy method */
fca28f75 184 if (call_user_finalize) {
a3f0c7db 185 typedef void (*method_t)(void *);
41a3efcd 186 method_t method;
fca28f75
SM
187 struct bt_component_class *comp_class =
188 iterator->upstream_component->class;
41a3efcd 189 struct bt_component_class_with_iterator_class *class_with_iter_class;
fca28f75 190
41a3efcd
SM
191 BT_ASSERT(bt_component_class_has_message_iterator_class(comp_class));
192 class_with_iter_class = container_of(comp_class,
193 struct bt_component_class_with_iterator_class, parent);
194 method = (method_t) class_with_iter_class->msg_iter_cls->methods.finalize;
bd14d768 195
fca28f75
SM
196 if (method) {
197 const bt_error *saved_error;
42a63165 198
fca28f75 199 saved_error = bt_current_thread_take_error();
42a63165 200
fca28f75
SM
201 BT_LIB_LOGD("Calling user's finalization method: %!+i",
202 iterator);
203 method(iterator);
42a63165 204
fca28f75
SM
205 if (saved_error) {
206 BT_CURRENT_THREAD_MOVE_ERROR_AND_RESET(saved_error);
207 }
42a63165 208 }
bd14d768
PP
209 }
210
ca02df0a
PP
211 /* Detach upstream message iterators */
212 for (i = 0; i < iterator->upstream_msg_iters->len; i++) {
9a2c8b8e 213 struct bt_message_iterator *upstream_msg_iter =
ca02df0a
PP
214 iterator->upstream_msg_iters->pdata[i];
215
216 upstream_msg_iter->downstream_msg_iter = NULL;
217 }
218
219 g_ptr_array_set_size(iterator->upstream_msg_iters, 0);
220
221 /* Detach downstream message iterator */
222 if (iterator->downstream_msg_iter) {
223 gboolean existed;
224
225 BT_ASSERT(iterator->downstream_msg_iter->upstream_msg_iters);
226 existed = g_ptr_array_remove_fast(
227 iterator->downstream_msg_iter->upstream_msg_iters,
228 iterator);
229 BT_ASSERT(existed);
230 }
231
bd14d768
PP
232 iterator->upstream_component = NULL;
233 iterator->upstream_port = NULL;
9a2c8b8e
PP
234 set_msg_iterator_state(iterator,
235 BT_MESSAGE_ITERATOR_STATE_FINALIZED);
d6e69534 236 BT_LIB_LOGD("Finalized message iterator: %!+i", iterator);
d0fea130
PP
237
238end:
239 return;
bd14d768
PP
240}
241
242BT_HIDDEN
9a2c8b8e
PP
243void bt_message_iterator_set_connection(
244 struct bt_message_iterator *iterator,
bd14d768
PP
245 struct bt_connection *connection)
246{
f6ccaed9 247 BT_ASSERT(iterator);
bd14d768 248 iterator->connection = connection;
3f7d4d90 249 BT_LIB_LOGI("Set message iterator's connection: "
d94d92ac 250 "%![iter-]+i, %![conn-]+x", iterator, connection);
bd14d768
PP
251}
252
7474e7d3 253static
f2fb1b32 254enum bt_message_iterator_can_seek_beginning_status can_seek_ns_from_origin_true(
9a2c8b8e 255 struct bt_message_iterator *iterator,
f2fb1b32 256 int64_t ns_from_origin, bt_bool *can_seek)
7474e7d3 257{
f2fb1b32
SM
258 *can_seek = BT_TRUE;
259
260 return BT_FUNC_STATUS_OK;
7474e7d3
PP
261}
262
263static
f2fb1b32 264enum bt_message_iterator_can_seek_beginning_status can_seek_beginning_true(
9a2c8b8e 265 struct bt_message_iterator *iterator,
f2fb1b32 266 bt_bool *can_seek)
7474e7d3 267{
f2fb1b32
SM
268 *can_seek = BT_TRUE;
269
270 return BT_FUNC_STATUS_OK;
7474e7d3
PP
271}
272
d94d92ac 273static
e803df70 274int create_self_component_input_port_message_iterator(
ca02df0a 275 struct bt_self_message_iterator *self_downstream_msg_iter,
e803df70 276 struct bt_self_component_port_input *self_port,
9a2c8b8e 277 struct bt_message_iterator **message_iterator)
47e5a032 278{
a3f0c7db 279 bt_message_iterator_class_initialize_method init_method = NULL;
9a2c8b8e 280 struct bt_message_iterator *iterator =
ca02df0a 281 NULL;
9a2c8b8e 282 struct bt_message_iterator *downstream_msg_iter =
ca02df0a
PP
283 (void *) self_downstream_msg_iter;
284 struct bt_port *port = (void *) self_port;
285 struct bt_port *upstream_port;
286 struct bt_component *comp;
287 struct bt_component *upstream_comp;
288 struct bt_component_class *upstream_comp_cls;
41a3efcd 289 struct bt_component_class_with_iterator_class *upstream_comp_cls_with_iter_cls;
e803df70 290 int status;
47e5a032 291
e803df70 292 BT_ASSERT_PRE_NON_NULL(message_iterator, "Created message iterator");
ca02df0a
PP
293 BT_ASSERT_PRE_NON_NULL(port, "Input port");
294 comp = bt_port_borrow_component_inline(port);
295 BT_ASSERT_PRE(bt_port_is_connected(port),
296 "Input port is not connected: %![port-]+p", port);
297 BT_ASSERT_PRE(comp, "Input port is not part of a component: %![port-]+p",
298 port);
ca02df0a
PP
299 BT_ASSERT(port->connection);
300 upstream_port = port->connection->upstream_port;
f6ccaed9 301 BT_ASSERT(upstream_port);
ca02df0a
PP
302 upstream_comp = bt_port_borrow_component_inline(upstream_port);
303 BT_ASSERT(upstream_comp);
304 BT_ASSERT_PRE(
9b4f9b42
PP
305 bt_component_borrow_graph(upstream_comp)->config_state ==
306 BT_GRAPH_CONFIGURATION_STATE_PARTIALLY_CONFIGURED ||
307 bt_component_borrow_graph(upstream_comp)->config_state ==
308 BT_GRAPH_CONFIGURATION_STATE_CONFIGURED,
ca02df0a
PP
309 "Graph is not configured: %!+g",
310 bt_component_borrow_graph(upstream_comp));
311 upstream_comp_cls = upstream_comp->class;
312 BT_ASSERT(upstream_comp->class->type ==
d94d92ac 313 BT_COMPONENT_CLASS_TYPE_SOURCE ||
ca02df0a 314 upstream_comp->class->type ==
d94d92ac 315 BT_COMPONENT_CLASS_TYPE_FILTER);
ca02df0a
PP
316 BT_LIB_LOGI("Creating message iterator on self component input port: "
317 "%![up-comp-]+c, %![up-port-]+p", upstream_comp, upstream_port);
d94d92ac 318 iterator = g_new0(
9a2c8b8e 319 struct bt_message_iterator, 1);
47e5a032 320 if (!iterator) {
870631a2
PP
321 BT_LIB_LOGE_APPEND_CAUSE(
322 "Failed to allocate one self component input port "
d6e69534 323 "message iterator.");
e803df70 324 status = BT_FUNC_STATUS_MEMORY_ERROR;
870631a2 325 goto error;
47e5a032
JG
326 }
327
6c373cc9 328 bt_object_init_shared(&iterator->base,
9a2c8b8e 329 bt_message_iterator_destroy);
6c373cc9
PP
330 iterator->msgs = g_ptr_array_new();
331 if (!iterator->msgs) {
332 BT_LIB_LOGE_APPEND_CAUSE("Failed to allocate a GPtrArray.");
e803df70 333 status = BT_FUNC_STATUS_MEMORY_ERROR;
870631a2 334 goto error;
d4393e08 335 }
3230ee6b 336
6c373cc9 337 g_ptr_array_set_size(iterator->msgs, MSG_BATCH_SIZE);
54b135a0 338 iterator->last_ns_from_origin = INT64_MIN;
da9c4c52
SM
339 iterator->auto_seek.msgs = g_queue_new();
340 if (!iterator->auto_seek.msgs) {
870631a2 341 BT_LIB_LOGE_APPEND_CAUSE("Failed to allocate a GQueue.");
e803df70 342 status = BT_FUNC_STATUS_MEMORY_ERROR;
ca02df0a
PP
343 goto error;
344 }
345
346 iterator->upstream_msg_iters = g_ptr_array_new();
347 if (!iterator->upstream_msg_iters) {
348 BT_LIB_LOGE_APPEND_CAUSE("Failed to allocate a GPtrArray.");
e803df70 349 status = BT_FUNC_STATUS_MEMORY_ERROR;
ca02df0a 350 goto error;
3230ee6b
PP
351 }
352
bd14d768
PP
353 iterator->upstream_component = upstream_comp;
354 iterator->upstream_port = upstream_port;
d94d92ac 355 iterator->connection = iterator->upstream_port->connection;
5c563278 356 iterator->graph = bt_component_borrow_graph(upstream_comp);
9a2c8b8e
PP
357 set_msg_iterator_state(iterator,
358 BT_MESSAGE_ITERATOR_STATE_NON_INITIALIZED);
7474e7d3 359
41a3efcd
SM
360 /* Copy methods from the message iterator class to the message iterator. */
361 BT_ASSERT(bt_component_class_has_message_iterator_class(upstream_comp_cls));
362 upstream_comp_cls_with_iter_cls = container_of(upstream_comp_cls,
363 struct bt_component_class_with_iterator_class, parent);
364
365 iterator->methods.next =
9a2c8b8e 366 (bt_message_iterator_next_method)
41a3efcd
SM
367 upstream_comp_cls_with_iter_cls->msg_iter_cls->methods.next;
368 iterator->methods.seek_ns_from_origin =
9a2c8b8e 369 (bt_message_iterator_seek_ns_from_origin_method)
41a3efcd
SM
370 upstream_comp_cls_with_iter_cls->msg_iter_cls->methods.seek_ns_from_origin;
371 iterator->methods.seek_beginning =
9a2c8b8e 372 (bt_message_iterator_seek_beginning_method)
41a3efcd
SM
373 upstream_comp_cls_with_iter_cls->msg_iter_cls->methods.seek_beginning;
374 iterator->methods.can_seek_ns_from_origin =
9a2c8b8e 375 (bt_message_iterator_can_seek_ns_from_origin_method)
41a3efcd
SM
376 upstream_comp_cls_with_iter_cls->msg_iter_cls->methods.can_seek_ns_from_origin;
377 iterator->methods.can_seek_beginning =
9a2c8b8e 378 (bt_message_iterator_can_seek_beginning_method)
41a3efcd 379 upstream_comp_cls_with_iter_cls->msg_iter_cls->methods.can_seek_beginning;
7474e7d3
PP
380
381 if (iterator->methods.seek_ns_from_origin &&
382 !iterator->methods.can_seek_ns_from_origin) {
383 iterator->methods.can_seek_ns_from_origin =
9a2c8b8e 384 (bt_message_iterator_can_seek_ns_from_origin_method)
7474e7d3
PP
385 can_seek_ns_from_origin_true;
386 }
387
388 if (iterator->methods.seek_beginning &&
389 !iterator->methods.can_seek_beginning) {
390 iterator->methods.can_seek_beginning =
9a2c8b8e 391 (bt_message_iterator_can_seek_beginning_method)
7474e7d3
PP
392 can_seek_beginning_true;
393 }
394
41a3efcd
SM
395 /* Call iterator's init method. */
396 init_method = upstream_comp_cls_with_iter_cls->msg_iter_cls->methods.initialize;
d94d92ac
PP
397
398 if (init_method) {
a3f0c7db 399 enum bt_message_iterator_class_initialize_method_status iter_status;
d94d92ac
PP
400
401 BT_LIB_LOGD("Calling user's initialization method: %!+i", iterator);
a3f0c7db
SM
402 iter_status = init_method(
403 (struct bt_self_message_iterator *) iterator,
404 &iterator->config,
a3f0c7db 405 (struct bt_self_component_port_output *) upstream_port);
d94d92ac 406 BT_LOGD("User method returned: status=%s",
d24d5663 407 bt_common_func_status_string(iter_status));
6ecdcca3 408 BT_ASSERT_POST_NO_ERROR_IF_NO_ERROR_STATUS(iter_status);
d24d5663 409 if (iter_status != BT_FUNC_STATUS_OK) {
870631a2
PP
410 BT_LIB_LOGW_APPEND_CAUSE(
411 "Component input port message iterator initialization method failed: "
412 "%![iter-]+i, status=%s",
413 iterator,
414 bt_common_func_status_string(iter_status));
e803df70 415 status = iter_status;
870631a2 416 goto error;
d94d92ac 417 }
8d8b141d
SM
418
419 iterator->config.frozen = true;
d94d92ac
PP
420 }
421
ca02df0a
PP
422 if (downstream_msg_iter) {
423 /* Set this message iterator's downstream message iterator */
424 iterator->downstream_msg_iter = downstream_msg_iter;
425
426 /*
427 * Add this message iterator to the downstream message
428 * iterator's array of upstream message iterators.
429 */
430 g_ptr_array_add(downstream_msg_iter->upstream_msg_iters,
431 iterator);
432 }
433
9a2c8b8e
PP
434 set_msg_iterator_state(iterator,
435 BT_MESSAGE_ITERATOR_STATE_ACTIVE);
d94d92ac 436 g_ptr_array_add(port->connection->iterators, iterator);
3f7d4d90 437 BT_LIB_LOGI("Created message iterator on self component input port: "
d94d92ac
PP
438 "%![up-port-]+p, %![up-comp-]+c, %![iter-]+i",
439 upstream_port, upstream_comp, iterator);
e803df70
SM
440
441 *message_iterator = iterator;
442 status = BT_FUNC_STATUS_OK;
870631a2
PP
443 goto end;
444
445error:
446 BT_OBJECT_PUT_REF_AND_RESET(iterator);
d94d92ac
PP
447
448end:
e803df70 449 return status;
ea8d3e58
JG
450}
451
9a2c8b8e
PP
452bt_message_iterator_create_from_message_iterator_status
453bt_message_iterator_create_from_message_iterator(
ca02df0a 454 struct bt_self_message_iterator *self_msg_iter,
e803df70 455 struct bt_self_component_port_input *input_port,
9a2c8b8e 456 struct bt_message_iterator **message_iterator)
ca02df0a 457{
17f3083a 458 BT_ASSERT_PRE_NO_ERROR();
ca02df0a
PP
459 BT_ASSERT_PRE_NON_NULL(self_msg_iter, "Message iterator");
460 return create_self_component_input_port_message_iterator(self_msg_iter,
e803df70 461 input_port, message_iterator);
ca02df0a
PP
462}
463
9a2c8b8e
PP
464bt_message_iterator_create_from_sink_component_status
465bt_message_iterator_create_from_sink_component(
ca02df0a 466 struct bt_self_component_sink *self_comp,
e803df70 467 struct bt_self_component_port_input *input_port,
9a2c8b8e 468 struct bt_message_iterator **message_iterator)
ca02df0a 469{
17f3083a 470 BT_ASSERT_PRE_NO_ERROR();
ca02df0a
PP
471 BT_ASSERT_PRE_NON_NULL(self_comp, "Sink component");
472 return create_self_component_input_port_message_iterator(NULL,
e803df70 473 input_port, message_iterator);
ca02df0a
PP
474}
475
d6e69534
PP
476void *bt_self_message_iterator_get_data(
477 const struct bt_self_message_iterator *self_iterator)
ea8d3e58 478{
9a2c8b8e 479 struct bt_message_iterator *iterator =
d94d92ac 480 (void *) self_iterator;
ea8d3e58 481
bdb288b3 482 BT_ASSERT_PRE_DEV_NON_NULL(iterator, "Message iterator");
d94d92ac 483 return iterator->user_data;
8738a040 484}
413bc2c4 485
d6e69534
PP
486void bt_self_message_iterator_set_data(
487 struct bt_self_message_iterator *self_iterator, void *data)
5c563278 488{
9a2c8b8e 489 struct bt_message_iterator *iterator =
d94d92ac 490 (void *) self_iterator;
5c563278 491
bdb288b3 492 BT_ASSERT_PRE_DEV_NON_NULL(iterator, "Message iterator");
d94d92ac 493 iterator->user_data = data;
3f7d4d90 494 BT_LIB_LOGD("Set message iterator's user data: "
d94d92ac 495 "%!+i, user-data-addr=%p", iterator, data);
5c563278
PP
496}
497
8d8b141d
SM
498void bt_self_message_iterator_configuration_set_can_seek_forward(
499 bt_self_message_iterator_configuration *config,
500 bt_bool can_seek_forward)
501{
502 BT_ASSERT_PRE_NON_NULL(config, "Message iterator configuration");
503 BT_ASSERT_PRE_DEV_HOT(config, "Message iterator configuration", "");
504
505 config->can_seek_forward = can_seek_forward;
506}
507
54b135a0
SM
508/*
509 * Validate that the default clock snapshot in `msg` doesn't make us go back in
510 * time.
511 */
512
bdb288b3 513BT_ASSERT_POST_DEV_FUNC
54b135a0
SM
514static
515bool clock_snapshots_are_monotonic_one(
9a2c8b8e 516 struct bt_message_iterator *iterator,
54b135a0
SM
517 const bt_message *msg)
518{
519 const struct bt_clock_snapshot *clock_snapshot = NULL;
520 bt_message_type message_type = bt_message_get_type(msg);
521 int64_t ns_from_origin;
d24d5663 522 enum bt_clock_snapshot_get_ns_from_origin_status clock_snapshot_status;
54b135a0
SM
523
524 /*
525 * The default is true: if we can't figure out the clock snapshot
526 * (or there is none), assume it is fine.
527 */
528 bool result = true;
529
530 switch (message_type) {
531 case BT_MESSAGE_TYPE_EVENT:
532 {
533 struct bt_message_event *event_msg = (struct bt_message_event *) msg;
534 clock_snapshot = event_msg->default_cs;
535 break;
536 }
537 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
538 {
539 struct bt_message_message_iterator_inactivity *inactivity_msg =
540 (struct bt_message_message_iterator_inactivity *) msg;
60d02328 541 clock_snapshot = inactivity_msg->cs;
54b135a0
SM
542 break;
543 }
544 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
545 case BT_MESSAGE_TYPE_PACKET_END:
546 {
547 struct bt_message_packet *packet_msg = (struct bt_message_packet *) msg;
548 clock_snapshot = packet_msg->default_cs;
549 break;
550 }
188edac1
SM
551 case BT_MESSAGE_TYPE_STREAM_BEGINNING:
552 case BT_MESSAGE_TYPE_STREAM_END:
54b135a0 553 {
188edac1
SM
554 struct bt_message_stream *stream_msg = (struct bt_message_stream *) msg;
555 if (stream_msg->default_cs_state != BT_MESSAGE_STREAM_CLOCK_SNAPSHOT_STATE_KNOWN) {
556 goto end;
54b135a0 557 }
188edac1
SM
558
559 clock_snapshot = stream_msg->default_cs;
54b135a0
SM
560 break;
561 }
54b135a0
SM
562 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
563 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
564 {
565 struct bt_message_discarded_items *discarded_msg =
566 (struct bt_message_discarded_items *) msg;
567
568 clock_snapshot = discarded_msg->default_begin_cs;
569 break;
570 }
571 }
572
573 if (!clock_snapshot) {
574 goto end;
575 }
576
d747e85f
SM
577 clock_snapshot_status = bt_clock_snapshot_get_ns_from_origin(
578 clock_snapshot, &ns_from_origin);
d24d5663 579 if (clock_snapshot_status != BT_FUNC_STATUS_OK) {
d747e85f
SM
580 /*
581 * bt_clock_snapshot_get_ns_from_origin can return
582 * OVERFLOW_ERROR. We don't really want to report an error to
583 * our caller, so just clear it.
584 */
585 bt_current_thread_clear_error();
54b135a0
SM
586 goto end;
587 }
588
589 result = ns_from_origin >= iterator->last_ns_from_origin;
590 iterator->last_ns_from_origin = ns_from_origin;
591end:
592 return result;
593}
594
bdb288b3 595BT_ASSERT_POST_DEV_FUNC
54b135a0
SM
596static
597bool clock_snapshots_are_monotonic(
9a2c8b8e 598 struct bt_message_iterator *iterator,
54b135a0
SM
599 bt_message_array_const msgs, uint64_t msg_count)
600{
601 uint64_t i;
602 bool result;
603
604 for (i = 0; i < msg_count; i++) {
605 if (!clock_snapshots_are_monotonic_one(iterator, msgs[i])) {
606 result = false;
607 goto end;
608 }
609 }
610
611 result = true;
612
613end:
614 return result;
615}
616
617/*
618 * When a new stream begins, verify that the clock class tied to this
619 * stream is compatible with what we've seen before.
620 */
621
bdb288b3 622BT_ASSERT_POST_DEV_FUNC
54b135a0 623static
9a2c8b8e 624bool clock_classes_are_compatible_one(struct bt_message_iterator *iterator,
54b135a0
SM
625 const struct bt_message *msg)
626{
627 enum bt_message_type message_type = bt_message_get_type(msg);
628 bool result;
629
630 if (message_type == BT_MESSAGE_TYPE_STREAM_BEGINNING) {
631 const struct bt_message_stream *stream_msg = (struct bt_message_stream *) msg;
632 const struct bt_clock_class *clock_class = stream_msg->stream->class->default_clock_class;
633 bt_uuid clock_class_uuid = NULL;
634
635 if (clock_class) {
636 clock_class_uuid = bt_clock_class_get_uuid(clock_class);
637 }
638
639 switch (iterator->clock_expectation.type) {
640 case CLOCK_EXPECTATION_UNSET:
641 /*
642 * This is the first time we see a message with a clock
643 * snapshot: record the properties of that clock, against
644 * which we'll compare the clock properties of the following
645 * messages.
646 */
647
648 if (!clock_class) {
649 iterator->clock_expectation.type = CLOCK_EXPECTATION_NONE;
650 } else if (bt_clock_class_origin_is_unix_epoch(clock_class)) {
651 iterator->clock_expectation.type = CLOCK_EXPECTATION_ORIGIN_UNIX;
652 } else if (clock_class_uuid) {
653 iterator->clock_expectation.type = CLOCK_EXPECTATION_ORIGIN_OTHER_UUID;
6162e6b7 654 bt_uuid_copy(iterator->clock_expectation.uuid, clock_class_uuid);
54b135a0
SM
655 } else {
656 iterator->clock_expectation.type = CLOCK_EXPECTATION_ORIGIN_OTHER_NO_UUID;
657 }
658 break;
659
660 case CLOCK_EXPECTATION_NONE:
661 if (clock_class) {
bdb288b3
PP
662 BT_ASSERT_POST_DEV_MSG(
663 "Expecting no clock class, got one: %![cc-]+K",
54b135a0
SM
664 clock_class);
665 result = false;
666 goto end;
667 }
668
669 break;
670
671 case CLOCK_EXPECTATION_ORIGIN_UNIX:
672 if (!clock_class) {
bdb288b3
PP
673 BT_ASSERT_POST_DEV_MSG(
674 "Expecting a clock class, got none.");
54b135a0
SM
675 result = false;
676 goto end;
677 }
678
679 if (!bt_clock_class_origin_is_unix_epoch(clock_class)) {
bdb288b3
PP
680 BT_ASSERT_POST_DEV_MSG(
681 "Expecting a clock class with Unix epoch origin: %![cc-]+K",
54b135a0
SM
682 clock_class);
683 result = false;
684 goto end;
685 }
686 break;
687
688 case CLOCK_EXPECTATION_ORIGIN_OTHER_UUID:
689 if (!clock_class) {
bdb288b3
PP
690 BT_ASSERT_POST_DEV_MSG(
691 "Expecting a clock class, got none.");
54b135a0
SM
692 result = false;
693 goto end;
694 }
695
696 if (bt_clock_class_origin_is_unix_epoch(clock_class)) {
bdb288b3
PP
697 BT_ASSERT_POST_DEV_MSG(
698 "Expecting a clock class without Unix epoch origin: %![cc-]+K",
54b135a0
SM
699 clock_class);
700 result = false;
701 goto end;
702 }
703
704 if (!clock_class_uuid) {
bdb288b3
PP
705 BT_ASSERT_POST_DEV_MSG(
706 "Expecting a clock class with UUID: %![cc-]+K",
54b135a0
SM
707 clock_class);
708 result = false;
709 goto end;
710 }
711
712 if (bt_uuid_compare(iterator->clock_expectation.uuid, clock_class_uuid)) {
bdb288b3
PP
713 BT_ASSERT_POST_DEV_MSG(
714 "Expecting a clock class with UUID, got one "
54b135a0
SM
715 "with a different UUID: %![cc-]+K, expected-uuid=%!u",
716 clock_class, iterator->clock_expectation.uuid);
717 result = false;
718 goto end;
719 }
720 break;
721
722 case CLOCK_EXPECTATION_ORIGIN_OTHER_NO_UUID:
723 if (!clock_class) {
bdb288b3
PP
724 BT_ASSERT_POST_DEV_MSG(
725 "Expecting a clock class, got none.");
54b135a0
SM
726 result = false;
727 goto end;
728 }
729
730 if (bt_clock_class_origin_is_unix_epoch(clock_class)) {
bdb288b3
PP
731 BT_ASSERT_POST_DEV_MSG(
732 "Expecting a clock class without Unix epoch origin: %![cc-]+K",
54b135a0
SM
733 clock_class);
734 result = false;
735 goto end;
736 }
737
738 if (clock_class_uuid) {
bdb288b3
PP
739 BT_ASSERT_POST_DEV_MSG(
740 "Expecting a clock class without UUID: %![cc-]+K",
54b135a0
SM
741 clock_class);
742 result = false;
743 goto end;
744 }
745 break;
746 }
747 }
748
749 result = true;
750
751end:
752 return result;
753}
754
bdb288b3 755BT_ASSERT_POST_DEV_FUNC
54b135a0
SM
756static
757bool clock_classes_are_compatible(
9a2c8b8e 758 struct bt_message_iterator *iterator,
54b135a0
SM
759 bt_message_array_const msgs, uint64_t msg_count)
760{
761 uint64_t i;
762 bool result;
763
764 for (i = 0; i < msg_count; i++) {
765 if (!clock_classes_are_compatible_one(iterator, msgs[i])) {
766 result = false;
767 goto end;
768 }
769 }
770
771 result = true;
772
773end:
774 return result;
775}
776
777/*
778 * Call the `next` method of the iterator. Do some validation on the returned
779 * messages.
780 */
781
782static
a3f0c7db 783enum bt_message_iterator_class_next_method_status
d24d5663 784call_iterator_next_method(
9a2c8b8e 785 struct bt_message_iterator *iterator,
54b135a0
SM
786 bt_message_array_const msgs, uint64_t capacity, uint64_t *user_count)
787{
a3f0c7db 788 enum bt_message_iterator_class_next_method_status status;
54b135a0 789
98b15851 790 BT_ASSERT_DBG(iterator->methods.next);
54b135a0 791 BT_LOGD_STR("Calling user's \"next\" method.");
54b135a0 792 status = iterator->methods.next(iterator, msgs, capacity, user_count);
f6f301d7 793 BT_LOGD("User method returned: status=%s, msg-count=%" PRIu64,
d24d5663 794 bt_common_func_status_string(status), *user_count);
54b135a0 795
d24d5663 796 if (status == BT_FUNC_STATUS_OK) {
bdb288b3 797 BT_ASSERT_POST_DEV(clock_classes_are_compatible(iterator, msgs, *user_count),
54b135a0 798 "Clocks are not compatible");
bdb288b3 799 BT_ASSERT_POST_DEV(clock_snapshots_are_monotonic(iterator, msgs, *user_count),
54b135a0
SM
800 "Clock snapshots are not monotonic");
801 }
802
6ecdcca3
SM
803 BT_ASSERT_POST_DEV_NO_ERROR_IF_NO_ERROR_STATUS(status);
804
54b135a0
SM
805 return status;
806}
807
d24d5663 808enum bt_message_iterator_next_status
9a2c8b8e
PP
809bt_message_iterator_next(
810 struct bt_message_iterator *iterator,
d6e69534 811 bt_message_array_const *msgs, uint64_t *user_count)
3230ee6b 812{
d24d5663 813 enum bt_message_iterator_next_status status = BT_FUNC_STATUS_OK;
d94d92ac 814
17f3083a 815 BT_ASSERT_PRE_DEV_NO_ERROR();
bdb288b3
PP
816 BT_ASSERT_PRE_DEV_NON_NULL(iterator, "Message iterator");
817 BT_ASSERT_PRE_DEV_NON_NULL(msgs, "Message array (output)");
818 BT_ASSERT_PRE_DEV_NON_NULL(user_count, "Message count (output)");
819 BT_ASSERT_PRE_DEV(iterator->state ==
9a2c8b8e 820 BT_MESSAGE_ITERATOR_STATE_ACTIVE,
d6e69534 821 "Message iterator's \"next\" called, but "
7474e7d3 822 "message iterator is in the wrong state: %!+i", iterator);
98b15851
PP
823 BT_ASSERT_DBG(iterator->upstream_component);
824 BT_ASSERT_DBG(iterator->upstream_component->class);
bdb288b3 825 BT_ASSERT_PRE_DEV(
5badd463
PP
826 bt_component_borrow_graph(iterator->upstream_component)->config_state !=
827 BT_GRAPH_CONFIGURATION_STATE_CONFIGURING,
4725a201
PP
828 "Graph is not configured: %!+g",
829 bt_component_borrow_graph(iterator->upstream_component));
d94d92ac 830 BT_LIB_LOGD("Getting next self component input port "
3f7d4d90
PP
831 "message iterator's messages: %!+i, batch-size=%u",
832 iterator, MSG_BATCH_SIZE);
d3eb6e8f 833
3230ee6b 834 /*
d6e69534 835 * Call the user's "next" method to get the next messages
fa054faf 836 * and status.
3230ee6b 837 */
3f7d4d90 838 *user_count = 0;
d24d5663 839 status = (int) call_iterator_next_method(iterator,
6c373cc9 840 (void *) iterator->msgs->pdata, MSG_BATCH_SIZE,
7474e7d3 841 user_count);
870631a2
PP
842 BT_LOGD("User method returned: status=%s, msg-count=%" PRIu64,
843 bt_common_func_status_string(status), *user_count);
d4393e08 844 if (status < 0) {
870631a2
PP
845 BT_LIB_LOGW_APPEND_CAUSE(
846 "Component input port message iterator's \"next\" method failed: "
847 "%![iter-]+i, status=%s",
848 iterator, bt_common_func_status_string(status));
f42867e2
PP
849 goto end;
850 }
3230ee6b 851
d0fea130
PP
852 /*
853 * There is no way that this iterator could have been finalized
854 * during its "next" method, as the only way to do this is to
855 * put the last iterator's reference, and this can only be done
856 * by its downstream owner.
7474e7d3
PP
857 *
858 * For the same reason, there is no way that this iterator could
859 * have seeked (cannot seek a self message iterator).
d0fea130 860 */
98b15851 861 BT_ASSERT_DBG(iterator->state ==
9a2c8b8e 862 BT_MESSAGE_ITERATOR_STATE_ACTIVE);
8cf27cc5 863
d4393e08 864 switch (status) {
d24d5663 865 case BT_FUNC_STATUS_OK:
bdb288b3 866 BT_ASSERT_POST_DEV(*user_count <= MSG_BATCH_SIZE,
7474e7d3
PP
867 "Invalid returned message count: greater than "
868 "batch size: count=%" PRIu64 ", batch-size=%u",
869 *user_count, MSG_BATCH_SIZE);
6c373cc9 870 *msgs = (void *) iterator->msgs->pdata;
d4393e08 871 break;
d24d5663 872 case BT_FUNC_STATUS_AGAIN:
d4393e08 873 goto end;
d24d5663 874 case BT_FUNC_STATUS_END:
9a2c8b8e
PP
875 set_msg_iterator_state(iterator,
876 BT_MESSAGE_ITERATOR_STATE_ENDED);
f42867e2 877 goto end;
f42867e2
PP
878 default:
879 /* Unknown non-error status */
498e7994 880 bt_common_abort();
41a2b7ae
PP
881 }
882
883end:
3230ee6b
PP
884 return status;
885}
886
7474e7d3 887struct bt_component *
9a2c8b8e
PP
888bt_message_iterator_borrow_component(
889 struct bt_message_iterator *iterator)
d94d92ac 890{
bdb288b3 891 BT_ASSERT_PRE_DEV_NON_NULL(iterator, "Message iterator");
d94d92ac
PP
892 return iterator->upstream_component;
893}
894
d6e69534
PP
895struct bt_self_component *bt_self_message_iterator_borrow_component(
896 struct bt_self_message_iterator *self_iterator)
413bc2c4 897{
9a2c8b8e 898 struct bt_message_iterator *iterator =
d94d92ac 899 (void *) self_iterator;
90157d89 900
bdb288b3 901 BT_ASSERT_PRE_DEV_NON_NULL(iterator, "Message iterator");
d94d92ac 902 return (void *) iterator->upstream_component;
413bc2c4
JG
903}
904
50e763f6 905struct bt_self_component_port_output *bt_self_message_iterator_borrow_port(
d6e69534 906 struct bt_self_message_iterator *self_iterator)
91457551 907{
9a2c8b8e 908 struct bt_message_iterator *iterator =
d94d92ac
PP
909 (void *) self_iterator;
910
bdb288b3 911 BT_ASSERT_PRE_DEV_NON_NULL(iterator, "Message iterator");
d94d92ac 912 return (void *) iterator->upstream_port;
91457551 913}
8ed535b5 914
f2fb1b32 915enum bt_message_iterator_can_seek_ns_from_origin_status
9a2c8b8e
PP
916bt_message_iterator_can_seek_ns_from_origin(
917 struct bt_message_iterator *iterator,
f2fb1b32 918 int64_t ns_from_origin, bt_bool *can_seek)
7474e7d3 919{
f2fb1b32 920 enum bt_message_iterator_can_seek_ns_from_origin_status status;
7474e7d3 921
17f3083a 922 BT_ASSERT_PRE_NO_ERROR();
7474e7d3 923 BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator");
f2fb1b32 924 BT_ASSERT_PRE_NON_NULL(can_seek, "Result (output)");
7474e7d3 925 BT_ASSERT_PRE_ITER_HAS_STATE_TO_SEEK(iterator);
5badd463
PP
926 BT_ASSERT_PRE(
927 bt_component_borrow_graph(iterator->upstream_component)->config_state !=
928 BT_GRAPH_CONFIGURATION_STATE_CONFIGURING,
7474e7d3
PP
929 "Graph is not configured: %!+g",
930 bt_component_borrow_graph(iterator->upstream_component));
931
932 if (iterator->methods.can_seek_ns_from_origin) {
f2fb1b32
SM
933 /*
934 * Initialize to an invalid value, so we can post-assert that
935 * the method returned a valid value.
936 */
937 *can_seek = -1;
938
c0e46a7c
SM
939 BT_LIB_LOGD("Calling user's \"can seek nanoseconds from origin\" method: %!+i",
940 iterator);
941
f2fb1b32
SM
942 status = (int) iterator->methods.can_seek_ns_from_origin(iterator,
943 ns_from_origin, can_seek);
944
6ecdcca3
SM
945 BT_ASSERT_POST_NO_ERROR_IF_NO_ERROR_STATUS(status);
946
c0e46a7c
SM
947 if (status != BT_FUNC_STATUS_OK) {
948 BT_LIB_LOGW_APPEND_CAUSE(
949 "Component input port message iterator's \"can seek nanoseconds from origin\" method failed: "
950 "%![iter-]+i, status=%s",
951 iterator, bt_common_func_status_string(status));
952 goto end;
953 }
954
955 BT_ASSERT_POST(*can_seek == BT_TRUE || *can_seek == BT_FALSE,
f2fb1b32
SM
956 "Unexpected boolean value returned from user's \"can seek ns from origin\" method: val=%d, %![iter-]+i",
957 *can_seek, iterator);
958
c0e46a7c
SM
959 BT_LIB_LOGD(
960 "User's \"can seek nanoseconds from origin\" returned successfully: "
961 "%![iter-]+i, can-seek=%d",
962 iterator, *can_seek);
963
964 if (*can_seek) {
965 goto end;
966 }
7474e7d3
PP
967 }
968
969 /*
c0e46a7c
SM
970 * Automatic seeking fall back: if we can seek to the beginning and the
971 * iterator supports forward seeking then we can automatically seek to
972 * any timestamp.
7474e7d3 973 */
9a2c8b8e 974 status = (int) bt_message_iterator_can_seek_beginning(
f2fb1b32 975 iterator, can_seek);
c0e46a7c
SM
976 if (status != BT_FUNC_STATUS_OK) {
977 goto end;
978 }
979
980 *can_seek = *can_seek && iterator->config.can_seek_forward;
7474e7d3
PP
981
982end:
f2fb1b32 983 return status;
7474e7d3
PP
984}
985
f2fb1b32 986enum bt_message_iterator_can_seek_beginning_status
9a2c8b8e
PP
987bt_message_iterator_can_seek_beginning(
988 struct bt_message_iterator *iterator,
f2fb1b32 989 bt_bool *can_seek)
7474e7d3 990{
f2fb1b32 991 enum bt_message_iterator_can_seek_beginning_status status;
7474e7d3 992
17f3083a 993 BT_ASSERT_PRE_NO_ERROR();
7474e7d3 994 BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator");
f2fb1b32 995 BT_ASSERT_PRE_NON_NULL(can_seek, "Result (output)");
7474e7d3 996 BT_ASSERT_PRE_ITER_HAS_STATE_TO_SEEK(iterator);
5badd463
PP
997 BT_ASSERT_PRE(
998 bt_component_borrow_graph(iterator->upstream_component)->config_state !=
999 BT_GRAPH_CONFIGURATION_STATE_CONFIGURING,
7474e7d3
PP
1000 "Graph is not configured: %!+g",
1001 bt_component_borrow_graph(iterator->upstream_component));
1002
1003 if (iterator->methods.can_seek_beginning) {
f2fb1b32
SM
1004 /*
1005 * Initialize to an invalid value, so we can post-assert that
1006 * the method returned a valid value.
1007 */
1008 *can_seek = -1;
1009
1010 status = (int) iterator->methods.can_seek_beginning(iterator, can_seek);
1011
1012 BT_ASSERT_POST(
1013 status != BT_FUNC_STATUS_OK ||
1014 *can_seek == BT_TRUE ||
1015 *can_seek == BT_FALSE,
1016 "Unexpected boolean value returned from user's \"can seek beginning\" method: val=%d, %![iter-]+i",
1017 *can_seek, iterator);
6ecdcca3 1018 BT_ASSERT_POST_NO_ERROR_IF_NO_ERROR_STATUS(status);
f2fb1b32
SM
1019 } else {
1020 *can_seek = BT_FALSE;
1021 status = BT_FUNC_STATUS_OK;
7474e7d3
PP
1022 }
1023
f2fb1b32 1024 return status;
7474e7d3
PP
1025}
1026
1027static inline
003e713f 1028void set_iterator_state_after_seeking(
9a2c8b8e 1029 struct bt_message_iterator *iterator,
d24d5663 1030 int status)
7474e7d3 1031{
9a2c8b8e 1032 enum bt_message_iterator_state new_state = 0;
7474e7d3
PP
1033
1034 /* Set iterator's state depending on seeking status */
1035 switch (status) {
d24d5663 1036 case BT_FUNC_STATUS_OK:
9a2c8b8e 1037 new_state = BT_MESSAGE_ITERATOR_STATE_ACTIVE;
7474e7d3 1038 break;
d24d5663 1039 case BT_FUNC_STATUS_AGAIN:
9a2c8b8e 1040 new_state = BT_MESSAGE_ITERATOR_STATE_LAST_SEEKING_RETURNED_AGAIN;
7474e7d3 1041 break;
d24d5663
PP
1042 case BT_FUNC_STATUS_ERROR:
1043 case BT_FUNC_STATUS_MEMORY_ERROR:
9a2c8b8e 1044 new_state = BT_MESSAGE_ITERATOR_STATE_LAST_SEEKING_RETURNED_ERROR;
7474e7d3 1045 break;
d24d5663 1046 case BT_FUNC_STATUS_END:
9a2c8b8e 1047 new_state = BT_MESSAGE_ITERATOR_STATE_ENDED;
7474e7d3
PP
1048 break;
1049 default:
498e7994 1050 bt_common_abort();
7474e7d3
PP
1051 }
1052
9a2c8b8e 1053 set_msg_iterator_state(iterator, new_state);
7474e7d3
PP
1054}
1055
54b135a0
SM
1056static
1057void reset_iterator_expectations(
9a2c8b8e 1058 struct bt_message_iterator *iterator)
54b135a0
SM
1059{
1060 iterator->last_ns_from_origin = INT64_MIN;
1061 iterator->clock_expectation.type = CLOCK_EXPECTATION_UNSET;
1062}
1063
f2fb1b32
SM
1064static
1065bool message_iterator_can_seek_beginning(
9a2c8b8e 1066 struct bt_message_iterator *iterator)
f2fb1b32
SM
1067{
1068 enum bt_message_iterator_can_seek_beginning_status status;
1069 bt_bool can_seek;
1070
9a2c8b8e 1071 status = bt_message_iterator_can_seek_beginning(
f2fb1b32
SM
1072 iterator, &can_seek);
1073 if (status != BT_FUNC_STATUS_OK) {
1074 can_seek = BT_FALSE;
1075 }
1076
1077 return can_seek;
1078}
1079
d24d5663 1080enum bt_message_iterator_seek_beginning_status
9a2c8b8e
PP
1081bt_message_iterator_seek_beginning(
1082 struct bt_message_iterator *iterator)
7474e7d3 1083{
9275bef4 1084 int status;
7474e7d3 1085
17f3083a 1086 BT_ASSERT_PRE_NO_ERROR();
7474e7d3
PP
1087 BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator");
1088 BT_ASSERT_PRE_ITER_HAS_STATE_TO_SEEK(iterator);
5badd463
PP
1089 BT_ASSERT_PRE(
1090 bt_component_borrow_graph(iterator->upstream_component)->config_state !=
1091 BT_GRAPH_CONFIGURATION_STATE_CONFIGURING,
7474e7d3
PP
1092 "Graph is not configured: %!+g",
1093 bt_component_borrow_graph(iterator->upstream_component));
f2fb1b32 1094 BT_ASSERT_PRE(message_iterator_can_seek_beginning(iterator),
7474e7d3 1095 "Message iterator cannot seek beginning: %!+i", iterator);
54b135a0
SM
1096
1097 /*
1098 * We are seeking, reset our expectations about how the following
1099 * messages should look like.
1100 */
1101 reset_iterator_expectations(iterator);
1102
7474e7d3 1103 BT_LIB_LOGD("Calling user's \"seek beginning\" method: %!+i", iterator);
9a2c8b8e
PP
1104 set_msg_iterator_state(iterator,
1105 BT_MESSAGE_ITERATOR_STATE_SEEKING);
7474e7d3
PP
1106 status = iterator->methods.seek_beginning(iterator);
1107 BT_LOGD("User method returned: status=%s",
d24d5663
PP
1108 bt_common_func_status_string(status));
1109 BT_ASSERT_POST(status == BT_FUNC_STATUS_OK ||
1110 status == BT_FUNC_STATUS_ERROR ||
1111 status == BT_FUNC_STATUS_MEMORY_ERROR ||
1112 status == BT_FUNC_STATUS_AGAIN,
7474e7d3 1113 "Unexpected status: %![iter-]+i, status=%s",
d24d5663 1114 iterator, bt_common_func_status_string(status));
6ecdcca3 1115 BT_ASSERT_POST_NO_ERROR_IF_NO_ERROR_STATUS(status);
870631a2
PP
1116 if (status < 0) {
1117 BT_LIB_LOGW_APPEND_CAUSE(
1118 "Component input port message iterator's \"seek beginning\" method failed: "
1119 "%![iter-]+i, status=%s",
1120 iterator, bt_common_func_status_string(status));
1121 }
1122
7474e7d3
PP
1123 set_iterator_state_after_seeking(iterator, status);
1124 return status;
1125}
1126
8d8b141d 1127bt_bool
9a2c8b8e
PP
1128bt_message_iterator_can_seek_forward(
1129 bt_message_iterator *iterator)
8d8b141d
SM
1130{
1131 BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator");
1132
1133 return iterator->config.can_seek_forward;
1134}
1135
5b7b55be
SM
1136/*
1137 * Structure used to record the state of a given stream during the fast-forward
1138 * phase of an auto-seek.
1139 */
1140struct auto_seek_stream_state {
1141 /*
1142 * Value representing which step of this timeline we are at.
1143 *
1144 * time --->
188edac1 1145 * [SB] 1 [PB] 2 [PE] 1 [SE]
5b7b55be
SM
1146 *
1147 * At each point in the timeline, the messages we need to replicate are:
1148 *
1149 * 1: Stream beginning
188edac1 1150 * 2: Stream beginning, packet beginning
5b7b55be
SM
1151 *
1152 * Before "Stream beginning" and after "Stream end", we don't need to
1153 * replicate anything as the stream doesn't exist.
1154 */
1155 enum {
1156 AUTO_SEEK_STREAM_STATE_STREAM_BEGAN,
5b7b55be
SM
1157 AUTO_SEEK_STREAM_STATE_PACKET_BEGAN,
1158 } state;
1159
1160 /*
1161 * If `state` is AUTO_SEEK_STREAM_STATE_PACKET_BEGAN, the packet we are
1162 * in. This is a weak reference, since the packet will always be
1163 * alive by the time we use it.
1164 */
1165 struct bt_packet *packet;
188edac1
SM
1166
1167 /* Have we see a message with a clock snapshot yet? */
1168 bool seen_clock_snapshot;
5b7b55be
SM
1169};
1170
1171static
1172struct auto_seek_stream_state *create_auto_seek_stream_state(void)
1173{
1174 return g_new0(struct auto_seek_stream_state, 1);
1175}
1176
1177static
1178void destroy_auto_seek_stream_state(void *ptr)
1179{
1180 g_free(ptr);
1181}
1182
1183static
1184GHashTable *create_auto_seek_stream_states(void)
1185{
1186 return g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL,
1187 destroy_auto_seek_stream_state);
1188}
1189
1190static
1191void destroy_auto_seek_stream_states(GHashTable *stream_states)
1192{
1193 g_hash_table_destroy(stream_states);
1194}
1195
1196/*
1197 * Handle one message while we are in the fast-forward phase of an auto-seek.
1198 *
1199 * Sets `*got_first` to true if the message's timestamp is greater or equal to
1200 * `ns_from_origin`. In other words, if this is the first message after our
1201 * seek point.
1202 *
1203 * `stream_states` is an hash table of `bt_stream *` (weak reference) to
1204 * `struct auto_seek_stream_state` used to keep the state of each stream
1205 * during the fast-forward.
1206 */
1207
7474e7d3 1208static inline
d24d5663 1209int auto_seek_handle_message(
9a2c8b8e 1210 struct bt_message_iterator *iterator,
5b9e151d 1211 int64_t ns_from_origin, const struct bt_message *msg,
5b7b55be 1212 bool *got_first, GHashTable *stream_states)
7474e7d3 1213{
d24d5663 1214 int status = BT_FUNC_STATUS_OK;
5b9e151d 1215 int64_t msg_ns_from_origin;
7474e7d3 1216 const struct bt_clock_snapshot *clk_snapshot = NULL;
5b9e151d
PP
1217 int ret;
1218
98b15851
PP
1219 BT_ASSERT_DBG(msg);
1220 BT_ASSERT_DBG(got_first);
7474e7d3
PP
1221
1222 switch (msg->type) {
1223 case BT_MESSAGE_TYPE_EVENT:
1224 {
1225 const struct bt_message_event *event_msg =
1226 (const void *) msg;
1227
2c091c04 1228 clk_snapshot = event_msg->default_cs;
bdb288b3 1229 BT_ASSERT_POST_DEV(clk_snapshot,
c7072d5a
PP
1230 "Event message has no default clock snapshot: %!+n",
1231 event_msg);
7474e7d3
PP
1232 break;
1233 }
b9fd9cbb 1234 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
7474e7d3 1235 {
b9fd9cbb 1236 const struct bt_message_message_iterator_inactivity *inactivity_msg =
7474e7d3
PP
1237 (const void *) msg;
1238
60d02328 1239 clk_snapshot = inactivity_msg->cs;
98b15851 1240 BT_ASSERT_DBG(clk_snapshot);
7474e7d3
PP
1241 break;
1242 }
16663a5e
PP
1243 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
1244 case BT_MESSAGE_TYPE_PACKET_END:
c7072d5a
PP
1245 {
1246 const struct bt_message_packet *packet_msg =
1247 (const void *) msg;
1248
1249 clk_snapshot = packet_msg->default_cs;
bdb288b3 1250 BT_ASSERT_POST_DEV(clk_snapshot,
c7072d5a
PP
1251 "Packet message has no default clock snapshot: %!+n",
1252 packet_msg);
1253 break;
1254 }
16663a5e
PP
1255 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
1256 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
7474e7d3 1257 {
5b9e151d
PP
1258 struct bt_message_discarded_items *msg_disc_items =
1259 (void *) msg;
1260
bdb288b3 1261 BT_ASSERT_POST_DEV(msg_disc_items->default_begin_cs &&
5b9e151d
PP
1262 msg_disc_items->default_end_cs,
1263 "Discarded events/packets message has no default clock snapshots: %!+n",
1264 msg_disc_items);
1265 ret = bt_clock_snapshot_get_ns_from_origin(
1266 msg_disc_items->default_begin_cs,
1267 &msg_ns_from_origin);
1268 if (ret) {
d24d5663 1269 status = BT_FUNC_STATUS_ERROR;
5b9e151d
PP
1270 goto end;
1271 }
7474e7d3 1272
5b9e151d
PP
1273 if (msg_ns_from_origin >= ns_from_origin) {
1274 *got_first = true;
1275 goto push_msg;
1276 }
1277
1278 ret = bt_clock_snapshot_get_ns_from_origin(
1279 msg_disc_items->default_end_cs,
1280 &msg_ns_from_origin);
1281 if (ret) {
d24d5663 1282 status = BT_FUNC_STATUS_ERROR;
5b9e151d
PP
1283 goto end;
1284 }
1285
1286 if (msg_ns_from_origin >= ns_from_origin) {
1287 /*
1288 * The discarded items message's beginning time
1289 * is before the requested seeking time, but its
1290 * end time is after. Modify the message so as
1291 * to set its beginning time to the requested
1292 * seeking time, and make its item count unknown
1293 * as we don't know if items were really
1294 * discarded within the new time range.
1295 */
4af85094 1296 uint64_t new_begin_raw_value = 0;
5b9e151d
PP
1297
1298 ret = bt_clock_class_clock_value_from_ns_from_origin(
1299 msg_disc_items->default_end_cs->clock_class,
1300 ns_from_origin, &new_begin_raw_value);
1301 if (ret) {
d24d5663 1302 status = BT_FUNC_STATUS_ERROR;
5b9e151d
PP
1303 goto end;
1304 }
1305
1306 bt_clock_snapshot_set_raw_value(
1307 msg_disc_items->default_begin_cs,
1308 new_begin_raw_value);
1309 msg_disc_items->count.base.avail =
1310 BT_PROPERTY_AVAILABILITY_NOT_AVAILABLE;
1311
1312 /*
1313 * It is safe to push it because its beginning
1314 * time is exactly the requested seeking time.
1315 */
1316 goto push_msg;
1317 } else {
1318 goto skip_msg;
1319 }
7474e7d3 1320 }
188edac1
SM
1321 case BT_MESSAGE_TYPE_STREAM_BEGINNING:
1322 case BT_MESSAGE_TYPE_STREAM_END:
7474e7d3 1323 {
188edac1
SM
1324 struct bt_message_stream *stream_msg =
1325 (struct bt_message_stream *) msg;
7474e7d3 1326
188edac1
SM
1327 if (stream_msg->default_cs_state != BT_MESSAGE_STREAM_CLOCK_SNAPSHOT_STATE_KNOWN) {
1328 /* Ignore */
5b9e151d 1329 goto skip_msg;
16663a5e
PP
1330 }
1331
188edac1 1332 clk_snapshot = stream_msg->default_cs;
7474e7d3
PP
1333 break;
1334 }
1335 default:
498e7994 1336 bt_common_abort();
7474e7d3
PP
1337 }
1338
98b15851 1339 BT_ASSERT_DBG(clk_snapshot);
5b9e151d
PP
1340 ret = bt_clock_snapshot_get_ns_from_origin(clk_snapshot,
1341 &msg_ns_from_origin);
1342 if (ret) {
d24d5663 1343 status = BT_FUNC_STATUS_ERROR;
7474e7d3
PP
1344 goto end;
1345 }
1346
5b9e151d
PP
1347 if (msg_ns_from_origin >= ns_from_origin) {
1348 *got_first = true;
1349 goto push_msg;
1350 }
1351
1352skip_msg:
5b7b55be
SM
1353 /* This message won't be sent downstream. */
1354 switch (msg->type) {
1355 case BT_MESSAGE_TYPE_STREAM_BEGINNING:
1356 {
1357 const struct bt_message_stream *stream_msg = (const void *) msg;
1358 struct auto_seek_stream_state *stream_state;
5b7b55be
SM
1359
1360 /* Update stream's state: stream began. */
1361 stream_state = create_auto_seek_stream_state();
1362 if (!stream_state) {
d24d5663 1363 status = BT_FUNC_STATUS_MEMORY_ERROR;
5b7b55be
SM
1364 goto end;
1365 }
1366
1367 stream_state->state = AUTO_SEEK_STREAM_STATE_STREAM_BEGAN;
e74dbb33 1368
188edac1
SM
1369 if (stream_msg->default_cs_state == BT_MESSAGE_STREAM_CLOCK_SNAPSHOT_STATE_KNOWN) {
1370 stream_state->seen_clock_snapshot = true;
1371 }
1372
98b15851 1373 BT_ASSERT_DBG(!bt_g_hash_table_contains(stream_states, stream_msg->stream));
e74dbb33 1374 g_hash_table_insert(stream_states, stream_msg->stream, stream_state);
5b7b55be
SM
1375 break;
1376 }
188edac1 1377 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
5b7b55be 1378 {
188edac1 1379 const struct bt_message_packet *packet_msg =
5b7b55be
SM
1380 (const void *) msg;
1381 struct auto_seek_stream_state *stream_state;
1382
188edac1
SM
1383 /* Update stream's state: packet began. */
1384 stream_state = g_hash_table_lookup(stream_states, packet_msg->packet->stream);
98b15851
PP
1385 BT_ASSERT_DBG(stream_state);
1386 BT_ASSERT_DBG(stream_state->state == AUTO_SEEK_STREAM_STATE_STREAM_BEGAN);
188edac1 1387 stream_state->state = AUTO_SEEK_STREAM_STATE_PACKET_BEGAN;
98b15851 1388 BT_ASSERT_DBG(!stream_state->packet);
188edac1
SM
1389 stream_state->packet = packet_msg->packet;
1390
1391 if (packet_msg->packet->stream->class->packets_have_beginning_default_clock_snapshot) {
1392 stream_state->seen_clock_snapshot = true;
1393 }
1394
5b7b55be
SM
1395 break;
1396 }
188edac1 1397 case BT_MESSAGE_TYPE_EVENT:
5b7b55be 1398 {
188edac1 1399 const struct bt_message_event *event_msg = (const void *) msg;
5b7b55be
SM
1400 struct auto_seek_stream_state *stream_state;
1401
188edac1 1402 stream_state = g_hash_table_lookup(stream_states,
03cc4222 1403 event_msg->event->stream);
98b15851 1404 BT_ASSERT_DBG(stream_state);
5b7b55be 1405
188edac1
SM
1406 // HELPME: are we sure that event messages have clock snapshots at this point?
1407 stream_state->seen_clock_snapshot = true;
1408
5b7b55be
SM
1409 break;
1410 }
1411 case BT_MESSAGE_TYPE_PACKET_END:
1412 {
1413 const struct bt_message_packet *packet_msg =
1414 (const void *) msg;
1415 struct auto_seek_stream_state *stream_state;
1416
1417 /* Update stream's state: packet ended. */
1418 stream_state = g_hash_table_lookup(stream_states, packet_msg->packet->stream);
98b15851
PP
1419 BT_ASSERT_DBG(stream_state);
1420 BT_ASSERT_DBG(stream_state->state == AUTO_SEEK_STREAM_STATE_PACKET_BEGAN);
188edac1 1421 stream_state->state = AUTO_SEEK_STREAM_STATE_STREAM_BEGAN;
98b15851 1422 BT_ASSERT_DBG(stream_state->packet);
5b7b55be 1423 stream_state->packet = NULL;
5b7b55be 1424
188edac1
SM
1425 if (packet_msg->packet->stream->class->packets_have_end_default_clock_snapshot) {
1426 stream_state->seen_clock_snapshot = true;
1427 }
5b7b55be 1428
5b7b55be
SM
1429 break;
1430 }
1431 case BT_MESSAGE_TYPE_STREAM_END:
1432 {
1433 const struct bt_message_stream *stream_msg = (const void *) msg;
1434 struct auto_seek_stream_state *stream_state;
1435
1436 stream_state = g_hash_table_lookup(stream_states, stream_msg->stream);
98b15851
PP
1437 BT_ASSERT_DBG(stream_state);
1438 BT_ASSERT_DBG(stream_state->state == AUTO_SEEK_STREAM_STATE_STREAM_BEGAN);
1439 BT_ASSERT_DBG(!stream_state->packet);
5b7b55be
SM
1440
1441 /* Update stream's state: this stream doesn't exist anymore. */
1442 g_hash_table_remove(stream_states, stream_msg->stream);
1443 break;
1444 }
188edac1
SM
1445 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
1446 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
1447 {
1448 const struct bt_message_discarded_items *discarded_msg =
1449 (const void *) msg;
1450 struct auto_seek_stream_state *stream_state;
1451
1452 stream_state = g_hash_table_lookup(stream_states, discarded_msg->stream);
98b15851 1453 BT_ASSERT_DBG(stream_state);
188edac1
SM
1454
1455 if ((msg->type == BT_MESSAGE_TYPE_DISCARDED_EVENTS && discarded_msg->stream->class->discarded_events_have_default_clock_snapshots) ||
1456 (msg->type == BT_MESSAGE_TYPE_DISCARDED_PACKETS && discarded_msg->stream->class->discarded_packets_have_default_clock_snapshots)) {
1457 stream_state->seen_clock_snapshot = true;
1458 }
1459
1460 break;
1461 }
5b7b55be
SM
1462 default:
1463 break;
1464 }
1465
6871026b 1466 bt_object_put_ref_no_null_check(msg);
04c0cec6 1467 msg = NULL;
5b9e151d
PP
1468 goto end;
1469
1470push_msg:
da9c4c52 1471 g_queue_push_tail(iterator->auto_seek.msgs, (void *) msg);
5b9e151d 1472 msg = NULL;
7474e7d3
PP
1473
1474end:
98b15851 1475 BT_ASSERT_DBG(!msg || status != BT_FUNC_STATUS_OK);
5b9e151d 1476 return status;
7474e7d3
PP
1477}
1478
1479static
d24d5663 1480int find_message_ge_ns_from_origin(
9a2c8b8e 1481 struct bt_message_iterator *iterator,
5b7b55be 1482 int64_t ns_from_origin, GHashTable *stream_states)
7474e7d3 1483{
c70a96a6 1484 int status = BT_FUNC_STATUS_OK;
9a2c8b8e 1485 enum bt_message_iterator_state init_state =
7474e7d3
PP
1486 iterator->state;
1487 const struct bt_message *messages[MSG_BATCH_SIZE];
1488 uint64_t user_count = 0;
1489 uint64_t i;
5b9e151d 1490 bool got_first = false;
7474e7d3 1491
98b15851 1492 BT_ASSERT_DBG(iterator);
7474e7d3
PP
1493 memset(&messages[0], 0, sizeof(messages[0]) * MSG_BATCH_SIZE);
1494
1495 /*
1496 * Make this iterator temporarily active (not seeking) to call
1497 * the "next" method.
1498 */
9a2c8b8e
PP
1499 set_msg_iterator_state(iterator,
1500 BT_MESSAGE_ITERATOR_STATE_ACTIVE);
7474e7d3 1501
98b15851 1502 BT_ASSERT_DBG(iterator->methods.next);
7474e7d3 1503
e0dade92 1504 while (!got_first) {
7474e7d3
PP
1505 /*
1506 * Call the user's "next" method to get the next
1507 * messages and status.
1508 */
54b135a0 1509 status = call_iterator_next_method(iterator,
7474e7d3 1510 &messages[0], MSG_BATCH_SIZE, &user_count);
870631a2
PP
1511 BT_LOGD("User method returned: status=%s",
1512 bt_common_func_status_string(status));
1513 if (status < 0) {
1514 BT_LIB_LOGW_APPEND_CAUSE(
1515 "Component input port message iterator's \"next\" method failed: "
1516 "%![iter-]+i, status=%s",
1517 iterator, bt_common_func_status_string(status));
1518 }
7474e7d3 1519
7474e7d3
PP
1520 /*
1521 * The user's "next" method must not do any action which
1522 * would change the iterator's state.
1523 */
98b15851 1524 BT_ASSERT_DBG(iterator->state ==
9a2c8b8e 1525 BT_MESSAGE_ITERATOR_STATE_ACTIVE);
7474e7d3
PP
1526
1527 switch (status) {
d24d5663 1528 case BT_FUNC_STATUS_OK:
bdb288b3 1529 BT_ASSERT_POST_DEV(user_count <= MSG_BATCH_SIZE,
7474e7d3
PP
1530 "Invalid returned message count: greater than "
1531 "batch size: count=%" PRIu64 ", batch-size=%u",
1532 user_count, MSG_BATCH_SIZE);
1533 break;
d24d5663
PP
1534 case BT_FUNC_STATUS_AGAIN:
1535 case BT_FUNC_STATUS_ERROR:
1536 case BT_FUNC_STATUS_MEMORY_ERROR:
1537 case BT_FUNC_STATUS_END:
7474e7d3
PP
1538 goto end;
1539 default:
498e7994 1540 bt_common_abort();
7474e7d3
PP
1541 }
1542
7474e7d3 1543 for (i = 0; i < user_count; i++) {
5b9e151d 1544 if (got_first) {
da9c4c52 1545 g_queue_push_tail(iterator->auto_seek.msgs,
5b9e151d
PP
1546 (void *) messages[i]);
1547 messages[i] = NULL;
7474e7d3
PP
1548 continue;
1549 }
1550
5b9e151d 1551 status = auto_seek_handle_message(iterator,
5b7b55be
SM
1552 ns_from_origin, messages[i], &got_first,
1553 stream_states);
d24d5663 1554 if (status == BT_FUNC_STATUS_OK) {
e0dade92 1555 /* Message was either pushed or moved */
5b9e151d
PP
1556 messages[i] = NULL;
1557 } else {
7474e7d3
PP
1558 goto end;
1559 }
7474e7d3
PP
1560 }
1561 }
1562
1563end:
1564 for (i = 0; i < user_count; i++) {
1565 if (messages[i]) {
6871026b 1566 bt_object_put_ref_no_null_check(messages[i]);
7474e7d3
PP
1567 }
1568 }
1569
9a2c8b8e 1570 set_msg_iterator_state(iterator, init_state);
7474e7d3
PP
1571 return status;
1572}
1573
5b7b55be
SM
1574/*
1575 * This function is installed as the iterator's next callback after we have
1576 * auto-seeked (seeked to the beginning and fast-forwarded) to send the
1577 * messages saved in iterator->auto_seek.msgs. Once this is done, the original
1578 * next callback is put back.
1579 */
1580
7474e7d3 1581static
a3f0c7db 1582enum bt_message_iterator_class_next_method_status post_auto_seek_next(
9a2c8b8e 1583 struct bt_message_iterator *iterator,
7474e7d3
PP
1584 bt_message_array_const msgs, uint64_t capacity,
1585 uint64_t *count)
1586{
da9c4c52 1587 BT_ASSERT(!g_queue_is_empty(iterator->auto_seek.msgs));
5b9e151d 1588 *count = 0;
7474e7d3
PP
1589
1590 /*
1591 * Move auto-seek messages to the output array (which is this
5b9e151d 1592 * iterator's base message array).
7474e7d3 1593 */
da9c4c52
SM
1594 while (capacity > 0 && !g_queue_is_empty(iterator->auto_seek.msgs)) {
1595 msgs[*count] = g_queue_pop_head(iterator->auto_seek.msgs);
5b9e151d
PP
1596 capacity--;
1597 (*count)++;
7474e7d3 1598 }
7474e7d3 1599
5b9e151d
PP
1600 BT_ASSERT(*count > 0);
1601
da9c4c52 1602 if (g_queue_is_empty(iterator->auto_seek.msgs)) {
572075a8
SM
1603 /* No more auto-seek messages, restore user's next callback. */
1604 BT_ASSERT(iterator->auto_seek.original_next_callback);
1605 iterator->methods.next = iterator->auto_seek.original_next_callback;
1606 iterator->auto_seek.original_next_callback = NULL;
7474e7d3
PP
1607 }
1608
d24d5663 1609 return BT_FUNC_STATUS_OK;
7474e7d3
PP
1610}
1611
5b7b55be
SM
1612static inline
1613int clock_raw_value_from_ns_from_origin(const bt_clock_class *clock_class,
1614 int64_t ns_from_origin, uint64_t *raw_value)
1615{
1616
1617 int64_t cc_offset_s = clock_class->offset_seconds;
1618 uint64_t cc_offset_cycles = clock_class->offset_cycles;
1619 uint64_t cc_freq = clock_class->frequency;
1620
1621 return bt_common_clock_value_from_ns_from_origin(cc_offset_s,
1622 cc_offset_cycles, cc_freq, ns_from_origin, raw_value);
1623}
1624
f2fb1b32
SM
1625static
1626bool message_iterator_can_seek_ns_from_origin(
9a2c8b8e 1627 struct bt_message_iterator *iterator,
f2fb1b32
SM
1628 int64_t ns_from_origin)
1629{
1630 enum bt_message_iterator_can_seek_ns_from_origin_status status;
1631 bt_bool can_seek;
1632
9a2c8b8e 1633 status = bt_message_iterator_can_seek_ns_from_origin(
f2fb1b32
SM
1634 iterator, ns_from_origin, &can_seek);
1635 if (status != BT_FUNC_STATUS_OK) {
1636 can_seek = BT_FALSE;
1637 }
1638
1639 return can_seek;
1640}
5b7b55be 1641
d24d5663 1642enum bt_message_iterator_seek_ns_from_origin_status
9a2c8b8e
PP
1643bt_message_iterator_seek_ns_from_origin(
1644 struct bt_message_iterator *iterator,
7474e7d3
PP
1645 int64_t ns_from_origin)
1646{
1647 int status;
5b7b55be 1648 GHashTable *stream_states = NULL;
c0e46a7c 1649 bt_bool can_seek_by_itself;
7474e7d3 1650
17f3083a 1651 BT_ASSERT_PRE_NO_ERROR();
7474e7d3
PP
1652 BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator");
1653 BT_ASSERT_PRE_ITER_HAS_STATE_TO_SEEK(iterator);
5badd463
PP
1654 BT_ASSERT_PRE(
1655 bt_component_borrow_graph(iterator->upstream_component)->config_state !=
1656 BT_GRAPH_CONFIGURATION_STATE_CONFIGURING,
7474e7d3
PP
1657 "Graph is not configured: %!+g",
1658 bt_component_borrow_graph(iterator->upstream_component));
c0e46a7c 1659 /* The iterator must be able to seek ns from origin one way or another. */
7474e7d3 1660 BT_ASSERT_PRE(
f2fb1b32 1661 message_iterator_can_seek_ns_from_origin(iterator, ns_from_origin),
7474e7d3
PP
1662 "Message iterator cannot seek nanoseconds from origin: %!+i, "
1663 "ns-from-origin=%" PRId64, iterator, ns_from_origin);
9a2c8b8e
PP
1664 set_msg_iterator_state(iterator,
1665 BT_MESSAGE_ITERATOR_STATE_SEEKING);
7474e7d3 1666
54b135a0
SM
1667 /*
1668 * We are seeking, reset our expectations about how the following
1669 * messages should look like.
1670 */
1671 reset_iterator_expectations(iterator);
1672
c0e46a7c
SM
1673 /* Check if the iterator can seek by itself. If not we'll use autoseek. */
1674 if (iterator->methods.can_seek_ns_from_origin) {
a3f0c7db 1675 bt_message_iterator_class_can_seek_ns_from_origin_method_status
c0e46a7c
SM
1676 can_seek_status;
1677
1678 can_seek_status =
1679 iterator->methods.can_seek_ns_from_origin(
1680 iterator, ns_from_origin, &can_seek_by_itself);
1681 if (can_seek_status != BT_FUNC_STATUS_OK) {
1682 status = can_seek_status;
1683 goto end;
1684 }
1685 } else {
1686 can_seek_by_itself = false;
1687 }
1688
1689 if (can_seek_by_itself) {
5b7b55be 1690 /* The iterator knows how to seek to a particular time, let it handle this. */
2e1b5615 1691 BT_ASSERT(iterator->methods.seek_ns_from_origin);
7474e7d3
PP
1692 BT_LIB_LOGD("Calling user's \"seek nanoseconds from origin\" method: "
1693 "%![iter-]+i, ns=%" PRId64, iterator, ns_from_origin);
1694 status = iterator->methods.seek_ns_from_origin(iterator,
1695 ns_from_origin);
1696 BT_LOGD("User method returned: status=%s",
d24d5663
PP
1697 bt_common_func_status_string(status));
1698 BT_ASSERT_POST(status == BT_FUNC_STATUS_OK ||
1699 status == BT_FUNC_STATUS_ERROR ||
1700 status == BT_FUNC_STATUS_MEMORY_ERROR ||
1701 status == BT_FUNC_STATUS_AGAIN,
7474e7d3 1702 "Unexpected status: %![iter-]+i, status=%s",
d24d5663 1703 iterator, bt_common_func_status_string(status));
6ecdcca3 1704 BT_ASSERT_POST_NO_ERROR_IF_NO_ERROR_STATUS(status);
870631a2
PP
1705 if (status < 0) {
1706 BT_LIB_LOGW_APPEND_CAUSE(
1707 "Component input port message iterator's \"seek nanoseconds from origin\" method failed: "
1708 "%![iter-]+i, status=%s",
1709 iterator, bt_common_func_status_string(status));
1710 }
7474e7d3 1711 } else {
5b7b55be 1712 /*
c0e46a7c
SM
1713 * The iterator doesn't know how to seek by itself to a
1714 * particular time. We will seek to the beginning and fast
1715 * forward to the right place.
5b7b55be 1716 */
a3f0c7db 1717 enum bt_message_iterator_class_can_seek_beginning_method_status can_seek_status;
f2fb1b32
SM
1718 bt_bool can_seek_beginning;
1719
1720 can_seek_status = iterator->methods.can_seek_beginning(iterator,
1721 &can_seek_beginning);
1722 BT_ASSERT(can_seek_status == BT_FUNC_STATUS_OK);
1723 BT_ASSERT(can_seek_beginning);
7474e7d3
PP
1724 BT_ASSERT(iterator->methods.seek_beginning);
1725 BT_LIB_LOGD("Calling user's \"seek beginning\" method: %!+i",
1726 iterator);
1727 status = iterator->methods.seek_beginning(iterator);
1728 BT_LOGD("User method returned: status=%s",
d24d5663
PP
1729 bt_common_func_status_string(status));
1730 BT_ASSERT_POST(status == BT_FUNC_STATUS_OK ||
1731 status == BT_FUNC_STATUS_ERROR ||
1732 status == BT_FUNC_STATUS_MEMORY_ERROR ||
1733 status == BT_FUNC_STATUS_AGAIN,
7474e7d3 1734 "Unexpected status: %![iter-]+i, status=%s",
d24d5663 1735 iterator, bt_common_func_status_string(status));
870631a2
PP
1736 if (status < 0) {
1737 BT_LIB_LOGW_APPEND_CAUSE(
1738 "Component input port message iterator's \"seek beginning\" method failed: "
1739 "%![iter-]+i, status=%s",
1740 iterator, bt_common_func_status_string(status));
1741 }
1742
7474e7d3 1743 switch (status) {
d24d5663 1744 case BT_FUNC_STATUS_OK:
7474e7d3 1745 break;
d24d5663
PP
1746 case BT_FUNC_STATUS_ERROR:
1747 case BT_FUNC_STATUS_MEMORY_ERROR:
1748 case BT_FUNC_STATUS_AGAIN:
7474e7d3
PP
1749 goto end;
1750 default:
498e7994 1751 bt_common_abort();
7474e7d3
PP
1752 }
1753
1754 /*
1755 * Find the first message which has a default clock
1756 * snapshot greater than or equal to the requested
5b9e151d
PP
1757 * seeking time, and move the received messages from
1758 * this point in the batch to this iterator's auto-seek
1759 * message queue.
7474e7d3 1760 */
da9c4c52 1761 while (!g_queue_is_empty(iterator->auto_seek.msgs)) {
6871026b 1762 bt_object_put_ref_no_null_check(
da9c4c52 1763 g_queue_pop_tail(iterator->auto_seek.msgs));
5b9e151d
PP
1764 }
1765
5b7b55be
SM
1766 stream_states = create_auto_seek_stream_states();
1767 if (!stream_states) {
870631a2
PP
1768 BT_LIB_LOGE_APPEND_CAUSE(
1769 "Failed to allocate one GHashTable.");
d24d5663 1770 status = BT_FUNC_STATUS_MEMORY_ERROR;
5b7b55be
SM
1771 goto end;
1772 }
1773
7474e7d3 1774 status = find_message_ge_ns_from_origin(iterator,
5b7b55be 1775 ns_from_origin, stream_states);
7474e7d3 1776 switch (status) {
d24d5663
PP
1777 case BT_FUNC_STATUS_OK:
1778 case BT_FUNC_STATUS_END:
5b7b55be
SM
1779 {
1780 GHashTableIter iter;
1781 gpointer key, value;
1782
1783 /*
1784 * If some streams exist at the seek time, prepend the
1785 * required messages to put those streams in the right
1786 * state.
1787 */
1788 g_hash_table_iter_init(&iter, stream_states);
1789 while (g_hash_table_iter_next (&iter, &key, &value)) {
1790 const bt_stream *stream = key;
1791 struct auto_seek_stream_state *stream_state =
1792 (struct auto_seek_stream_state *) value;
1793 bt_message *msg;
1794 const bt_clock_class *clock_class = bt_stream_class_borrow_default_clock_class_const(
1795 bt_stream_borrow_class_const(stream));
188edac1
SM
1796 /* Initialize to silence maybe-uninitialized warning. */
1797 uint64_t raw_value = 0;
1798
1799 /*
1800 * If we haven't seen a message with a clock snapshot, we don't know if our seek time is within
1801 * the clock's range, so it wouldn't be safe to try to convert ns_from_origin to a clock value.
1802 *
1803 * Also, it would be a bit of a lie to generate a stream begin message with the seek time as its
1804 * clock snapshot, because we don't really know if the stream existed at that time. If we have
1805 * seen a message with a clock snapshot in our seeking, then we are sure that the
1806 * seek time is not below the clock range, and we know the stream was active at that
1807 * time (and that we cut it short).
1808 */
1809 if (stream_state->seen_clock_snapshot) {
1810 if (clock_raw_value_from_ns_from_origin(clock_class, ns_from_origin, &raw_value) != 0) {
1811 BT_LIB_LOGW("Could not convert nanoseconds from origin to clock value: ns-from-origin=%" PRId64 ", %![cc-]+K",
1812 ns_from_origin, clock_class);
1813 status = BT_FUNC_STATUS_ERROR;
1814 goto end;
1815 }
5b7b55be
SM
1816 }
1817
1818 switch (stream_state->state) {
1819 case AUTO_SEEK_STREAM_STATE_PACKET_BEGAN:
1820 BT_ASSERT(stream_state->packet);
1821 BT_LIB_LOGD("Creating packet message: %![packet-]+a", stream_state->packet);
188edac1
SM
1822
1823 if (stream->class->packets_have_beginning_default_clock_snapshot) {
1824 /*
1825 * If we are in the PACKET_BEGAN state, it means we have seen a "packet beginning"
1826 * message. If "packet beginning" packets have clock snapshots, then we must have
1827 * seen a clock snapshot.
1828 */
1829 BT_ASSERT(stream_state->seen_clock_snapshot);
1830
1831 msg = bt_message_packet_beginning_create_with_default_clock_snapshot(
1832 (bt_self_message_iterator *) iterator, stream_state->packet, raw_value);
1833 } else {
1834 msg = bt_message_packet_beginning_create((bt_self_message_iterator *) iterator,
1835 stream_state->packet);
5b7b55be
SM
1836 }
1837
5b7b55be 1838 if (!msg) {
d24d5663 1839 status = BT_FUNC_STATUS_MEMORY_ERROR;
5b7b55be
SM
1840 goto end;
1841 }
1842
5b7b55be
SM
1843 g_queue_push_head(iterator->auto_seek.msgs, msg);
1844 msg = NULL;
1845 /* fall-thru */
188edac1 1846
5b7b55be
SM
1847 case AUTO_SEEK_STREAM_STATE_STREAM_BEGAN:
1848 msg = bt_message_stream_beginning_create(
1849 (bt_self_message_iterator *) iterator, stream);
1850 if (!msg) {
d24d5663 1851 status = BT_FUNC_STATUS_MEMORY_ERROR;
5b7b55be
SM
1852 goto end;
1853 }
1854
188edac1
SM
1855 if (stream_state->seen_clock_snapshot) {
1856 bt_message_stream_beginning_set_default_clock_snapshot(msg, raw_value);
1857 }
1858
5b7b55be
SM
1859 g_queue_push_head(iterator->auto_seek.msgs, msg);
1860 msg = NULL;
1861 break;
1862 }
1863 }
1864
7474e7d3 1865 /*
5b9e151d
PP
1866 * If there are messages in the auto-seek
1867 * message queue, replace the user's "next"
1868 * method with a custom, temporary "next" method
1869 * which returns them.
7474e7d3 1870 */
da9c4c52 1871 if (!g_queue_is_empty(iterator->auto_seek.msgs)) {
572075a8
SM
1872 BT_ASSERT(!iterator->auto_seek.original_next_callback);
1873 iterator->auto_seek.original_next_callback = iterator->methods.next;
1874
5b9e151d 1875 iterator->methods.next =
9a2c8b8e 1876 (bt_message_iterator_next_method)
5b9e151d
PP
1877 post_auto_seek_next;
1878 }
1879
1880 /*
d24d5663
PP
1881 * `BT_FUNC_STATUS_END` becomes
1882 * `BT_FUNC_STATUS_OK`: the next
5b9e151d
PP
1883 * time this iterator's "next" method is called,
1884 * it will return
d24d5663 1885 * `BT_FUNC_STATUS_END`.
5b9e151d 1886 */
d24d5663 1887 status = BT_FUNC_STATUS_OK;
7474e7d3 1888 break;
5b7b55be 1889 }
d24d5663
PP
1890 case BT_FUNC_STATUS_ERROR:
1891 case BT_FUNC_STATUS_MEMORY_ERROR:
1892 case BT_FUNC_STATUS_AGAIN:
7474e7d3 1893 goto end;
7474e7d3 1894 default:
498e7994 1895 bt_common_abort();
7474e7d3
PP
1896 }
1897 }
1898
54b135a0
SM
1899 /*
1900 * The following messages returned by the next method (including
1901 * post_auto_seek_next) must be after (or at) `ns_from_origin`.
1902 */
1903 iterator->last_ns_from_origin = ns_from_origin;
1904
7474e7d3 1905end:
5b7b55be
SM
1906 if (stream_states) {
1907 destroy_auto_seek_stream_states(stream_states);
1908 stream_states = NULL;
1909 }
870631a2 1910
7474e7d3 1911 set_iterator_state_after_seeking(iterator, status);
7474e7d3
PP
1912 return status;
1913}
1914
9b4f9b42
PP
1915bt_bool bt_self_message_iterator_is_interrupted(
1916 const struct bt_self_message_iterator *self_msg_iter)
1917{
9a2c8b8e 1918 const struct bt_message_iterator *iterator =
9b4f9b42
PP
1919 (const void *) self_msg_iter;
1920
1921 BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator");
1922 return (bt_bool) bt_graph_is_interrupted(iterator->graph);
1923}
1924
9a2c8b8e
PP
1925void bt_message_iterator_get_ref(
1926 const struct bt_message_iterator *iterator)
c5b9b441
PP
1927{
1928 bt_object_get_ref(iterator);
1929}
1930
9a2c8b8e
PP
1931void bt_message_iterator_put_ref(
1932 const struct bt_message_iterator *iterator)
c5b9b441
PP
1933{
1934 bt_object_put_ref(iterator);
1935}
This page took 0.197152 seconds and 4 git commands to generate.