Fix: flt.utils.trimmer: bt_message_put_ref() on freed message
[babeltrace.git] / plugins / utils / trimmer / trimmer.c
CommitLineData
cab3f160 1/*
cab3f160 2 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
7de0e49a 3 * Copyright 2019 Philippe Proulx <pproulx@efficios.com>
cab3f160
JG
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a copy
6 * of this software and associated documentation files (the "Software"), to deal
7 * in the Software without restriction, including without limitation the rights
8 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 * copies of the Software, and to permit persons to whom the Software is
10 * furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21 * SOFTWARE.
22 */
23
b4565e8b
PP
24#define BT_LOG_TAG "PLUGIN-UTILS-TRIMMER-FLT"
25#include "logging.h"
26
3d2f08e7 27#include <babeltrace/compat/utc-internal.h>
7de0e49a 28#include <babeltrace/compat/time-internal.h>
9d408fca 29#include <babeltrace/babeltrace.h>
7de0e49a 30#include <babeltrace/common-internal.h>
7d61fa8e 31#include <plugins-common.h>
f6ccaed9 32#include <babeltrace/assert-internal.h>
7de0e49a
PP
33#include <stdint.h>
34#include <inttypes.h>
35#include <glib.h>
36
37#include "trimmer.h"
38
39#define NS_PER_S INT64_C(1000000000)
40
41static const char * const in_port_name = "in";
42
43struct trimmer_time {
44 unsigned int hour, minute, second, ns;
45};
46
47struct trimmer_bound {
48 /*
49 * Nanoseconds from origin, valid if `is_set` is set and
50 * `is_infinite` is false.
51 */
52 int64_t ns_from_origin;
53
54 /* True if this bound's full time (`ns_from_origin`) is set */
55 bool is_set;
56
57 /*
58 * True if this bound represents the infinity (negative or
59 * positive depending on which bound it is). If this is true,
60 * then we don't care about `ns_from_origin` above.
61 */
62 bool is_infinite;
63
64 /*
65 * This bound's time without the date; this time is used to set
66 * `ns_from_origin` once we know the date.
67 */
68 struct trimmer_time time;
69};
70
71struct trimmer_comp {
72 struct trimmer_bound begin, end;
73 bool is_gmt;
74};
75
76enum trimmer_iterator_state {
77 /*
78 * Find the first message's date and set the bounds's times
79 * accordingly.
80 */
81 TRIMMER_ITERATOR_STATE_SET_BOUNDS_NS_FROM_ORIGIN,
82
83 /*
84 * Initially seek to the trimming range's beginning time.
85 */
86 TRIMMER_ITERATOR_STATE_SEEK_INITIALLY,
87
88 /*
89 * Fill the output message queue for as long as received input
90 * messages are within the trimming time range.
91 */
92 TRIMMER_ITERATOR_STATE_TRIM,
93
94 /* Flush the remaining messages in the output message queue */
95 TRIMMER_ITERATOR_STATE_ENDING,
96
97 /* Trimming operation and message iterator is ended */
98 TRIMMER_ITERATOR_STATE_ENDED,
99};
100
101struct trimmer_iterator {
102 /* Weak */
103 struct trimmer_comp *trimmer_comp;
104
105 /* Weak */
106 bt_self_message_iterator *self_msg_iter;
107
108 enum trimmer_iterator_state state;
109
110 /* Owned by this */
111 bt_self_component_port_input_message_iterator *upstream_iter;
112 struct trimmer_bound begin, end;
113
114 /*
115 * Queue of `const bt_message *` (owned by the queue).
116 *
117 * This is where the trimming operation pushes the messages to
118 * output by this message iterator.
119 */
120 GQueue *output_messages;
121
122 /*
123 * Hash table of `bt_stream *` (weak) to
124 * `struct trimmer_iterator_stream_state *` (owned by the HT).
125 */
126 GHashTable *stream_states;
127};
128
129struct trimmer_iterator_stream_state {
130 /*
131 * True if both stream beginning and initial stream activity
132 * beginning messages were pushed for this stream.
133 */
134 bool inited;
135
136 /*
137 * True if the last pushed message for this stream was a stream
138 * activity end message.
139 */
140 bool last_msg_is_stream_activity_end;
141
142 /*
143 * Time to use for a generated stream end activity message when
144 * ending the stream.
145 */
146 int64_t stream_act_end_ns_from_origin;
147
148 /* Weak */
149 const bt_stream *stream;
150
151 /* Owned by this (`NULL` initially and between packets) */
152 const bt_packet *cur_packet;
153
154 /* Owned by this */
155 const bt_message *stream_beginning_msg;
156};
cab3f160
JG
157
158static
7de0e49a 159void destroy_trimmer_comp(struct trimmer_comp *trimmer_comp)
cab3f160 160{
7de0e49a
PP
161 BT_ASSERT(trimmer_comp);
162 g_free(trimmer_comp);
cab3f160
JG
163}
164
165static
7de0e49a
PP
166struct trimmer_comp *create_trimmer_comp(void)
167{
168 return g_new0(struct trimmer_comp, 1);
169}
170
171BT_HIDDEN
172void trimmer_finalize(bt_self_component_filter *self_comp)
cab3f160 173{
7de0e49a
PP
174 struct trimmer_comp *trimmer_comp =
175 bt_self_component_get_data(
176 bt_self_component_filter_as_self_component(self_comp));
177
178 if (trimmer_comp) {
179 destroy_trimmer_comp(trimmer_comp);
180 }
cab3f160
JG
181}
182
7de0e49a
PP
183/*
184 * Sets the time (in ns from origin) of a trimmer bound from date and
185 * time components.
186 *
187 * Returns a negative value if anything goes wrong.
188 */
189static
190int set_bound_ns_from_origin(struct trimmer_bound *bound,
191 unsigned int year, unsigned int month, unsigned int day,
192 unsigned int hour, unsigned int minute, unsigned int second,
193 unsigned int ns, bool is_gmt)
cab3f160 194{
7de0e49a
PP
195 int ret = 0;
196 time_t result;
197 struct tm tm = {
198 .tm_sec = second,
199 .tm_min = minute,
200 .tm_hour = hour,
201 .tm_mday = day,
202 .tm_mon = month - 1,
203 .tm_year = year - 1900,
204 .tm_isdst = -1,
205 };
206
207 if (is_gmt) {
208 result = bt_timegm(&tm);
209 } else {
210 result = mktime(&tm);
211 }
212
213 if (result < 0) {
214 ret = -1;
215 goto end;
216 }
cab3f160 217
7de0e49a
PP
218 BT_ASSERT(bound);
219 bound->ns_from_origin = (int64_t) result;
220 bound->ns_from_origin *= NS_PER_S;
221 bound->ns_from_origin += ns;
222 bound->is_set = true;
223
224end:
225 return ret;
cab3f160
JG
226}
227
528debdf
MD
228/*
229 * Parses a timestamp, figuring out its format.
230 *
231 * Returns a negative value if anything goes wrong.
232 *
233 * Expected formats:
234 *
7de0e49a
PP
235 * YYYY-MM-DD hh:mm[:ss[.ns]]
236 * [hh:mm:]ss[.ns]
237 * [-]s[.ns]
238 *
239 * TODO: Check overflows.
240 */
241static
242int set_bound_from_str(const char *str, struct trimmer_bound *bound,
243 bool is_gmt)
244{
245 int ret = 0;
246 int s_ret;
247 unsigned int year, month, day, hour, minute, second, ns;
248 char dummy;
249
250 /* Try `YYYY-MM-DD hh:mm:ss.ns` format */
251 s_ret = sscanf(str, "%u-%u-%u %u:%u:%u.%u%c", &year, &month, &day,
252 &hour, &minute, &second, &ns, &dummy);
253 if (s_ret == 7) {
254 ret = set_bound_ns_from_origin(bound, year, month, day,
255 hour, minute, second, ns, is_gmt);
256 goto end;
257 }
258
259 /* Try `YYYY-MM-DD hh:mm:ss` format */
260 s_ret = sscanf(str, "%u-%u-%u %u:%u:%u%c", &year, &month, &day,
261 &hour, &minute, &second, &dummy);
262 if (s_ret == 6) {
263 ret = set_bound_ns_from_origin(bound, year, month, day,
264 hour, minute, second, 0, is_gmt);
265 goto end;
266 }
267
268 /* Try `YYYY-MM-DD hh:mm` format */
269 s_ret = sscanf(str, "%u-%u-%u %u:%u%c", &year, &month, &day,
270 &hour, &minute, &dummy);
271 if (s_ret == 5) {
272 ret = set_bound_ns_from_origin(bound, year, month, day,
273 hour, minute, 0, 0, is_gmt);
274 goto end;
275 }
276
277 /* Try `YYYY-MM-DD` format */
278 s_ret = sscanf(str, "%u-%u-%u%c", &year, &month, &day, &dummy);
279 if (s_ret == 3) {
280 ret = set_bound_ns_from_origin(bound, year, month, day,
281 0, 0, 0, 0, is_gmt);
282 goto end;
283 }
284
285 /* Try `hh:mm:ss.ns` format */
286 s_ret = sscanf(str, "%u:%u:%u.%u%c", &hour, &minute, &second, &ns,
287 &dummy);
288 if (s_ret == 4) {
289 bound->time.hour = hour;
290 bound->time.minute = minute;
291 bound->time.second = second;
292 bound->time.ns = ns;
293 goto end;
294 }
295
296 /* Try `hh:mm:ss` format */
297 s_ret = sscanf(str, "%u:%u:%u%c", &hour, &minute, &second, &dummy);
298 if (s_ret == 3) {
299 bound->time.hour = hour;
300 bound->time.minute = minute;
301 bound->time.second = second;
302 bound->time.ns = 0;
303 goto end;
304 }
305
306 /* Try `-s.ns` format */
307 s_ret = sscanf(str, "-%u.%u%c", &second, &ns, &dummy);
308 if (s_ret == 2) {
309 bound->ns_from_origin = -((int64_t) second) * NS_PER_S;
310 bound->ns_from_origin -= (int64_t) ns;
311 bound->is_set = true;
312 goto end;
313 }
314
315 /* Try `s.ns` format */
316 s_ret = sscanf(str, "%u.%u%c", &second, &ns, &dummy);
317 if (s_ret == 2) {
318 bound->ns_from_origin = ((int64_t) second) * NS_PER_S;
319 bound->ns_from_origin += (int64_t) ns;
320 bound->is_set = true;
321 goto end;
322 }
323
324 /* Try `-s` format */
325 s_ret = sscanf(str, "-%u%c", &second, &dummy);
326 if (s_ret == 1) {
327 bound->ns_from_origin = -((int64_t) second) * NS_PER_S;
328 bound->is_set = true;
329 goto end;
330 }
331
332 /* Try `s` format */
333 s_ret = sscanf(str, "%u%c", &second, &dummy);
334 if (s_ret == 1) {
335 bound->ns_from_origin = (int64_t) second * NS_PER_S;
336 bound->is_set = true;
337 goto end;
338 }
339
340 BT_LOGE("Invalid date/time format: param=\"%s\"", str);
341 ret = -1;
342
343end:
344 return ret;
345}
346
347/*
348 * Sets a trimmer bound's properties from a parameter string/integer
349 * value.
350 *
351 * Returns a negative value if anything goes wrong.
528debdf 352 */
44d3cbf0 353static
7de0e49a
PP
354int set_bound_from_param(const char *param_name, const bt_value *param,
355 struct trimmer_bound *bound, bool is_gmt)
528debdf
MD
356{
357 int ret;
268fae9a 358 const char *arg;
7de0e49a 359 char tmp_arg[64];
268fae9a 360
fdd3a2da
PP
361 if (bt_value_is_signed_integer(param)) {
362 int64_t value = bt_value_signed_integer_get(param);
7de0e49a
PP
363
364 /*
365 * Just convert it to a temporary string to handle
366 * everything the same way.
367 */
368 sprintf(tmp_arg, "%" PRId64, value);
369 arg = tmp_arg;
370 } else if (bt_value_is_string(param)) {
371 arg = bt_value_string_get(param);
372 } else {
268fae9a
PP
373 BT_LOGE("`%s` parameter must be an integer or a string value.",
374 param_name);
7de0e49a
PP
375 ret = -1;
376 goto end;
268fae9a
PP
377 }
378
7de0e49a
PP
379 ret = set_bound_from_str(arg, bound, is_gmt);
380
381end:
382 return ret;
383}
384
385static
386int validate_trimmer_bounds(struct trimmer_bound *begin,
387 struct trimmer_bound *end)
388{
389 int ret = 0;
390
391 BT_ASSERT(begin->is_set);
392 BT_ASSERT(end->is_set);
393
394 if (!begin->is_infinite && !end->is_infinite &&
395 begin->ns_from_origin > end->ns_from_origin) {
396 BT_LOGE("Trimming time range's beginning time is greater than end time: "
397 "begin-ns-from-origin=%" PRId64 ", "
398 "end-ns-from-origin=%" PRId64,
399 begin->ns_from_origin,
400 end->ns_from_origin);
401 ret = -1;
402 goto end;
528debdf
MD
403 }
404
7de0e49a
PP
405 if (!begin->is_infinite && begin->ns_from_origin == INT64_MIN) {
406 BT_LOGE("Invalid trimming time range's beginning time: "
407 "ns-from-origin=%" PRId64,
408 begin->ns_from_origin);
409 ret = -1;
410 goto end;
411 }
528debdf 412
7de0e49a
PP
413 if (!end->is_infinite && end->ns_from_origin == INT64_MIN) {
414 BT_LOGE("Invalid trimming time range's end time: "
415 "ns-from-origin=%" PRId64,
416 end->ns_from_origin);
417 ret = -1;
418 goto end;
419 }
528debdf 420
7de0e49a
PP
421end:
422 return ret;
528debdf
MD
423}
424
425static
7de0e49a
PP
426int init_trimmer_comp_from_params(struct trimmer_comp *trimmer_comp,
427 const bt_value *params)
44d3cbf0 428{
7de0e49a
PP
429 const bt_value *value;
430 int ret = 0;
44d3cbf0 431
f6ccaed9 432 BT_ASSERT(params);
7de0e49a 433 value = bt_value_map_borrow_entry_value_const(params, "gmt");
528debdf 434 if (value) {
7de0e49a 435 trimmer_comp->is_gmt = (bool) bt_value_bool_get(value);
528debdf
MD
436 }
437
7de0e49a 438 value = bt_value_map_borrow_entry_value_const(params, "begin");
528debdf 439 if (value) {
7de0e49a
PP
440 if (set_bound_from_param("begin", value,
441 &trimmer_comp->begin, trimmer_comp->is_gmt)) {
442 /* set_bound_from_param() logs errors */
443 ret = BT_SELF_COMPONENT_STATUS_ERROR;
268fae9a 444 goto end;
44d3cbf0 445 }
7de0e49a
PP
446 } else {
447 trimmer_comp->begin.is_infinite = true;
448 trimmer_comp->begin.is_set = true;
44d3cbf0 449 }
528debdf 450
7de0e49a 451 value = bt_value_map_borrow_entry_value_const(params, "end");
528debdf 452 if (value) {
7de0e49a
PP
453 if (set_bound_from_param("end", value,
454 &trimmer_comp->end, trimmer_comp->is_gmt)) {
455 /* set_bound_from_param() logs errors */
456 ret = BT_SELF_COMPONENT_STATUS_ERROR;
268fae9a 457 goto end;
528debdf 458 }
7de0e49a
PP
459 } else {
460 trimmer_comp->end.is_infinite = true;
461 trimmer_comp->end.is_set = true;
528debdf 462 }
268fae9a 463
528debdf 464end:
7de0e49a
PP
465 if (trimmer_comp->begin.is_set && trimmer_comp->end.is_set) {
466 /* validate_trimmer_bounds() logs errors */
467 ret = validate_trimmer_bounds(&trimmer_comp->begin,
468 &trimmer_comp->end);
55595636 469 }
7de0e49a 470
528debdf 471 return ret;
44d3cbf0
JG
472}
473
7de0e49a
PP
474bt_self_component_status trimmer_init(bt_self_component_filter *self_comp,
475 const bt_value *params, void *init_data)
cab3f160 476{
7de0e49a
PP
477 int ret;
478 bt_self_component_status status;
479 struct trimmer_comp *trimmer_comp = create_trimmer_comp();
cab3f160 480
7de0e49a 481 if (!trimmer_comp) {
8dad9b32 482 status = BT_SELF_COMPONENT_STATUS_NOMEM;
7de0e49a 483 goto error;
cab3f160
JG
484 }
485
7de0e49a
PP
486 status = bt_self_component_filter_add_input_port(
487 self_comp, in_port_name, NULL, NULL);
488 if (status != BT_SELF_COMPONENT_STATUS_OK) {
b9d103be
PP
489 goto error;
490 }
491
7de0e49a
PP
492 status = bt_self_component_filter_add_output_port(
493 self_comp, "out", NULL, NULL);
494 if (status != BT_SELF_COMPONENT_STATUS_OK) {
b9d103be
PP
495 goto error;
496 }
497
7de0e49a
PP
498 ret = init_trimmer_comp_from_params(trimmer_comp, params);
499 if (ret) {
8dad9b32 500 status = BT_SELF_COMPONENT_STATUS_ERROR;
cab3f160
JG
501 goto error;
502 }
503
7de0e49a
PP
504 bt_self_component_set_data(
505 bt_self_component_filter_as_self_component(self_comp),
506 trimmer_comp);
507 goto end;
508
509error:
510 if (status == BT_SELF_COMPONENT_STATUS_OK) {
511 status = BT_SELF_COMPONENT_STATUS_ERROR;
512 }
513
514 if (trimmer_comp) {
515 destroy_trimmer_comp(trimmer_comp);
516 }
517
cab3f160 518end:
8dad9b32 519 return status;
7de0e49a
PP
520}
521
522static
523void destroy_trimmer_iterator(struct trimmer_iterator *trimmer_it)
524{
525 BT_ASSERT(trimmer_it);
526 bt_self_component_port_input_message_iterator_put_ref(
527 trimmer_it->upstream_iter);
528
529 if (trimmer_it->output_messages) {
530 g_queue_free(trimmer_it->output_messages);
531 }
532
533 if (trimmer_it->stream_states) {
534 g_hash_table_destroy(trimmer_it->stream_states);
535 }
536
537 g_free(trimmer_it);
538}
539
540static
541void destroy_trimmer_iterator_stream_state(
542 struct trimmer_iterator_stream_state *sstate)
543{
544 BT_ASSERT(sstate);
545 BT_PACKET_PUT_REF_AND_RESET(sstate->cur_packet);
546 BT_MESSAGE_PUT_REF_AND_RESET(sstate->stream_beginning_msg);
547 g_free(sstate);
548}
549
550BT_HIDDEN
551bt_self_message_iterator_status trimmer_msg_iter_init(
552 bt_self_message_iterator *self_msg_iter,
553 bt_self_component_filter *self_comp,
554 bt_self_component_port_output *port)
555{
556 bt_self_message_iterator_status status =
557 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
558 struct trimmer_iterator *trimmer_it;
559
560 trimmer_it = g_new0(struct trimmer_iterator, 1);
561 if (!trimmer_it) {
562 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
563 goto end;
564 }
565
566 trimmer_it->trimmer_comp = bt_self_component_get_data(
567 bt_self_component_filter_as_self_component(self_comp));
568 BT_ASSERT(trimmer_it->trimmer_comp);
569
570 if (trimmer_it->trimmer_comp->begin.is_set &&
571 trimmer_it->trimmer_comp->end.is_set) {
572 /*
573 * Both trimming time range's bounds are set, so skip
574 * the
575 * `TRIMMER_ITERATOR_STATE_SET_BOUNDS_NS_FROM_ORIGIN`
576 * phase.
577 */
578 trimmer_it->state = TRIMMER_ITERATOR_STATE_SEEK_INITIALLY;
579 }
580
581 trimmer_it->begin = trimmer_it->trimmer_comp->begin;
582 trimmer_it->end = trimmer_it->trimmer_comp->end;
583 trimmer_it->upstream_iter =
584 bt_self_component_port_input_message_iterator_create(
585 bt_self_component_filter_borrow_input_port_by_name(
586 self_comp, in_port_name));
587 if (!trimmer_it->upstream_iter) {
588 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
589 goto end;
590 }
591
592 trimmer_it->output_messages = g_queue_new();
593 if (!trimmer_it->output_messages) {
594 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
595 goto end;
596 }
597
598 trimmer_it->stream_states = g_hash_table_new_full(g_direct_hash,
599 g_direct_equal, NULL,
600 (GDestroyNotify) destroy_trimmer_iterator_stream_state);
601 if (!trimmer_it->stream_states) {
602 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
603 goto end;
604 }
605
606 trimmer_it->self_msg_iter = self_msg_iter;
607 bt_self_message_iterator_set_data(self_msg_iter, trimmer_it);
608
609end:
610 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK && trimmer_it) {
611 destroy_trimmer_iterator(trimmer_it);
612 }
613
614 return status;
615}
616
617static inline
618int get_msg_ns_from_origin(const bt_message *msg, int64_t *ns_from_origin,
619 bool *skip)
620{
621 const bt_clock_class *clock_class = NULL;
622 const bt_clock_snapshot *clock_snapshot = NULL;
623 bt_clock_snapshot_state cs_state = BT_CLOCK_SNAPSHOT_STATE_KNOWN;
624 bt_message_stream_activity_clock_snapshot_state sa_cs_state;
625 int ret = 0;
626
627 BT_ASSERT(msg);
628 BT_ASSERT(ns_from_origin);
629 BT_ASSERT(skip);
630
631 switch (bt_message_get_type(msg)) {
632 case BT_MESSAGE_TYPE_EVENT:
633 clock_class =
634 bt_message_event_borrow_stream_class_default_clock_class_const(
635 msg);
636 if (unlikely(!clock_class)) {
637 goto error;
638 }
639
640 cs_state = bt_message_event_borrow_default_clock_snapshot_const(
641 msg, &clock_snapshot);
642 break;
643 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
644 clock_class =
645 bt_message_packet_beginning_borrow_stream_class_default_clock_class_const(
646 msg);
647 if (unlikely(!clock_class)) {
648 goto error;
649 }
650
651 cs_state = bt_message_packet_beginning_borrow_default_clock_snapshot_const(
652 msg, &clock_snapshot);
653 break;
654 case BT_MESSAGE_TYPE_PACKET_END:
655 clock_class =
656 bt_message_packet_end_borrow_stream_class_default_clock_class_const(
657 msg);
658 if (unlikely(!clock_class)) {
659 goto error;
660 }
661
662 cs_state = bt_message_packet_end_borrow_default_clock_snapshot_const(
663 msg, &clock_snapshot);
664 break;
665 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
666 clock_class =
667 bt_message_discarded_events_borrow_stream_class_default_clock_class_const(
668 msg);
669 if (unlikely(!clock_class)) {
670 goto error;
671 }
672
673 cs_state = bt_message_discarded_events_borrow_default_beginning_clock_snapshot_const(
674 msg, &clock_snapshot);
675 break;
676 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
677 clock_class =
678 bt_message_discarded_packets_borrow_stream_class_default_clock_class_const(
679 msg);
680 if (unlikely(!clock_class)) {
681 goto error;
682 }
683
684 cs_state = bt_message_discarded_packets_borrow_default_beginning_clock_snapshot_const(
685 msg, &clock_snapshot);
686 break;
687 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING:
688 clock_class =
689 bt_message_stream_activity_beginning_borrow_stream_class_default_clock_class_const(
690 msg);
691 if (unlikely(!clock_class)) {
692 goto error;
693 }
694
695 sa_cs_state = bt_message_stream_activity_beginning_borrow_default_clock_snapshot_const(
696 msg, &clock_snapshot);
697 if (sa_cs_state == BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_UNKNOWN ||
698 sa_cs_state == BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_INFINITE) {
699 /* Lowest possible time to always include them */
700 *ns_from_origin = INT64_MIN;
701 goto no_clock_snapshot;
702 }
703
704 break;
705 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END:
706 clock_class =
707 bt_message_stream_activity_end_borrow_stream_class_default_clock_class_const(
708 msg);
709 if (unlikely(!clock_class)) {
710 goto error;
711 }
712
713 sa_cs_state = bt_message_stream_activity_end_borrow_default_clock_snapshot_const(
714 msg, &clock_snapshot);
715 if (sa_cs_state == BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_UNKNOWN) {
716 /* Lowest time to always include it */
717 *ns_from_origin = INT64_MIN;
718 goto no_clock_snapshot;
719 } else if (sa_cs_state == BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_INFINITE) {
720 /* Greatest time to always exclude it */
721 *ns_from_origin = INT64_MAX;
722 goto no_clock_snapshot;
723 }
724
725 break;
726 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
727 cs_state =
728 bt_message_message_iterator_inactivity_borrow_default_clock_snapshot_const(
729 msg, &clock_snapshot);
730 break;
731 default:
732 goto no_clock_snapshot;
733 }
734
735 if (unlikely(cs_state != BT_CLOCK_SNAPSHOT_STATE_KNOWN)) {
736 BT_LOGE_STR("Unsupported unknown clock snapshot.");
737 ret = -1;
738 goto end;
739 }
740
741 ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot,
742 ns_from_origin);
743 if (unlikely(ret)) {
744 goto error;
745 }
746
747 goto end;
748
749no_clock_snapshot:
750 *skip = true;
751 goto end;
752
cab3f160 753error:
7de0e49a
PP
754 ret = -1;
755
756end:
757 return ret;
758}
759
760static inline
761void put_messages(bt_message_array_const msgs, uint64_t count)
762{
763 uint64_t i;
764
765 for (i = 0; i < count; i++) {
766 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
767 }
768}
769
770static inline
771int set_trimmer_iterator_bound(struct trimmer_bound *bound,
772 int64_t ns_from_origin, bool is_gmt)
773{
774 struct tm tm;
775 time_t time_seconds = (time_t) (ns_from_origin / NS_PER_S);
776 int ret = 0;
777
778 BT_ASSERT(!bound->is_set);
779 errno = 0;
780
781 /* We only need to extract the date from this time */
782 if (is_gmt) {
783 bt_gmtime_r(&time_seconds, &tm);
784 } else {
785 bt_localtime_r(&time_seconds, &tm);
786 }
787
788 if (errno) {
789 BT_LOGE_ERRNO("Cannot convert timestamp to date and time",
790 "ts=%" PRId64, (int64_t) time_seconds);
791 ret = -1;
792 goto end;
793 }
794
795 ret = set_bound_ns_from_origin(bound, tm.tm_year + 1900, tm.tm_mon + 1,
796 tm.tm_mday, bound->time.hour, bound->time.minute,
797 bound->time.second, bound->time.ns, is_gmt);
798
799end:
cab3f160
JG
800 return ret;
801}
7de0e49a
PP
802
803static
804bt_self_message_iterator_status state_set_trimmer_iterator_bounds(
805 struct trimmer_iterator *trimmer_it)
806{
807 bt_message_iterator_status upstream_iter_status =
808 BT_MESSAGE_ITERATOR_STATUS_OK;
809 struct trimmer_comp *trimmer_comp = trimmer_it->trimmer_comp;
810 bt_message_array_const msgs;
811 uint64_t count = 0;
812 int64_t ns_from_origin = INT64_MIN;
813 uint64_t i;
814 int ret;
815
816 BT_ASSERT(!trimmer_it->begin.is_set ||
817 !trimmer_it->end.is_set);
818
819 while (true) {
820 upstream_iter_status =
821 bt_self_component_port_input_message_iterator_next(
822 trimmer_it->upstream_iter, &msgs, &count);
823 if (upstream_iter_status != BT_MESSAGE_ITERATOR_STATUS_OK) {
824 goto end;
825 }
826
827 for (i = 0; i < count; i++) {
828 const bt_message *msg = msgs[i];
829 bool skip = false;
830 int ret;
831
832 ret = get_msg_ns_from_origin(msg, &ns_from_origin,
833 &skip);
834 if (ret) {
835 goto error;
836 }
837
838 if (skip) {
839 continue;
840 }
841
842 BT_ASSERT(ns_from_origin != INT64_MIN &&
843 ns_from_origin != INT64_MAX);
844 put_messages(msgs, count);
845 goto found;
846 }
847
848 put_messages(msgs, count);
849 }
850
851found:
852 if (!trimmer_it->begin.is_set) {
853 BT_ASSERT(!trimmer_it->begin.is_infinite);
854 ret = set_trimmer_iterator_bound(&trimmer_it->begin,
855 ns_from_origin, trimmer_comp->is_gmt);
856 if (ret) {
857 goto error;
858 }
859 }
860
861 if (!trimmer_it->end.is_set) {
862 BT_ASSERT(!trimmer_it->end.is_infinite);
863 ret = set_trimmer_iterator_bound(&trimmer_it->end,
864 ns_from_origin, trimmer_comp->is_gmt);
865 if (ret) {
866 goto error;
867 }
868 }
869
870 ret = validate_trimmer_bounds(&trimmer_it->begin,
871 &trimmer_it->end);
872 if (ret) {
873 goto error;
874 }
875
876 goto end;
877
878error:
879 put_messages(msgs, count);
880 upstream_iter_status = BT_MESSAGE_ITERATOR_STATUS_ERROR;
881
882end:
883 return (int) upstream_iter_status;
884}
885
886static
887bt_self_message_iterator_status state_seek_initially(
888 struct trimmer_iterator *trimmer_it)
889{
890 bt_self_message_iterator_status status =
891 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
892
893 BT_ASSERT(trimmer_it->begin.is_set);
894
895 if (trimmer_it->begin.is_infinite) {
896 if (!bt_self_component_port_input_message_iterator_can_seek_beginning(
897 trimmer_it->upstream_iter)) {
898 BT_LOGE_STR("Cannot make upstream message iterator initially seek its beginning.");
899 status = BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
900 goto end;
901 }
902
903 status = (int) bt_self_component_port_input_message_iterator_seek_beginning(
904 trimmer_it->upstream_iter);
905 } else {
906 if (!bt_self_component_port_input_message_iterator_can_seek_ns_from_origin(
907 trimmer_it->upstream_iter,
908 trimmer_it->begin.ns_from_origin)) {
909 BT_LOGE("Cannot make upstream message iterator initially seek: "
910 "seek-ns-from-origin=%" PRId64,
911 trimmer_it->begin.ns_from_origin);
912 status = BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
913 goto end;
914 }
915
916 status = (int) bt_self_component_port_input_message_iterator_seek_ns_from_origin(
917 trimmer_it->upstream_iter, trimmer_it->begin.ns_from_origin);
918 }
919
920 if (status == BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
921 trimmer_it->state = TRIMMER_ITERATOR_STATE_TRIM;
922 }
923
924end:
925 return status;
926}
927
928static inline
929void push_message(struct trimmer_iterator *trimmer_it, const bt_message *msg)
930{
931 g_queue_push_head(trimmer_it->output_messages, (void *) msg);
932}
933
934static inline
935const bt_message *pop_message(struct trimmer_iterator *trimmer_it)
936{
937 return g_queue_pop_tail(trimmer_it->output_messages);
938}
939
940static inline
941int clock_raw_value_from_ns_from_origin(const bt_clock_class *clock_class,
942 int64_t ns_from_origin, uint64_t *raw_value)
943{
944
945 int64_t cc_offset_s;
946 uint64_t cc_offset_cycles;
947 uint64_t cc_freq;
948
949 bt_clock_class_get_offset(clock_class, &cc_offset_s, &cc_offset_cycles);
950 cc_freq = bt_clock_class_get_frequency(clock_class);
951 return bt_common_clock_value_from_ns_from_origin(cc_offset_s,
952 cc_offset_cycles, cc_freq, ns_from_origin, raw_value);
953}
954
955static inline
956bt_self_message_iterator_status end_stream(struct trimmer_iterator *trimmer_it,
957 struct trimmer_iterator_stream_state *sstate)
958{
959 bt_self_message_iterator_status status =
960 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
961 uint64_t raw_value;
962 const bt_clock_class *clock_class;
963 int ret;
964 bt_message *msg = NULL;
965
966 BT_ASSERT(!trimmer_it->end.is_infinite);
967
968 if (!sstate->stream) {
969 goto end;
970 }
971
972 if (sstate->cur_packet) {
973 /*
974 * The last message could not have been a stream
975 * activity end message if we have a current packet.
976 */
977 BT_ASSERT(!sstate->last_msg_is_stream_activity_end);
978
979 /*
980 * Create and push a packet end message, making its time
981 * the trimming range's end time.
982 */
983 clock_class = bt_stream_class_borrow_default_clock_class_const(
984 bt_stream_borrow_class_const(sstate->stream));
985 BT_ASSERT(clock_class);
986 ret = clock_raw_value_from_ns_from_origin(clock_class,
987 trimmer_it->end.ns_from_origin, &raw_value);
988 if (ret) {
989 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
990 goto end;
991 }
992
993 msg = bt_message_packet_end_create_with_default_clock_snapshot(
994 trimmer_it->self_msg_iter, sstate->cur_packet,
995 raw_value);
996 if (!msg) {
997 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
998 goto end;
999 }
1000
1001 push_message(trimmer_it, msg);
1002 msg = NULL;
1003 BT_PACKET_PUT_REF_AND_RESET(sstate->cur_packet);
1004
1005 /*
1006 * Because we generated a packet end message, set the
1007 * stream activity end message's time to use to the
1008 * trimming range's end time (this packet end message's
1009 * time).
1010 */
1011 sstate->stream_act_end_ns_from_origin =
1012 trimmer_it->end.ns_from_origin;
1013 }
1014
1015 if (!sstate->last_msg_is_stream_activity_end) {
1016 /* Create and push a stream activity end message */
1017 msg = bt_message_stream_activity_end_create(
1018 trimmer_it->self_msg_iter, sstate->stream);
1019 if (!msg) {
1020 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
1021 goto end;
1022 }
1023
1024 clock_class = bt_stream_class_borrow_default_clock_class_const(
1025 bt_stream_borrow_class_const(sstate->stream));
1026 BT_ASSERT(clock_class);
cb179cb6
PP
1027
1028 if (sstate->stream_act_end_ns_from_origin == INT64_MIN) {
1029 /*
1030 * We received at least what is necessary to
1031 * have a stream state (stream beginning and
1032 * stream activity beginning messages), but
1033 * nothing else: use the trimmer range's end
1034 * time.
1035 */
1036 sstate->stream_act_end_ns_from_origin =
1037 trimmer_it->end.ns_from_origin;
1038 }
1039
7de0e49a
PP
1040 ret = clock_raw_value_from_ns_from_origin(clock_class,
1041 sstate->stream_act_end_ns_from_origin, &raw_value);
1042 if (ret) {
1043 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1044 goto end;
1045 }
1046
1047 bt_message_stream_activity_end_set_default_clock_snapshot(
1048 msg, raw_value);
1049 push_message(trimmer_it, msg);
1050 msg = NULL;
1051 }
1052
1053 /* Create and push a stream end message */
1054 msg = bt_message_stream_end_create(trimmer_it->self_msg_iter,
1055 sstate->stream);
1056 if (!msg) {
1057 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
1058 goto end;
1059 }
1060
1061 push_message(trimmer_it, msg);
1062 msg = NULL;
1063
1064 /*
1065 * Just to make sure that we don't use this stream state again
1066 * in the future without an obvious error.
1067 */
1068 sstate->stream = NULL;
1069
1070end:
1071 bt_message_put_ref(msg);
1072 return status;
1073}
1074
1075static inline
1076bt_self_message_iterator_status end_iterator_streams(
1077 struct trimmer_iterator *trimmer_it)
1078{
1079 bt_self_message_iterator_status status =
1080 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1081 GHashTableIter iter;
1082 gpointer key, sstate;
1083
1084 if (trimmer_it->end.is_infinite) {
1085 /*
1086 * An infinite trimming range's end time guarantees that
1087 * we received (and pushed) all the appropriate end
1088 * messages.
1089 */
1090 goto remove_all;
1091 }
1092
1093 /*
1094 * End each stream and then remove them from the hash table of
1095 * stream states to release unneeded references.
1096 */
1097 g_hash_table_iter_init(&iter, trimmer_it->stream_states);
1098
1099 while (g_hash_table_iter_next(&iter, &key, &sstate)) {
1100 status = end_stream(trimmer_it, sstate);
1101 if (status) {
1102 goto end;
1103 }
1104 }
1105
1106remove_all:
1107 g_hash_table_remove_all(trimmer_it->stream_states);
1108
1109end:
1110 return status;
1111}
1112
1113static inline
1114bt_self_message_iterator_status create_stream_beginning_activity_message(
1115 struct trimmer_iterator *trimmer_it,
1116 const bt_stream *stream,
1117 const bt_clock_class *clock_class, bt_message **msg)
1118{
75a15167 1119 bt_message *local_msg;
7de0e49a
PP
1120 bt_self_message_iterator_status status =
1121 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1122
1123 BT_ASSERT(msg);
1124 BT_ASSERT(!trimmer_it->begin.is_infinite);
1125
75a15167 1126 local_msg = bt_message_stream_activity_beginning_create(
7de0e49a 1127 trimmer_it->self_msg_iter, stream);
75a15167 1128 if (!local_msg) {
7de0e49a
PP
1129 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
1130 goto end;
1131 }
1132
1133 if (clock_class) {
1134 int ret;
1135 uint64_t raw_value;
1136
1137 ret = clock_raw_value_from_ns_from_origin(clock_class,
1138 trimmer_it->begin.ns_from_origin, &raw_value);
1139 if (ret) {
1140 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
75a15167 1141 bt_message_put_ref(local_msg);
7de0e49a
PP
1142 goto end;
1143 }
1144
1145 bt_message_stream_activity_beginning_set_default_clock_snapshot(
75a15167 1146 local_msg, raw_value);
7de0e49a
PP
1147 }
1148
75a15167
FD
1149 BT_MESSAGE_MOVE_REF(*msg, local_msg);
1150
7de0e49a
PP
1151end:
1152 return status;
1153}
1154
1155/*
1156 * Makes sure to initialize a stream state, pushing the appropriate
1157 * initial messages.
1158 *
1159 * `stream_act_beginning_msg` is an initial stream activity beginning
1160 * message to potentially use, depending on its clock snapshot state.
1161 * This function consumes `stream_act_beginning_msg` unconditionally.
1162 */
1163static inline
1164bt_self_message_iterator_status ensure_stream_state_is_inited(
1165 struct trimmer_iterator *trimmer_it,
1166 struct trimmer_iterator_stream_state *sstate,
1167 const bt_message *stream_act_beginning_msg)
1168{
1169 bt_self_message_iterator_status status =
1170 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1171 bt_message *new_msg = NULL;
1172 const bt_clock_class *clock_class =
1173 bt_stream_class_borrow_default_clock_class_const(
1174 bt_stream_borrow_class_const(sstate->stream));
1175
1176 BT_ASSERT(!sstate->inited);
1177
1178 if (!sstate->stream_beginning_msg) {
1179 /* No initial stream beginning message: create one */
1180 sstate->stream_beginning_msg =
1181 bt_message_stream_beginning_create(
1182 trimmer_it->self_msg_iter, sstate->stream);
1183 if (!sstate->stream_beginning_msg) {
1184 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
1185 goto end;
1186 }
1187 }
1188
1189 /* Push initial stream beginning message */
1190 BT_ASSERT(sstate->stream_beginning_msg);
1191 push_message(trimmer_it, sstate->stream_beginning_msg);
1192 sstate->stream_beginning_msg = NULL;
1193
1194 if (stream_act_beginning_msg) {
1195 /*
1196 * Initial stream activity beginning message exists: if
1197 * its time is -inf, then create and push a new one
1198 * having the trimming range's beginning time. Otherwise
1199 * push it as is (known and unknown).
1200 */
1201 const bt_clock_snapshot *cs;
1202 bt_message_stream_activity_clock_snapshot_state sa_cs_state;
1203
1204 sa_cs_state = bt_message_stream_activity_beginning_borrow_default_clock_snapshot_const(
1205 stream_act_beginning_msg, &cs);
1206 if (sa_cs_state == BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_INFINITE &&
1207 !trimmer_it->begin.is_infinite) {
1208 /*
1209 * -inf time: use trimming range's beginning
1210 * time (which is not -inf).
1211 */
1212 status = create_stream_beginning_activity_message(
1213 trimmer_it, sstate->stream, clock_class,
1214 &new_msg);
1215 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1216 goto end;
1217 }
1218
1219 push_message(trimmer_it, new_msg);
1220 new_msg = NULL;
1221 } else {
1222 /* Known/unknown: push as is */
1223 push_message(trimmer_it, stream_act_beginning_msg);
1224 stream_act_beginning_msg = NULL;
1225 }
1226 } else {
1227 BT_ASSERT(!trimmer_it->begin.is_infinite);
1228
1229 /*
1230 * No stream beginning activity message: create and push
1231 * a new message.
1232 */
1233 status = create_stream_beginning_activity_message(
1234 trimmer_it, sstate->stream, clock_class, &new_msg);
1235 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1236 goto end;
1237 }
1238
1239 push_message(trimmer_it, new_msg);
1240 new_msg = NULL;
1241 }
1242
1243 sstate->inited = true;
1244
1245end:
1246 bt_message_put_ref(new_msg);
1247 bt_message_put_ref(stream_act_beginning_msg);
1248 return status;
1249}
1250
1251static inline
1252bt_self_message_iterator_status ensure_cur_packet_exists(
1253 struct trimmer_iterator *trimmer_it,
1254 struct trimmer_iterator_stream_state *sstate,
1255 const bt_packet *packet)
1256{
1257 bt_self_message_iterator_status status =
1258 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1259 int ret;
1260 const bt_clock_class *clock_class =
1261 bt_stream_class_borrow_default_clock_class_const(
1262 bt_stream_borrow_class_const(sstate->stream));
1263 bt_message *msg = NULL;
1264 uint64_t raw_value;
1265
1266 BT_ASSERT(!trimmer_it->begin.is_infinite);
1267 BT_ASSERT(!sstate->cur_packet);
1268
1269 /*
1270 * Create and push an initial packet beginning message,
1271 * making its time the trimming range's beginning time.
1272 */
1273 ret = clock_raw_value_from_ns_from_origin(clock_class,
1274 trimmer_it->begin.ns_from_origin, &raw_value);
1275 if (ret) {
1276 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1277 goto end;
1278 }
1279
1280 msg = bt_message_packet_beginning_create_with_default_clock_snapshot(
1281 trimmer_it->self_msg_iter, packet, raw_value);
1282 if (!msg) {
1283 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
1284 goto end;
1285 }
1286
1287 push_message(trimmer_it, msg);
1288 msg = NULL;
1289
1290 /* Set packet as this stream's current packet */
1291 sstate->cur_packet = packet;
1292 bt_packet_get_ref(sstate->cur_packet);
1293
1294end:
1295 bt_message_put_ref(msg);
1296 return status;
1297}
1298
1299/*
1300 * Handles a message which is associated to a given stream state. This
1301 * _could_ make the iterator's output message queue grow; this could
1302 * also consume the message without pushing anything to this queue, only
1303 * modifying the stream state.
1304 *
1305 * This function consumes the `msg` reference, _whatever the outcome_.
1306 *
1307 * `ns_from_origin` is the message's time, as given by
1308 * get_msg_ns_from_origin().
1309 *
1310 * This function sets `reached_end` if handling this message made the
1311 * iterator reach the end of the trimming range. Note that the output
1312 * message queue could contain messages even if this function sets
1313 * `reached_end`.
1314 */
1315static inline
1316bt_self_message_iterator_status handle_message_with_stream_state(
1317 struct trimmer_iterator *trimmer_it, const bt_message *msg,
1318 struct trimmer_iterator_stream_state *sstate,
1319 int64_t ns_from_origin, bool *reached_end)
1320{
1321 bt_self_message_iterator_status status =
1322 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1323 bt_message_type msg_type = bt_message_get_type(msg);
1324 int ret;
1325
1326 switch (msg_type) {
1327 case BT_MESSAGE_TYPE_EVENT:
1328 if (unlikely(!trimmer_it->end.is_infinite &&
1329 ns_from_origin > trimmer_it->end.ns_from_origin)) {
1330 status = end_iterator_streams(trimmer_it);
1331 *reached_end = true;
1332 break;
1333 }
1334
1335 if (unlikely(!sstate->inited)) {
1336 status = ensure_stream_state_is_inited(trimmer_it,
1337 sstate, NULL);
1338 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1339 goto end;
1340 }
1341 }
1342
1343 if (unlikely(!sstate->cur_packet)) {
1344 const bt_event *event =
1345 bt_message_event_borrow_event_const(msg);
1346 const bt_packet *packet = bt_event_borrow_packet_const(
1347 event);
1348
1349 status = ensure_cur_packet_exists(trimmer_it, sstate,
1350 packet);
1351 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1352 goto end;
1353 }
1354 }
1355
1356 BT_ASSERT(sstate->cur_packet);
1357 push_message(trimmer_it, msg);
1358 msg = NULL;
1359 break;
1360 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
1361 if (unlikely(!trimmer_it->end.is_infinite &&
1362 ns_from_origin > trimmer_it->end.ns_from_origin)) {
1363 status = end_iterator_streams(trimmer_it);
1364 *reached_end = true;
1365 break;
1366 }
1367
1368 if (unlikely(!sstate->inited)) {
1369 status = ensure_stream_state_is_inited(trimmer_it,
1370 sstate, NULL);
1371 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1372 goto end;
1373 }
1374 }
1375
1376 BT_ASSERT(!sstate->cur_packet);
1377 sstate->cur_packet =
1378 bt_message_packet_beginning_borrow_packet_const(msg);
1379 bt_packet_get_ref(sstate->cur_packet);
1380 push_message(trimmer_it, msg);
1381 msg = NULL;
1382 break;
1383 case BT_MESSAGE_TYPE_PACKET_END:
1384 sstate->stream_act_end_ns_from_origin = ns_from_origin;
1385
1386 if (unlikely(!trimmer_it->end.is_infinite &&
1387 ns_from_origin > trimmer_it->end.ns_from_origin)) {
1388 status = end_iterator_streams(trimmer_it);
1389 *reached_end = true;
1390 break;
1391 }
1392
1393 if (unlikely(!sstate->inited)) {
1394 status = ensure_stream_state_is_inited(trimmer_it,
1395 sstate, NULL);
1396 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1397 goto end;
1398 }
1399 }
1400
1401 if (unlikely(!sstate->cur_packet)) {
1402 const bt_packet *packet =
1403 bt_message_packet_end_borrow_packet_const(msg);
1404
1405 status = ensure_cur_packet_exists(trimmer_it, sstate,
1406 packet);
1407 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1408 goto end;
1409 }
1410 }
1411
1412 BT_ASSERT(sstate->cur_packet);
1413 BT_PACKET_PUT_REF_AND_RESET(sstate->cur_packet);
1414 push_message(trimmer_it, msg);
1415 msg = NULL;
1416 break;
1417 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
1418 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
1419 {
1420 /*
1421 * `ns_from_origin` is the message's time range's
1422 * beginning time here.
1423 */
1424 int64_t end_ns_from_origin;
1425 const bt_clock_snapshot *end_cs;
1426
1427 if (bt_message_get_type(msg) ==
1428 BT_MESSAGE_TYPE_DISCARDED_EVENTS) {
1429 /*
1430 * Safe to ignore the return value because we
1431 * know there's a default clock and it's always
1432 * known.
1433 */
1434 (void) bt_message_discarded_events_borrow_default_end_clock_snapshot_const(
1435 msg, &end_cs);
1436 } else {
1437 /*
1438 * Safe to ignore the return value because we
1439 * know there's a default clock and it's always
1440 * known.
1441 */
1442 (void) bt_message_discarded_packets_borrow_default_end_clock_snapshot_const(
1443 msg, &end_cs);
1444 }
1445
1446 if (bt_clock_snapshot_get_ns_from_origin(end_cs,
1447 &end_ns_from_origin)) {
1448 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1449 goto end;
1450 }
1451
1452 sstate->stream_act_end_ns_from_origin = end_ns_from_origin;
1453
1454 if (!trimmer_it->end.is_infinite &&
1455 ns_from_origin > trimmer_it->end.ns_from_origin) {
1456 status = end_iterator_streams(trimmer_it);
1457 *reached_end = true;
1458 break;
1459 }
1460
1461 if (!trimmer_it->end.is_infinite &&
1462 end_ns_from_origin > trimmer_it->end.ns_from_origin) {
1463 /*
1464 * This message's end time is outside the
1465 * trimming time range: replace it with a new
1466 * message having an end time equal to the
1467 * trimming time range's end and without a
1468 * count.
1469 */
1470 const bt_clock_class *clock_class =
1471 bt_clock_snapshot_borrow_clock_class_const(
1472 end_cs);
1473 const bt_clock_snapshot *begin_cs;
1474 bt_message *new_msg;
1475 uint64_t end_raw_value;
1476
1477 ret = clock_raw_value_from_ns_from_origin(clock_class,
1478 trimmer_it->end.ns_from_origin, &end_raw_value);
1479 if (ret) {
1480 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1481 goto end;
1482 }
1483
1484 if (msg_type == BT_MESSAGE_TYPE_DISCARDED_EVENTS) {
1485 (void) bt_message_discarded_events_borrow_default_beginning_clock_snapshot_const(
1486 msg, &begin_cs);
1487 new_msg = bt_message_discarded_events_create_with_default_clock_snapshots(
1488 trimmer_it->self_msg_iter,
1489 sstate->stream,
1490 bt_clock_snapshot_get_value(begin_cs),
1491 end_raw_value);
1492 } else {
1493 (void) bt_message_discarded_packets_borrow_default_beginning_clock_snapshot_const(
1494 msg, &begin_cs);
1495 new_msg = bt_message_discarded_packets_create_with_default_clock_snapshots(
1496 trimmer_it->self_msg_iter,
1497 sstate->stream,
1498 bt_clock_snapshot_get_value(begin_cs),
1499 end_raw_value);
1500 }
1501
1502 if (!new_msg) {
1503 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
1504 goto end;
1505 }
1506
1507 /* Replace the original message */
1508 BT_MESSAGE_MOVE_REF(msg, new_msg);
1509 }
1510
1511 if (unlikely(!sstate->inited)) {
1512 status = ensure_stream_state_is_inited(trimmer_it,
1513 sstate, NULL);
1514 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1515 goto end;
1516 }
1517 }
1518
1519 push_message(trimmer_it, msg);
1520 msg = NULL;
1521 break;
1522 }
1523 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING:
1524 if (!trimmer_it->end.is_infinite &&
1525 ns_from_origin > trimmer_it->end.ns_from_origin) {
1526 /*
1527 * This only happens when the message's time is
1528 * known and is greater than the trimming
1529 * range's end time. Unknown and -inf times are
1530 * always less than
1531 * `trimmer_it->end.ns_from_origin`.
1532 */
1533 status = end_iterator_streams(trimmer_it);
1534 *reached_end = true;
1535 break;
1536 }
1537
1538 if (!sstate->inited) {
1539 status = ensure_stream_state_is_inited(trimmer_it,
1540 sstate, msg);
1541 msg = NULL;
1542 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1543 goto end;
1544 }
1545 } else {
1546 push_message(trimmer_it, msg);
1547 msg = NULL;
1548 }
1549
1550 break;
1551 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END:
1552 if (trimmer_it->end.is_infinite) {
1553 push_message(trimmer_it, msg);
1554 msg = NULL;
1555 break;
1556 }
1557
1558 if (ns_from_origin == INT64_MIN) {
1559 /* Unknown: push as is if stream state is inited */
1560 if (sstate->inited) {
1561 push_message(trimmer_it, msg);
1562 msg = NULL;
1563 sstate->last_msg_is_stream_activity_end = true;
1564 }
1565 } else if (ns_from_origin == INT64_MAX) {
1566 /* Infinite: use trimming range's end time */
1567 sstate->stream_act_end_ns_from_origin =
1568 trimmer_it->end.ns_from_origin;
1569 } else {
1570 /* Known: check if outside of trimming range */
1571 if (ns_from_origin > trimmer_it->end.ns_from_origin) {
1572 sstate->stream_act_end_ns_from_origin =
1573 trimmer_it->end.ns_from_origin;
1574 status = end_iterator_streams(trimmer_it);
1575 *reached_end = true;
1576 break;
1577 }
1578
1579 if (!sstate->inited) {
1580 /*
1581 * First message for this stream is a
1582 * stream activity end: we can't deduce
1583 * anything about the stream activity
1584 * beginning's time, and using this
1585 * message's time would make a useless
1586 * pair of stream activity beginning/end
1587 * with the same time. Just skip this
1588 * message and wait for something
1589 * useful.
1590 */
1591 break;
1592 }
1593
1594 push_message(trimmer_it, msg);
1595 msg = NULL;
1596 sstate->last_msg_is_stream_activity_end = true;
1597 sstate->stream_act_end_ns_from_origin = ns_from_origin;
1598 }
1599
1600 break;
1601 case BT_MESSAGE_TYPE_STREAM_BEGINNING:
1602 /*
1603 * We don't know what follows at this point, so just
1604 * keep this message until we know what to do with it
1605 * (it will be used in ensure_stream_state_is_inited()).
1606 */
1607 BT_ASSERT(!sstate->inited);
1608 BT_MESSAGE_MOVE_REF(sstate->stream_beginning_msg, msg);
1609 break;
1610 case BT_MESSAGE_TYPE_STREAM_END:
1611 if (sstate->inited) {
1612 /*
1613 * This is the end of an inited stream: end this
1614 * stream if its stream activity end message
1615 * time is not the trimming range's end time
1616 * (which means the final stream activity end
1617 * message had an infinite time). end_stream()
1618 * will generate its own stream end message.
1619 */
1620 if (trimmer_it->end.is_infinite) {
1621 push_message(trimmer_it, msg);
1622 msg = NULL;
1623 g_hash_table_remove(trimmer_it->stream_states,
1624 sstate->stream);
1625 } else if (sstate->stream_act_end_ns_from_origin <
1626 trimmer_it->end.ns_from_origin) {
1627 status = end_stream(trimmer_it, sstate);
1628 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1629 goto end;
1630 }
1631
1632 /* We won't need this stream state again */
1633 g_hash_table_remove(trimmer_it->stream_states,
1634 sstate->stream);
1635 }
1636 } else {
1637 /* We dont't need this stream state anymore */
1638 g_hash_table_remove(trimmer_it->stream_states, sstate->stream);
1639 }
1640
1641 break;
1642 default:
1643 break;
1644 }
1645
1646end:
1647 /* We release the message's reference whatever the outcome */
1648 bt_message_put_ref(msg);
1649 return BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1650}
1651
1652/*
1653 * Handles an input message. This _could_ make the iterator's output
1654 * message queue grow; this could also consume the message without
1655 * pushing anything to this queue, only modifying the stream state.
1656 *
1657 * This function consumes the `msg` reference, _whatever the outcome_.
1658 *
1659 * This function sets `reached_end` if handling this message made the
1660 * iterator reach the end of the trimming range. Note that the output
1661 * message queue could contain messages even if this function sets
1662 * `reached_end`.
1663 */
1664static inline
1665bt_self_message_iterator_status handle_message(
1666 struct trimmer_iterator *trimmer_it, const bt_message *msg,
1667 bool *reached_end)
1668{
1669 bt_self_message_iterator_status status;
1670 const bt_stream *stream = NULL;
1671 int64_t ns_from_origin = INT64_MIN;
1672 bool skip;
1673 int ret;
1674 struct trimmer_iterator_stream_state *sstate = NULL;
1675
1676 /* Find message's associated stream */
1677 switch (bt_message_get_type(msg)) {
1678 case BT_MESSAGE_TYPE_EVENT:
1679 stream = bt_event_borrow_stream_const(
1680 bt_message_event_borrow_event_const(msg));
1681 break;
1682 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
1683 stream = bt_packet_borrow_stream_const(
1684 bt_message_packet_beginning_borrow_packet_const(msg));
1685 break;
1686 case BT_MESSAGE_TYPE_PACKET_END:
1687 stream = bt_packet_borrow_stream_const(
1688 bt_message_packet_end_borrow_packet_const(msg));
1689 break;
1690 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
1691 stream = bt_message_discarded_events_borrow_stream_const(msg);
1692 break;
1693 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
1694 stream = bt_message_discarded_packets_borrow_stream_const(msg);
1695 break;
1696 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING:
1697 stream = bt_message_stream_activity_beginning_borrow_stream_const(msg);
1698 break;
1699 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END:
1700 stream = bt_message_stream_activity_end_borrow_stream_const(msg);
1701 break;
1702 case BT_MESSAGE_TYPE_STREAM_BEGINNING:
1703 stream = bt_message_stream_beginning_borrow_stream_const(msg);
1704 break;
1705 case BT_MESSAGE_TYPE_STREAM_END:
1706 stream = bt_message_stream_end_borrow_stream_const(msg);
1707 break;
1708 default:
1709 break;
1710 }
1711
1712 if (likely(stream)) {
1713 /* Find stream state */
1714 sstate = g_hash_table_lookup(trimmer_it->stream_states,
1715 stream);
1716 if (unlikely(!sstate)) {
1717 /* No stream state yet: create one now */
1718 const bt_stream_class *sc;
1719
1720 /*
1721 * Validate right now that the stream's class
1722 * has a registered default clock class so that
1723 * an existing stream state guarantees existing
1724 * default clock snapshots for its associated
1725 * messages.
1726 *
1727 * Also check that clock snapshots are always
1728 * known.
1729 */
1730 sc = bt_stream_borrow_class_const(stream);
1731 if (!bt_stream_class_borrow_default_clock_class_const(sc)) {
1732 BT_LOGE("Unsupported stream: stream class does "
1733 "not have a default clock class: "
1734 "stream-addr=%p, "
1735 "stream-id=%" PRIu64 ", "
1736 "stream-name=\"%s\"",
1737 stream, bt_stream_get_id(stream),
1738 bt_stream_get_name(stream));
1739 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1740 goto end;
1741 }
1742
1743 if (!bt_stream_class_default_clock_is_always_known(sc)) {
1744 BT_LOGE("Unsupported stream: clock does not "
1745 "always have a known value: "
1746 "stream-addr=%p, "
1747 "stream-id=%" PRIu64 ", "
1748 "stream-name=\"%s\"",
1749 stream, bt_stream_get_id(stream),
1750 bt_stream_get_name(stream));
1751 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1752 goto end;
1753 }
1754
1755 sstate = g_new0(struct trimmer_iterator_stream_state,
1756 1);
1757 if (!sstate) {
1758 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
1759 goto end;
1760 }
1761
1762 sstate->stream = stream;
1763 sstate->stream_act_end_ns_from_origin = INT64_MIN;
1764 g_hash_table_insert(trimmer_it->stream_states,
1765 (void *) stream, sstate);
1766 }
1767 }
1768
1769 /* Retrieve the message's time */
1770 ret = get_msg_ns_from_origin(msg, &ns_from_origin, &skip);
1771 if (unlikely(ret)) {
1772 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1773 goto end;
1774 }
1775
1776 if (likely(sstate)) {
1777 /* Message associated to a stream */
1778 status = handle_message_with_stream_state(trimmer_it, msg,
1779 sstate, ns_from_origin, reached_end);
1780
1781 /*
1782 * handle_message_with_stream_state() unconditionally
1783 * consumes `msg`.
1784 */
1785 msg = NULL;
1786 } else {
1787 /*
1788 * Message not associated to a stream (message iterator
1789 * inactivity).
1790 */
1791 if (unlikely(ns_from_origin > trimmer_it->end.ns_from_origin)) {
1792 BT_MESSAGE_PUT_REF_AND_RESET(msg);
1793 status = end_iterator_streams(trimmer_it);
1794 *reached_end = true;
1795 } else {
1796 push_message(trimmer_it, msg);
8dad9b32 1797 status = BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
7de0e49a
PP
1798 msg = NULL;
1799 }
1800 }
1801
1802end:
1803 /* We release the message's reference whatever the outcome */
1804 bt_message_put_ref(msg);
1805 return status;
1806}
1807
1808static inline
1809void fill_message_array_from_output_messages(
1810 struct trimmer_iterator *trimmer_it,
1811 bt_message_array_const msgs, uint64_t capacity, uint64_t *count)
1812{
1813 *count = 0;
1814
1815 /*
1816 * Move auto-seek messages to the output array (which is this
1817 * iterator's base message array).
1818 */
1819 while (capacity > 0 && !g_queue_is_empty(trimmer_it->output_messages)) {
1820 msgs[*count] = pop_message(trimmer_it);
1821 capacity--;
1822 (*count)++;
1823 }
1824
1825 BT_ASSERT(*count > 0);
1826}
1827
1828static inline
1829bt_self_message_iterator_status state_ending(
1830 struct trimmer_iterator *trimmer_it,
1831 bt_message_array_const msgs, uint64_t capacity,
1832 uint64_t *count)
1833{
1834 bt_self_message_iterator_status status =
1835 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1836
1837 if (g_queue_is_empty(trimmer_it->output_messages)) {
1838 trimmer_it->state = TRIMMER_ITERATOR_STATE_ENDED;
1839 status = BT_SELF_MESSAGE_ITERATOR_STATUS_END;
1840 goto end;
1841 }
1842
1843 fill_message_array_from_output_messages(trimmer_it, msgs,
1844 capacity, count);
1845
1846end:
1847 return status;
1848}
1849
1850static inline
1851bt_self_message_iterator_status state_trim(struct trimmer_iterator *trimmer_it,
1852 bt_message_array_const msgs, uint64_t capacity,
1853 uint64_t *count)
1854{
1855 bt_self_message_iterator_status status =
1856 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1857 bt_message_array_const my_msgs;
1858 uint64_t my_count;
1859 uint64_t i;
1860 bool reached_end = false;
1861
1862 while (g_queue_is_empty(trimmer_it->output_messages)) {
1863 status = (int) bt_self_component_port_input_message_iterator_next(
1864 trimmer_it->upstream_iter, &my_msgs, &my_count);
1865 if (unlikely(status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK)) {
1866 if (status == BT_SELF_MESSAGE_ITERATOR_STATUS_END) {
1867 status = end_iterator_streams(trimmer_it);
1868 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1869 goto end;
1870 }
1871
1872 trimmer_it->state =
1873 TRIMMER_ITERATOR_STATE_ENDING;
1874 status = state_ending(trimmer_it, msgs,
1875 capacity, count);
1876 }
1877
1878 goto end;
1879 }
1880
1881 BT_ASSERT(my_count > 0);
1882
1883 for (i = 0; i < my_count; i++) {
1884 status = handle_message(trimmer_it, my_msgs[i],
1885 &reached_end);
1886
1887 /*
1888 * handle_message() unconditionally consumes the
1889 * message reference.
1890 */
1891 my_msgs[i] = NULL;
1892
1893 if (unlikely(status !=
1894 BT_SELF_MESSAGE_ITERATOR_STATUS_OK)) {
1895 put_messages(my_msgs, my_count);
1896 goto end;
1897 }
1898
1899 if (unlikely(reached_end)) {
1900 /*
1901 * This message's time was passed the
1902 * trimming time range's end time: we
1903 * are done. Their might still be
1904 * messages in the output message queue,
1905 * so move to the "ending" state and
1906 * apply it immediately since
1907 * state_trim() is called within the
1908 * "next" method.
1909 */
1910 put_messages(my_msgs, my_count);
1911 trimmer_it->state =
1912 TRIMMER_ITERATOR_STATE_ENDING;
1913 status = state_ending(trimmer_it, msgs,
1914 capacity, count);
1915 goto end;
1916 }
1917 }
1918 }
1919
1920 /*
1921 * There's at least one message in the output message queue:
1922 * move the messages to the output message array.
1923 */
1924 BT_ASSERT(!g_queue_is_empty(trimmer_it->output_messages));
1925 fill_message_array_from_output_messages(trimmer_it, msgs,
1926 capacity, count);
1927
1928end:
1929 return status;
1930}
1931
1932BT_HIDDEN
1933bt_self_message_iterator_status trimmer_msg_iter_next(
1934 bt_self_message_iterator *self_msg_iter,
1935 bt_message_array_const msgs, uint64_t capacity,
1936 uint64_t *count)
1937{
1938 struct trimmer_iterator *trimmer_it =
1939 bt_self_message_iterator_get_data(self_msg_iter);
1940 bt_self_message_iterator_status status =
1941 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1942
1943 BT_ASSERT(trimmer_it);
1944
1945 if (likely(trimmer_it->state == TRIMMER_ITERATOR_STATE_TRIM)) {
1946 status = state_trim(trimmer_it, msgs, capacity, count);
1947 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1948 goto end;
1949 }
1950 } else {
1951 switch (trimmer_it->state) {
1952 case TRIMMER_ITERATOR_STATE_SET_BOUNDS_NS_FROM_ORIGIN:
1953 status = state_set_trimmer_iterator_bounds(trimmer_it);
1954 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1955 goto end;
1956 }
1957
1958 status = state_seek_initially(trimmer_it);
1959 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1960 goto end;
1961 }
1962
1963 status = state_trim(trimmer_it, msgs, capacity, count);
1964 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1965 goto end;
1966 }
1967
1968 break;
1969 case TRIMMER_ITERATOR_STATE_SEEK_INITIALLY:
1970 status = state_seek_initially(trimmer_it);
1971 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1972 goto end;
1973 }
1974
1975 status = state_trim(trimmer_it, msgs, capacity, count);
1976 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1977 goto end;
1978 }
1979
1980 break;
1981 case TRIMMER_ITERATOR_STATE_ENDING:
1982 status = state_ending(trimmer_it, msgs, capacity,
1983 count);
1984 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1985 goto end;
1986 }
1987
1988 break;
1989 case TRIMMER_ITERATOR_STATE_ENDED:
1990 status = BT_SELF_MESSAGE_ITERATOR_STATUS_END;
1991 break;
1992 default:
1993 abort();
1994 }
1995 }
1996
1997end:
1998 return status;
1999}
2000
2001BT_HIDDEN
2002void trimmer_msg_iter_finalize(bt_self_message_iterator *self_msg_iter)
2003{
2004 struct trimmer_iterator *trimmer_it =
2005 bt_self_message_iterator_get_data(self_msg_iter);
2006
2007 BT_ASSERT(trimmer_it);
2008 destroy_trimmer_iterator(trimmer_it);
2009}
This page took 0.117713 seconds and 4 git commands to generate.