7fd3279bf12bc5cb2ae29c6d177411d76c596e48
[babeltrace.git] / plugins / utils / trimmer / trimmer.c
1 /*
2 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 * Copyright 2019 Philippe Proulx <pproulx@efficios.com>
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a copy
6 * of this software and associated documentation files (the "Software"), to deal
7 * in the Software without restriction, including without limitation the rights
8 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 * copies of the Software, and to permit persons to whom the Software is
10 * furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21 * SOFTWARE.
22 */
23
24 #define BT_LOG_TAG "PLUGIN-UTILS-TRIMMER-FLT"
25 #include "logging.h"
26
27 #include <babeltrace/compat/utc-internal.h>
28 #include <babeltrace/compat/time-internal.h>
29 #include <babeltrace/babeltrace.h>
30 #include <babeltrace/common-internal.h>
31 #include <plugins-common.h>
32 #include <babeltrace/assert-internal.h>
33 #include <stdint.h>
34 #include <inttypes.h>
35 #include <glib.h>
36
37 #include "trimmer.h"
38
39 #define NS_PER_S INT64_C(1000000000)
40
41 static const char * const in_port_name = "in";
42
43 struct trimmer_time {
44 unsigned int hour, minute, second, ns;
45 };
46
47 struct trimmer_bound {
48 /*
49 * Nanoseconds from origin, valid if `is_set` is set and
50 * `is_infinite` is false.
51 */
52 int64_t ns_from_origin;
53
54 /* True if this bound's full time (`ns_from_origin`) is set */
55 bool is_set;
56
57 /*
58 * True if this bound represents the infinity (negative or
59 * positive depending on which bound it is). If this is true,
60 * then we don't care about `ns_from_origin` above.
61 */
62 bool is_infinite;
63
64 /*
65 * This bound's time without the date; this time is used to set
66 * `ns_from_origin` once we know the date.
67 */
68 struct trimmer_time time;
69 };
70
71 struct trimmer_comp {
72 struct trimmer_bound begin, end;
73 bool is_gmt;
74 };
75
76 enum trimmer_iterator_state {
77 /*
78 * Find the first message's date and set the bounds's times
79 * accordingly.
80 */
81 TRIMMER_ITERATOR_STATE_SET_BOUNDS_NS_FROM_ORIGIN,
82
83 /*
84 * Initially seek to the trimming range's beginning time.
85 */
86 TRIMMER_ITERATOR_STATE_SEEK_INITIALLY,
87
88 /*
89 * Fill the output message queue for as long as received input
90 * messages are within the trimming time range.
91 */
92 TRIMMER_ITERATOR_STATE_TRIM,
93
94 /* Flush the remaining messages in the output message queue */
95 TRIMMER_ITERATOR_STATE_ENDING,
96
97 /* Trimming operation and message iterator is ended */
98 TRIMMER_ITERATOR_STATE_ENDED,
99 };
100
101 struct trimmer_iterator {
102 /* Weak */
103 struct trimmer_comp *trimmer_comp;
104
105 /* Weak */
106 bt_self_message_iterator *self_msg_iter;
107
108 enum trimmer_iterator_state state;
109
110 /* Owned by this */
111 bt_self_component_port_input_message_iterator *upstream_iter;
112 struct trimmer_bound begin, end;
113
114 /*
115 * Queue of `const bt_message *` (owned by the queue).
116 *
117 * This is where the trimming operation pushes the messages to
118 * output by this message iterator.
119 */
120 GQueue *output_messages;
121
122 /*
123 * Hash table of `bt_stream *` (weak) to
124 * `struct trimmer_iterator_stream_state *` (owned by the HT).
125 */
126 GHashTable *stream_states;
127 };
128
129 struct trimmer_iterator_stream_state {
130 /*
131 * True if both stream beginning and initial stream activity
132 * beginning messages were pushed for this stream.
133 */
134 bool inited;
135
136 /*
137 * True if the last pushed message for this stream was a stream
138 * activity end message.
139 */
140 bool last_msg_is_stream_activity_end;
141
142 /*
143 * Time to use for a generated stream end activity message when
144 * ending the stream.
145 */
146 int64_t stream_act_end_ns_from_origin;
147
148 /* Weak */
149 const bt_stream *stream;
150
151 /* Owned by this (`NULL` initially and between packets) */
152 const bt_packet *cur_packet;
153
154 /* Owned by this */
155 const bt_message *stream_beginning_msg;
156 };
157
158 static
159 void destroy_trimmer_comp(struct trimmer_comp *trimmer_comp)
160 {
161 BT_ASSERT(trimmer_comp);
162 g_free(trimmer_comp);
163 }
164
165 static
166 struct trimmer_comp *create_trimmer_comp(void)
167 {
168 return g_new0(struct trimmer_comp, 1);
169 }
170
171 BT_HIDDEN
172 void trimmer_finalize(bt_self_component_filter *self_comp)
173 {
174 struct trimmer_comp *trimmer_comp =
175 bt_self_component_get_data(
176 bt_self_component_filter_as_self_component(self_comp));
177
178 if (trimmer_comp) {
179 destroy_trimmer_comp(trimmer_comp);
180 }
181 }
182
183 /*
184 * Sets the time (in ns from origin) of a trimmer bound from date and
185 * time components.
186 *
187 * Returns a negative value if anything goes wrong.
188 */
189 static
190 int set_bound_ns_from_origin(struct trimmer_bound *bound,
191 unsigned int year, unsigned int month, unsigned int day,
192 unsigned int hour, unsigned int minute, unsigned int second,
193 unsigned int ns, bool is_gmt)
194 {
195 int ret = 0;
196 time_t result;
197 struct tm tm = {
198 .tm_sec = second,
199 .tm_min = minute,
200 .tm_hour = hour,
201 .tm_mday = day,
202 .tm_mon = month - 1,
203 .tm_year = year - 1900,
204 .tm_isdst = -1,
205 };
206
207 if (is_gmt) {
208 result = bt_timegm(&tm);
209 } else {
210 result = mktime(&tm);
211 }
212
213 if (result < 0) {
214 ret = -1;
215 goto end;
216 }
217
218 BT_ASSERT(bound);
219 bound->ns_from_origin = (int64_t) result;
220 bound->ns_from_origin *= NS_PER_S;
221 bound->ns_from_origin += ns;
222 bound->is_set = true;
223
224 end:
225 return ret;
226 }
227
228 /*
229 * Parses a timestamp, figuring out its format.
230 *
231 * Returns a negative value if anything goes wrong.
232 *
233 * Expected formats:
234 *
235 * YYYY-MM-DD hh:mm[:ss[.ns]]
236 * [hh:mm:]ss[.ns]
237 * [-]s[.ns]
238 *
239 * TODO: Check overflows.
240 */
241 static
242 int set_bound_from_str(const char *str, struct trimmer_bound *bound,
243 bool is_gmt)
244 {
245 int ret = 0;
246 int s_ret;
247 unsigned int year, month, day, hour, minute, second, ns;
248 char dummy;
249
250 /* Try `YYYY-MM-DD hh:mm:ss.ns` format */
251 s_ret = sscanf(str, "%u-%u-%u %u:%u:%u.%u%c", &year, &month, &day,
252 &hour, &minute, &second, &ns, &dummy);
253 if (s_ret == 7) {
254 ret = set_bound_ns_from_origin(bound, year, month, day,
255 hour, minute, second, ns, is_gmt);
256 goto end;
257 }
258
259 /* Try `YYYY-MM-DD hh:mm:ss` format */
260 s_ret = sscanf(str, "%u-%u-%u %u:%u:%u%c", &year, &month, &day,
261 &hour, &minute, &second, &dummy);
262 if (s_ret == 6) {
263 ret = set_bound_ns_from_origin(bound, year, month, day,
264 hour, minute, second, 0, is_gmt);
265 goto end;
266 }
267
268 /* Try `YYYY-MM-DD hh:mm` format */
269 s_ret = sscanf(str, "%u-%u-%u %u:%u%c", &year, &month, &day,
270 &hour, &minute, &dummy);
271 if (s_ret == 5) {
272 ret = set_bound_ns_from_origin(bound, year, month, day,
273 hour, minute, 0, 0, is_gmt);
274 goto end;
275 }
276
277 /* Try `YYYY-MM-DD` format */
278 s_ret = sscanf(str, "%u-%u-%u%c", &year, &month, &day, &dummy);
279 if (s_ret == 3) {
280 ret = set_bound_ns_from_origin(bound, year, month, day,
281 0, 0, 0, 0, is_gmt);
282 goto end;
283 }
284
285 /* Try `hh:mm:ss.ns` format */
286 s_ret = sscanf(str, "%u:%u:%u.%u%c", &hour, &minute, &second, &ns,
287 &dummy);
288 if (s_ret == 4) {
289 bound->time.hour = hour;
290 bound->time.minute = minute;
291 bound->time.second = second;
292 bound->time.ns = ns;
293 goto end;
294 }
295
296 /* Try `hh:mm:ss` format */
297 s_ret = sscanf(str, "%u:%u:%u%c", &hour, &minute, &second, &dummy);
298 if (s_ret == 3) {
299 bound->time.hour = hour;
300 bound->time.minute = minute;
301 bound->time.second = second;
302 bound->time.ns = 0;
303 goto end;
304 }
305
306 /* Try `-s.ns` format */
307 s_ret = sscanf(str, "-%u.%u%c", &second, &ns, &dummy);
308 if (s_ret == 2) {
309 bound->ns_from_origin = -((int64_t) second) * NS_PER_S;
310 bound->ns_from_origin -= (int64_t) ns;
311 bound->is_set = true;
312 goto end;
313 }
314
315 /* Try `s.ns` format */
316 s_ret = sscanf(str, "%u.%u%c", &second, &ns, &dummy);
317 if (s_ret == 2) {
318 bound->ns_from_origin = ((int64_t) second) * NS_PER_S;
319 bound->ns_from_origin += (int64_t) ns;
320 bound->is_set = true;
321 goto end;
322 }
323
324 /* Try `-s` format */
325 s_ret = sscanf(str, "-%u%c", &second, &dummy);
326 if (s_ret == 1) {
327 bound->ns_from_origin = -((int64_t) second) * NS_PER_S;
328 bound->is_set = true;
329 goto end;
330 }
331
332 /* Try `s` format */
333 s_ret = sscanf(str, "%u%c", &second, &dummy);
334 if (s_ret == 1) {
335 bound->ns_from_origin = (int64_t) second * NS_PER_S;
336 bound->is_set = true;
337 goto end;
338 }
339
340 BT_LOGE("Invalid date/time format: param=\"%s\"", str);
341 ret = -1;
342
343 end:
344 return ret;
345 }
346
347 /*
348 * Sets a trimmer bound's properties from a parameter string/integer
349 * value.
350 *
351 * Returns a negative value if anything goes wrong.
352 */
353 static
354 int set_bound_from_param(const char *param_name, const bt_value *param,
355 struct trimmer_bound *bound, bool is_gmt)
356 {
357 int ret;
358 const char *arg;
359 char tmp_arg[64];
360
361 if (bt_value_is_integer(param)) {
362 int64_t value = bt_value_integer_get(param);
363
364 /*
365 * Just convert it to a temporary string to handle
366 * everything the same way.
367 */
368 sprintf(tmp_arg, "%" PRId64, value);
369 arg = tmp_arg;
370 } else if (bt_value_is_string(param)) {
371 arg = bt_value_string_get(param);
372 } else {
373 BT_LOGE("`%s` parameter must be an integer or a string value.",
374 param_name);
375 ret = -1;
376 goto end;
377 }
378
379 ret = set_bound_from_str(arg, bound, is_gmt);
380
381 end:
382 return ret;
383 }
384
385 static
386 int validate_trimmer_bounds(struct trimmer_bound *begin,
387 struct trimmer_bound *end)
388 {
389 int ret = 0;
390
391 BT_ASSERT(begin->is_set);
392 BT_ASSERT(end->is_set);
393
394 if (!begin->is_infinite && !end->is_infinite &&
395 begin->ns_from_origin > end->ns_from_origin) {
396 BT_LOGE("Trimming time range's beginning time is greater than end time: "
397 "begin-ns-from-origin=%" PRId64 ", "
398 "end-ns-from-origin=%" PRId64,
399 begin->ns_from_origin,
400 end->ns_from_origin);
401 ret = -1;
402 goto end;
403 }
404
405 if (!begin->is_infinite && begin->ns_from_origin == INT64_MIN) {
406 BT_LOGE("Invalid trimming time range's beginning time: "
407 "ns-from-origin=%" PRId64,
408 begin->ns_from_origin);
409 ret = -1;
410 goto end;
411 }
412
413 if (!end->is_infinite && end->ns_from_origin == INT64_MIN) {
414 BT_LOGE("Invalid trimming time range's end time: "
415 "ns-from-origin=%" PRId64,
416 end->ns_from_origin);
417 ret = -1;
418 goto end;
419 }
420
421 end:
422 return ret;
423 }
424
425 static
426 int init_trimmer_comp_from_params(struct trimmer_comp *trimmer_comp,
427 const bt_value *params)
428 {
429 const bt_value *value;
430 int ret = 0;
431
432 BT_ASSERT(params);
433 value = bt_value_map_borrow_entry_value_const(params, "gmt");
434 if (value) {
435 trimmer_comp->is_gmt = (bool) bt_value_bool_get(value);
436 }
437
438 value = bt_value_map_borrow_entry_value_const(params, "begin");
439 if (value) {
440 if (set_bound_from_param("begin", value,
441 &trimmer_comp->begin, trimmer_comp->is_gmt)) {
442 /* set_bound_from_param() logs errors */
443 ret = BT_SELF_COMPONENT_STATUS_ERROR;
444 goto end;
445 }
446 } else {
447 trimmer_comp->begin.is_infinite = true;
448 trimmer_comp->begin.is_set = true;
449 }
450
451 value = bt_value_map_borrow_entry_value_const(params, "end");
452 if (value) {
453 if (set_bound_from_param("end", value,
454 &trimmer_comp->end, trimmer_comp->is_gmt)) {
455 /* set_bound_from_param() logs errors */
456 ret = BT_SELF_COMPONENT_STATUS_ERROR;
457 goto end;
458 }
459 } else {
460 trimmer_comp->end.is_infinite = true;
461 trimmer_comp->end.is_set = true;
462 }
463
464 end:
465 if (trimmer_comp->begin.is_set && trimmer_comp->end.is_set) {
466 /* validate_trimmer_bounds() logs errors */
467 ret = validate_trimmer_bounds(&trimmer_comp->begin,
468 &trimmer_comp->end);
469 }
470
471 return ret;
472 }
473
474 bt_self_component_status trimmer_init(bt_self_component_filter *self_comp,
475 const bt_value *params, void *init_data)
476 {
477 int ret;
478 bt_self_component_status status;
479 struct trimmer_comp *trimmer_comp = create_trimmer_comp();
480
481 if (!trimmer_comp) {
482 status = BT_SELF_COMPONENT_STATUS_NOMEM;
483 goto error;
484 }
485
486 status = bt_self_component_filter_add_input_port(
487 self_comp, in_port_name, NULL, NULL);
488 if (status != BT_SELF_COMPONENT_STATUS_OK) {
489 goto error;
490 }
491
492 status = bt_self_component_filter_add_output_port(
493 self_comp, "out", NULL, NULL);
494 if (status != BT_SELF_COMPONENT_STATUS_OK) {
495 goto error;
496 }
497
498 ret = init_trimmer_comp_from_params(trimmer_comp, params);
499 if (ret) {
500 status = BT_SELF_COMPONENT_STATUS_ERROR;
501 goto error;
502 }
503
504 bt_self_component_set_data(
505 bt_self_component_filter_as_self_component(self_comp),
506 trimmer_comp);
507 goto end;
508
509 error:
510 if (status == BT_SELF_COMPONENT_STATUS_OK) {
511 status = BT_SELF_COMPONENT_STATUS_ERROR;
512 }
513
514 if (trimmer_comp) {
515 destroy_trimmer_comp(trimmer_comp);
516 }
517
518 end:
519 return status;
520 }
521
522 static
523 void destroy_trimmer_iterator(struct trimmer_iterator *trimmer_it)
524 {
525 BT_ASSERT(trimmer_it);
526 bt_self_component_port_input_message_iterator_put_ref(
527 trimmer_it->upstream_iter);
528
529 if (trimmer_it->output_messages) {
530 g_queue_free(trimmer_it->output_messages);
531 }
532
533 if (trimmer_it->stream_states) {
534 g_hash_table_destroy(trimmer_it->stream_states);
535 }
536
537 g_free(trimmer_it);
538 }
539
540 static
541 void destroy_trimmer_iterator_stream_state(
542 struct trimmer_iterator_stream_state *sstate)
543 {
544 BT_ASSERT(sstate);
545 BT_PACKET_PUT_REF_AND_RESET(sstate->cur_packet);
546 BT_MESSAGE_PUT_REF_AND_RESET(sstate->stream_beginning_msg);
547 g_free(sstate);
548 }
549
550 BT_HIDDEN
551 bt_self_message_iterator_status trimmer_msg_iter_init(
552 bt_self_message_iterator *self_msg_iter,
553 bt_self_component_filter *self_comp,
554 bt_self_component_port_output *port)
555 {
556 bt_self_message_iterator_status status =
557 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
558 struct trimmer_iterator *trimmer_it;
559
560 trimmer_it = g_new0(struct trimmer_iterator, 1);
561 if (!trimmer_it) {
562 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
563 goto end;
564 }
565
566 trimmer_it->trimmer_comp = bt_self_component_get_data(
567 bt_self_component_filter_as_self_component(self_comp));
568 BT_ASSERT(trimmer_it->trimmer_comp);
569
570 if (trimmer_it->trimmer_comp->begin.is_set &&
571 trimmer_it->trimmer_comp->end.is_set) {
572 /*
573 * Both trimming time range's bounds are set, so skip
574 * the
575 * `TRIMMER_ITERATOR_STATE_SET_BOUNDS_NS_FROM_ORIGIN`
576 * phase.
577 */
578 trimmer_it->state = TRIMMER_ITERATOR_STATE_SEEK_INITIALLY;
579 }
580
581 trimmer_it->begin = trimmer_it->trimmer_comp->begin;
582 trimmer_it->end = trimmer_it->trimmer_comp->end;
583 trimmer_it->upstream_iter =
584 bt_self_component_port_input_message_iterator_create(
585 bt_self_component_filter_borrow_input_port_by_name(
586 self_comp, in_port_name));
587 if (!trimmer_it->upstream_iter) {
588 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
589 goto end;
590 }
591
592 trimmer_it->output_messages = g_queue_new();
593 if (!trimmer_it->output_messages) {
594 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
595 goto end;
596 }
597
598 trimmer_it->stream_states = g_hash_table_new_full(g_direct_hash,
599 g_direct_equal, NULL,
600 (GDestroyNotify) destroy_trimmer_iterator_stream_state);
601 if (!trimmer_it->stream_states) {
602 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
603 goto end;
604 }
605
606 trimmer_it->self_msg_iter = self_msg_iter;
607 bt_self_message_iterator_set_data(self_msg_iter, trimmer_it);
608
609 end:
610 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK && trimmer_it) {
611 destroy_trimmer_iterator(trimmer_it);
612 }
613
614 return status;
615 }
616
617 static inline
618 int get_msg_ns_from_origin(const bt_message *msg, int64_t *ns_from_origin,
619 bool *skip)
620 {
621 const bt_clock_class *clock_class = NULL;
622 const bt_clock_snapshot *clock_snapshot = NULL;
623 bt_clock_snapshot_state cs_state = BT_CLOCK_SNAPSHOT_STATE_KNOWN;
624 bt_message_stream_activity_clock_snapshot_state sa_cs_state;
625 int ret = 0;
626
627 BT_ASSERT(msg);
628 BT_ASSERT(ns_from_origin);
629 BT_ASSERT(skip);
630
631 switch (bt_message_get_type(msg)) {
632 case BT_MESSAGE_TYPE_EVENT:
633 clock_class =
634 bt_message_event_borrow_stream_class_default_clock_class_const(
635 msg);
636 if (unlikely(!clock_class)) {
637 goto error;
638 }
639
640 cs_state = bt_message_event_borrow_default_clock_snapshot_const(
641 msg, &clock_snapshot);
642 break;
643 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
644 clock_class =
645 bt_message_packet_beginning_borrow_stream_class_default_clock_class_const(
646 msg);
647 if (unlikely(!clock_class)) {
648 goto error;
649 }
650
651 cs_state = bt_message_packet_beginning_borrow_default_clock_snapshot_const(
652 msg, &clock_snapshot);
653 break;
654 case BT_MESSAGE_TYPE_PACKET_END:
655 clock_class =
656 bt_message_packet_end_borrow_stream_class_default_clock_class_const(
657 msg);
658 if (unlikely(!clock_class)) {
659 goto error;
660 }
661
662 cs_state = bt_message_packet_end_borrow_default_clock_snapshot_const(
663 msg, &clock_snapshot);
664 break;
665 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
666 clock_class =
667 bt_message_discarded_events_borrow_stream_class_default_clock_class_const(
668 msg);
669 if (unlikely(!clock_class)) {
670 goto error;
671 }
672
673 cs_state = bt_message_discarded_events_borrow_default_beginning_clock_snapshot_const(
674 msg, &clock_snapshot);
675 break;
676 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
677 clock_class =
678 bt_message_discarded_packets_borrow_stream_class_default_clock_class_const(
679 msg);
680 if (unlikely(!clock_class)) {
681 goto error;
682 }
683
684 cs_state = bt_message_discarded_packets_borrow_default_beginning_clock_snapshot_const(
685 msg, &clock_snapshot);
686 break;
687 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING:
688 clock_class =
689 bt_message_stream_activity_beginning_borrow_stream_class_default_clock_class_const(
690 msg);
691 if (unlikely(!clock_class)) {
692 goto error;
693 }
694
695 sa_cs_state = bt_message_stream_activity_beginning_borrow_default_clock_snapshot_const(
696 msg, &clock_snapshot);
697 if (sa_cs_state == BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_UNKNOWN ||
698 sa_cs_state == BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_INFINITE) {
699 /* Lowest possible time to always include them */
700 *ns_from_origin = INT64_MIN;
701 goto no_clock_snapshot;
702 }
703
704 break;
705 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END:
706 clock_class =
707 bt_message_stream_activity_end_borrow_stream_class_default_clock_class_const(
708 msg);
709 if (unlikely(!clock_class)) {
710 goto error;
711 }
712
713 sa_cs_state = bt_message_stream_activity_end_borrow_default_clock_snapshot_const(
714 msg, &clock_snapshot);
715 if (sa_cs_state == BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_UNKNOWN) {
716 /* Lowest time to always include it */
717 *ns_from_origin = INT64_MIN;
718 goto no_clock_snapshot;
719 } else if (sa_cs_state == BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_INFINITE) {
720 /* Greatest time to always exclude it */
721 *ns_from_origin = INT64_MAX;
722 goto no_clock_snapshot;
723 }
724
725 break;
726 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
727 cs_state =
728 bt_message_message_iterator_inactivity_borrow_default_clock_snapshot_const(
729 msg, &clock_snapshot);
730 break;
731 default:
732 goto no_clock_snapshot;
733 }
734
735 if (unlikely(cs_state != BT_CLOCK_SNAPSHOT_STATE_KNOWN)) {
736 BT_LOGE_STR("Unsupported unknown clock snapshot.");
737 ret = -1;
738 goto end;
739 }
740
741 ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot,
742 ns_from_origin);
743 if (unlikely(ret)) {
744 goto error;
745 }
746
747 goto end;
748
749 no_clock_snapshot:
750 *skip = true;
751 goto end;
752
753 error:
754 ret = -1;
755
756 end:
757 return ret;
758 }
759
760 static inline
761 void put_messages(bt_message_array_const msgs, uint64_t count)
762 {
763 uint64_t i;
764
765 for (i = 0; i < count; i++) {
766 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
767 }
768 }
769
770 static inline
771 int set_trimmer_iterator_bound(struct trimmer_bound *bound,
772 int64_t ns_from_origin, bool is_gmt)
773 {
774 struct tm tm;
775 time_t time_seconds = (time_t) (ns_from_origin / NS_PER_S);
776 int ret = 0;
777
778 BT_ASSERT(!bound->is_set);
779 errno = 0;
780
781 /* We only need to extract the date from this time */
782 if (is_gmt) {
783 bt_gmtime_r(&time_seconds, &tm);
784 } else {
785 bt_localtime_r(&time_seconds, &tm);
786 }
787
788 if (errno) {
789 BT_LOGE_ERRNO("Cannot convert timestamp to date and time",
790 "ts=%" PRId64, (int64_t) time_seconds);
791 ret = -1;
792 goto end;
793 }
794
795 ret = set_bound_ns_from_origin(bound, tm.tm_year + 1900, tm.tm_mon + 1,
796 tm.tm_mday, bound->time.hour, bound->time.minute,
797 bound->time.second, bound->time.ns, is_gmt);
798
799 end:
800 return ret;
801 }
802
803 static
804 bt_self_message_iterator_status state_set_trimmer_iterator_bounds(
805 struct trimmer_iterator *trimmer_it)
806 {
807 bt_message_iterator_status upstream_iter_status =
808 BT_MESSAGE_ITERATOR_STATUS_OK;
809 struct trimmer_comp *trimmer_comp = trimmer_it->trimmer_comp;
810 bt_message_array_const msgs;
811 uint64_t count = 0;
812 int64_t ns_from_origin = INT64_MIN;
813 uint64_t i;
814 int ret;
815
816 BT_ASSERT(!trimmer_it->begin.is_set ||
817 !trimmer_it->end.is_set);
818
819 while (true) {
820 upstream_iter_status =
821 bt_self_component_port_input_message_iterator_next(
822 trimmer_it->upstream_iter, &msgs, &count);
823 if (upstream_iter_status != BT_MESSAGE_ITERATOR_STATUS_OK) {
824 goto end;
825 }
826
827 for (i = 0; i < count; i++) {
828 const bt_message *msg = msgs[i];
829 bool skip = false;
830 int ret;
831
832 ret = get_msg_ns_from_origin(msg, &ns_from_origin,
833 &skip);
834 if (ret) {
835 goto error;
836 }
837
838 if (skip) {
839 continue;
840 }
841
842 BT_ASSERT(ns_from_origin != INT64_MIN &&
843 ns_from_origin != INT64_MAX);
844 put_messages(msgs, count);
845 goto found;
846 }
847
848 put_messages(msgs, count);
849 }
850
851 found:
852 if (!trimmer_it->begin.is_set) {
853 BT_ASSERT(!trimmer_it->begin.is_infinite);
854 ret = set_trimmer_iterator_bound(&trimmer_it->begin,
855 ns_from_origin, trimmer_comp->is_gmt);
856 if (ret) {
857 goto error;
858 }
859 }
860
861 if (!trimmer_it->end.is_set) {
862 BT_ASSERT(!trimmer_it->end.is_infinite);
863 ret = set_trimmer_iterator_bound(&trimmer_it->end,
864 ns_from_origin, trimmer_comp->is_gmt);
865 if (ret) {
866 goto error;
867 }
868 }
869
870 ret = validate_trimmer_bounds(&trimmer_it->begin,
871 &trimmer_it->end);
872 if (ret) {
873 goto error;
874 }
875
876 goto end;
877
878 error:
879 put_messages(msgs, count);
880 upstream_iter_status = BT_MESSAGE_ITERATOR_STATUS_ERROR;
881
882 end:
883 return (int) upstream_iter_status;
884 }
885
886 static
887 bt_self_message_iterator_status state_seek_initially(
888 struct trimmer_iterator *trimmer_it)
889 {
890 bt_self_message_iterator_status status =
891 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
892
893 BT_ASSERT(trimmer_it->begin.is_set);
894
895 if (trimmer_it->begin.is_infinite) {
896 if (!bt_self_component_port_input_message_iterator_can_seek_beginning(
897 trimmer_it->upstream_iter)) {
898 BT_LOGE_STR("Cannot make upstream message iterator initially seek its beginning.");
899 status = BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
900 goto end;
901 }
902
903 status = (int) bt_self_component_port_input_message_iterator_seek_beginning(
904 trimmer_it->upstream_iter);
905 } else {
906 if (!bt_self_component_port_input_message_iterator_can_seek_ns_from_origin(
907 trimmer_it->upstream_iter,
908 trimmer_it->begin.ns_from_origin)) {
909 BT_LOGE("Cannot make upstream message iterator initially seek: "
910 "seek-ns-from-origin=%" PRId64,
911 trimmer_it->begin.ns_from_origin);
912 status = BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
913 goto end;
914 }
915
916 status = (int) bt_self_component_port_input_message_iterator_seek_ns_from_origin(
917 trimmer_it->upstream_iter, trimmer_it->begin.ns_from_origin);
918 }
919
920 if (status == BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
921 trimmer_it->state = TRIMMER_ITERATOR_STATE_TRIM;
922 }
923
924 end:
925 return status;
926 }
927
928 static inline
929 void push_message(struct trimmer_iterator *trimmer_it, const bt_message *msg)
930 {
931 g_queue_push_head(trimmer_it->output_messages, (void *) msg);
932 }
933
934 static inline
935 const bt_message *pop_message(struct trimmer_iterator *trimmer_it)
936 {
937 return g_queue_pop_tail(trimmer_it->output_messages);
938 }
939
940 static inline
941 int clock_raw_value_from_ns_from_origin(const bt_clock_class *clock_class,
942 int64_t ns_from_origin, uint64_t *raw_value)
943 {
944
945 int64_t cc_offset_s;
946 uint64_t cc_offset_cycles;
947 uint64_t cc_freq;
948
949 bt_clock_class_get_offset(clock_class, &cc_offset_s, &cc_offset_cycles);
950 cc_freq = bt_clock_class_get_frequency(clock_class);
951 return bt_common_clock_value_from_ns_from_origin(cc_offset_s,
952 cc_offset_cycles, cc_freq, ns_from_origin, raw_value);
953 }
954
955 static inline
956 bt_self_message_iterator_status end_stream(struct trimmer_iterator *trimmer_it,
957 struct trimmer_iterator_stream_state *sstate)
958 {
959 bt_self_message_iterator_status status =
960 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
961 uint64_t raw_value;
962 const bt_clock_class *clock_class;
963 int ret;
964 bt_message *msg = NULL;
965
966 BT_ASSERT(!trimmer_it->end.is_infinite);
967
968 if (!sstate->stream) {
969 goto end;
970 }
971
972 if (sstate->cur_packet) {
973 /*
974 * The last message could not have been a stream
975 * activity end message if we have a current packet.
976 */
977 BT_ASSERT(!sstate->last_msg_is_stream_activity_end);
978
979 /*
980 * Create and push a packet end message, making its time
981 * the trimming range's end time.
982 */
983 clock_class = bt_stream_class_borrow_default_clock_class_const(
984 bt_stream_borrow_class_const(sstate->stream));
985 BT_ASSERT(clock_class);
986 ret = clock_raw_value_from_ns_from_origin(clock_class,
987 trimmer_it->end.ns_from_origin, &raw_value);
988 if (ret) {
989 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
990 goto end;
991 }
992
993 msg = bt_message_packet_end_create_with_default_clock_snapshot(
994 trimmer_it->self_msg_iter, sstate->cur_packet,
995 raw_value);
996 if (!msg) {
997 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
998 goto end;
999 }
1000
1001 push_message(trimmer_it, msg);
1002 msg = NULL;
1003 BT_PACKET_PUT_REF_AND_RESET(sstate->cur_packet);
1004
1005 /*
1006 * Because we generated a packet end message, set the
1007 * stream activity end message's time to use to the
1008 * trimming range's end time (this packet end message's
1009 * time).
1010 */
1011 sstate->stream_act_end_ns_from_origin =
1012 trimmer_it->end.ns_from_origin;
1013 }
1014
1015 if (!sstate->last_msg_is_stream_activity_end) {
1016 /* Create and push a stream activity end message */
1017 msg = bt_message_stream_activity_end_create(
1018 trimmer_it->self_msg_iter, sstate->stream);
1019 if (!msg) {
1020 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
1021 goto end;
1022 }
1023
1024 clock_class = bt_stream_class_borrow_default_clock_class_const(
1025 bt_stream_borrow_class_const(sstate->stream));
1026 BT_ASSERT(clock_class);
1027 BT_ASSERT(sstate->stream_act_end_ns_from_origin != INT64_MIN);
1028 ret = clock_raw_value_from_ns_from_origin(clock_class,
1029 sstate->stream_act_end_ns_from_origin, &raw_value);
1030 if (ret) {
1031 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1032 goto end;
1033 }
1034
1035 bt_message_stream_activity_end_set_default_clock_snapshot(
1036 msg, raw_value);
1037 push_message(trimmer_it, msg);
1038 msg = NULL;
1039 }
1040
1041 /* Create and push a stream end message */
1042 msg = bt_message_stream_end_create(trimmer_it->self_msg_iter,
1043 sstate->stream);
1044 if (!msg) {
1045 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
1046 goto end;
1047 }
1048
1049 push_message(trimmer_it, msg);
1050 msg = NULL;
1051
1052 /*
1053 * Just to make sure that we don't use this stream state again
1054 * in the future without an obvious error.
1055 */
1056 sstate->stream = NULL;
1057
1058 end:
1059 bt_message_put_ref(msg);
1060 return status;
1061 }
1062
1063 static inline
1064 bt_self_message_iterator_status end_iterator_streams(
1065 struct trimmer_iterator *trimmer_it)
1066 {
1067 bt_self_message_iterator_status status =
1068 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1069 GHashTableIter iter;
1070 gpointer key, sstate;
1071
1072 if (trimmer_it->end.is_infinite) {
1073 /*
1074 * An infinite trimming range's end time guarantees that
1075 * we received (and pushed) all the appropriate end
1076 * messages.
1077 */
1078 goto remove_all;
1079 }
1080
1081 /*
1082 * End each stream and then remove them from the hash table of
1083 * stream states to release unneeded references.
1084 */
1085 g_hash_table_iter_init(&iter, trimmer_it->stream_states);
1086
1087 while (g_hash_table_iter_next(&iter, &key, &sstate)) {
1088 status = end_stream(trimmer_it, sstate);
1089 if (status) {
1090 goto end;
1091 }
1092 }
1093
1094 remove_all:
1095 g_hash_table_remove_all(trimmer_it->stream_states);
1096
1097 end:
1098 return status;
1099 }
1100
1101 static inline
1102 bt_self_message_iterator_status create_stream_beginning_activity_message(
1103 struct trimmer_iterator *trimmer_it,
1104 const bt_stream *stream,
1105 const bt_clock_class *clock_class, bt_message **msg)
1106 {
1107 bt_self_message_iterator_status status =
1108 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1109
1110 BT_ASSERT(msg);
1111 BT_ASSERT(!trimmer_it->begin.is_infinite);
1112
1113 *msg = bt_message_stream_activity_beginning_create(
1114 trimmer_it->self_msg_iter, stream);
1115 if (!*msg) {
1116 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
1117 goto end;
1118 }
1119
1120 if (clock_class) {
1121 int ret;
1122 uint64_t raw_value;
1123
1124 ret = clock_raw_value_from_ns_from_origin(clock_class,
1125 trimmer_it->begin.ns_from_origin, &raw_value);
1126 if (ret) {
1127 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1128 bt_message_put_ref(*msg);
1129 goto end;
1130 }
1131
1132 bt_message_stream_activity_beginning_set_default_clock_snapshot(
1133 *msg, raw_value);
1134 }
1135
1136 end:
1137 return status;
1138 }
1139
1140 /*
1141 * Makes sure to initialize a stream state, pushing the appropriate
1142 * initial messages.
1143 *
1144 * `stream_act_beginning_msg` is an initial stream activity beginning
1145 * message to potentially use, depending on its clock snapshot state.
1146 * This function consumes `stream_act_beginning_msg` unconditionally.
1147 */
1148 static inline
1149 bt_self_message_iterator_status ensure_stream_state_is_inited(
1150 struct trimmer_iterator *trimmer_it,
1151 struct trimmer_iterator_stream_state *sstate,
1152 const bt_message *stream_act_beginning_msg)
1153 {
1154 bt_self_message_iterator_status status =
1155 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1156 bt_message *new_msg = NULL;
1157 const bt_clock_class *clock_class =
1158 bt_stream_class_borrow_default_clock_class_const(
1159 bt_stream_borrow_class_const(sstate->stream));
1160
1161 BT_ASSERT(!sstate->inited);
1162
1163 if (!sstate->stream_beginning_msg) {
1164 /* No initial stream beginning message: create one */
1165 sstate->stream_beginning_msg =
1166 bt_message_stream_beginning_create(
1167 trimmer_it->self_msg_iter, sstate->stream);
1168 if (!sstate->stream_beginning_msg) {
1169 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
1170 goto end;
1171 }
1172 }
1173
1174 /* Push initial stream beginning message */
1175 BT_ASSERT(sstate->stream_beginning_msg);
1176 push_message(trimmer_it, sstate->stream_beginning_msg);
1177 sstate->stream_beginning_msg = NULL;
1178
1179 if (stream_act_beginning_msg) {
1180 /*
1181 * Initial stream activity beginning message exists: if
1182 * its time is -inf, then create and push a new one
1183 * having the trimming range's beginning time. Otherwise
1184 * push it as is (known and unknown).
1185 */
1186 const bt_clock_snapshot *cs;
1187 bt_message_stream_activity_clock_snapshot_state sa_cs_state;
1188
1189 sa_cs_state = bt_message_stream_activity_beginning_borrow_default_clock_snapshot_const(
1190 stream_act_beginning_msg, &cs);
1191 if (sa_cs_state == BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_INFINITE &&
1192 !trimmer_it->begin.is_infinite) {
1193 /*
1194 * -inf time: use trimming range's beginning
1195 * time (which is not -inf).
1196 */
1197 status = create_stream_beginning_activity_message(
1198 trimmer_it, sstate->stream, clock_class,
1199 &new_msg);
1200 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1201 goto end;
1202 }
1203
1204 push_message(trimmer_it, new_msg);
1205 new_msg = NULL;
1206 } else {
1207 /* Known/unknown: push as is */
1208 push_message(trimmer_it, stream_act_beginning_msg);
1209 stream_act_beginning_msg = NULL;
1210 }
1211 } else {
1212 BT_ASSERT(!trimmer_it->begin.is_infinite);
1213
1214 /*
1215 * No stream beginning activity message: create and push
1216 * a new message.
1217 */
1218 status = create_stream_beginning_activity_message(
1219 trimmer_it, sstate->stream, clock_class, &new_msg);
1220 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1221 goto end;
1222 }
1223
1224 push_message(trimmer_it, new_msg);
1225 new_msg = NULL;
1226 }
1227
1228 sstate->inited = true;
1229
1230 end:
1231 bt_message_put_ref(new_msg);
1232 bt_message_put_ref(stream_act_beginning_msg);
1233 return status;
1234 }
1235
1236 static inline
1237 bt_self_message_iterator_status ensure_cur_packet_exists(
1238 struct trimmer_iterator *trimmer_it,
1239 struct trimmer_iterator_stream_state *sstate,
1240 const bt_packet *packet)
1241 {
1242 bt_self_message_iterator_status status =
1243 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1244 int ret;
1245 const bt_clock_class *clock_class =
1246 bt_stream_class_borrow_default_clock_class_const(
1247 bt_stream_borrow_class_const(sstate->stream));
1248 bt_message *msg = NULL;
1249 uint64_t raw_value;
1250
1251 BT_ASSERT(!trimmer_it->begin.is_infinite);
1252 BT_ASSERT(!sstate->cur_packet);
1253
1254 /*
1255 * Create and push an initial packet beginning message,
1256 * making its time the trimming range's beginning time.
1257 */
1258 ret = clock_raw_value_from_ns_from_origin(clock_class,
1259 trimmer_it->begin.ns_from_origin, &raw_value);
1260 if (ret) {
1261 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1262 goto end;
1263 }
1264
1265 msg = bt_message_packet_beginning_create_with_default_clock_snapshot(
1266 trimmer_it->self_msg_iter, packet, raw_value);
1267 if (!msg) {
1268 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
1269 goto end;
1270 }
1271
1272 push_message(trimmer_it, msg);
1273 msg = NULL;
1274
1275 /* Set packet as this stream's current packet */
1276 sstate->cur_packet = packet;
1277 bt_packet_get_ref(sstate->cur_packet);
1278
1279 end:
1280 bt_message_put_ref(msg);
1281 return status;
1282 }
1283
1284 /*
1285 * Handles a message which is associated to a given stream state. This
1286 * _could_ make the iterator's output message queue grow; this could
1287 * also consume the message without pushing anything to this queue, only
1288 * modifying the stream state.
1289 *
1290 * This function consumes the `msg` reference, _whatever the outcome_.
1291 *
1292 * `ns_from_origin` is the message's time, as given by
1293 * get_msg_ns_from_origin().
1294 *
1295 * This function sets `reached_end` if handling this message made the
1296 * iterator reach the end of the trimming range. Note that the output
1297 * message queue could contain messages even if this function sets
1298 * `reached_end`.
1299 */
1300 static inline
1301 bt_self_message_iterator_status handle_message_with_stream_state(
1302 struct trimmer_iterator *trimmer_it, const bt_message *msg,
1303 struct trimmer_iterator_stream_state *sstate,
1304 int64_t ns_from_origin, bool *reached_end)
1305 {
1306 bt_self_message_iterator_status status =
1307 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1308 bt_message_type msg_type = bt_message_get_type(msg);
1309 int ret;
1310
1311 switch (msg_type) {
1312 case BT_MESSAGE_TYPE_EVENT:
1313 if (unlikely(!trimmer_it->end.is_infinite &&
1314 ns_from_origin > trimmer_it->end.ns_from_origin)) {
1315 status = end_iterator_streams(trimmer_it);
1316 *reached_end = true;
1317 break;
1318 }
1319
1320 if (unlikely(!sstate->inited)) {
1321 status = ensure_stream_state_is_inited(trimmer_it,
1322 sstate, NULL);
1323 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1324 goto end;
1325 }
1326 }
1327
1328 if (unlikely(!sstate->cur_packet)) {
1329 const bt_event *event =
1330 bt_message_event_borrow_event_const(msg);
1331 const bt_packet *packet = bt_event_borrow_packet_const(
1332 event);
1333
1334 status = ensure_cur_packet_exists(trimmer_it, sstate,
1335 packet);
1336 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1337 goto end;
1338 }
1339 }
1340
1341 BT_ASSERT(sstate->cur_packet);
1342 push_message(trimmer_it, msg);
1343 msg = NULL;
1344 break;
1345 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
1346 if (unlikely(!trimmer_it->end.is_infinite &&
1347 ns_from_origin > trimmer_it->end.ns_from_origin)) {
1348 status = end_iterator_streams(trimmer_it);
1349 *reached_end = true;
1350 break;
1351 }
1352
1353 if (unlikely(!sstate->inited)) {
1354 status = ensure_stream_state_is_inited(trimmer_it,
1355 sstate, NULL);
1356 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1357 goto end;
1358 }
1359 }
1360
1361 BT_ASSERT(!sstate->cur_packet);
1362 sstate->cur_packet =
1363 bt_message_packet_beginning_borrow_packet_const(msg);
1364 bt_packet_get_ref(sstate->cur_packet);
1365 push_message(trimmer_it, msg);
1366 msg = NULL;
1367 break;
1368 case BT_MESSAGE_TYPE_PACKET_END:
1369 sstate->stream_act_end_ns_from_origin = ns_from_origin;
1370
1371 if (unlikely(!trimmer_it->end.is_infinite &&
1372 ns_from_origin > trimmer_it->end.ns_from_origin)) {
1373 status = end_iterator_streams(trimmer_it);
1374 *reached_end = true;
1375 break;
1376 }
1377
1378 if (unlikely(!sstate->inited)) {
1379 status = ensure_stream_state_is_inited(trimmer_it,
1380 sstate, NULL);
1381 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1382 goto end;
1383 }
1384 }
1385
1386 if (unlikely(!sstate->cur_packet)) {
1387 const bt_packet *packet =
1388 bt_message_packet_end_borrow_packet_const(msg);
1389
1390 status = ensure_cur_packet_exists(trimmer_it, sstate,
1391 packet);
1392 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1393 goto end;
1394 }
1395 }
1396
1397 BT_ASSERT(sstate->cur_packet);
1398 BT_PACKET_PUT_REF_AND_RESET(sstate->cur_packet);
1399 push_message(trimmer_it, msg);
1400 msg = NULL;
1401 break;
1402 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
1403 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
1404 {
1405 /*
1406 * `ns_from_origin` is the message's time range's
1407 * beginning time here.
1408 */
1409 int64_t end_ns_from_origin;
1410 const bt_clock_snapshot *end_cs;
1411
1412 if (bt_message_get_type(msg) ==
1413 BT_MESSAGE_TYPE_DISCARDED_EVENTS) {
1414 /*
1415 * Safe to ignore the return value because we
1416 * know there's a default clock and it's always
1417 * known.
1418 */
1419 (void) bt_message_discarded_events_borrow_default_end_clock_snapshot_const(
1420 msg, &end_cs);
1421 } else {
1422 /*
1423 * Safe to ignore the return value because we
1424 * know there's a default clock and it's always
1425 * known.
1426 */
1427 (void) bt_message_discarded_packets_borrow_default_end_clock_snapshot_const(
1428 msg, &end_cs);
1429 }
1430
1431 if (bt_clock_snapshot_get_ns_from_origin(end_cs,
1432 &end_ns_from_origin)) {
1433 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1434 goto end;
1435 }
1436
1437 sstate->stream_act_end_ns_from_origin = end_ns_from_origin;
1438
1439 if (!trimmer_it->end.is_infinite &&
1440 ns_from_origin > trimmer_it->end.ns_from_origin) {
1441 status = end_iterator_streams(trimmer_it);
1442 *reached_end = true;
1443 break;
1444 }
1445
1446 if (!trimmer_it->end.is_infinite &&
1447 end_ns_from_origin > trimmer_it->end.ns_from_origin) {
1448 /*
1449 * This message's end time is outside the
1450 * trimming time range: replace it with a new
1451 * message having an end time equal to the
1452 * trimming time range's end and without a
1453 * count.
1454 */
1455 const bt_clock_class *clock_class =
1456 bt_clock_snapshot_borrow_clock_class_const(
1457 end_cs);
1458 const bt_clock_snapshot *begin_cs;
1459 bt_message *new_msg;
1460 uint64_t end_raw_value;
1461
1462 ret = clock_raw_value_from_ns_from_origin(clock_class,
1463 trimmer_it->end.ns_from_origin, &end_raw_value);
1464 if (ret) {
1465 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1466 goto end;
1467 }
1468
1469 if (msg_type == BT_MESSAGE_TYPE_DISCARDED_EVENTS) {
1470 (void) bt_message_discarded_events_borrow_default_beginning_clock_snapshot_const(
1471 msg, &begin_cs);
1472 new_msg = bt_message_discarded_events_create_with_default_clock_snapshots(
1473 trimmer_it->self_msg_iter,
1474 sstate->stream,
1475 bt_clock_snapshot_get_value(begin_cs),
1476 end_raw_value);
1477 } else {
1478 (void) bt_message_discarded_packets_borrow_default_beginning_clock_snapshot_const(
1479 msg, &begin_cs);
1480 new_msg = bt_message_discarded_packets_create_with_default_clock_snapshots(
1481 trimmer_it->self_msg_iter,
1482 sstate->stream,
1483 bt_clock_snapshot_get_value(begin_cs),
1484 end_raw_value);
1485 }
1486
1487 if (!new_msg) {
1488 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
1489 goto end;
1490 }
1491
1492 /* Replace the original message */
1493 BT_MESSAGE_MOVE_REF(msg, new_msg);
1494 }
1495
1496 if (unlikely(!sstate->inited)) {
1497 status = ensure_stream_state_is_inited(trimmer_it,
1498 sstate, NULL);
1499 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1500 goto end;
1501 }
1502 }
1503
1504 push_message(trimmer_it, msg);
1505 msg = NULL;
1506 break;
1507 }
1508 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING:
1509 if (!trimmer_it->end.is_infinite &&
1510 ns_from_origin > trimmer_it->end.ns_from_origin) {
1511 /*
1512 * This only happens when the message's time is
1513 * known and is greater than the trimming
1514 * range's end time. Unknown and -inf times are
1515 * always less than
1516 * `trimmer_it->end.ns_from_origin`.
1517 */
1518 status = end_iterator_streams(trimmer_it);
1519 *reached_end = true;
1520 break;
1521 }
1522
1523 if (!sstate->inited) {
1524 status = ensure_stream_state_is_inited(trimmer_it,
1525 sstate, msg);
1526 msg = NULL;
1527 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1528 goto end;
1529 }
1530 } else {
1531 push_message(trimmer_it, msg);
1532 msg = NULL;
1533 }
1534
1535 break;
1536 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END:
1537 if (trimmer_it->end.is_infinite) {
1538 push_message(trimmer_it, msg);
1539 msg = NULL;
1540 break;
1541 }
1542
1543 if (ns_from_origin == INT64_MIN) {
1544 /* Unknown: push as is if stream state is inited */
1545 if (sstate->inited) {
1546 push_message(trimmer_it, msg);
1547 msg = NULL;
1548 sstate->last_msg_is_stream_activity_end = true;
1549 }
1550 } else if (ns_from_origin == INT64_MAX) {
1551 /* Infinite: use trimming range's end time */
1552 sstate->stream_act_end_ns_from_origin =
1553 trimmer_it->end.ns_from_origin;
1554 } else {
1555 /* Known: check if outside of trimming range */
1556 if (ns_from_origin > trimmer_it->end.ns_from_origin) {
1557 sstate->stream_act_end_ns_from_origin =
1558 trimmer_it->end.ns_from_origin;
1559 status = end_iterator_streams(trimmer_it);
1560 *reached_end = true;
1561 break;
1562 }
1563
1564 if (!sstate->inited) {
1565 /*
1566 * First message for this stream is a
1567 * stream activity end: we can't deduce
1568 * anything about the stream activity
1569 * beginning's time, and using this
1570 * message's time would make a useless
1571 * pair of stream activity beginning/end
1572 * with the same time. Just skip this
1573 * message and wait for something
1574 * useful.
1575 */
1576 break;
1577 }
1578
1579 push_message(trimmer_it, msg);
1580 msg = NULL;
1581 sstate->last_msg_is_stream_activity_end = true;
1582 sstate->stream_act_end_ns_from_origin = ns_from_origin;
1583 }
1584
1585 break;
1586 case BT_MESSAGE_TYPE_STREAM_BEGINNING:
1587 /*
1588 * We don't know what follows at this point, so just
1589 * keep this message until we know what to do with it
1590 * (it will be used in ensure_stream_state_is_inited()).
1591 */
1592 BT_ASSERT(!sstate->inited);
1593 BT_MESSAGE_MOVE_REF(sstate->stream_beginning_msg, msg);
1594 break;
1595 case BT_MESSAGE_TYPE_STREAM_END:
1596 if (sstate->inited) {
1597 /*
1598 * This is the end of an inited stream: end this
1599 * stream if its stream activity end message
1600 * time is not the trimming range's end time
1601 * (which means the final stream activity end
1602 * message had an infinite time). end_stream()
1603 * will generate its own stream end message.
1604 */
1605 if (trimmer_it->end.is_infinite) {
1606 push_message(trimmer_it, msg);
1607 msg = NULL;
1608 g_hash_table_remove(trimmer_it->stream_states,
1609 sstate->stream);
1610 } else if (sstate->stream_act_end_ns_from_origin <
1611 trimmer_it->end.ns_from_origin) {
1612 status = end_stream(trimmer_it, sstate);
1613 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1614 goto end;
1615 }
1616
1617 /* We won't need this stream state again */
1618 g_hash_table_remove(trimmer_it->stream_states,
1619 sstate->stream);
1620 }
1621 } else {
1622 /* We dont't need this stream state anymore */
1623 g_hash_table_remove(trimmer_it->stream_states, sstate->stream);
1624 }
1625
1626 break;
1627 default:
1628 break;
1629 }
1630
1631 end:
1632 /* We release the message's reference whatever the outcome */
1633 bt_message_put_ref(msg);
1634 return BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1635 }
1636
1637 /*
1638 * Handles an input message. This _could_ make the iterator's output
1639 * message queue grow; this could also consume the message without
1640 * pushing anything to this queue, only modifying the stream state.
1641 *
1642 * This function consumes the `msg` reference, _whatever the outcome_.
1643 *
1644 * This function sets `reached_end` if handling this message made the
1645 * iterator reach the end of the trimming range. Note that the output
1646 * message queue could contain messages even if this function sets
1647 * `reached_end`.
1648 */
1649 static inline
1650 bt_self_message_iterator_status handle_message(
1651 struct trimmer_iterator *trimmer_it, const bt_message *msg,
1652 bool *reached_end)
1653 {
1654 bt_self_message_iterator_status status;
1655 const bt_stream *stream = NULL;
1656 int64_t ns_from_origin = INT64_MIN;
1657 bool skip;
1658 int ret;
1659 struct trimmer_iterator_stream_state *sstate = NULL;
1660
1661 /* Find message's associated stream */
1662 switch (bt_message_get_type(msg)) {
1663 case BT_MESSAGE_TYPE_EVENT:
1664 stream = bt_event_borrow_stream_const(
1665 bt_message_event_borrow_event_const(msg));
1666 break;
1667 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
1668 stream = bt_packet_borrow_stream_const(
1669 bt_message_packet_beginning_borrow_packet_const(msg));
1670 break;
1671 case BT_MESSAGE_TYPE_PACKET_END:
1672 stream = bt_packet_borrow_stream_const(
1673 bt_message_packet_end_borrow_packet_const(msg));
1674 break;
1675 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
1676 stream = bt_message_discarded_events_borrow_stream_const(msg);
1677 break;
1678 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
1679 stream = bt_message_discarded_packets_borrow_stream_const(msg);
1680 break;
1681 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING:
1682 stream = bt_message_stream_activity_beginning_borrow_stream_const(msg);
1683 break;
1684 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END:
1685 stream = bt_message_stream_activity_end_borrow_stream_const(msg);
1686 break;
1687 case BT_MESSAGE_TYPE_STREAM_BEGINNING:
1688 stream = bt_message_stream_beginning_borrow_stream_const(msg);
1689 break;
1690 case BT_MESSAGE_TYPE_STREAM_END:
1691 stream = bt_message_stream_end_borrow_stream_const(msg);
1692 break;
1693 default:
1694 break;
1695 }
1696
1697 if (likely(stream)) {
1698 /* Find stream state */
1699 sstate = g_hash_table_lookup(trimmer_it->stream_states,
1700 stream);
1701 if (unlikely(!sstate)) {
1702 /* No stream state yet: create one now */
1703 const bt_stream_class *sc;
1704
1705 /*
1706 * Validate right now that the stream's class
1707 * has a registered default clock class so that
1708 * an existing stream state guarantees existing
1709 * default clock snapshots for its associated
1710 * messages.
1711 *
1712 * Also check that clock snapshots are always
1713 * known.
1714 */
1715 sc = bt_stream_borrow_class_const(stream);
1716 if (!bt_stream_class_borrow_default_clock_class_const(sc)) {
1717 BT_LOGE("Unsupported stream: stream class does "
1718 "not have a default clock class: "
1719 "stream-addr=%p, "
1720 "stream-id=%" PRIu64 ", "
1721 "stream-name=\"%s\"",
1722 stream, bt_stream_get_id(stream),
1723 bt_stream_get_name(stream));
1724 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1725 goto end;
1726 }
1727
1728 if (!bt_stream_class_default_clock_is_always_known(sc)) {
1729 BT_LOGE("Unsupported stream: clock does not "
1730 "always have a known value: "
1731 "stream-addr=%p, "
1732 "stream-id=%" PRIu64 ", "
1733 "stream-name=\"%s\"",
1734 stream, bt_stream_get_id(stream),
1735 bt_stream_get_name(stream));
1736 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1737 goto end;
1738 }
1739
1740 sstate = g_new0(struct trimmer_iterator_stream_state,
1741 1);
1742 if (!sstate) {
1743 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
1744 goto end;
1745 }
1746
1747 sstate->stream = stream;
1748 sstate->stream_act_end_ns_from_origin = INT64_MIN;
1749 g_hash_table_insert(trimmer_it->stream_states,
1750 (void *) stream, sstate);
1751 }
1752 }
1753
1754 /* Retrieve the message's time */
1755 ret = get_msg_ns_from_origin(msg, &ns_from_origin, &skip);
1756 if (unlikely(ret)) {
1757 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1758 goto end;
1759 }
1760
1761 if (likely(sstate)) {
1762 /* Message associated to a stream */
1763 status = handle_message_with_stream_state(trimmer_it, msg,
1764 sstate, ns_from_origin, reached_end);
1765
1766 /*
1767 * handle_message_with_stream_state() unconditionally
1768 * consumes `msg`.
1769 */
1770 msg = NULL;
1771 } else {
1772 /*
1773 * Message not associated to a stream (message iterator
1774 * inactivity).
1775 */
1776 if (unlikely(ns_from_origin > trimmer_it->end.ns_from_origin)) {
1777 BT_MESSAGE_PUT_REF_AND_RESET(msg);
1778 status = end_iterator_streams(trimmer_it);
1779 *reached_end = true;
1780 } else {
1781 push_message(trimmer_it, msg);
1782 status = BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1783 msg = NULL;
1784 }
1785 }
1786
1787 end:
1788 /* We release the message's reference whatever the outcome */
1789 bt_message_put_ref(msg);
1790 return status;
1791 }
1792
1793 static inline
1794 void fill_message_array_from_output_messages(
1795 struct trimmer_iterator *trimmer_it,
1796 bt_message_array_const msgs, uint64_t capacity, uint64_t *count)
1797 {
1798 *count = 0;
1799
1800 /*
1801 * Move auto-seek messages to the output array (which is this
1802 * iterator's base message array).
1803 */
1804 while (capacity > 0 && !g_queue_is_empty(trimmer_it->output_messages)) {
1805 msgs[*count] = pop_message(trimmer_it);
1806 capacity--;
1807 (*count)++;
1808 }
1809
1810 BT_ASSERT(*count > 0);
1811 }
1812
1813 static inline
1814 bt_self_message_iterator_status state_ending(
1815 struct trimmer_iterator *trimmer_it,
1816 bt_message_array_const msgs, uint64_t capacity,
1817 uint64_t *count)
1818 {
1819 bt_self_message_iterator_status status =
1820 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1821
1822 if (g_queue_is_empty(trimmer_it->output_messages)) {
1823 trimmer_it->state = TRIMMER_ITERATOR_STATE_ENDED;
1824 status = BT_SELF_MESSAGE_ITERATOR_STATUS_END;
1825 goto end;
1826 }
1827
1828 fill_message_array_from_output_messages(trimmer_it, msgs,
1829 capacity, count);
1830
1831 end:
1832 return status;
1833 }
1834
1835 static inline
1836 bt_self_message_iterator_status state_trim(struct trimmer_iterator *trimmer_it,
1837 bt_message_array_const msgs, uint64_t capacity,
1838 uint64_t *count)
1839 {
1840 bt_self_message_iterator_status status =
1841 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1842 bt_message_array_const my_msgs;
1843 uint64_t my_count;
1844 uint64_t i;
1845 bool reached_end = false;
1846
1847 while (g_queue_is_empty(trimmer_it->output_messages)) {
1848 status = (int) bt_self_component_port_input_message_iterator_next(
1849 trimmer_it->upstream_iter, &my_msgs, &my_count);
1850 if (unlikely(status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK)) {
1851 if (status == BT_SELF_MESSAGE_ITERATOR_STATUS_END) {
1852 status = end_iterator_streams(trimmer_it);
1853 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1854 goto end;
1855 }
1856
1857 trimmer_it->state =
1858 TRIMMER_ITERATOR_STATE_ENDING;
1859 status = state_ending(trimmer_it, msgs,
1860 capacity, count);
1861 }
1862
1863 goto end;
1864 }
1865
1866 BT_ASSERT(my_count > 0);
1867
1868 for (i = 0; i < my_count; i++) {
1869 status = handle_message(trimmer_it, my_msgs[i],
1870 &reached_end);
1871
1872 /*
1873 * handle_message() unconditionally consumes the
1874 * message reference.
1875 */
1876 my_msgs[i] = NULL;
1877
1878 if (unlikely(status !=
1879 BT_SELF_MESSAGE_ITERATOR_STATUS_OK)) {
1880 put_messages(my_msgs, my_count);
1881 goto end;
1882 }
1883
1884 if (unlikely(reached_end)) {
1885 /*
1886 * This message's time was passed the
1887 * trimming time range's end time: we
1888 * are done. Their might still be
1889 * messages in the output message queue,
1890 * so move to the "ending" state and
1891 * apply it immediately since
1892 * state_trim() is called within the
1893 * "next" method.
1894 */
1895 put_messages(my_msgs, my_count);
1896 trimmer_it->state =
1897 TRIMMER_ITERATOR_STATE_ENDING;
1898 status = state_ending(trimmer_it, msgs,
1899 capacity, count);
1900 goto end;
1901 }
1902 }
1903 }
1904
1905 /*
1906 * There's at least one message in the output message queue:
1907 * move the messages to the output message array.
1908 */
1909 BT_ASSERT(!g_queue_is_empty(trimmer_it->output_messages));
1910 fill_message_array_from_output_messages(trimmer_it, msgs,
1911 capacity, count);
1912
1913 end:
1914 return status;
1915 }
1916
1917 BT_HIDDEN
1918 bt_self_message_iterator_status trimmer_msg_iter_next(
1919 bt_self_message_iterator *self_msg_iter,
1920 bt_message_array_const msgs, uint64_t capacity,
1921 uint64_t *count)
1922 {
1923 struct trimmer_iterator *trimmer_it =
1924 bt_self_message_iterator_get_data(self_msg_iter);
1925 bt_self_message_iterator_status status =
1926 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1927
1928 BT_ASSERT(trimmer_it);
1929
1930 if (likely(trimmer_it->state == TRIMMER_ITERATOR_STATE_TRIM)) {
1931 status = state_trim(trimmer_it, msgs, capacity, count);
1932 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1933 goto end;
1934 }
1935 } else {
1936 switch (trimmer_it->state) {
1937 case TRIMMER_ITERATOR_STATE_SET_BOUNDS_NS_FROM_ORIGIN:
1938 status = state_set_trimmer_iterator_bounds(trimmer_it);
1939 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1940 goto end;
1941 }
1942
1943 status = state_seek_initially(trimmer_it);
1944 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1945 goto end;
1946 }
1947
1948 status = state_trim(trimmer_it, msgs, capacity, count);
1949 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1950 goto end;
1951 }
1952
1953 break;
1954 case TRIMMER_ITERATOR_STATE_SEEK_INITIALLY:
1955 status = state_seek_initially(trimmer_it);
1956 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1957 goto end;
1958 }
1959
1960 status = state_trim(trimmer_it, msgs, capacity, count);
1961 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1962 goto end;
1963 }
1964
1965 break;
1966 case TRIMMER_ITERATOR_STATE_ENDING:
1967 status = state_ending(trimmer_it, msgs, capacity,
1968 count);
1969 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1970 goto end;
1971 }
1972
1973 break;
1974 case TRIMMER_ITERATOR_STATE_ENDED:
1975 status = BT_SELF_MESSAGE_ITERATOR_STATUS_END;
1976 break;
1977 default:
1978 abort();
1979 }
1980 }
1981
1982 end:
1983 return status;
1984 }
1985
1986 BT_HIDDEN
1987 void trimmer_msg_iter_finalize(bt_self_message_iterator *self_msg_iter)
1988 {
1989 struct trimmer_iterator *trimmer_it =
1990 bt_self_message_iterator_get_data(self_msg_iter);
1991
1992 BT_ASSERT(trimmer_it);
1993 destroy_trimmer_iterator(trimmer_it);
1994 }
This page took 0.067924 seconds and 3 git commands to generate.