lib: remove unused port_connection_iterators_are_finalized() function
[babeltrace.git] / plugins / utils / trimmer / trimmer.c
CommitLineData
cab3f160 1/*
cab3f160 2 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
7de0e49a 3 * Copyright 2019 Philippe Proulx <pproulx@efficios.com>
cab3f160
JG
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a copy
6 * of this software and associated documentation files (the "Software"), to deal
7 * in the Software without restriction, including without limitation the rights
8 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 * copies of the Software, and to permit persons to whom the Software is
10 * furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21 * SOFTWARE.
22 */
23
b4565e8b
PP
24#define BT_LOG_TAG "PLUGIN-UTILS-TRIMMER-FLT"
25#include "logging.h"
26
3d2f08e7 27#include <babeltrace/compat/utc-internal.h>
7de0e49a 28#include <babeltrace/compat/time-internal.h>
9d408fca 29#include <babeltrace/babeltrace.h>
7de0e49a 30#include <babeltrace/common-internal.h>
7d61fa8e 31#include <plugins-common.h>
f6ccaed9 32#include <babeltrace/assert-internal.h>
7de0e49a
PP
33#include <stdint.h>
34#include <inttypes.h>
35#include <glib.h>
36
37#include "trimmer.h"
38
39#define NS_PER_S INT64_C(1000000000)
40
41static const char * const in_port_name = "in";
42
43struct trimmer_time {
44 unsigned int hour, minute, second, ns;
45};
46
47struct trimmer_bound {
48 /*
49 * Nanoseconds from origin, valid if `is_set` is set and
50 * `is_infinite` is false.
51 */
52 int64_t ns_from_origin;
53
54 /* True if this bound's full time (`ns_from_origin`) is set */
55 bool is_set;
56
57 /*
58 * True if this bound represents the infinity (negative or
59 * positive depending on which bound it is). If this is true,
60 * then we don't care about `ns_from_origin` above.
61 */
62 bool is_infinite;
63
64 /*
65 * This bound's time without the date; this time is used to set
66 * `ns_from_origin` once we know the date.
67 */
68 struct trimmer_time time;
69};
70
71struct trimmer_comp {
72 struct trimmer_bound begin, end;
73 bool is_gmt;
74};
75
76enum trimmer_iterator_state {
77 /*
78 * Find the first message's date and set the bounds's times
79 * accordingly.
80 */
81 TRIMMER_ITERATOR_STATE_SET_BOUNDS_NS_FROM_ORIGIN,
82
83 /*
84 * Initially seek to the trimming range's beginning time.
85 */
86 TRIMMER_ITERATOR_STATE_SEEK_INITIALLY,
87
88 /*
89 * Fill the output message queue for as long as received input
90 * messages are within the trimming time range.
91 */
92 TRIMMER_ITERATOR_STATE_TRIM,
93
94 /* Flush the remaining messages in the output message queue */
95 TRIMMER_ITERATOR_STATE_ENDING,
96
97 /* Trimming operation and message iterator is ended */
98 TRIMMER_ITERATOR_STATE_ENDED,
99};
100
101struct trimmer_iterator {
102 /* Weak */
103 struct trimmer_comp *trimmer_comp;
104
105 /* Weak */
106 bt_self_message_iterator *self_msg_iter;
107
108 enum trimmer_iterator_state state;
109
110 /* Owned by this */
111 bt_self_component_port_input_message_iterator *upstream_iter;
112 struct trimmer_bound begin, end;
113
114 /*
115 * Queue of `const bt_message *` (owned by the queue).
116 *
117 * This is where the trimming operation pushes the messages to
118 * output by this message iterator.
119 */
120 GQueue *output_messages;
121
122 /*
123 * Hash table of `bt_stream *` (weak) to
124 * `struct trimmer_iterator_stream_state *` (owned by the HT).
125 */
126 GHashTable *stream_states;
127};
128
129struct trimmer_iterator_stream_state {
130 /*
131 * True if both stream beginning and initial stream activity
132 * beginning messages were pushed for this stream.
133 */
134 bool inited;
135
136 /*
137 * True if the last pushed message for this stream was a stream
138 * activity end message.
139 */
140 bool last_msg_is_stream_activity_end;
141
142 /*
143 * Time to use for a generated stream end activity message when
144 * ending the stream.
145 */
146 int64_t stream_act_end_ns_from_origin;
147
148 /* Weak */
149 const bt_stream *stream;
150
151 /* Owned by this (`NULL` initially and between packets) */
152 const bt_packet *cur_packet;
153
154 /* Owned by this */
155 const bt_message *stream_beginning_msg;
156};
cab3f160
JG
157
158static
7de0e49a 159void destroy_trimmer_comp(struct trimmer_comp *trimmer_comp)
cab3f160 160{
7de0e49a
PP
161 BT_ASSERT(trimmer_comp);
162 g_free(trimmer_comp);
cab3f160
JG
163}
164
165static
7de0e49a
PP
166struct trimmer_comp *create_trimmer_comp(void)
167{
168 return g_new0(struct trimmer_comp, 1);
169}
170
171BT_HIDDEN
172void trimmer_finalize(bt_self_component_filter *self_comp)
cab3f160 173{
7de0e49a
PP
174 struct trimmer_comp *trimmer_comp =
175 bt_self_component_get_data(
176 bt_self_component_filter_as_self_component(self_comp));
177
178 if (trimmer_comp) {
179 destroy_trimmer_comp(trimmer_comp);
180 }
cab3f160
JG
181}
182
7de0e49a
PP
183/*
184 * Sets the time (in ns from origin) of a trimmer bound from date and
185 * time components.
186 *
187 * Returns a negative value if anything goes wrong.
188 */
189static
190int set_bound_ns_from_origin(struct trimmer_bound *bound,
191 unsigned int year, unsigned int month, unsigned int day,
192 unsigned int hour, unsigned int minute, unsigned int second,
193 unsigned int ns, bool is_gmt)
cab3f160 194{
7de0e49a
PP
195 int ret = 0;
196 time_t result;
197 struct tm tm = {
198 .tm_sec = second,
199 .tm_min = minute,
200 .tm_hour = hour,
201 .tm_mday = day,
202 .tm_mon = month - 1,
203 .tm_year = year - 1900,
204 .tm_isdst = -1,
205 };
206
207 if (is_gmt) {
208 result = bt_timegm(&tm);
209 } else {
210 result = mktime(&tm);
211 }
212
213 if (result < 0) {
214 ret = -1;
215 goto end;
216 }
cab3f160 217
7de0e49a
PP
218 BT_ASSERT(bound);
219 bound->ns_from_origin = (int64_t) result;
220 bound->ns_from_origin *= NS_PER_S;
221 bound->ns_from_origin += ns;
222 bound->is_set = true;
223
224end:
225 return ret;
cab3f160
JG
226}
227
528debdf
MD
228/*
229 * Parses a timestamp, figuring out its format.
230 *
231 * Returns a negative value if anything goes wrong.
232 *
233 * Expected formats:
234 *
7de0e49a
PP
235 * YYYY-MM-DD hh:mm[:ss[.ns]]
236 * [hh:mm:]ss[.ns]
237 * [-]s[.ns]
238 *
239 * TODO: Check overflows.
240 */
241static
242int set_bound_from_str(const char *str, struct trimmer_bound *bound,
243 bool is_gmt)
244{
245 int ret = 0;
246 int s_ret;
247 unsigned int year, month, day, hour, minute, second, ns;
248 char dummy;
249
250 /* Try `YYYY-MM-DD hh:mm:ss.ns` format */
251 s_ret = sscanf(str, "%u-%u-%u %u:%u:%u.%u%c", &year, &month, &day,
252 &hour, &minute, &second, &ns, &dummy);
253 if (s_ret == 7) {
254 ret = set_bound_ns_from_origin(bound, year, month, day,
255 hour, minute, second, ns, is_gmt);
256 goto end;
257 }
258
259 /* Try `YYYY-MM-DD hh:mm:ss` format */
260 s_ret = sscanf(str, "%u-%u-%u %u:%u:%u%c", &year, &month, &day,
261 &hour, &minute, &second, &dummy);
262 if (s_ret == 6) {
263 ret = set_bound_ns_from_origin(bound, year, month, day,
264 hour, minute, second, 0, is_gmt);
265 goto end;
266 }
267
268 /* Try `YYYY-MM-DD hh:mm` format */
269 s_ret = sscanf(str, "%u-%u-%u %u:%u%c", &year, &month, &day,
270 &hour, &minute, &dummy);
271 if (s_ret == 5) {
272 ret = set_bound_ns_from_origin(bound, year, month, day,
273 hour, minute, 0, 0, is_gmt);
274 goto end;
275 }
276
277 /* Try `YYYY-MM-DD` format */
278 s_ret = sscanf(str, "%u-%u-%u%c", &year, &month, &day, &dummy);
279 if (s_ret == 3) {
280 ret = set_bound_ns_from_origin(bound, year, month, day,
281 0, 0, 0, 0, is_gmt);
282 goto end;
283 }
284
285 /* Try `hh:mm:ss.ns` format */
286 s_ret = sscanf(str, "%u:%u:%u.%u%c", &hour, &minute, &second, &ns,
287 &dummy);
288 if (s_ret == 4) {
289 bound->time.hour = hour;
290 bound->time.minute = minute;
291 bound->time.second = second;
292 bound->time.ns = ns;
293 goto end;
294 }
295
296 /* Try `hh:mm:ss` format */
297 s_ret = sscanf(str, "%u:%u:%u%c", &hour, &minute, &second, &dummy);
298 if (s_ret == 3) {
299 bound->time.hour = hour;
300 bound->time.minute = minute;
301 bound->time.second = second;
302 bound->time.ns = 0;
303 goto end;
304 }
305
306 /* Try `-s.ns` format */
307 s_ret = sscanf(str, "-%u.%u%c", &second, &ns, &dummy);
308 if (s_ret == 2) {
309 bound->ns_from_origin = -((int64_t) second) * NS_PER_S;
310 bound->ns_from_origin -= (int64_t) ns;
311 bound->is_set = true;
312 goto end;
313 }
314
315 /* Try `s.ns` format */
316 s_ret = sscanf(str, "%u.%u%c", &second, &ns, &dummy);
317 if (s_ret == 2) {
318 bound->ns_from_origin = ((int64_t) second) * NS_PER_S;
319 bound->ns_from_origin += (int64_t) ns;
320 bound->is_set = true;
321 goto end;
322 }
323
324 /* Try `-s` format */
325 s_ret = sscanf(str, "-%u%c", &second, &dummy);
326 if (s_ret == 1) {
327 bound->ns_from_origin = -((int64_t) second) * NS_PER_S;
328 bound->is_set = true;
329 goto end;
330 }
331
332 /* Try `s` format */
333 s_ret = sscanf(str, "%u%c", &second, &dummy);
334 if (s_ret == 1) {
335 bound->ns_from_origin = (int64_t) second * NS_PER_S;
336 bound->is_set = true;
337 goto end;
338 }
339
340 BT_LOGE("Invalid date/time format: param=\"%s\"", str);
341 ret = -1;
342
343end:
344 return ret;
345}
346
347/*
348 * Sets a trimmer bound's properties from a parameter string/integer
349 * value.
350 *
351 * Returns a negative value if anything goes wrong.
528debdf 352 */
44d3cbf0 353static
7de0e49a
PP
354int set_bound_from_param(const char *param_name, const bt_value *param,
355 struct trimmer_bound *bound, bool is_gmt)
528debdf
MD
356{
357 int ret;
268fae9a 358 const char *arg;
7de0e49a 359 char tmp_arg[64];
268fae9a
PP
360
361 if (bt_value_is_integer(param)) {
7de0e49a
PP
362 int64_t value = bt_value_integer_get(param);
363
364 /*
365 * Just convert it to a temporary string to handle
366 * everything the same way.
367 */
368 sprintf(tmp_arg, "%" PRId64, value);
369 arg = tmp_arg;
370 } else if (bt_value_is_string(param)) {
371 arg = bt_value_string_get(param);
372 } else {
268fae9a
PP
373 BT_LOGE("`%s` parameter must be an integer or a string value.",
374 param_name);
7de0e49a
PP
375 ret = -1;
376 goto end;
268fae9a
PP
377 }
378
7de0e49a
PP
379 ret = set_bound_from_str(arg, bound, is_gmt);
380
381end:
382 return ret;
383}
384
385static
386int validate_trimmer_bounds(struct trimmer_bound *begin,
387 struct trimmer_bound *end)
388{
389 int ret = 0;
390
391 BT_ASSERT(begin->is_set);
392 BT_ASSERT(end->is_set);
393
394 if (!begin->is_infinite && !end->is_infinite &&
395 begin->ns_from_origin > end->ns_from_origin) {
396 BT_LOGE("Trimming time range's beginning time is greater than end time: "
397 "begin-ns-from-origin=%" PRId64 ", "
398 "end-ns-from-origin=%" PRId64,
399 begin->ns_from_origin,
400 end->ns_from_origin);
401 ret = -1;
402 goto end;
528debdf
MD
403 }
404
7de0e49a
PP
405 if (!begin->is_infinite && begin->ns_from_origin == INT64_MIN) {
406 BT_LOGE("Invalid trimming time range's beginning time: "
407 "ns-from-origin=%" PRId64,
408 begin->ns_from_origin);
409 ret = -1;
410 goto end;
411 }
528debdf 412
7de0e49a
PP
413 if (!end->is_infinite && end->ns_from_origin == INT64_MIN) {
414 BT_LOGE("Invalid trimming time range's end time: "
415 "ns-from-origin=%" PRId64,
416 end->ns_from_origin);
417 ret = -1;
418 goto end;
419 }
528debdf 420
7de0e49a
PP
421end:
422 return ret;
528debdf
MD
423}
424
425static
7de0e49a
PP
426int init_trimmer_comp_from_params(struct trimmer_comp *trimmer_comp,
427 const bt_value *params)
44d3cbf0 428{
7de0e49a
PP
429 const bt_value *value;
430 int ret = 0;
44d3cbf0 431
f6ccaed9 432 BT_ASSERT(params);
7de0e49a 433 value = bt_value_map_borrow_entry_value_const(params, "gmt");
528debdf 434 if (value) {
7de0e49a 435 trimmer_comp->is_gmt = (bool) bt_value_bool_get(value);
528debdf
MD
436 }
437
7de0e49a 438 value = bt_value_map_borrow_entry_value_const(params, "begin");
528debdf 439 if (value) {
7de0e49a
PP
440 if (set_bound_from_param("begin", value,
441 &trimmer_comp->begin, trimmer_comp->is_gmt)) {
442 /* set_bound_from_param() logs errors */
443 ret = BT_SELF_COMPONENT_STATUS_ERROR;
268fae9a 444 goto end;
44d3cbf0 445 }
7de0e49a
PP
446 } else {
447 trimmer_comp->begin.is_infinite = true;
448 trimmer_comp->begin.is_set = true;
44d3cbf0 449 }
528debdf 450
7de0e49a 451 value = bt_value_map_borrow_entry_value_const(params, "end");
528debdf 452 if (value) {
7de0e49a
PP
453 if (set_bound_from_param("end", value,
454 &trimmer_comp->end, trimmer_comp->is_gmt)) {
455 /* set_bound_from_param() logs errors */
456 ret = BT_SELF_COMPONENT_STATUS_ERROR;
268fae9a 457 goto end;
528debdf 458 }
7de0e49a
PP
459 } else {
460 trimmer_comp->end.is_infinite = true;
461 trimmer_comp->end.is_set = true;
528debdf 462 }
268fae9a 463
528debdf 464end:
7de0e49a
PP
465 if (trimmer_comp->begin.is_set && trimmer_comp->end.is_set) {
466 /* validate_trimmer_bounds() logs errors */
467 ret = validate_trimmer_bounds(&trimmer_comp->begin,
468 &trimmer_comp->end);
55595636 469 }
7de0e49a 470
528debdf 471 return ret;
44d3cbf0
JG
472}
473
7de0e49a
PP
474bt_self_component_status trimmer_init(bt_self_component_filter *self_comp,
475 const bt_value *params, void *init_data)
cab3f160 476{
7de0e49a
PP
477 int ret;
478 bt_self_component_status status;
479 struct trimmer_comp *trimmer_comp = create_trimmer_comp();
cab3f160 480
7de0e49a
PP
481 if (!trimmer_comp) {
482 ret = BT_SELF_COMPONENT_STATUS_NOMEM;
483 goto error;
cab3f160
JG
484 }
485
7de0e49a
PP
486 status = bt_self_component_filter_add_input_port(
487 self_comp, in_port_name, NULL, NULL);
488 if (status != BT_SELF_COMPONENT_STATUS_OK) {
b9d103be
PP
489 goto error;
490 }
491
7de0e49a
PP
492 status = bt_self_component_filter_add_output_port(
493 self_comp, "out", NULL, NULL);
494 if (status != BT_SELF_COMPONENT_STATUS_OK) {
b9d103be
PP
495 goto error;
496 }
497
7de0e49a
PP
498 ret = init_trimmer_comp_from_params(trimmer_comp, params);
499 if (ret) {
cab3f160
JG
500 goto error;
501 }
502
7de0e49a
PP
503 bt_self_component_set_data(
504 bt_self_component_filter_as_self_component(self_comp),
505 trimmer_comp);
506 goto end;
507
508error:
509 if (status == BT_SELF_COMPONENT_STATUS_OK) {
510 status = BT_SELF_COMPONENT_STATUS_ERROR;
511 }
512
513 if (trimmer_comp) {
514 destroy_trimmer_comp(trimmer_comp);
515 }
516
cab3f160
JG
517end:
518 return ret;
7de0e49a
PP
519}
520
521static
522void destroy_trimmer_iterator(struct trimmer_iterator *trimmer_it)
523{
524 BT_ASSERT(trimmer_it);
525 bt_self_component_port_input_message_iterator_put_ref(
526 trimmer_it->upstream_iter);
527
528 if (trimmer_it->output_messages) {
529 g_queue_free(trimmer_it->output_messages);
530 }
531
532 if (trimmer_it->stream_states) {
533 g_hash_table_destroy(trimmer_it->stream_states);
534 }
535
536 g_free(trimmer_it);
537}
538
539static
540void destroy_trimmer_iterator_stream_state(
541 struct trimmer_iterator_stream_state *sstate)
542{
543 BT_ASSERT(sstate);
544 BT_PACKET_PUT_REF_AND_RESET(sstate->cur_packet);
545 BT_MESSAGE_PUT_REF_AND_RESET(sstate->stream_beginning_msg);
546 g_free(sstate);
547}
548
549BT_HIDDEN
550bt_self_message_iterator_status trimmer_msg_iter_init(
551 bt_self_message_iterator *self_msg_iter,
552 bt_self_component_filter *self_comp,
553 bt_self_component_port_output *port)
554{
555 bt_self_message_iterator_status status =
556 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
557 struct trimmer_iterator *trimmer_it;
558
559 trimmer_it = g_new0(struct trimmer_iterator, 1);
560 if (!trimmer_it) {
561 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
562 goto end;
563 }
564
565 trimmer_it->trimmer_comp = bt_self_component_get_data(
566 bt_self_component_filter_as_self_component(self_comp));
567 BT_ASSERT(trimmer_it->trimmer_comp);
568
569 if (trimmer_it->trimmer_comp->begin.is_set &&
570 trimmer_it->trimmer_comp->end.is_set) {
571 /*
572 * Both trimming time range's bounds are set, so skip
573 * the
574 * `TRIMMER_ITERATOR_STATE_SET_BOUNDS_NS_FROM_ORIGIN`
575 * phase.
576 */
577 trimmer_it->state = TRIMMER_ITERATOR_STATE_SEEK_INITIALLY;
578 }
579
580 trimmer_it->begin = trimmer_it->trimmer_comp->begin;
581 trimmer_it->end = trimmer_it->trimmer_comp->end;
582 trimmer_it->upstream_iter =
583 bt_self_component_port_input_message_iterator_create(
584 bt_self_component_filter_borrow_input_port_by_name(
585 self_comp, in_port_name));
586 if (!trimmer_it->upstream_iter) {
587 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
588 goto end;
589 }
590
591 trimmer_it->output_messages = g_queue_new();
592 if (!trimmer_it->output_messages) {
593 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
594 goto end;
595 }
596
597 trimmer_it->stream_states = g_hash_table_new_full(g_direct_hash,
598 g_direct_equal, NULL,
599 (GDestroyNotify) destroy_trimmer_iterator_stream_state);
600 if (!trimmer_it->stream_states) {
601 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
602 goto end;
603 }
604
605 trimmer_it->self_msg_iter = self_msg_iter;
606 bt_self_message_iterator_set_data(self_msg_iter, trimmer_it);
607
608end:
609 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK && trimmer_it) {
610 destroy_trimmer_iterator(trimmer_it);
611 }
612
613 return status;
614}
615
616static inline
617int get_msg_ns_from_origin(const bt_message *msg, int64_t *ns_from_origin,
618 bool *skip)
619{
620 const bt_clock_class *clock_class = NULL;
621 const bt_clock_snapshot *clock_snapshot = NULL;
622 bt_clock_snapshot_state cs_state = BT_CLOCK_SNAPSHOT_STATE_KNOWN;
623 bt_message_stream_activity_clock_snapshot_state sa_cs_state;
624 int ret = 0;
625
626 BT_ASSERT(msg);
627 BT_ASSERT(ns_from_origin);
628 BT_ASSERT(skip);
629
630 switch (bt_message_get_type(msg)) {
631 case BT_MESSAGE_TYPE_EVENT:
632 clock_class =
633 bt_message_event_borrow_stream_class_default_clock_class_const(
634 msg);
635 if (unlikely(!clock_class)) {
636 goto error;
637 }
638
639 cs_state = bt_message_event_borrow_default_clock_snapshot_const(
640 msg, &clock_snapshot);
641 break;
642 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
643 clock_class =
644 bt_message_packet_beginning_borrow_stream_class_default_clock_class_const(
645 msg);
646 if (unlikely(!clock_class)) {
647 goto error;
648 }
649
650 cs_state = bt_message_packet_beginning_borrow_default_clock_snapshot_const(
651 msg, &clock_snapshot);
652 break;
653 case BT_MESSAGE_TYPE_PACKET_END:
654 clock_class =
655 bt_message_packet_end_borrow_stream_class_default_clock_class_const(
656 msg);
657 if (unlikely(!clock_class)) {
658 goto error;
659 }
660
661 cs_state = bt_message_packet_end_borrow_default_clock_snapshot_const(
662 msg, &clock_snapshot);
663 break;
664 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
665 clock_class =
666 bt_message_discarded_events_borrow_stream_class_default_clock_class_const(
667 msg);
668 if (unlikely(!clock_class)) {
669 goto error;
670 }
671
672 cs_state = bt_message_discarded_events_borrow_default_beginning_clock_snapshot_const(
673 msg, &clock_snapshot);
674 break;
675 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
676 clock_class =
677 bt_message_discarded_packets_borrow_stream_class_default_clock_class_const(
678 msg);
679 if (unlikely(!clock_class)) {
680 goto error;
681 }
682
683 cs_state = bt_message_discarded_packets_borrow_default_beginning_clock_snapshot_const(
684 msg, &clock_snapshot);
685 break;
686 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING:
687 clock_class =
688 bt_message_stream_activity_beginning_borrow_stream_class_default_clock_class_const(
689 msg);
690 if (unlikely(!clock_class)) {
691 goto error;
692 }
693
694 sa_cs_state = bt_message_stream_activity_beginning_borrow_default_clock_snapshot_const(
695 msg, &clock_snapshot);
696 if (sa_cs_state == BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_UNKNOWN ||
697 sa_cs_state == BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_INFINITE) {
698 /* Lowest possible time to always include them */
699 *ns_from_origin = INT64_MIN;
700 goto no_clock_snapshot;
701 }
702
703 break;
704 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END:
705 clock_class =
706 bt_message_stream_activity_end_borrow_stream_class_default_clock_class_const(
707 msg);
708 if (unlikely(!clock_class)) {
709 goto error;
710 }
711
712 sa_cs_state = bt_message_stream_activity_end_borrow_default_clock_snapshot_const(
713 msg, &clock_snapshot);
714 if (sa_cs_state == BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_UNKNOWN) {
715 /* Lowest time to always include it */
716 *ns_from_origin = INT64_MIN;
717 goto no_clock_snapshot;
718 } else if (sa_cs_state == BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_INFINITE) {
719 /* Greatest time to always exclude it */
720 *ns_from_origin = INT64_MAX;
721 goto no_clock_snapshot;
722 }
723
724 break;
725 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
726 cs_state =
727 bt_message_message_iterator_inactivity_borrow_default_clock_snapshot_const(
728 msg, &clock_snapshot);
729 break;
730 default:
731 goto no_clock_snapshot;
732 }
733
734 if (unlikely(cs_state != BT_CLOCK_SNAPSHOT_STATE_KNOWN)) {
735 BT_LOGE_STR("Unsupported unknown clock snapshot.");
736 ret = -1;
737 goto end;
738 }
739
740 ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot,
741 ns_from_origin);
742 if (unlikely(ret)) {
743 goto error;
744 }
745
746 goto end;
747
748no_clock_snapshot:
749 *skip = true;
750 goto end;
751
cab3f160 752error:
7de0e49a
PP
753 ret = -1;
754
755end:
756 return ret;
757}
758
759static inline
760void put_messages(bt_message_array_const msgs, uint64_t count)
761{
762 uint64_t i;
763
764 for (i = 0; i < count; i++) {
765 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
766 }
767}
768
769static inline
770int set_trimmer_iterator_bound(struct trimmer_bound *bound,
771 int64_t ns_from_origin, bool is_gmt)
772{
773 struct tm tm;
774 time_t time_seconds = (time_t) (ns_from_origin / NS_PER_S);
775 int ret = 0;
776
777 BT_ASSERT(!bound->is_set);
778 errno = 0;
779
780 /* We only need to extract the date from this time */
781 if (is_gmt) {
782 bt_gmtime_r(&time_seconds, &tm);
783 } else {
784 bt_localtime_r(&time_seconds, &tm);
785 }
786
787 if (errno) {
788 BT_LOGE_ERRNO("Cannot convert timestamp to date and time",
789 "ts=%" PRId64, (int64_t) time_seconds);
790 ret = -1;
791 goto end;
792 }
793
794 ret = set_bound_ns_from_origin(bound, tm.tm_year + 1900, tm.tm_mon + 1,
795 tm.tm_mday, bound->time.hour, bound->time.minute,
796 bound->time.second, bound->time.ns, is_gmt);
797
798end:
cab3f160
JG
799 return ret;
800}
7de0e49a
PP
801
802static
803bt_self_message_iterator_status state_set_trimmer_iterator_bounds(
804 struct trimmer_iterator *trimmer_it)
805{
806 bt_message_iterator_status upstream_iter_status =
807 BT_MESSAGE_ITERATOR_STATUS_OK;
808 struct trimmer_comp *trimmer_comp = trimmer_it->trimmer_comp;
809 bt_message_array_const msgs;
810 uint64_t count = 0;
811 int64_t ns_from_origin = INT64_MIN;
812 uint64_t i;
813 int ret;
814
815 BT_ASSERT(!trimmer_it->begin.is_set ||
816 !trimmer_it->end.is_set);
817
818 while (true) {
819 upstream_iter_status =
820 bt_self_component_port_input_message_iterator_next(
821 trimmer_it->upstream_iter, &msgs, &count);
822 if (upstream_iter_status != BT_MESSAGE_ITERATOR_STATUS_OK) {
823 goto end;
824 }
825
826 for (i = 0; i < count; i++) {
827 const bt_message *msg = msgs[i];
828 bool skip = false;
829 int ret;
830
831 ret = get_msg_ns_from_origin(msg, &ns_from_origin,
832 &skip);
833 if (ret) {
834 goto error;
835 }
836
837 if (skip) {
838 continue;
839 }
840
841 BT_ASSERT(ns_from_origin != INT64_MIN &&
842 ns_from_origin != INT64_MAX);
843 put_messages(msgs, count);
844 goto found;
845 }
846
847 put_messages(msgs, count);
848 }
849
850found:
851 if (!trimmer_it->begin.is_set) {
852 BT_ASSERT(!trimmer_it->begin.is_infinite);
853 ret = set_trimmer_iterator_bound(&trimmer_it->begin,
854 ns_from_origin, trimmer_comp->is_gmt);
855 if (ret) {
856 goto error;
857 }
858 }
859
860 if (!trimmer_it->end.is_set) {
861 BT_ASSERT(!trimmer_it->end.is_infinite);
862 ret = set_trimmer_iterator_bound(&trimmer_it->end,
863 ns_from_origin, trimmer_comp->is_gmt);
864 if (ret) {
865 goto error;
866 }
867 }
868
869 ret = validate_trimmer_bounds(&trimmer_it->begin,
870 &trimmer_it->end);
871 if (ret) {
872 goto error;
873 }
874
875 goto end;
876
877error:
878 put_messages(msgs, count);
879 upstream_iter_status = BT_MESSAGE_ITERATOR_STATUS_ERROR;
880
881end:
882 return (int) upstream_iter_status;
883}
884
885static
886bt_self_message_iterator_status state_seek_initially(
887 struct trimmer_iterator *trimmer_it)
888{
889 bt_self_message_iterator_status status =
890 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
891
892 BT_ASSERT(trimmer_it->begin.is_set);
893
894 if (trimmer_it->begin.is_infinite) {
895 if (!bt_self_component_port_input_message_iterator_can_seek_beginning(
896 trimmer_it->upstream_iter)) {
897 BT_LOGE_STR("Cannot make upstream message iterator initially seek its beginning.");
898 status = BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
899 goto end;
900 }
901
902 status = (int) bt_self_component_port_input_message_iterator_seek_beginning(
903 trimmer_it->upstream_iter);
904 } else {
905 if (!bt_self_component_port_input_message_iterator_can_seek_ns_from_origin(
906 trimmer_it->upstream_iter,
907 trimmer_it->begin.ns_from_origin)) {
908 BT_LOGE("Cannot make upstream message iterator initially seek: "
909 "seek-ns-from-origin=%" PRId64,
910 trimmer_it->begin.ns_from_origin);
911 status = BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
912 goto end;
913 }
914
915 status = (int) bt_self_component_port_input_message_iterator_seek_ns_from_origin(
916 trimmer_it->upstream_iter, trimmer_it->begin.ns_from_origin);
917 }
918
919 if (status == BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
920 trimmer_it->state = TRIMMER_ITERATOR_STATE_TRIM;
921 }
922
923end:
924 return status;
925}
926
927static inline
928void push_message(struct trimmer_iterator *trimmer_it, const bt_message *msg)
929{
930 g_queue_push_head(trimmer_it->output_messages, (void *) msg);
931}
932
933static inline
934const bt_message *pop_message(struct trimmer_iterator *trimmer_it)
935{
936 return g_queue_pop_tail(trimmer_it->output_messages);
937}
938
939static inline
940int clock_raw_value_from_ns_from_origin(const bt_clock_class *clock_class,
941 int64_t ns_from_origin, uint64_t *raw_value)
942{
943
944 int64_t cc_offset_s;
945 uint64_t cc_offset_cycles;
946 uint64_t cc_freq;
947
948 bt_clock_class_get_offset(clock_class, &cc_offset_s, &cc_offset_cycles);
949 cc_freq = bt_clock_class_get_frequency(clock_class);
950 return bt_common_clock_value_from_ns_from_origin(cc_offset_s,
951 cc_offset_cycles, cc_freq, ns_from_origin, raw_value);
952}
953
954static inline
955bt_self_message_iterator_status end_stream(struct trimmer_iterator *trimmer_it,
956 struct trimmer_iterator_stream_state *sstate)
957{
958 bt_self_message_iterator_status status =
959 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
960 uint64_t raw_value;
961 const bt_clock_class *clock_class;
962 int ret;
963 bt_message *msg = NULL;
964
965 BT_ASSERT(!trimmer_it->end.is_infinite);
966
967 if (!sstate->stream) {
968 goto end;
969 }
970
971 if (sstate->cur_packet) {
972 /*
973 * The last message could not have been a stream
974 * activity end message if we have a current packet.
975 */
976 BT_ASSERT(!sstate->last_msg_is_stream_activity_end);
977
978 /*
979 * Create and push a packet end message, making its time
980 * the trimming range's end time.
981 */
982 clock_class = bt_stream_class_borrow_default_clock_class_const(
983 bt_stream_borrow_class_const(sstate->stream));
984 BT_ASSERT(clock_class);
985 ret = clock_raw_value_from_ns_from_origin(clock_class,
986 trimmer_it->end.ns_from_origin, &raw_value);
987 if (ret) {
988 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
989 goto end;
990 }
991
992 msg = bt_message_packet_end_create_with_default_clock_snapshot(
993 trimmer_it->self_msg_iter, sstate->cur_packet,
994 raw_value);
995 if (!msg) {
996 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
997 goto end;
998 }
999
1000 push_message(trimmer_it, msg);
1001 msg = NULL;
1002 BT_PACKET_PUT_REF_AND_RESET(sstate->cur_packet);
1003
1004 /*
1005 * Because we generated a packet end message, set the
1006 * stream activity end message's time to use to the
1007 * trimming range's end time (this packet end message's
1008 * time).
1009 */
1010 sstate->stream_act_end_ns_from_origin =
1011 trimmer_it->end.ns_from_origin;
1012 }
1013
1014 if (!sstate->last_msg_is_stream_activity_end) {
1015 /* Create and push a stream activity end message */
1016 msg = bt_message_stream_activity_end_create(
1017 trimmer_it->self_msg_iter, sstate->stream);
1018 if (!msg) {
1019 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
1020 goto end;
1021 }
1022
1023 clock_class = bt_stream_class_borrow_default_clock_class_const(
1024 bt_stream_borrow_class_const(sstate->stream));
1025 BT_ASSERT(clock_class);
1026 BT_ASSERT(sstate->stream_act_end_ns_from_origin != INT64_MIN);
1027 ret = clock_raw_value_from_ns_from_origin(clock_class,
1028 sstate->stream_act_end_ns_from_origin, &raw_value);
1029 if (ret) {
1030 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1031 goto end;
1032 }
1033
1034 bt_message_stream_activity_end_set_default_clock_snapshot(
1035 msg, raw_value);
1036 push_message(trimmer_it, msg);
1037 msg = NULL;
1038 }
1039
1040 /* Create and push a stream end message */
1041 msg = bt_message_stream_end_create(trimmer_it->self_msg_iter,
1042 sstate->stream);
1043 if (!msg) {
1044 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
1045 goto end;
1046 }
1047
1048 push_message(trimmer_it, msg);
1049 msg = NULL;
1050
1051 /*
1052 * Just to make sure that we don't use this stream state again
1053 * in the future without an obvious error.
1054 */
1055 sstate->stream = NULL;
1056
1057end:
1058 bt_message_put_ref(msg);
1059 return status;
1060}
1061
1062static inline
1063bt_self_message_iterator_status end_iterator_streams(
1064 struct trimmer_iterator *trimmer_it)
1065{
1066 bt_self_message_iterator_status status =
1067 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1068 GHashTableIter iter;
1069 gpointer key, sstate;
1070
1071 if (trimmer_it->end.is_infinite) {
1072 /*
1073 * An infinite trimming range's end time guarantees that
1074 * we received (and pushed) all the appropriate end
1075 * messages.
1076 */
1077 goto remove_all;
1078 }
1079
1080 /*
1081 * End each stream and then remove them from the hash table of
1082 * stream states to release unneeded references.
1083 */
1084 g_hash_table_iter_init(&iter, trimmer_it->stream_states);
1085
1086 while (g_hash_table_iter_next(&iter, &key, &sstate)) {
1087 status = end_stream(trimmer_it, sstate);
1088 if (status) {
1089 goto end;
1090 }
1091 }
1092
1093remove_all:
1094 g_hash_table_remove_all(trimmer_it->stream_states);
1095
1096end:
1097 return status;
1098}
1099
1100static inline
1101bt_self_message_iterator_status create_stream_beginning_activity_message(
1102 struct trimmer_iterator *trimmer_it,
1103 const bt_stream *stream,
1104 const bt_clock_class *clock_class, bt_message **msg)
1105{
1106 bt_self_message_iterator_status status =
1107 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1108
1109 BT_ASSERT(msg);
1110 BT_ASSERT(!trimmer_it->begin.is_infinite);
1111
1112 *msg = bt_message_stream_activity_beginning_create(
1113 trimmer_it->self_msg_iter, stream);
1114 if (!*msg) {
1115 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
1116 goto end;
1117 }
1118
1119 if (clock_class) {
1120 int ret;
1121 uint64_t raw_value;
1122
1123 ret = clock_raw_value_from_ns_from_origin(clock_class,
1124 trimmer_it->begin.ns_from_origin, &raw_value);
1125 if (ret) {
1126 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1127 bt_message_put_ref(*msg);
1128 goto end;
1129 }
1130
1131 bt_message_stream_activity_beginning_set_default_clock_snapshot(
1132 *msg, raw_value);
1133 }
1134
1135end:
1136 return status;
1137}
1138
1139/*
1140 * Makes sure to initialize a stream state, pushing the appropriate
1141 * initial messages.
1142 *
1143 * `stream_act_beginning_msg` is an initial stream activity beginning
1144 * message to potentially use, depending on its clock snapshot state.
1145 * This function consumes `stream_act_beginning_msg` unconditionally.
1146 */
1147static inline
1148bt_self_message_iterator_status ensure_stream_state_is_inited(
1149 struct trimmer_iterator *trimmer_it,
1150 struct trimmer_iterator_stream_state *sstate,
1151 const bt_message *stream_act_beginning_msg)
1152{
1153 bt_self_message_iterator_status status =
1154 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1155 bt_message *new_msg = NULL;
1156 const bt_clock_class *clock_class =
1157 bt_stream_class_borrow_default_clock_class_const(
1158 bt_stream_borrow_class_const(sstate->stream));
1159
1160 BT_ASSERT(!sstate->inited);
1161
1162 if (!sstate->stream_beginning_msg) {
1163 /* No initial stream beginning message: create one */
1164 sstate->stream_beginning_msg =
1165 bt_message_stream_beginning_create(
1166 trimmer_it->self_msg_iter, sstate->stream);
1167 if (!sstate->stream_beginning_msg) {
1168 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
1169 goto end;
1170 }
1171 }
1172
1173 /* Push initial stream beginning message */
1174 BT_ASSERT(sstate->stream_beginning_msg);
1175 push_message(trimmer_it, sstate->stream_beginning_msg);
1176 sstate->stream_beginning_msg = NULL;
1177
1178 if (stream_act_beginning_msg) {
1179 /*
1180 * Initial stream activity beginning message exists: if
1181 * its time is -inf, then create and push a new one
1182 * having the trimming range's beginning time. Otherwise
1183 * push it as is (known and unknown).
1184 */
1185 const bt_clock_snapshot *cs;
1186 bt_message_stream_activity_clock_snapshot_state sa_cs_state;
1187
1188 sa_cs_state = bt_message_stream_activity_beginning_borrow_default_clock_snapshot_const(
1189 stream_act_beginning_msg, &cs);
1190 if (sa_cs_state == BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_INFINITE &&
1191 !trimmer_it->begin.is_infinite) {
1192 /*
1193 * -inf time: use trimming range's beginning
1194 * time (which is not -inf).
1195 */
1196 status = create_stream_beginning_activity_message(
1197 trimmer_it, sstate->stream, clock_class,
1198 &new_msg);
1199 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1200 goto end;
1201 }
1202
1203 push_message(trimmer_it, new_msg);
1204 new_msg = NULL;
1205 } else {
1206 /* Known/unknown: push as is */
1207 push_message(trimmer_it, stream_act_beginning_msg);
1208 stream_act_beginning_msg = NULL;
1209 }
1210 } else {
1211 BT_ASSERT(!trimmer_it->begin.is_infinite);
1212
1213 /*
1214 * No stream beginning activity message: create and push
1215 * a new message.
1216 */
1217 status = create_stream_beginning_activity_message(
1218 trimmer_it, sstate->stream, clock_class, &new_msg);
1219 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1220 goto end;
1221 }
1222
1223 push_message(trimmer_it, new_msg);
1224 new_msg = NULL;
1225 }
1226
1227 sstate->inited = true;
1228
1229end:
1230 bt_message_put_ref(new_msg);
1231 bt_message_put_ref(stream_act_beginning_msg);
1232 return status;
1233}
1234
1235static inline
1236bt_self_message_iterator_status ensure_cur_packet_exists(
1237 struct trimmer_iterator *trimmer_it,
1238 struct trimmer_iterator_stream_state *sstate,
1239 const bt_packet *packet)
1240{
1241 bt_self_message_iterator_status status =
1242 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1243 int ret;
1244 const bt_clock_class *clock_class =
1245 bt_stream_class_borrow_default_clock_class_const(
1246 bt_stream_borrow_class_const(sstate->stream));
1247 bt_message *msg = NULL;
1248 uint64_t raw_value;
1249
1250 BT_ASSERT(!trimmer_it->begin.is_infinite);
1251 BT_ASSERT(!sstate->cur_packet);
1252
1253 /*
1254 * Create and push an initial packet beginning message,
1255 * making its time the trimming range's beginning time.
1256 */
1257 ret = clock_raw_value_from_ns_from_origin(clock_class,
1258 trimmer_it->begin.ns_from_origin, &raw_value);
1259 if (ret) {
1260 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1261 goto end;
1262 }
1263
1264 msg = bt_message_packet_beginning_create_with_default_clock_snapshot(
1265 trimmer_it->self_msg_iter, packet, raw_value);
1266 if (!msg) {
1267 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
1268 goto end;
1269 }
1270
1271 push_message(trimmer_it, msg);
1272 msg = NULL;
1273
1274 /* Set packet as this stream's current packet */
1275 sstate->cur_packet = packet;
1276 bt_packet_get_ref(sstate->cur_packet);
1277
1278end:
1279 bt_message_put_ref(msg);
1280 return status;
1281}
1282
1283/*
1284 * Handles a message which is associated to a given stream state. This
1285 * _could_ make the iterator's output message queue grow; this could
1286 * also consume the message without pushing anything to this queue, only
1287 * modifying the stream state.
1288 *
1289 * This function consumes the `msg` reference, _whatever the outcome_.
1290 *
1291 * `ns_from_origin` is the message's time, as given by
1292 * get_msg_ns_from_origin().
1293 *
1294 * This function sets `reached_end` if handling this message made the
1295 * iterator reach the end of the trimming range. Note that the output
1296 * message queue could contain messages even if this function sets
1297 * `reached_end`.
1298 */
1299static inline
1300bt_self_message_iterator_status handle_message_with_stream_state(
1301 struct trimmer_iterator *trimmer_it, const bt_message *msg,
1302 struct trimmer_iterator_stream_state *sstate,
1303 int64_t ns_from_origin, bool *reached_end)
1304{
1305 bt_self_message_iterator_status status =
1306 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1307 bt_message_type msg_type = bt_message_get_type(msg);
1308 int ret;
1309
1310 switch (msg_type) {
1311 case BT_MESSAGE_TYPE_EVENT:
1312 if (unlikely(!trimmer_it->end.is_infinite &&
1313 ns_from_origin > trimmer_it->end.ns_from_origin)) {
1314 status = end_iterator_streams(trimmer_it);
1315 *reached_end = true;
1316 break;
1317 }
1318
1319 if (unlikely(!sstate->inited)) {
1320 status = ensure_stream_state_is_inited(trimmer_it,
1321 sstate, NULL);
1322 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1323 goto end;
1324 }
1325 }
1326
1327 if (unlikely(!sstate->cur_packet)) {
1328 const bt_event *event =
1329 bt_message_event_borrow_event_const(msg);
1330 const bt_packet *packet = bt_event_borrow_packet_const(
1331 event);
1332
1333 status = ensure_cur_packet_exists(trimmer_it, sstate,
1334 packet);
1335 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1336 goto end;
1337 }
1338 }
1339
1340 BT_ASSERT(sstate->cur_packet);
1341 push_message(trimmer_it, msg);
1342 msg = NULL;
1343 break;
1344 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
1345 if (unlikely(!trimmer_it->end.is_infinite &&
1346 ns_from_origin > trimmer_it->end.ns_from_origin)) {
1347 status = end_iterator_streams(trimmer_it);
1348 *reached_end = true;
1349 break;
1350 }
1351
1352 if (unlikely(!sstate->inited)) {
1353 status = ensure_stream_state_is_inited(trimmer_it,
1354 sstate, NULL);
1355 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1356 goto end;
1357 }
1358 }
1359
1360 BT_ASSERT(!sstate->cur_packet);
1361 sstate->cur_packet =
1362 bt_message_packet_beginning_borrow_packet_const(msg);
1363 bt_packet_get_ref(sstate->cur_packet);
1364 push_message(trimmer_it, msg);
1365 msg = NULL;
1366 break;
1367 case BT_MESSAGE_TYPE_PACKET_END:
1368 sstate->stream_act_end_ns_from_origin = ns_from_origin;
1369
1370 if (unlikely(!trimmer_it->end.is_infinite &&
1371 ns_from_origin > trimmer_it->end.ns_from_origin)) {
1372 status = end_iterator_streams(trimmer_it);
1373 *reached_end = true;
1374 break;
1375 }
1376
1377 if (unlikely(!sstate->inited)) {
1378 status = ensure_stream_state_is_inited(trimmer_it,
1379 sstate, NULL);
1380 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1381 goto end;
1382 }
1383 }
1384
1385 if (unlikely(!sstate->cur_packet)) {
1386 const bt_packet *packet =
1387 bt_message_packet_end_borrow_packet_const(msg);
1388
1389 status = ensure_cur_packet_exists(trimmer_it, sstate,
1390 packet);
1391 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1392 goto end;
1393 }
1394 }
1395
1396 BT_ASSERT(sstate->cur_packet);
1397 BT_PACKET_PUT_REF_AND_RESET(sstate->cur_packet);
1398 push_message(trimmer_it, msg);
1399 msg = NULL;
1400 break;
1401 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
1402 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
1403 {
1404 /*
1405 * `ns_from_origin` is the message's time range's
1406 * beginning time here.
1407 */
1408 int64_t end_ns_from_origin;
1409 const bt_clock_snapshot *end_cs;
1410
1411 if (bt_message_get_type(msg) ==
1412 BT_MESSAGE_TYPE_DISCARDED_EVENTS) {
1413 /*
1414 * Safe to ignore the return value because we
1415 * know there's a default clock and it's always
1416 * known.
1417 */
1418 (void) bt_message_discarded_events_borrow_default_end_clock_snapshot_const(
1419 msg, &end_cs);
1420 } else {
1421 /*
1422 * Safe to ignore the return value because we
1423 * know there's a default clock and it's always
1424 * known.
1425 */
1426 (void) bt_message_discarded_packets_borrow_default_end_clock_snapshot_const(
1427 msg, &end_cs);
1428 }
1429
1430 if (bt_clock_snapshot_get_ns_from_origin(end_cs,
1431 &end_ns_from_origin)) {
1432 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1433 goto end;
1434 }
1435
1436 sstate->stream_act_end_ns_from_origin = end_ns_from_origin;
1437
1438 if (!trimmer_it->end.is_infinite &&
1439 ns_from_origin > trimmer_it->end.ns_from_origin) {
1440 status = end_iterator_streams(trimmer_it);
1441 *reached_end = true;
1442 break;
1443 }
1444
1445 if (!trimmer_it->end.is_infinite &&
1446 end_ns_from_origin > trimmer_it->end.ns_from_origin) {
1447 /*
1448 * This message's end time is outside the
1449 * trimming time range: replace it with a new
1450 * message having an end time equal to the
1451 * trimming time range's end and without a
1452 * count.
1453 */
1454 const bt_clock_class *clock_class =
1455 bt_clock_snapshot_borrow_clock_class_const(
1456 end_cs);
1457 const bt_clock_snapshot *begin_cs;
1458 bt_message *new_msg;
1459 uint64_t end_raw_value;
1460
1461 ret = clock_raw_value_from_ns_from_origin(clock_class,
1462 trimmer_it->end.ns_from_origin, &end_raw_value);
1463 if (ret) {
1464 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1465 goto end;
1466 }
1467
1468 if (msg_type == BT_MESSAGE_TYPE_DISCARDED_EVENTS) {
1469 (void) bt_message_discarded_events_borrow_default_beginning_clock_snapshot_const(
1470 msg, &begin_cs);
1471 new_msg = bt_message_discarded_events_create_with_default_clock_snapshots(
1472 trimmer_it->self_msg_iter,
1473 sstate->stream,
1474 bt_clock_snapshot_get_value(begin_cs),
1475 end_raw_value);
1476 } else {
1477 (void) bt_message_discarded_packets_borrow_default_beginning_clock_snapshot_const(
1478 msg, &begin_cs);
1479 new_msg = bt_message_discarded_packets_create_with_default_clock_snapshots(
1480 trimmer_it->self_msg_iter,
1481 sstate->stream,
1482 bt_clock_snapshot_get_value(begin_cs),
1483 end_raw_value);
1484 }
1485
1486 if (!new_msg) {
1487 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
1488 goto end;
1489 }
1490
1491 /* Replace the original message */
1492 BT_MESSAGE_MOVE_REF(msg, new_msg);
1493 }
1494
1495 if (unlikely(!sstate->inited)) {
1496 status = ensure_stream_state_is_inited(trimmer_it,
1497 sstate, NULL);
1498 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1499 goto end;
1500 }
1501 }
1502
1503 push_message(trimmer_it, msg);
1504 msg = NULL;
1505 break;
1506 }
1507 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING:
1508 if (!trimmer_it->end.is_infinite &&
1509 ns_from_origin > trimmer_it->end.ns_from_origin) {
1510 /*
1511 * This only happens when the message's time is
1512 * known and is greater than the trimming
1513 * range's end time. Unknown and -inf times are
1514 * always less than
1515 * `trimmer_it->end.ns_from_origin`.
1516 */
1517 status = end_iterator_streams(trimmer_it);
1518 *reached_end = true;
1519 break;
1520 }
1521
1522 if (!sstate->inited) {
1523 status = ensure_stream_state_is_inited(trimmer_it,
1524 sstate, msg);
1525 msg = NULL;
1526 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1527 goto end;
1528 }
1529 } else {
1530 push_message(trimmer_it, msg);
1531 msg = NULL;
1532 }
1533
1534 break;
1535 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END:
1536 if (trimmer_it->end.is_infinite) {
1537 push_message(trimmer_it, msg);
1538 msg = NULL;
1539 break;
1540 }
1541
1542 if (ns_from_origin == INT64_MIN) {
1543 /* Unknown: push as is if stream state is inited */
1544 if (sstate->inited) {
1545 push_message(trimmer_it, msg);
1546 msg = NULL;
1547 sstate->last_msg_is_stream_activity_end = true;
1548 }
1549 } else if (ns_from_origin == INT64_MAX) {
1550 /* Infinite: use trimming range's end time */
1551 sstate->stream_act_end_ns_from_origin =
1552 trimmer_it->end.ns_from_origin;
1553 } else {
1554 /* Known: check if outside of trimming range */
1555 if (ns_from_origin > trimmer_it->end.ns_from_origin) {
1556 sstate->stream_act_end_ns_from_origin =
1557 trimmer_it->end.ns_from_origin;
1558 status = end_iterator_streams(trimmer_it);
1559 *reached_end = true;
1560 break;
1561 }
1562
1563 if (!sstate->inited) {
1564 /*
1565 * First message for this stream is a
1566 * stream activity end: we can't deduce
1567 * anything about the stream activity
1568 * beginning's time, and using this
1569 * message's time would make a useless
1570 * pair of stream activity beginning/end
1571 * with the same time. Just skip this
1572 * message and wait for something
1573 * useful.
1574 */
1575 break;
1576 }
1577
1578 push_message(trimmer_it, msg);
1579 msg = NULL;
1580 sstate->last_msg_is_stream_activity_end = true;
1581 sstate->stream_act_end_ns_from_origin = ns_from_origin;
1582 }
1583
1584 break;
1585 case BT_MESSAGE_TYPE_STREAM_BEGINNING:
1586 /*
1587 * We don't know what follows at this point, so just
1588 * keep this message until we know what to do with it
1589 * (it will be used in ensure_stream_state_is_inited()).
1590 */
1591 BT_ASSERT(!sstate->inited);
1592 BT_MESSAGE_MOVE_REF(sstate->stream_beginning_msg, msg);
1593 break;
1594 case BT_MESSAGE_TYPE_STREAM_END:
1595 if (sstate->inited) {
1596 /*
1597 * This is the end of an inited stream: end this
1598 * stream if its stream activity end message
1599 * time is not the trimming range's end time
1600 * (which means the final stream activity end
1601 * message had an infinite time). end_stream()
1602 * will generate its own stream end message.
1603 */
1604 if (trimmer_it->end.is_infinite) {
1605 push_message(trimmer_it, msg);
1606 msg = NULL;
1607 g_hash_table_remove(trimmer_it->stream_states,
1608 sstate->stream);
1609 } else if (sstate->stream_act_end_ns_from_origin <
1610 trimmer_it->end.ns_from_origin) {
1611 status = end_stream(trimmer_it, sstate);
1612 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1613 goto end;
1614 }
1615
1616 /* We won't need this stream state again */
1617 g_hash_table_remove(trimmer_it->stream_states,
1618 sstate->stream);
1619 }
1620 } else {
1621 /* We dont't need this stream state anymore */
1622 g_hash_table_remove(trimmer_it->stream_states, sstate->stream);
1623 }
1624
1625 break;
1626 default:
1627 break;
1628 }
1629
1630end:
1631 /* We release the message's reference whatever the outcome */
1632 bt_message_put_ref(msg);
1633 return BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1634}
1635
1636/*
1637 * Handles an input message. This _could_ make the iterator's output
1638 * message queue grow; this could also consume the message without
1639 * pushing anything to this queue, only modifying the stream state.
1640 *
1641 * This function consumes the `msg` reference, _whatever the outcome_.
1642 *
1643 * This function sets `reached_end` if handling this message made the
1644 * iterator reach the end of the trimming range. Note that the output
1645 * message queue could contain messages even if this function sets
1646 * `reached_end`.
1647 */
1648static inline
1649bt_self_message_iterator_status handle_message(
1650 struct trimmer_iterator *trimmer_it, const bt_message *msg,
1651 bool *reached_end)
1652{
1653 bt_self_message_iterator_status status;
1654 const bt_stream *stream = NULL;
1655 int64_t ns_from_origin = INT64_MIN;
1656 bool skip;
1657 int ret;
1658 struct trimmer_iterator_stream_state *sstate = NULL;
1659
1660 /* Find message's associated stream */
1661 switch (bt_message_get_type(msg)) {
1662 case BT_MESSAGE_TYPE_EVENT:
1663 stream = bt_event_borrow_stream_const(
1664 bt_message_event_borrow_event_const(msg));
1665 break;
1666 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
1667 stream = bt_packet_borrow_stream_const(
1668 bt_message_packet_beginning_borrow_packet_const(msg));
1669 break;
1670 case BT_MESSAGE_TYPE_PACKET_END:
1671 stream = bt_packet_borrow_stream_const(
1672 bt_message_packet_end_borrow_packet_const(msg));
1673 break;
1674 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
1675 stream = bt_message_discarded_events_borrow_stream_const(msg);
1676 break;
1677 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
1678 stream = bt_message_discarded_packets_borrow_stream_const(msg);
1679 break;
1680 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING:
1681 stream = bt_message_stream_activity_beginning_borrow_stream_const(msg);
1682 break;
1683 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END:
1684 stream = bt_message_stream_activity_end_borrow_stream_const(msg);
1685 break;
1686 case BT_MESSAGE_TYPE_STREAM_BEGINNING:
1687 stream = bt_message_stream_beginning_borrow_stream_const(msg);
1688 break;
1689 case BT_MESSAGE_TYPE_STREAM_END:
1690 stream = bt_message_stream_end_borrow_stream_const(msg);
1691 break;
1692 default:
1693 break;
1694 }
1695
1696 if (likely(stream)) {
1697 /* Find stream state */
1698 sstate = g_hash_table_lookup(trimmer_it->stream_states,
1699 stream);
1700 if (unlikely(!sstate)) {
1701 /* No stream state yet: create one now */
1702 const bt_stream_class *sc;
1703
1704 /*
1705 * Validate right now that the stream's class
1706 * has a registered default clock class so that
1707 * an existing stream state guarantees existing
1708 * default clock snapshots for its associated
1709 * messages.
1710 *
1711 * Also check that clock snapshots are always
1712 * known.
1713 */
1714 sc = bt_stream_borrow_class_const(stream);
1715 if (!bt_stream_class_borrow_default_clock_class_const(sc)) {
1716 BT_LOGE("Unsupported stream: stream class does "
1717 "not have a default clock class: "
1718 "stream-addr=%p, "
1719 "stream-id=%" PRIu64 ", "
1720 "stream-name=\"%s\"",
1721 stream, bt_stream_get_id(stream),
1722 bt_stream_get_name(stream));
1723 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1724 goto end;
1725 }
1726
1727 if (!bt_stream_class_default_clock_is_always_known(sc)) {
1728 BT_LOGE("Unsupported stream: clock does not "
1729 "always have a known value: "
1730 "stream-addr=%p, "
1731 "stream-id=%" PRIu64 ", "
1732 "stream-name=\"%s\"",
1733 stream, bt_stream_get_id(stream),
1734 bt_stream_get_name(stream));
1735 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1736 goto end;
1737 }
1738
1739 sstate = g_new0(struct trimmer_iterator_stream_state,
1740 1);
1741 if (!sstate) {
1742 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
1743 goto end;
1744 }
1745
1746 sstate->stream = stream;
1747 sstate->stream_act_end_ns_from_origin = INT64_MIN;
1748 g_hash_table_insert(trimmer_it->stream_states,
1749 (void *) stream, sstate);
1750 }
1751 }
1752
1753 /* Retrieve the message's time */
1754 ret = get_msg_ns_from_origin(msg, &ns_from_origin, &skip);
1755 if (unlikely(ret)) {
1756 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1757 goto end;
1758 }
1759
1760 if (likely(sstate)) {
1761 /* Message associated to a stream */
1762 status = handle_message_with_stream_state(trimmer_it, msg,
1763 sstate, ns_from_origin, reached_end);
1764
1765 /*
1766 * handle_message_with_stream_state() unconditionally
1767 * consumes `msg`.
1768 */
1769 msg = NULL;
1770 } else {
1771 /*
1772 * Message not associated to a stream (message iterator
1773 * inactivity).
1774 */
1775 if (unlikely(ns_from_origin > trimmer_it->end.ns_from_origin)) {
1776 BT_MESSAGE_PUT_REF_AND_RESET(msg);
1777 status = end_iterator_streams(trimmer_it);
1778 *reached_end = true;
1779 } else {
1780 push_message(trimmer_it, msg);
1781 msg = NULL;
1782 }
1783 }
1784
1785end:
1786 /* We release the message's reference whatever the outcome */
1787 bt_message_put_ref(msg);
1788 return status;
1789}
1790
1791static inline
1792void fill_message_array_from_output_messages(
1793 struct trimmer_iterator *trimmer_it,
1794 bt_message_array_const msgs, uint64_t capacity, uint64_t *count)
1795{
1796 *count = 0;
1797
1798 /*
1799 * Move auto-seek messages to the output array (which is this
1800 * iterator's base message array).
1801 */
1802 while (capacity > 0 && !g_queue_is_empty(trimmer_it->output_messages)) {
1803 msgs[*count] = pop_message(trimmer_it);
1804 capacity--;
1805 (*count)++;
1806 }
1807
1808 BT_ASSERT(*count > 0);
1809}
1810
1811static inline
1812bt_self_message_iterator_status state_ending(
1813 struct trimmer_iterator *trimmer_it,
1814 bt_message_array_const msgs, uint64_t capacity,
1815 uint64_t *count)
1816{
1817 bt_self_message_iterator_status status =
1818 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1819
1820 if (g_queue_is_empty(trimmer_it->output_messages)) {
1821 trimmer_it->state = TRIMMER_ITERATOR_STATE_ENDED;
1822 status = BT_SELF_MESSAGE_ITERATOR_STATUS_END;
1823 goto end;
1824 }
1825
1826 fill_message_array_from_output_messages(trimmer_it, msgs,
1827 capacity, count);
1828
1829end:
1830 return status;
1831}
1832
1833static inline
1834bt_self_message_iterator_status state_trim(struct trimmer_iterator *trimmer_it,
1835 bt_message_array_const msgs, uint64_t capacity,
1836 uint64_t *count)
1837{
1838 bt_self_message_iterator_status status =
1839 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1840 bt_message_array_const my_msgs;
1841 uint64_t my_count;
1842 uint64_t i;
1843 bool reached_end = false;
1844
1845 while (g_queue_is_empty(trimmer_it->output_messages)) {
1846 status = (int) bt_self_component_port_input_message_iterator_next(
1847 trimmer_it->upstream_iter, &my_msgs, &my_count);
1848 if (unlikely(status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK)) {
1849 if (status == BT_SELF_MESSAGE_ITERATOR_STATUS_END) {
1850 status = end_iterator_streams(trimmer_it);
1851 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1852 goto end;
1853 }
1854
1855 trimmer_it->state =
1856 TRIMMER_ITERATOR_STATE_ENDING;
1857 status = state_ending(trimmer_it, msgs,
1858 capacity, count);
1859 }
1860
1861 goto end;
1862 }
1863
1864 BT_ASSERT(my_count > 0);
1865
1866 for (i = 0; i < my_count; i++) {
1867 status = handle_message(trimmer_it, my_msgs[i],
1868 &reached_end);
1869
1870 /*
1871 * handle_message() unconditionally consumes the
1872 * message reference.
1873 */
1874 my_msgs[i] = NULL;
1875
1876 if (unlikely(status !=
1877 BT_SELF_MESSAGE_ITERATOR_STATUS_OK)) {
1878 put_messages(my_msgs, my_count);
1879 goto end;
1880 }
1881
1882 if (unlikely(reached_end)) {
1883 /*
1884 * This message's time was passed the
1885 * trimming time range's end time: we
1886 * are done. Their might still be
1887 * messages in the output message queue,
1888 * so move to the "ending" state and
1889 * apply it immediately since
1890 * state_trim() is called within the
1891 * "next" method.
1892 */
1893 put_messages(my_msgs, my_count);
1894 trimmer_it->state =
1895 TRIMMER_ITERATOR_STATE_ENDING;
1896 status = state_ending(trimmer_it, msgs,
1897 capacity, count);
1898 goto end;
1899 }
1900 }
1901 }
1902
1903 /*
1904 * There's at least one message in the output message queue:
1905 * move the messages to the output message array.
1906 */
1907 BT_ASSERT(!g_queue_is_empty(trimmer_it->output_messages));
1908 fill_message_array_from_output_messages(trimmer_it, msgs,
1909 capacity, count);
1910
1911end:
1912 return status;
1913}
1914
1915BT_HIDDEN
1916bt_self_message_iterator_status trimmer_msg_iter_next(
1917 bt_self_message_iterator *self_msg_iter,
1918 bt_message_array_const msgs, uint64_t capacity,
1919 uint64_t *count)
1920{
1921 struct trimmer_iterator *trimmer_it =
1922 bt_self_message_iterator_get_data(self_msg_iter);
1923 bt_self_message_iterator_status status =
1924 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1925
1926 BT_ASSERT(trimmer_it);
1927
1928 if (likely(trimmer_it->state == TRIMMER_ITERATOR_STATE_TRIM)) {
1929 status = state_trim(trimmer_it, msgs, capacity, count);
1930 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1931 goto end;
1932 }
1933 } else {
1934 switch (trimmer_it->state) {
1935 case TRIMMER_ITERATOR_STATE_SET_BOUNDS_NS_FROM_ORIGIN:
1936 status = state_set_trimmer_iterator_bounds(trimmer_it);
1937 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1938 goto end;
1939 }
1940
1941 status = state_seek_initially(trimmer_it);
1942 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1943 goto end;
1944 }
1945
1946 status = state_trim(trimmer_it, msgs, capacity, count);
1947 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1948 goto end;
1949 }
1950
1951 break;
1952 case TRIMMER_ITERATOR_STATE_SEEK_INITIALLY:
1953 status = state_seek_initially(trimmer_it);
1954 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1955 goto end;
1956 }
1957
1958 status = state_trim(trimmer_it, msgs, capacity, count);
1959 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1960 goto end;
1961 }
1962
1963 break;
1964 case TRIMMER_ITERATOR_STATE_ENDING:
1965 status = state_ending(trimmer_it, msgs, capacity,
1966 count);
1967 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1968 goto end;
1969 }
1970
1971 break;
1972 case TRIMMER_ITERATOR_STATE_ENDED:
1973 status = BT_SELF_MESSAGE_ITERATOR_STATUS_END;
1974 break;
1975 default:
1976 abort();
1977 }
1978 }
1979
1980end:
1981 return status;
1982}
1983
1984BT_HIDDEN
1985void trimmer_msg_iter_finalize(bt_self_message_iterator *self_msg_iter)
1986{
1987 struct trimmer_iterator *trimmer_it =
1988 bt_self_message_iterator_get_data(self_msg_iter);
1989
1990 BT_ASSERT(trimmer_it);
1991 destroy_trimmer_iterator(trimmer_it);
1992}
This page took 0.119311 seconds and 4 git commands to generate.