lib: remove CV snapshot property from stream beginning/end message
[babeltrace.git] / lib / graph / iterator.c
CommitLineData
47e5a032 1/*
e2f7325d 2 * Copyright 2017-2018 Philippe Proulx <pproulx@efficios.com>
47e5a032 3 * Copyright 2015 Jérémie Galarneau <jeremie.galarneau@efficios.com>
47e5a032
JG
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a copy
6 * of this software and associated documentation files (the "Software"), to deal
7 * in the Software without restriction, including without limitation the rights
8 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 * copies of the Software, and to permit persons to whom the Software is
10 * furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21 * SOFTWARE.
22 */
23
d6e69534 24#define BT_LOG_TAG "MSG-ITER"
5af447e5
PP
25#include <babeltrace/lib-logging-internal.h>
26
3d9990ac 27#include <babeltrace/compiler-internal.h>
c6bd8523 28#include <babeltrace/trace-ir/field.h>
40f4ba76 29#include <babeltrace/trace-ir/event-const.h>
56e18c4c 30#include <babeltrace/trace-ir/event-internal.h>
40f4ba76 31#include <babeltrace/trace-ir/packet-const.h>
56e18c4c
PP
32#include <babeltrace/trace-ir/packet-internal.h>
33#include <babeltrace/trace-ir/stream-internal.h>
0d72b8c3 34#include <babeltrace/graph/connection-const.h>
bd14d768 35#include <babeltrace/graph/connection-internal.h>
0d72b8c3 36#include <babeltrace/graph/component-const.h>
e5be10ef 37#include <babeltrace/graph/component-internal.h>
b2e0c907
PP
38#include <babeltrace/graph/component-source-internal.h>
39#include <babeltrace/graph/component-class-internal.h>
8ed535b5 40#include <babeltrace/graph/component-class-sink-colander-internal.h>
0d72b8c3 41#include <babeltrace/graph/component-sink-const.h>
7474e7d3 42#include <babeltrace/graph/component-sink-internal.h>
d6e69534 43#include <babeltrace/graph/message-const.h>
7474e7d3 44#include <babeltrace/graph/message-iterator-const.h>
d6e69534
PP
45#include <babeltrace/graph/message-iterator-internal.h>
46#include <babeltrace/graph/self-component-port-input-message-iterator.h>
47#include <babeltrace/graph/port-output-message-iterator.h>
48#include <babeltrace/graph/message-internal.h>
49#include <babeltrace/graph/message-event-const.h>
50#include <babeltrace/graph/message-event-internal.h>
51#include <babeltrace/graph/message-packet-const.h>
52#include <babeltrace/graph/message-packet-internal.h>
53#include <babeltrace/graph/message-stream-const.h>
54#include <babeltrace/graph/message-stream-internal.h>
7474e7d3 55#include <babeltrace/graph/message-inactivity-internal.h>
0d72b8c3
PP
56#include <babeltrace/graph/port-const.h>
57#include <babeltrace/graph/graph.h>
58#include <babeltrace/graph/graph-const.h>
8ed535b5 59#include <babeltrace/graph/graph-internal.h>
c55a9f58 60#include <babeltrace/types.h>
f6ccaed9 61#include <babeltrace/assert-internal.h>
f42867e2 62#include <babeltrace/assert-pre-internal.h>
fa054faf 63#include <stdint.h>
2ec84d26 64#include <inttypes.h>
0fbb9a9f 65#include <stdlib.h>
3230ee6b 66
d4393e08
PP
67/*
68 * TODO: Use graph's state (number of active iterators, etc.) and
69 * possibly system specifications to make a better guess than this.
70 */
d6e69534 71#define MSG_BATCH_SIZE 15
d4393e08 72
7474e7d3
PP
73#define BT_ASSERT_PRE_ITER_HAS_STATE_TO_SEEK(_iter) \
74 BT_ASSERT_PRE((_iter)->state == BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ACTIVE || \
75 (_iter)->state == BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ENDED || \
76 (_iter)->state == BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_LAST_SEEKING_RETURNED_AGAIN || \
77 (_iter)->state == BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_LAST_SEEKING_RETURNED_ERROR, \
78 "Message iterator is in the wrong state: %!+i", _iter)
47e5a032 79
d0fea130
PP
80static inline
81void _set_self_comp_port_input_msg_iterator_state(
82 struct bt_self_component_port_input_message_iterator *iterator,
83 enum bt_self_component_port_input_message_iterator_state state)
84{
85 BT_ASSERT(iterator);
7474e7d3 86 BT_LIB_LOGD("Updating message iterator's state: new-state=%s",
d0fea130
PP
87 bt_self_component_port_input_message_iterator_state_string(state));
88 iterator->state = state;
89}
90
91#ifdef BT_DEV_MODE
92# define set_self_comp_port_input_msg_iterator_state _set_self_comp_port_input_msg_iterator_state
93#else
94# define set_self_comp_port_input_msg_iterator_state(_a, _b)
95#endif
96
8ed535b5 97static
d6e69534 98void destroy_base_message_iterator(struct bt_object *obj)
8ed535b5 99{
d6e69534 100 struct bt_message_iterator *iterator = (void *) obj;
d4393e08
PP
101
102 BT_ASSERT(iterator);
103
d6e69534
PP
104 if (iterator->msgs) {
105 g_ptr_array_free(iterator->msgs, TRUE);
106 iterator->msgs = NULL;
d4393e08
PP
107 }
108
109 g_free(iterator);
8ed535b5
PP
110}
111
47e5a032 112static
d6e69534 113void bt_self_component_port_input_message_iterator_destroy(struct bt_object *obj)
47e5a032 114{
d6e69534 115 struct bt_self_component_port_input_message_iterator *iterator;
8738a040 116
f6ccaed9 117 BT_ASSERT(obj);
d3eb6e8f 118
bd14d768 119 /*
d6e69534 120 * The message iterator's reference count is 0 if we're
bd14d768
PP
121 * here. Increment it to avoid a double-destroy (possibly
122 * infinitely recursive). This could happen for example if the
d6e69534 123 * message iterator's finalization function does
d94d92ac
PP
124 * bt_object_get_ref() (or anything that causes
125 * bt_object_get_ref() to be called) on itself (ref. count goes
126 * from 0 to 1), and then bt_object_put_ref(): the reference
127 * count would go from 1 to 0 again and this function would be
128 * called again.
bd14d768 129 */
3fea54f6 130 obj->ref_count++;
07245ac2 131 iterator = (void *) obj;
d6e69534 132 BT_LIB_LOGD("Destroying self component input port message iterator object: "
d94d92ac 133 "%!+i", iterator);
d0fea130 134 bt_self_component_port_input_message_iterator_try_finalize(iterator);
d3eb6e8f 135
bd14d768
PP
136 if (iterator->connection) {
137 /*
138 * Remove ourself from the originating connection so
139 * that it does not try to finalize a dangling pointer
140 * later.
141 */
142 bt_connection_remove_iterator(iterator->connection, iterator);
d94d92ac 143 iterator->connection = NULL;
bd14d768
PP
144 }
145
7474e7d3
PP
146 if (iterator->auto_seek_msgs) {
147 uint64_t i;
148
149 /* Put any remaining message in the auto-seek array */
150 for (i = 0; i < iterator->auto_seek_msgs->len; i++) {
151 if (iterator->auto_seek_msgs->pdata[i]) {
152 bt_object_put_no_null_check(
153 iterator->auto_seek_msgs->pdata[i]);
154 }
155 }
156
157 g_ptr_array_free(iterator->auto_seek_msgs, TRUE);
158 iterator->auto_seek_msgs = NULL;
159 }
160
d6e69534 161 destroy_base_message_iterator(obj);
47e5a032
JG
162}
163
bd14d768 164BT_HIDDEN
d0fea130 165void bt_self_component_port_input_message_iterator_try_finalize(
d6e69534 166 struct bt_self_component_port_input_message_iterator *iterator)
bd14d768 167{
d94d92ac
PP
168 typedef void (*method_t)(void *);
169
bd14d768 170 struct bt_component_class *comp_class = NULL;
d94d92ac 171 method_t method = NULL;
bd14d768 172
f6ccaed9 173 BT_ASSERT(iterator);
bd14d768
PP
174
175 switch (iterator->state) {
d6e69534 176 case BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_NON_INITIALIZED:
088d4023 177 /* Skip user finalization if user initialization failed */
d6e69534 178 BT_LIB_LOGD("Not finalizing non-initialized message iterator: "
d94d92ac 179 "%!+i", iterator);
d0fea130 180 goto end;
d6e69534 181 case BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZED:
bd14d768 182 /* Already finalized */
d6e69534 183 BT_LIB_LOGD("Not finalizing message iterator: already finalized: "
d94d92ac 184 "%!+i", iterator);
d0fea130
PP
185 goto end;
186 case BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZING:
187 /* Already finalized */
188 BT_LIB_LOGF("Message iterator is already being finalized: "
189 "%!+i", iterator);
190 abort();
bd14d768
PP
191 default:
192 break;
193 }
194
d6e69534 195 BT_LIB_LOGD("Finalizing message iterator: %!+i", iterator);
d0fea130
PP
196 set_self_comp_port_input_msg_iterator_state(iterator,
197 BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZING);
f6ccaed9 198 BT_ASSERT(iterator->upstream_component);
bd14d768
PP
199 comp_class = iterator->upstream_component->class;
200
201 /* Call user-defined destroy method */
202 switch (comp_class->type) {
203 case BT_COMPONENT_CLASS_TYPE_SOURCE:
204 {
d94d92ac
PP
205 struct bt_component_class_source *src_comp_cls =
206 (void *) comp_class;
bd14d768 207
d6e69534 208 method = (method_t) src_comp_cls->methods.msg_iter_finalize;
bd14d768
PP
209 break;
210 }
211 case BT_COMPONENT_CLASS_TYPE_FILTER:
212 {
d94d92ac
PP
213 struct bt_component_class_filter *flt_comp_cls =
214 (void *) comp_class;
bd14d768 215
d6e69534 216 method = (method_t) flt_comp_cls->methods.msg_iter_finalize;
bd14d768
PP
217 break;
218 }
219 default:
220 /* Unreachable */
0fbb9a9f 221 abort();
bd14d768
PP
222 }
223
d94d92ac
PP
224 if (method) {
225 BT_LIB_LOGD("Calling user's finalization method: %!+i",
5af447e5 226 iterator);
d94d92ac 227 method(iterator);
bd14d768
PP
228 }
229
bd14d768
PP
230 iterator->upstream_component = NULL;
231 iterator->upstream_port = NULL;
d0fea130
PP
232 set_self_comp_port_input_msg_iterator_state(iterator,
233 BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZED);
d6e69534 234 BT_LIB_LOGD("Finalized message iterator: %!+i", iterator);
d0fea130
PP
235
236end:
237 return;
bd14d768
PP
238}
239
240BT_HIDDEN
d6e69534
PP
241void bt_self_component_port_input_message_iterator_set_connection(
242 struct bt_self_component_port_input_message_iterator *iterator,
bd14d768
PP
243 struct bt_connection *connection)
244{
f6ccaed9 245 BT_ASSERT(iterator);
bd14d768 246 iterator->connection = connection;
d6e69534 247 BT_LIB_LOGV("Set message iterator's connection: "
d94d92ac 248 "%![iter-]+i, %![conn-]+x", iterator, connection);
bd14d768
PP
249}
250
90157d89 251static
d6e69534
PP
252int init_message_iterator(struct bt_message_iterator *iterator,
253 enum bt_message_iterator_type type,
90157d89
PP
254 bt_object_release_func destroy)
255{
d4393e08
PP
256 int ret = 0;
257
3fea54f6 258 bt_object_init_shared(&iterator->base, destroy);
90157d89 259 iterator->type = type;
d6e69534
PP
260 iterator->msgs = g_ptr_array_new();
261 if (!iterator->msgs) {
d4393e08
PP
262 BT_LOGE_STR("Failed to allocate a GPtrArray.");
263 ret = -1;
264 goto end;
265 }
266
d6e69534 267 g_ptr_array_set_size(iterator->msgs, MSG_BATCH_SIZE);
d4393e08
PP
268
269end:
270 return ret;
90157d89
PP
271}
272
7474e7d3
PP
273static
274bt_bool can_seek_ns_from_origin_true(
275 struct bt_self_component_port_input_message_iterator *iterator,
276 int64_t ns_from_origin)
277{
278 return BT_TRUE;
279}
280
281static
282bt_bool can_seek_beginning_true(
283 struct bt_self_component_port_input_message_iterator *iterator)
284{
285 return BT_TRUE;
286}
287
d94d92ac 288static
d6e69534
PP
289struct bt_self_component_port_input_message_iterator *
290bt_self_component_port_input_message_iterator_create_initial(
3230ee6b 291 struct bt_component *upstream_comp,
d94d92ac 292 struct bt_port *upstream_port)
47e5a032 293{
d4393e08 294 int ret;
d6e69534 295 struct bt_self_component_port_input_message_iterator *iterator = NULL;
47e5a032 296
f6ccaed9
PP
297 BT_ASSERT(upstream_comp);
298 BT_ASSERT(upstream_port);
f6ccaed9 299 BT_ASSERT(bt_port_is_connected(upstream_port));
d6e69534 300 BT_LIB_LOGD("Creating initial message iterator on self component input port: "
d94d92ac
PP
301 "%![up-comp-]+c, %![up-port-]+p", upstream_comp, upstream_port);
302 BT_ASSERT(bt_component_get_class_type(upstream_comp) ==
303 BT_COMPONENT_CLASS_TYPE_SOURCE ||
304 bt_component_get_class_type(upstream_comp) ==
305 BT_COMPONENT_CLASS_TYPE_FILTER);
306 iterator = g_new0(
d6e69534 307 struct bt_self_component_port_input_message_iterator, 1);
47e5a032 308 if (!iterator) {
d94d92ac 309 BT_LOGE_STR("Failed to allocate one self component input port "
d6e69534 310 "message iterator.");
73d5c1ad 311 goto end;
47e5a032
JG
312 }
313
d6e69534
PP
314 ret = init_message_iterator((void *) iterator,
315 BT_MESSAGE_ITERATOR_TYPE_SELF_COMPONENT_PORT_INPUT,
316 bt_self_component_port_input_message_iterator_destroy);
d4393e08 317 if (ret) {
d6e69534 318 /* init_message_iterator() logs errors */
d94d92ac 319 BT_OBJECT_PUT_REF_AND_RESET(iterator);
d4393e08
PP
320 goto end;
321 }
3230ee6b 322
7474e7d3
PP
323 iterator->auto_seek_msgs = g_ptr_array_new();
324 if (!iterator->auto_seek_msgs) {
325 BT_LOGE_STR("Failed to allocate a GPtrArray.");
326 ret = -1;
73d5c1ad 327 goto end;
3230ee6b
PP
328 }
329
7474e7d3 330 g_ptr_array_set_size(iterator->auto_seek_msgs, MSG_BATCH_SIZE);
bd14d768
PP
331 iterator->upstream_component = upstream_comp;
332 iterator->upstream_port = upstream_port;
d94d92ac 333 iterator->connection = iterator->upstream_port->connection;
5c563278 334 iterator->graph = bt_component_borrow_graph(upstream_comp);
d0fea130
PP
335 set_self_comp_port_input_msg_iterator_state(iterator,
336 BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_NON_INITIALIZED);
7474e7d3
PP
337
338 switch (iterator->upstream_component->class->type) {
339 case BT_COMPONENT_CLASS_TYPE_SOURCE:
340 {
341 struct bt_component_class_source *src_comp_cls =
342 (void *) iterator->upstream_component->class;
343
344 iterator->methods.next =
345 (bt_self_component_port_input_message_iterator_next_method)
346 src_comp_cls->methods.msg_iter_next;
347 iterator->methods.seek_ns_from_origin =
348 (bt_self_component_port_input_message_iterator_seek_ns_from_origin_method)
349 src_comp_cls->methods.msg_iter_seek_ns_from_origin;
350 iterator->methods.seek_beginning =
351 (bt_self_component_port_input_message_iterator_seek_beginning_method)
352 src_comp_cls->methods.msg_iter_seek_beginning;
353 iterator->methods.can_seek_ns_from_origin =
354 (bt_self_component_port_input_message_iterator_can_seek_ns_from_origin_method)
355 src_comp_cls->methods.msg_iter_can_seek_ns_from_origin;
356 iterator->methods.can_seek_beginning =
357 (bt_self_component_port_input_message_iterator_can_seek_beginning_method)
358 src_comp_cls->methods.msg_iter_can_seek_beginning;
359 break;
360 }
361 case BT_COMPONENT_CLASS_TYPE_FILTER:
362 {
363 struct bt_component_class_filter *flt_comp_cls =
364 (void *) iterator->upstream_component->class;
365
366 iterator->methods.next =
367 (bt_self_component_port_input_message_iterator_next_method)
368 flt_comp_cls->methods.msg_iter_next;
369 iterator->methods.seek_ns_from_origin =
370 (bt_self_component_port_input_message_iterator_seek_ns_from_origin_method)
371 flt_comp_cls->methods.msg_iter_seek_ns_from_origin;
372 iterator->methods.seek_beginning =
373 (bt_self_component_port_input_message_iterator_seek_beginning_method)
374 flt_comp_cls->methods.msg_iter_seek_beginning;
375 iterator->methods.can_seek_ns_from_origin =
376 (bt_self_component_port_input_message_iterator_can_seek_ns_from_origin_method)
377 flt_comp_cls->methods.msg_iter_can_seek_ns_from_origin;
378 iterator->methods.can_seek_beginning =
379 (bt_self_component_port_input_message_iterator_can_seek_beginning_method)
380 flt_comp_cls->methods.msg_iter_can_seek_beginning;
381 break;
382 }
383 default:
384 abort();
385 }
386
387 if (iterator->methods.seek_ns_from_origin &&
388 !iterator->methods.can_seek_ns_from_origin) {
389 iterator->methods.can_seek_ns_from_origin =
390 (bt_self_component_port_input_message_iterator_can_seek_ns_from_origin_method)
391 can_seek_ns_from_origin_true;
392 }
393
394 if (iterator->methods.seek_beginning &&
395 !iterator->methods.can_seek_beginning) {
396 iterator->methods.can_seek_beginning =
397 (bt_self_component_port_input_message_iterator_seek_beginning_method)
398 can_seek_beginning_true;
399 }
400
d6e69534 401 BT_LIB_LOGD("Created initial message iterator on self component input port: "
d94d92ac
PP
402 "%![up-port-]+p, %![up-comp-]+c, %![iter-]+i",
403 upstream_port, upstream_comp, iterator);
3230ee6b 404
47e5a032 405end:
d94d92ac 406 return iterator;
47e5a032
JG
407}
408
d6e69534
PP
409struct bt_self_component_port_input_message_iterator *
410bt_self_component_port_input_message_iterator_create(
d94d92ac 411 struct bt_self_component_port_input *self_port)
ea8d3e58 412{
d6e69534 413 typedef enum bt_self_message_iterator_status (*init_method_t)(
d94d92ac
PP
414 void *, void *, void *);
415
416 init_method_t init_method = NULL;
d6e69534 417 struct bt_self_component_port_input_message_iterator *iterator =
d94d92ac
PP
418 NULL;
419 struct bt_port *port = (void *) self_port;
420 struct bt_port *upstream_port;
421 struct bt_component *comp;
422 struct bt_component *upstream_comp;
423 struct bt_component_class *upstream_comp_cls;
424
425 BT_ASSERT_PRE_NON_NULL(port, "Port");
0d72b8c3 426 comp = bt_port_borrow_component_inline(port);
d94d92ac
PP
427 BT_ASSERT_PRE(bt_port_is_connected(port),
428 "Port is not connected: %![port-]+p", port);
429 BT_ASSERT_PRE(comp, "Port is not part of a component: %![port-]+p",
430 port);
431 BT_ASSERT_PRE(!bt_component_graph_is_canceled(comp),
432 "Port's component's graph is canceled: "
433 "%![port-]+p, %![comp-]+c", port, comp);
434 BT_ASSERT(port->connection);
435 upstream_port = port->connection->upstream_port;
436 BT_ASSERT(upstream_port);
0d72b8c3 437 upstream_comp = bt_port_borrow_component_inline(upstream_port);
d94d92ac
PP
438 BT_ASSERT(upstream_comp);
439 upstream_comp_cls = upstream_comp->class;
440 BT_ASSERT(upstream_comp->class->type ==
441 BT_COMPONENT_CLASS_TYPE_SOURCE ||
442 upstream_comp->class->type ==
443 BT_COMPONENT_CLASS_TYPE_FILTER);
d6e69534 444 iterator = bt_self_component_port_input_message_iterator_create_initial(
d94d92ac
PP
445 upstream_comp, upstream_port);
446 if (!iterator) {
447 BT_LOGW_STR("Cannot create self component input port "
d6e69534 448 "message iterator.");
d94d92ac
PP
449 goto end;
450 }
890882ef 451
d94d92ac
PP
452 switch (upstream_comp_cls->type) {
453 case BT_COMPONENT_CLASS_TYPE_SOURCE:
454 {
455 struct bt_component_class_source *src_comp_cls =
456 (void *) upstream_comp_cls;
457
458 init_method =
d6e69534 459 (init_method_t) src_comp_cls->methods.msg_iter_init;
d94d92ac
PP
460 break;
461 }
462 case BT_COMPONENT_CLASS_TYPE_FILTER:
463 {
464 struct bt_component_class_filter *flt_comp_cls =
465 (void *) upstream_comp_cls;
466
467 init_method =
d6e69534 468 (init_method_t) flt_comp_cls->methods.msg_iter_init;
d94d92ac
PP
469 break;
470 }
471 default:
472 /* Unreachable */
473 abort();
474 }
475
476 if (init_method) {
477 int iter_status;
478
479 BT_LIB_LOGD("Calling user's initialization method: %!+i", iterator);
480 iter_status = init_method(iterator, upstream_comp,
481 upstream_port);
482 BT_LOGD("User method returned: status=%s",
d6e69534
PP
483 bt_message_iterator_status_string(iter_status));
484 if (iter_status != BT_MESSAGE_ITERATOR_STATUS_OK) {
d94d92ac 485 BT_LOGW_STR("Initialization method failed.");
d0fea130 486 BT_OBJECT_PUT_REF_AND_RESET(iterator);
d94d92ac
PP
487 goto end;
488 }
489 }
490
d0fea130
PP
491 set_self_comp_port_input_msg_iterator_state(iterator,
492 BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ACTIVE);
d94d92ac 493 g_ptr_array_add(port->connection->iterators, iterator);
d6e69534 494 BT_LIB_LOGD("Created message iterator on self component input port: "
d94d92ac
PP
495 "%![up-port-]+p, %![up-comp-]+c, %![iter-]+i",
496 upstream_port, upstream_comp, iterator);
497
498end:
499 return iterator;
ea8d3e58
JG
500}
501
d6e69534
PP
502void *bt_self_message_iterator_get_data(
503 const struct bt_self_message_iterator *self_iterator)
ea8d3e58 504{
d6e69534 505 struct bt_self_component_port_input_message_iterator *iterator =
d94d92ac 506 (void *) self_iterator;
ea8d3e58 507
d6e69534 508 BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator");
d94d92ac 509 return iterator->user_data;
8738a040 510}
413bc2c4 511
d6e69534
PP
512void bt_self_message_iterator_set_data(
513 struct bt_self_message_iterator *self_iterator, void *data)
5c563278 514{
d6e69534 515 struct bt_self_component_port_input_message_iterator *iterator =
d94d92ac 516 (void *) self_iterator;
5c563278 517
d6e69534 518 BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator");
d94d92ac 519 iterator->user_data = data;
d6e69534 520 BT_LIB_LOGV("Set message iterator's user data: "
d94d92ac 521 "%!+i, user-data-addr=%p", iterator, data);
5c563278
PP
522}
523
f42867e2
PP
524BT_ASSERT_PRE_FUNC
525static inline
d6e69534 526void bt_message_borrow_packet_stream(const struct bt_message *msg,
40f4ba76
PP
527 const struct bt_stream **stream,
528 const struct bt_packet **packet)
fa054faf 529{
d6e69534 530 BT_ASSERT(msg);
fa054faf 531
d6e69534
PP
532 switch (msg->type) {
533 case BT_MESSAGE_TYPE_EVENT:
40f4ba76 534 *packet = bt_event_borrow_packet_const(
d6e69534 535 bt_message_event_borrow_event_const(msg));
40f4ba76 536 *stream = bt_packet_borrow_stream_const(*packet);
fa054faf 537 break;
d6e69534
PP
538 case BT_MESSAGE_TYPE_STREAM_BEGINNING:
539 *stream = bt_message_stream_beginning_borrow_stream_const(msg);
fa054faf 540 break;
d6e69534
PP
541 case BT_MESSAGE_TYPE_STREAM_END:
542 *stream = bt_message_stream_end_borrow_stream_const(msg);
fa054faf 543 break;
d6e69534
PP
544 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
545 *packet = bt_message_packet_beginning_borrow_packet_const(msg);
40f4ba76 546 *stream = bt_packet_borrow_stream_const(*packet);
fa054faf 547 break;
d6e69534
PP
548 case BT_MESSAGE_TYPE_PACKET_END:
549 *packet = bt_message_packet_end_borrow_packet_const(msg);
40f4ba76 550 *stream = bt_packet_borrow_stream_const(*packet);
2ec84d26 551 break;
fa054faf 552 default:
f42867e2 553 break;
fa054faf 554 }
fa054faf
PP
555}
556
d6e69534
PP
557enum bt_message_iterator_status
558bt_self_component_port_input_message_iterator_next(
559 struct bt_self_component_port_input_message_iterator *iterator,
560 bt_message_array_const *msgs, uint64_t *user_count)
3230ee6b 561{
d6e69534 562 int status = BT_MESSAGE_ITERATOR_STATUS_OK;
d94d92ac 563
d6e69534
PP
564 BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator");
565 BT_ASSERT_PRE_NON_NULL(msgs, "Message array (output)");
566 BT_ASSERT_PRE_NON_NULL(user_count, "Message count (output)");
f42867e2 567 BT_ASSERT_PRE(iterator->state ==
d6e69534
PP
568 BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ACTIVE,
569 "Message iterator's \"next\" called, but "
7474e7d3 570 "message iterator is in the wrong state: %!+i", iterator);
f42867e2
PP
571 BT_ASSERT(iterator->upstream_component);
572 BT_ASSERT(iterator->upstream_component->class);
4725a201
PP
573 BT_ASSERT_PRE(bt_component_borrow_graph(iterator->upstream_component)->is_configured,
574 "Graph is not configured: %!+g",
575 bt_component_borrow_graph(iterator->upstream_component));
d94d92ac 576 BT_LIB_LOGD("Getting next self component input port "
d6e69534 577 "message iterator's messages: %!+i", iterator);
d3eb6e8f 578
3230ee6b 579 /*
d6e69534 580 * Call the user's "next" method to get the next messages
fa054faf 581 * and status.
3230ee6b 582 */
7474e7d3 583 BT_ASSERT(iterator->methods.next);
f42867e2 584 BT_LOGD_STR("Calling user's \"next\" method.");
7474e7d3
PP
585 status = iterator->methods.next(iterator,
586 (void *) iterator->base.msgs->pdata, MSG_BATCH_SIZE,
587 user_count);
f42867e2 588 BT_LOGD("User method returned: status=%s",
d6e69534 589 bt_message_iterator_status_string(status));
d4393e08 590 if (status < 0) {
f42867e2 591 BT_LOGW_STR("User method failed.");
f42867e2
PP
592 goto end;
593 }
3230ee6b 594
d0fea130
PP
595#ifdef BT_DEV_MODE
596 /*
597 * There is no way that this iterator could have been finalized
598 * during its "next" method, as the only way to do this is to
599 * put the last iterator's reference, and this can only be done
600 * by its downstream owner.
7474e7d3
PP
601 *
602 * For the same reason, there is no way that this iterator could
603 * have seeked (cannot seek a self message iterator).
d0fea130
PP
604 */
605 BT_ASSERT(iterator->state ==
606 BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ACTIVE);
607#endif
8cf27cc5 608
d4393e08 609 switch (status) {
d6e69534 610 case BT_MESSAGE_ITERATOR_STATUS_OK:
7474e7d3
PP
611 BT_ASSERT_PRE(*user_count <= MSG_BATCH_SIZE,
612 "Invalid returned message count: greater than "
613 "batch size: count=%" PRIu64 ", batch-size=%u",
614 *user_count, MSG_BATCH_SIZE);
d6e69534 615 *msgs = (void *) iterator->base.msgs->pdata;
d4393e08 616 break;
d6e69534 617 case BT_MESSAGE_ITERATOR_STATUS_AGAIN:
d4393e08 618 goto end;
d6e69534 619 case BT_MESSAGE_ITERATOR_STATUS_END:
d0fea130
PP
620 set_self_comp_port_input_msg_iterator_state(iterator,
621 BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ENDED);
f42867e2 622 goto end;
f42867e2
PP
623 default:
624 /* Unknown non-error status */
625 abort();
41a2b7ae
PP
626 }
627
628end:
3230ee6b
PP
629 return status;
630}
631
d0fea130 632enum bt_message_iterator_status bt_port_output_message_iterator_next(
d6e69534
PP
633 struct bt_port_output_message_iterator *iterator,
634 bt_message_array_const *msgs_to_user,
d4393e08 635 uint64_t *count_to_user)
3230ee6b 636{
d6e69534 637 enum bt_message_iterator_status status;
07245ac2 638 enum bt_graph_status graph_status;
3230ee6b 639
d6e69534
PP
640 BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator");
641 BT_ASSERT_PRE_NON_NULL(msgs_to_user, "Message array (output)");
642 BT_ASSERT_PRE_NON_NULL(count_to_user, "Message count (output)");
643 BT_LIB_LOGD("Getting next output port message iterator's messages: "
07245ac2 644 "%!+i", iterator);
d4393e08 645
4725a201
PP
646 /*
647 * As soon as the user calls this function, we mark the graph as
648 * being definitely configured.
649 */
650 bt_graph_set_is_configured(iterator->graph, true);
651
d94d92ac
PP
652 graph_status = bt_graph_consume_sink_no_check(iterator->graph,
653 iterator->colander);
07245ac2
PP
654 switch (graph_status) {
655 case BT_GRAPH_STATUS_CANCELED:
07245ac2 656 case BT_GRAPH_STATUS_AGAIN:
07245ac2 657 case BT_GRAPH_STATUS_END:
07245ac2 658 case BT_GRAPH_STATUS_NOMEM:
d94d92ac 659 status = (int) graph_status;
07245ac2
PP
660 break;
661 case BT_GRAPH_STATUS_OK:
d6e69534 662 status = BT_MESSAGE_ITERATOR_STATUS_OK;
d4393e08
PP
663
664 /*
d6e69534 665 * On success, the colander sink moves the messages
d4393e08 666 * to this iterator's array and sets this iterator's
d6e69534 667 * message count: move them to the user.
d4393e08 668 */
d6e69534 669 *msgs_to_user = (void *) iterator->base.msgs->pdata;
d94d92ac 670 *count_to_user = iterator->count;
90157d89 671 break;
90157d89 672 default:
07245ac2 673 /* Other errors */
d6e69534 674 status = BT_MESSAGE_ITERATOR_STATUS_ERROR;
90157d89 675 }
3230ee6b 676
3230ee6b 677 return status;
53d45b87
JG
678}
679
7474e7d3
PP
680struct bt_component *
681bt_self_component_port_input_message_iterator_borrow_component(
d6e69534 682 struct bt_self_component_port_input_message_iterator *iterator)
d94d92ac 683{
d6e69534 684 BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator");
d94d92ac
PP
685 return iterator->upstream_component;
686}
687
7474e7d3
PP
688const struct bt_component *
689bt_self_component_port_input_message_iterator_borrow_component_const(
690 const struct bt_self_component_port_input_message_iterator *iterator)
691{
692 BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator");
693 return iterator->upstream_component;
694}
695
d6e69534
PP
696struct bt_self_component *bt_self_message_iterator_borrow_component(
697 struct bt_self_message_iterator *self_iterator)
413bc2c4 698{
d6e69534 699 struct bt_self_component_port_input_message_iterator *iterator =
d94d92ac 700 (void *) self_iterator;
90157d89 701
d6e69534 702 BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator");
d94d92ac 703 return (void *) iterator->upstream_component;
413bc2c4
JG
704}
705
d6e69534
PP
706struct bt_self_port_output *bt_self_message_iterator_borrow_port(
707 struct bt_self_message_iterator *self_iterator)
91457551 708{
d6e69534 709 struct bt_self_component_port_input_message_iterator *iterator =
d94d92ac
PP
710 (void *) self_iterator;
711
d6e69534 712 BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator");
d94d92ac 713 return (void *) iterator->upstream_port;
91457551 714}
8ed535b5
PP
715
716static
d6e69534 717void bt_port_output_message_iterator_destroy(struct bt_object *obj)
8ed535b5 718{
d6e69534 719 struct bt_port_output_message_iterator *iterator = (void *) obj;
8ed535b5 720
d6e69534 721 BT_LIB_LOGD("Destroying output port message iterator object: %!+i",
8ed535b5
PP
722 iterator);
723 BT_LOGD_STR("Putting graph.");
d94d92ac 724 BT_OBJECT_PUT_REF_AND_RESET(iterator->graph);
8ed535b5 725 BT_LOGD_STR("Putting colander sink component.");
d94d92ac 726 BT_OBJECT_PUT_REF_AND_RESET(iterator->colander);
d6e69534 727 destroy_base_message_iterator(obj);
8ed535b5
PP
728}
729
d6e69534 730struct bt_port_output_message_iterator *
7474e7d3 731bt_port_output_message_iterator_create(struct bt_graph *graph,
0d72b8c3 732 const struct bt_port_output *output_port)
8ed535b5 733{
d6e69534 734 struct bt_port_output_message_iterator *iterator = NULL;
d94d92ac 735 struct bt_component_class_sink *colander_comp_cls = NULL;
8ed535b5 736 struct bt_component *output_port_comp = NULL;
d94d92ac 737 struct bt_component_sink *colander_comp;
8ed535b5 738 enum bt_graph_status graph_status;
d94d92ac 739 struct bt_port_input *colander_in_port = NULL;
8ed535b5 740 struct bt_component_class_sink_colander_data colander_data;
d4393e08 741 int ret;
8ed535b5 742
d94d92ac 743 BT_ASSERT_PRE_NON_NULL(graph, "Graph");
f42867e2 744 BT_ASSERT_PRE_NON_NULL(output_port, "Output port");
0d72b8c3
PP
745 output_port_comp = bt_port_borrow_component_inline(
746 (const void *) output_port);
f42867e2
PP
747 BT_ASSERT_PRE(output_port_comp,
748 "Output port has no component: %!+p", output_port);
d94d92ac
PP
749 BT_ASSERT_PRE(bt_component_borrow_graph(output_port_comp) ==
750 (void *) graph,
751 "Output port is not part of graph: %![graph-]+g, %![port-]+p",
752 graph, output_port);
8ed535b5 753
d6e69534
PP
754 /* Create message iterator */
755 BT_LIB_LOGD("Creating message iterator on output port: "
d94d92ac 756 "%![port-]+p, %![comp-]+c", output_port, output_port_comp);
d6e69534 757 iterator = g_new0(struct bt_port_output_message_iterator, 1);
8ed535b5 758 if (!iterator) {
d6e69534 759 BT_LOGE_STR("Failed to allocate one output port message iterator.");
8ed535b5
PP
760 goto error;
761 }
762
d6e69534
PP
763 ret = init_message_iterator((void *) iterator,
764 BT_MESSAGE_ITERATOR_TYPE_PORT_OUTPUT,
765 bt_port_output_message_iterator_destroy);
d4393e08 766 if (ret) {
d6e69534 767 /* init_message_iterator() logs errors */
65300d60 768 BT_OBJECT_PUT_REF_AND_RESET(iterator);
d4393e08
PP
769 goto end;
770 }
8ed535b5
PP
771
772 /* Create colander component */
773 colander_comp_cls = bt_component_class_sink_colander_get();
774 if (!colander_comp_cls) {
775 BT_LOGW("Cannot get colander sink component class.");
776 goto error;
777 }
778
398454ed
PP
779 iterator->graph = graph;
780 bt_object_get_no_null_check(iterator->graph);
d6e69534 781 colander_data.msgs = (void *) iterator->base.msgs->pdata;
d4393e08 782 colander_data.count_addr = &iterator->count;
5fd91d88
PP
783
784 /* Hope that nobody uses this very unique name */
d94d92ac 785 graph_status =
0d72b8c3 786 bt_graph_add_sink_component_with_init_method_data(
5fd91d88
PP
787 (void *) graph, colander_comp_cls,
788 "colander-36ac3409-b1a8-4d60-ab1f-4fdf341a8fb1",
0d72b8c3 789 NULL, &colander_data, (void *) &iterator->colander);
8ed535b5 790 if (graph_status != BT_GRAPH_STATUS_OK) {
d94d92ac
PP
791 BT_LIB_LOGW("Cannot add colander sink component to graph: "
792 "%1[graph-]+g, status=%s", graph,
8ed535b5
PP
793 bt_graph_status_string(graph_status));
794 goto error;
795 }
796
797 /*
798 * Connect provided output port to the colander component's
799 * input port.
800 */
0d72b8c3
PP
801 colander_in_port =
802 (void *) bt_component_sink_borrow_input_port_by_index_const(
803 (void *) iterator->colander, 0);
f6ccaed9 804 BT_ASSERT(colander_in_port);
0d72b8c3 805 graph_status = bt_graph_connect_ports(graph,
8ed535b5
PP
806 output_port, colander_in_port, NULL);
807 if (graph_status != BT_GRAPH_STATUS_OK) {
d94d92ac
PP
808 BT_LIB_LOGW("Cannot add colander sink component to graph: "
809 "%![graph-]+g, %![comp-]+c, status=%s", graph,
810 iterator->colander,
8ed535b5
PP
811 bt_graph_status_string(graph_status));
812 goto error;
813 }
814
815 /*
816 * At this point everything went fine. Make the graph
d6e69534 817 * nonconsumable forever so that only this message iterator
8ed535b5 818 * can consume (thanks to bt_graph_consume_sink_no_check()).
d6e69534
PP
819 * This avoids leaking the message created by the colander
820 * sink and moved to the message iterator's message
07245ac2 821 * member.
8ed535b5 822 */
d94d92ac 823 bt_graph_set_can_consume(iterator->graph, false);
8ed535b5
PP
824 goto end;
825
826error:
827 if (iterator && iterator->graph && iterator->colander) {
828 int ret;
829
830 /* Remove created colander component from graph if any */
831 colander_comp = iterator->colander;
65300d60 832 BT_OBJECT_PUT_REF_AND_RESET(iterator->colander);
8ed535b5
PP
833
834 /*
835 * At this point the colander component's reference
836 * count is 0 because iterator->colander was the only
837 * owner. We also know that it is not connected because
838 * this is the last operation before this function
839 * succeeds.
840 *
841 * Since we honor the preconditions here,
842 * bt_graph_remove_unconnected_component() always
843 * succeeds.
844 */
845 ret = bt_graph_remove_unconnected_component(iterator->graph,
d94d92ac 846 (void *) colander_comp);
f6ccaed9 847 BT_ASSERT(ret == 0);
8ed535b5
PP
848 }
849
65300d60 850 BT_OBJECT_PUT_REF_AND_RESET(iterator);
8ed535b5
PP
851
852end:
65300d60 853 bt_object_put_ref(colander_comp_cls);
8ed535b5
PP
854 return (void *) iterator;
855}
c5b9b441 856
7474e7d3
PP
857bt_bool bt_self_component_port_input_message_iterator_can_seek_ns_from_origin(
858 struct bt_self_component_port_input_message_iterator *iterator,
859 int64_t ns_from_origin)
860{
861 bt_bool can = BT_FALSE;
862
863 BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator");
864 BT_ASSERT_PRE_ITER_HAS_STATE_TO_SEEK(iterator);
865 BT_ASSERT_PRE(bt_component_borrow_graph(iterator->upstream_component)->is_configured,
866 "Graph is not configured: %!+g",
867 bt_component_borrow_graph(iterator->upstream_component));
868
869 if (iterator->methods.can_seek_ns_from_origin) {
870 can = iterator->methods.can_seek_ns_from_origin(iterator,
871 ns_from_origin);
872 goto end;
873 }
874
875 /*
876 * Automatic seeking fall back: if we can seek to the beginning,
877 * then we can automatically seek to any message.
878 */
879 if (iterator->methods.can_seek_beginning) {
880 can = iterator->methods.can_seek_beginning(iterator);
881 }
882
883end:
884 return can;
885}
886
887bt_bool bt_self_component_port_input_message_iterator_can_seek_beginning(
888 struct bt_self_component_port_input_message_iterator *iterator)
889{
890 bt_bool can = BT_FALSE;
891
892 BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator");
893 BT_ASSERT_PRE_ITER_HAS_STATE_TO_SEEK(iterator);
894 BT_ASSERT_PRE(bt_component_borrow_graph(iterator->upstream_component)->is_configured,
895 "Graph is not configured: %!+g",
896 bt_component_borrow_graph(iterator->upstream_component));
897
898 if (iterator->methods.can_seek_beginning) {
899 can = iterator->methods.can_seek_beginning(iterator);
900 }
901
902 return can;
903}
904
905static inline
906void _set_iterator_state_after_seeking(
907 struct bt_self_component_port_input_message_iterator *iterator,
908 enum bt_message_iterator_status status)
909{
910 enum bt_self_component_port_input_message_iterator_state new_state = 0;
911
912 /* Set iterator's state depending on seeking status */
913 switch (status) {
914 case BT_MESSAGE_ITERATOR_STATUS_OK:
915 new_state = BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ACTIVE;
916 break;
917 case BT_MESSAGE_ITERATOR_STATUS_AGAIN:
918 new_state = BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_LAST_SEEKING_RETURNED_AGAIN;
919 break;
920 case BT_MESSAGE_ITERATOR_STATUS_ERROR:
921 case BT_MESSAGE_ITERATOR_STATUS_NOMEM:
922 new_state = BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_LAST_SEEKING_RETURNED_ERROR;
923 break;
924 case BT_MESSAGE_ITERATOR_STATUS_END:
925 new_state = BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ENDED;
926 break;
927 default:
928 abort();
929 }
930
931 set_self_comp_port_input_msg_iterator_state(iterator, new_state);
932}
933
934#ifdef BT_DEV_MODE
935# define set_iterator_state_after_seeking _set_iterator_state_after_seeking
936#else
937# define set_iterator_state_after_seeking(_iter, _status)
938#endif
939
940enum bt_message_iterator_status
941bt_self_component_port_input_message_iterator_seek_beginning(
942 struct bt_self_component_port_input_message_iterator *iterator)
943{
944 int status;
945
946 BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator");
947 BT_ASSERT_PRE_ITER_HAS_STATE_TO_SEEK(iterator);
948 BT_ASSERT_PRE(bt_component_borrow_graph(iterator->upstream_component)->is_configured,
949 "Graph is not configured: %!+g",
950 bt_component_borrow_graph(iterator->upstream_component));
951 BT_ASSERT_PRE(
952 bt_self_component_port_input_message_iterator_can_seek_beginning(
953 iterator),
954 "Message iterator cannot seek beginning: %!+i", iterator);
955 BT_LIB_LOGD("Calling user's \"seek beginning\" method: %!+i", iterator);
956 set_self_comp_port_input_msg_iterator_state(iterator,
957 BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_SEEKING);
958 status = iterator->methods.seek_beginning(iterator);
959 BT_LOGD("User method returned: status=%s",
960 bt_message_iterator_status_string(status));
961 BT_ASSERT_PRE(status == BT_MESSAGE_ITERATOR_STATUS_OK ||
962 status == BT_MESSAGE_ITERATOR_STATUS_ERROR ||
963 status == BT_MESSAGE_ITERATOR_STATUS_NOMEM ||
964 status == BT_MESSAGE_ITERATOR_STATUS_AGAIN,
965 "Unexpected status: %![iter-]+i, status=%s",
966 iterator, bt_self_message_iterator_status_string(status));
967 set_iterator_state_after_seeking(iterator, status);
968 return status;
969}
970
971static inline
972int get_message_ns_from_origin(const struct bt_message *msg,
973 int64_t *ns_from_origin, bool *ignore)
974{
975 const struct bt_clock_snapshot *clk_snapshot = NULL;
976 int ret = 0;
977
978 switch (msg->type) {
979 case BT_MESSAGE_TYPE_EVENT:
980 {
981 const struct bt_message_event *event_msg =
982 (const void *) msg;
983
984 clk_snapshot = event_msg->event->default_cs;
985 BT_ASSERT_PRE(clk_snapshot,
986 "Event has no default clock snapshot: %!+e",
987 event_msg->event);
988 break;
989 }
990 case BT_MESSAGE_TYPE_INACTIVITY:
991 {
992 const struct bt_message_inactivity *inactivity_msg =
993 (const void *) msg;
994
995 BT_ASSERT(inactivity_msg->default_cs);
996 clk_snapshot = inactivity_msg->default_cs;
997 break;
998 }
999 case BT_MESSAGE_TYPE_STREAM_BEGINNING:
1000 case BT_MESSAGE_TYPE_STREAM_END:
1001 /* Ignore */
1002 goto end;
1003 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
1004 {
1005 const struct bt_message_packet_beginning *pkt_msg =
1006 (const void *) msg;
1007
1008 clk_snapshot = pkt_msg->packet->default_beginning_cs;
1009 break;
1010 }
1011 case BT_MESSAGE_TYPE_PACKET_END:
1012 {
1013 const struct bt_message_packet_end *pkt_msg =
1014 (const void *) msg;
1015
1016 clk_snapshot = pkt_msg->packet->default_end_cs;
1017 break;
1018 }
1019 default:
1020 abort();
1021 }
1022
1023 if (!clk_snapshot) {
1024 *ignore = true;
1025 goto end;
1026 }
1027
1028 ret = bt_clock_snapshot_get_ns_from_origin(clk_snapshot,
1029 ns_from_origin);
1030
1031end:
1032 return ret;
1033}
1034
1035static
1036enum bt_message_iterator_status find_message_ge_ns_from_origin(
1037 struct bt_self_component_port_input_message_iterator *iterator,
1038 int64_t ns_from_origin)
1039{
1040 int status;
1041 enum bt_self_component_port_input_message_iterator_state init_state =
1042 iterator->state;
1043 const struct bt_message *messages[MSG_BATCH_SIZE];
1044 uint64_t user_count = 0;
1045 uint64_t i;
1046
1047 BT_ASSERT(iterator);
1048 memset(&messages[0], 0, sizeof(messages[0]) * MSG_BATCH_SIZE);
1049
1050 /*
1051 * Make this iterator temporarily active (not seeking) to call
1052 * the "next" method.
1053 */
1054 set_self_comp_port_input_msg_iterator_state(iterator,
1055 BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ACTIVE);
1056
1057 BT_ASSERT(iterator->methods.next);
1058
1059 while (true) {
1060 /*
1061 * Call the user's "next" method to get the next
1062 * messages and status.
1063 */
1064 BT_LOGD_STR("Calling user's \"next\" method.");
1065 status = iterator->methods.next(iterator,
1066 &messages[0], MSG_BATCH_SIZE, &user_count);
1067 BT_LOGD("User method returned: status=%s",
1068 bt_message_iterator_status_string(status));
1069
1070#ifdef BT_DEV_MODE
1071 /*
1072 * The user's "next" method must not do any action which
1073 * would change the iterator's state.
1074 */
1075 BT_ASSERT(iterator->state ==
1076 BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ACTIVE);
1077#endif
1078
1079 switch (status) {
1080 case BT_MESSAGE_ITERATOR_STATUS_OK:
1081 BT_ASSERT_PRE(user_count <= MSG_BATCH_SIZE,
1082 "Invalid returned message count: greater than "
1083 "batch size: count=%" PRIu64 ", batch-size=%u",
1084 user_count, MSG_BATCH_SIZE);
1085 break;
1086 case BT_MESSAGE_ITERATOR_STATUS_AGAIN:
1087 case BT_MESSAGE_ITERATOR_STATUS_ERROR:
1088 case BT_MESSAGE_ITERATOR_STATUS_NOMEM:
1089 case BT_MESSAGE_ITERATOR_STATUS_END:
1090 goto end;
1091 default:
1092 abort();
1093 }
1094
1095 /*
1096 * Find first message which has a default clock snapshot
1097 * that is greater than or equal to the requested value.
1098 *
1099 * For event and inactivity messages, compare with the
1100 * default clock snapshot.
1101 *
1102 * For packet beginning messages, compare with the
1103 * default beginning clock snapshot, if any.
1104 *
1105 * For packet end messages, compare with the default end
1106 * clock snapshot, if any.
1107 *
1108 * For stream beginning, stream end, ignore.
1109 */
1110 for (i = 0; i < user_count; i++) {
1111 const struct bt_message *msg = messages[i];
1112 int64_t msg_ns_from_origin;
1113 bool ignore = false;
1114 int ret;
1115
1116 BT_ASSERT(msg);
1117 ret = get_message_ns_from_origin(msg, &msg_ns_from_origin,
1118 &ignore);
1119 if (ret) {
1120 status = BT_MESSAGE_ITERATOR_STATUS_ERROR;
1121 goto end;
1122 }
1123
1124 if (ignore) {
1125 /* Skip message without a clock snapshot */
1126 continue;
1127 }
1128
1129 if (msg_ns_from_origin >= ns_from_origin) {
1130 /*
1131 * We found it: move this message and
1132 * the following ones to the iterator's
1133 * auto-seek message array.
1134 */
1135 uint64_t j;
1136
1137 for (j = i; j < user_count; j++) {
1138 iterator->auto_seek_msgs->pdata[j - i] =
1139 (void *) messages[j];
1140 messages[j] = NULL;
1141 }
1142
1143 iterator->auto_seek_msg_count = user_count - i;
1144 goto end;
1145 }
1146
1147 bt_object_put_no_null_check(msg);
1148 messages[i] = NULL;
1149 }
1150 }
1151
1152end:
1153 for (i = 0; i < user_count; i++) {
1154 if (messages[i]) {
1155 bt_object_put_no_null_check(messages[i]);
1156 }
1157 }
1158
1159 set_self_comp_port_input_msg_iterator_state(iterator, init_state);
1160 return status;
1161}
1162
1163static
1164enum bt_self_message_iterator_status post_auto_seek_next(
1165 struct bt_self_component_port_input_message_iterator *iterator,
1166 bt_message_array_const msgs, uint64_t capacity,
1167 uint64_t *count)
1168{
1169 BT_ASSERT(iterator->auto_seek_msg_count <= capacity);
1170 BT_ASSERT(iterator->auto_seek_msg_count > 0);
1171
1172 /*
1173 * Move auto-seek messages to the output array (which is this
1174 * iterator's base message array.
1175 */
1176 memcpy(&msgs[0], &iterator->auto_seek_msgs->pdata[0],
1177 sizeof(msgs[0]) * iterator->auto_seek_msg_count);
1178 memset(&iterator->auto_seek_msgs->pdata[0], 0,
1179 sizeof(iterator->auto_seek_msgs->pdata[0]) *
1180 iterator->auto_seek_msg_count);
1181 *count = iterator->auto_seek_msg_count;
1182
1183 /* Restore real user's "next" method */
1184 switch (iterator->upstream_component->class->type) {
1185 case BT_COMPONENT_CLASS_TYPE_SOURCE:
1186 {
1187 struct bt_component_class_source *src_comp_cls =
1188 (void *) iterator->upstream_component->class;
1189
1190 iterator->methods.next =
1191 (bt_self_component_port_input_message_iterator_next_method)
1192 src_comp_cls->methods.msg_iter_next;
1193 break;
1194 }
1195 case BT_COMPONENT_CLASS_TYPE_FILTER:
1196 {
1197 struct bt_component_class_filter *flt_comp_cls =
1198 (void *) iterator->upstream_component->class;
1199
1200 iterator->methods.next =
1201 (bt_self_component_port_input_message_iterator_next_method)
1202 flt_comp_cls->methods.msg_iter_next;
1203 break;
1204 }
1205 default:
1206 abort();
1207 }
1208
1209 return BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1210}
1211
1212enum bt_message_iterator_status
1213bt_self_component_port_input_message_iterator_seek_ns_from_origin(
1214 struct bt_self_component_port_input_message_iterator *iterator,
1215 int64_t ns_from_origin)
1216{
1217 int status;
1218
1219 BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator");
1220 BT_ASSERT_PRE_ITER_HAS_STATE_TO_SEEK(iterator);
1221 BT_ASSERT_PRE(bt_component_borrow_graph(iterator->upstream_component)->is_configured,
1222 "Graph is not configured: %!+g",
1223 bt_component_borrow_graph(iterator->upstream_component));
1224 BT_ASSERT_PRE(
1225 bt_self_component_port_input_message_iterator_can_seek_ns_from_origin(
1226 iterator, ns_from_origin),
1227 "Message iterator cannot seek nanoseconds from origin: %!+i, "
1228 "ns-from-origin=%" PRId64, iterator, ns_from_origin);
1229 set_self_comp_port_input_msg_iterator_state(iterator,
1230 BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_SEEKING);
1231
1232 if (iterator->methods.seek_ns_from_origin) {
1233 BT_LIB_LOGD("Calling user's \"seek nanoseconds from origin\" method: "
1234 "%![iter-]+i, ns=%" PRId64, iterator, ns_from_origin);
1235 status = iterator->methods.seek_ns_from_origin(iterator,
1236 ns_from_origin);
1237 BT_LOGD("User method returned: status=%s",
1238 bt_message_iterator_status_string(status));
1239 BT_ASSERT_PRE(status == BT_MESSAGE_ITERATOR_STATUS_OK ||
1240 status == BT_MESSAGE_ITERATOR_STATUS_ERROR ||
1241 status == BT_MESSAGE_ITERATOR_STATUS_NOMEM ||
1242 status == BT_MESSAGE_ITERATOR_STATUS_AGAIN,
1243 "Unexpected status: %![iter-]+i, status=%s",
1244 iterator,
1245 bt_self_message_iterator_status_string(status));
1246 } else {
1247 /* Start automatic seeking: seek beginning first */
1248 BT_ASSERT(iterator->methods.can_seek_beginning(iterator));
1249 BT_ASSERT(iterator->methods.seek_beginning);
1250 BT_LIB_LOGD("Calling user's \"seek beginning\" method: %!+i",
1251 iterator);
1252 status = iterator->methods.seek_beginning(iterator);
1253 BT_LOGD("User method returned: status=%s",
1254 bt_message_iterator_status_string(status));
1255 BT_ASSERT_PRE(status == BT_MESSAGE_ITERATOR_STATUS_OK ||
1256 status == BT_MESSAGE_ITERATOR_STATUS_ERROR ||
1257 status == BT_MESSAGE_ITERATOR_STATUS_NOMEM ||
1258 status == BT_MESSAGE_ITERATOR_STATUS_AGAIN,
1259 "Unexpected status: %![iter-]+i, status=%s",
1260 iterator,
1261 bt_self_message_iterator_status_string(status));
1262 switch (status) {
1263 case BT_MESSAGE_ITERATOR_STATUS_OK:
1264 break;
1265 case BT_MESSAGE_ITERATOR_STATUS_ERROR:
1266 case BT_MESSAGE_ITERATOR_STATUS_NOMEM:
1267 case BT_MESSAGE_ITERATOR_STATUS_AGAIN:
1268 goto end;
1269 default:
1270 abort();
1271 }
1272
1273 /*
1274 * Find the first message which has a default clock
1275 * snapshot greater than or equal to the requested
1276 * nanoseconds from origin, and move the received
1277 * messages from this point in the batch to this
1278 * iterator's auto-seek message array.
1279 */
1280 status = find_message_ge_ns_from_origin(iterator,
1281 ns_from_origin);
1282 switch (status) {
1283 case BT_MESSAGE_ITERATOR_STATUS_OK:
1284 /*
1285 * Replace the user's "next" method with a
1286 * custom, temporary "next" method which returns
1287 * the messages in the iterator's message array.
1288 */
1289 iterator->methods.next =
1290 (bt_self_component_port_input_message_iterator_next_method)
1291 post_auto_seek_next;
1292 break;
1293 case BT_MESSAGE_ITERATOR_STATUS_ERROR:
1294 case BT_MESSAGE_ITERATOR_STATUS_NOMEM:
1295 case BT_MESSAGE_ITERATOR_STATUS_AGAIN:
1296 goto end;
1297 case BT_MESSAGE_ITERATOR_STATUS_END:
1298 /*
1299 * The iterator reached the end: just return
1300 * `BT_MESSAGE_ITERATOR_STATUS_OK` here, as if
1301 * the seeking operation occured: the next
1302 * "next" method will return
1303 * `BT_MESSAGE_ITERATOR_STATUS_END` itself.
1304 */
1305 break;
1306 default:
1307 abort();
1308 }
1309 }
1310
1311end:
1312 set_iterator_state_after_seeking(iterator, status);
1313
1314 if (status == BT_MESSAGE_ITERATOR_STATUS_END) {
1315 status = BT_MESSAGE_ITERATOR_STATUS_OK;
1316 }
1317
1318 return status;
1319}
1320
1321static inline
1322bt_self_component_port_input_message_iterator *
1323borrow_output_port_message_iterator_upstream_iterator(
1324 struct bt_port_output_message_iterator *iterator)
1325{
1326 struct bt_component_class_sink_colander_priv_data *colander_data;
1327
1328 BT_ASSERT(iterator);
1329 colander_data = (void *) iterator->colander->parent.user_data;
1330 BT_ASSERT(colander_data);
1331 BT_ASSERT(colander_data->msg_iter);
1332 return colander_data->msg_iter;
1333}
1334
1335bt_bool bt_port_output_message_iterator_can_seek_ns_from_origin(
1336 struct bt_port_output_message_iterator *iterator,
1337 int64_t ns_from_origin)
1338{
1339 BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator");
1340 return bt_self_component_port_input_message_iterator_can_seek_ns_from_origin(
1341 borrow_output_port_message_iterator_upstream_iterator(
1342 iterator), ns_from_origin);
1343}
1344
1345bt_bool bt_port_output_message_iterator_can_seek_beginning(
1346 struct bt_port_output_message_iterator *iterator)
1347{
1348 BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator");
1349 return bt_self_component_port_input_message_iterator_can_seek_beginning(
1350 borrow_output_port_message_iterator_upstream_iterator(
1351 iterator));
1352}
1353
1354enum bt_message_iterator_status bt_port_output_message_iterator_seek_ns_from_origin(
1355 struct bt_port_output_message_iterator *iterator,
1356 int64_t ns_from_origin)
1357{
1358 BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator");
1359 return bt_self_component_port_input_message_iterator_seek_ns_from_origin(
1360 borrow_output_port_message_iterator_upstream_iterator(iterator),
1361 ns_from_origin);
1362}
1363
1364enum bt_message_iterator_status bt_port_output_message_iterator_seek_beginning(
1365 struct bt_port_output_message_iterator *iterator)
1366{
1367 BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator");
1368 return bt_self_component_port_input_message_iterator_seek_beginning(
1369 borrow_output_port_message_iterator_upstream_iterator(
1370 iterator));
1371}
1372
d6e69534
PP
1373void bt_port_output_message_iterator_get_ref(
1374 const struct bt_port_output_message_iterator *iterator)
c5b9b441
PP
1375{
1376 bt_object_get_ref(iterator);
1377}
1378
d6e69534
PP
1379void bt_port_output_message_iterator_put_ref(
1380 const struct bt_port_output_message_iterator *iterator)
c5b9b441
PP
1381{
1382 bt_object_put_ref(iterator);
1383}
1384
d6e69534
PP
1385void bt_self_component_port_input_message_iterator_get_ref(
1386 const struct bt_self_component_port_input_message_iterator *iterator)
c5b9b441
PP
1387{
1388 bt_object_get_ref(iterator);
1389}
1390
d6e69534
PP
1391void bt_self_component_port_input_message_iterator_put_ref(
1392 const struct bt_self_component_port_input_message_iterator *iterator)
c5b9b441
PP
1393{
1394 bt_object_put_ref(iterator);
1395}
This page took 0.172021 seconds and 4 git commands to generate.