lib: rename include dir to babeltrace2
[babeltrace.git] / plugins / utils / trimmer / trimmer.c
1 /*
2 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 * Copyright 2019 Philippe Proulx <pproulx@efficios.com>
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a copy
6 * of this software and associated documentation files (the "Software"), to deal
7 * in the Software without restriction, including without limitation the rights
8 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 * copies of the Software, and to permit persons to whom the Software is
10 * furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21 * SOFTWARE.
22 */
23
24 #define BT_LOG_TAG "PLUGIN-UTILS-TRIMMER-FLT"
25 #include "logging.h"
26
27 #include <babeltrace2/compat/utc-internal.h>
28 #include <babeltrace2/compat/time-internal.h>
29 #include <babeltrace2/babeltrace.h>
30 #include <babeltrace2/common-internal.h>
31 #include <plugins-common.h>
32 #include <babeltrace2/assert-internal.h>
33 #include <stdint.h>
34 #include <inttypes.h>
35 #include <glib.h>
36
37 #include "trimmer.h"
38
39 #define NS_PER_S INT64_C(1000000000)
40
41 static const char * const in_port_name = "in";
42
43 struct trimmer_time {
44 unsigned int hour, minute, second, ns;
45 };
46
47 struct trimmer_bound {
48 /*
49 * Nanoseconds from origin, valid if `is_set` is set and
50 * `is_infinite` is false.
51 */
52 int64_t ns_from_origin;
53
54 /* True if this bound's full time (`ns_from_origin`) is set */
55 bool is_set;
56
57 /*
58 * True if this bound represents the infinity (negative or
59 * positive depending on which bound it is). If this is true,
60 * then we don't care about `ns_from_origin` above.
61 */
62 bool is_infinite;
63
64 /*
65 * This bound's time without the date; this time is used to set
66 * `ns_from_origin` once we know the date.
67 */
68 struct trimmer_time time;
69 };
70
71 struct trimmer_comp {
72 struct trimmer_bound begin, end;
73 bool is_gmt;
74 };
75
76 enum trimmer_iterator_state {
77 /*
78 * Find the first message's date and set the bounds's times
79 * accordingly.
80 */
81 TRIMMER_ITERATOR_STATE_SET_BOUNDS_NS_FROM_ORIGIN,
82
83 /*
84 * Initially seek to the trimming range's beginning time.
85 */
86 TRIMMER_ITERATOR_STATE_SEEK_INITIALLY,
87
88 /*
89 * Fill the output message queue for as long as received input
90 * messages are within the trimming time range.
91 */
92 TRIMMER_ITERATOR_STATE_TRIM,
93
94 /* Flush the remaining messages in the output message queue */
95 TRIMMER_ITERATOR_STATE_ENDING,
96
97 /* Trimming operation and message iterator is ended */
98 TRIMMER_ITERATOR_STATE_ENDED,
99 };
100
101 struct trimmer_iterator {
102 /* Weak */
103 struct trimmer_comp *trimmer_comp;
104
105 /* Weak */
106 bt_self_message_iterator *self_msg_iter;
107
108 enum trimmer_iterator_state state;
109
110 /* Owned by this */
111 bt_self_component_port_input_message_iterator *upstream_iter;
112 struct trimmer_bound begin, end;
113
114 /*
115 * Queue of `const bt_message *` (owned by the queue).
116 *
117 * This is where the trimming operation pushes the messages to
118 * output by this message iterator.
119 */
120 GQueue *output_messages;
121
122 /*
123 * Hash table of `bt_stream *` (weak) to
124 * `struct trimmer_iterator_stream_state *` (owned by the HT).
125 */
126 GHashTable *stream_states;
127 };
128
129 struct trimmer_iterator_stream_state {
130 /*
131 * True if both stream beginning and initial stream activity
132 * beginning messages were pushed for this stream.
133 */
134 bool inited;
135
136 /*
137 * True if the last pushed message for this stream was a stream
138 * activity end message.
139 */
140 bool last_msg_is_stream_activity_end;
141
142 /*
143 * Time to use for a generated stream end activity message when
144 * ending the stream.
145 */
146 int64_t stream_act_end_ns_from_origin;
147
148 /* Weak */
149 const bt_stream *stream;
150
151 /* Owned by this (`NULL` initially and between packets) */
152 const bt_packet *cur_packet;
153
154 /* Owned by this */
155 const bt_message *stream_beginning_msg;
156 };
157
158 static
159 void destroy_trimmer_comp(struct trimmer_comp *trimmer_comp)
160 {
161 BT_ASSERT(trimmer_comp);
162 g_free(trimmer_comp);
163 }
164
165 static
166 struct trimmer_comp *create_trimmer_comp(void)
167 {
168 return g_new0(struct trimmer_comp, 1);
169 }
170
171 BT_HIDDEN
172 void trimmer_finalize(bt_self_component_filter *self_comp)
173 {
174 struct trimmer_comp *trimmer_comp =
175 bt_self_component_get_data(
176 bt_self_component_filter_as_self_component(self_comp));
177
178 if (trimmer_comp) {
179 destroy_trimmer_comp(trimmer_comp);
180 }
181 }
182
183 /*
184 * Sets the time (in ns from origin) of a trimmer bound from date and
185 * time components.
186 *
187 * Returns a negative value if anything goes wrong.
188 */
189 static
190 int set_bound_ns_from_origin(struct trimmer_bound *bound,
191 unsigned int year, unsigned int month, unsigned int day,
192 unsigned int hour, unsigned int minute, unsigned int second,
193 unsigned int ns, bool is_gmt)
194 {
195 int ret = 0;
196 time_t result;
197 struct tm tm = {
198 .tm_sec = second,
199 .tm_min = minute,
200 .tm_hour = hour,
201 .tm_mday = day,
202 .tm_mon = month - 1,
203 .tm_year = year - 1900,
204 .tm_isdst = -1,
205 };
206
207 if (is_gmt) {
208 result = bt_timegm(&tm);
209 } else {
210 result = mktime(&tm);
211 }
212
213 if (result < 0) {
214 ret = -1;
215 goto end;
216 }
217
218 BT_ASSERT(bound);
219 bound->ns_from_origin = (int64_t) result;
220 bound->ns_from_origin *= NS_PER_S;
221 bound->ns_from_origin += ns;
222 bound->is_set = true;
223
224 end:
225 return ret;
226 }
227
228 /*
229 * Parses a timestamp, figuring out its format.
230 *
231 * Returns a negative value if anything goes wrong.
232 *
233 * Expected formats:
234 *
235 * YYYY-MM-DD hh:mm[:ss[.ns]]
236 * [hh:mm:]ss[.ns]
237 * [-]s[.ns]
238 *
239 * TODO: Check overflows.
240 */
241 static
242 int set_bound_from_str(const char *str, struct trimmer_bound *bound,
243 bool is_gmt)
244 {
245 int ret = 0;
246 int s_ret;
247 unsigned int year, month, day, hour, minute, second, ns;
248 char dummy;
249
250 /* Try `YYYY-MM-DD hh:mm:ss.ns` format */
251 s_ret = sscanf(str, "%u-%u-%u %u:%u:%u.%u%c", &year, &month, &day,
252 &hour, &minute, &second, &ns, &dummy);
253 if (s_ret == 7) {
254 ret = set_bound_ns_from_origin(bound, year, month, day,
255 hour, minute, second, ns, is_gmt);
256 goto end;
257 }
258
259 /* Try `YYYY-MM-DD hh:mm:ss` format */
260 s_ret = sscanf(str, "%u-%u-%u %u:%u:%u%c", &year, &month, &day,
261 &hour, &minute, &second, &dummy);
262 if (s_ret == 6) {
263 ret = set_bound_ns_from_origin(bound, year, month, day,
264 hour, minute, second, 0, is_gmt);
265 goto end;
266 }
267
268 /* Try `YYYY-MM-DD hh:mm` format */
269 s_ret = sscanf(str, "%u-%u-%u %u:%u%c", &year, &month, &day,
270 &hour, &minute, &dummy);
271 if (s_ret == 5) {
272 ret = set_bound_ns_from_origin(bound, year, month, day,
273 hour, minute, 0, 0, is_gmt);
274 goto end;
275 }
276
277 /* Try `YYYY-MM-DD` format */
278 s_ret = sscanf(str, "%u-%u-%u%c", &year, &month, &day, &dummy);
279 if (s_ret == 3) {
280 ret = set_bound_ns_from_origin(bound, year, month, day,
281 0, 0, 0, 0, is_gmt);
282 goto end;
283 }
284
285 /* Try `hh:mm:ss.ns` format */
286 s_ret = sscanf(str, "%u:%u:%u.%u%c", &hour, &minute, &second, &ns,
287 &dummy);
288 if (s_ret == 4) {
289 bound->time.hour = hour;
290 bound->time.minute = minute;
291 bound->time.second = second;
292 bound->time.ns = ns;
293 goto end;
294 }
295
296 /* Try `hh:mm:ss` format */
297 s_ret = sscanf(str, "%u:%u:%u%c", &hour, &minute, &second, &dummy);
298 if (s_ret == 3) {
299 bound->time.hour = hour;
300 bound->time.minute = minute;
301 bound->time.second = second;
302 bound->time.ns = 0;
303 goto end;
304 }
305
306 /* Try `-s.ns` format */
307 s_ret = sscanf(str, "-%u.%u%c", &second, &ns, &dummy);
308 if (s_ret == 2) {
309 bound->ns_from_origin = -((int64_t) second) * NS_PER_S;
310 bound->ns_from_origin -= (int64_t) ns;
311 bound->is_set = true;
312 goto end;
313 }
314
315 /* Try `s.ns` format */
316 s_ret = sscanf(str, "%u.%u%c", &second, &ns, &dummy);
317 if (s_ret == 2) {
318 bound->ns_from_origin = ((int64_t) second) * NS_PER_S;
319 bound->ns_from_origin += (int64_t) ns;
320 bound->is_set = true;
321 goto end;
322 }
323
324 /* Try `-s` format */
325 s_ret = sscanf(str, "-%u%c", &second, &dummy);
326 if (s_ret == 1) {
327 bound->ns_from_origin = -((int64_t) second) * NS_PER_S;
328 bound->is_set = true;
329 goto end;
330 }
331
332 /* Try `s` format */
333 s_ret = sscanf(str, "%u%c", &second, &dummy);
334 if (s_ret == 1) {
335 bound->ns_from_origin = (int64_t) second * NS_PER_S;
336 bound->is_set = true;
337 goto end;
338 }
339
340 BT_LOGE("Invalid date/time format: param=\"%s\"", str);
341 ret = -1;
342
343 end:
344 return ret;
345 }
346
347 /*
348 * Sets a trimmer bound's properties from a parameter string/integer
349 * value.
350 *
351 * Returns a negative value if anything goes wrong.
352 */
353 static
354 int set_bound_from_param(const char *param_name, const bt_value *param,
355 struct trimmer_bound *bound, bool is_gmt)
356 {
357 int ret;
358 const char *arg;
359 char tmp_arg[64];
360
361 if (bt_value_is_signed_integer(param)) {
362 int64_t value = bt_value_signed_integer_get(param);
363
364 /*
365 * Just convert it to a temporary string to handle
366 * everything the same way.
367 */
368 sprintf(tmp_arg, "%" PRId64, value);
369 arg = tmp_arg;
370 } else if (bt_value_is_string(param)) {
371 arg = bt_value_string_get(param);
372 } else {
373 BT_LOGE("`%s` parameter must be an integer or a string value.",
374 param_name);
375 ret = -1;
376 goto end;
377 }
378
379 ret = set_bound_from_str(arg, bound, is_gmt);
380
381 end:
382 return ret;
383 }
384
385 static
386 int validate_trimmer_bounds(struct trimmer_bound *begin,
387 struct trimmer_bound *end)
388 {
389 int ret = 0;
390
391 BT_ASSERT(begin->is_set);
392 BT_ASSERT(end->is_set);
393
394 if (!begin->is_infinite && !end->is_infinite &&
395 begin->ns_from_origin > end->ns_from_origin) {
396 BT_LOGE("Trimming time range's beginning time is greater than end time: "
397 "begin-ns-from-origin=%" PRId64 ", "
398 "end-ns-from-origin=%" PRId64,
399 begin->ns_from_origin,
400 end->ns_from_origin);
401 ret = -1;
402 goto end;
403 }
404
405 if (!begin->is_infinite && begin->ns_from_origin == INT64_MIN) {
406 BT_LOGE("Invalid trimming time range's beginning time: "
407 "ns-from-origin=%" PRId64,
408 begin->ns_from_origin);
409 ret = -1;
410 goto end;
411 }
412
413 if (!end->is_infinite && end->ns_from_origin == INT64_MIN) {
414 BT_LOGE("Invalid trimming time range's end time: "
415 "ns-from-origin=%" PRId64,
416 end->ns_from_origin);
417 ret = -1;
418 goto end;
419 }
420
421 end:
422 return ret;
423 }
424
425 static
426 int init_trimmer_comp_from_params(struct trimmer_comp *trimmer_comp,
427 const bt_value *params)
428 {
429 const bt_value *value;
430 int ret = 0;
431
432 BT_ASSERT(params);
433 value = bt_value_map_borrow_entry_value_const(params, "gmt");
434 if (value) {
435 trimmer_comp->is_gmt = (bool) bt_value_bool_get(value);
436 }
437
438 value = bt_value_map_borrow_entry_value_const(params, "begin");
439 if (value) {
440 if (set_bound_from_param("begin", value,
441 &trimmer_comp->begin, trimmer_comp->is_gmt)) {
442 /* set_bound_from_param() logs errors */
443 ret = BT_SELF_COMPONENT_STATUS_ERROR;
444 goto end;
445 }
446 } else {
447 trimmer_comp->begin.is_infinite = true;
448 trimmer_comp->begin.is_set = true;
449 }
450
451 value = bt_value_map_borrow_entry_value_const(params, "end");
452 if (value) {
453 if (set_bound_from_param("end", value,
454 &trimmer_comp->end, trimmer_comp->is_gmt)) {
455 /* set_bound_from_param() logs errors */
456 ret = BT_SELF_COMPONENT_STATUS_ERROR;
457 goto end;
458 }
459 } else {
460 trimmer_comp->end.is_infinite = true;
461 trimmer_comp->end.is_set = true;
462 }
463
464 end:
465 if (trimmer_comp->begin.is_set && trimmer_comp->end.is_set) {
466 /* validate_trimmer_bounds() logs errors */
467 ret = validate_trimmer_bounds(&trimmer_comp->begin,
468 &trimmer_comp->end);
469 }
470
471 return ret;
472 }
473
474 bt_self_component_status trimmer_init(bt_self_component_filter *self_comp,
475 const bt_value *params, void *init_data)
476 {
477 int ret;
478 bt_self_component_status status;
479 struct trimmer_comp *trimmer_comp = create_trimmer_comp();
480
481 if (!trimmer_comp) {
482 status = BT_SELF_COMPONENT_STATUS_NOMEM;
483 goto error;
484 }
485
486 status = bt_self_component_filter_add_input_port(
487 self_comp, in_port_name, NULL, NULL);
488 if (status != BT_SELF_COMPONENT_STATUS_OK) {
489 goto error;
490 }
491
492 status = bt_self_component_filter_add_output_port(
493 self_comp, "out", NULL, NULL);
494 if (status != BT_SELF_COMPONENT_STATUS_OK) {
495 goto error;
496 }
497
498 ret = init_trimmer_comp_from_params(trimmer_comp, params);
499 if (ret) {
500 status = BT_SELF_COMPONENT_STATUS_ERROR;
501 goto error;
502 }
503
504 bt_self_component_set_data(
505 bt_self_component_filter_as_self_component(self_comp),
506 trimmer_comp);
507 goto end;
508
509 error:
510 if (status == BT_SELF_COMPONENT_STATUS_OK) {
511 status = BT_SELF_COMPONENT_STATUS_ERROR;
512 }
513
514 if (trimmer_comp) {
515 destroy_trimmer_comp(trimmer_comp);
516 }
517
518 end:
519 return status;
520 }
521
522 static
523 void destroy_trimmer_iterator(struct trimmer_iterator *trimmer_it)
524 {
525 BT_ASSERT(trimmer_it);
526 bt_self_component_port_input_message_iterator_put_ref(
527 trimmer_it->upstream_iter);
528
529 if (trimmer_it->output_messages) {
530 g_queue_free(trimmer_it->output_messages);
531 }
532
533 if (trimmer_it->stream_states) {
534 g_hash_table_destroy(trimmer_it->stream_states);
535 }
536
537 g_free(trimmer_it);
538 }
539
540 static
541 void destroy_trimmer_iterator_stream_state(
542 struct trimmer_iterator_stream_state *sstate)
543 {
544 BT_ASSERT(sstate);
545 BT_PACKET_PUT_REF_AND_RESET(sstate->cur_packet);
546 BT_MESSAGE_PUT_REF_AND_RESET(sstate->stream_beginning_msg);
547 g_free(sstate);
548 }
549
550 BT_HIDDEN
551 bt_self_message_iterator_status trimmer_msg_iter_init(
552 bt_self_message_iterator *self_msg_iter,
553 bt_self_component_filter *self_comp,
554 bt_self_component_port_output *port)
555 {
556 bt_self_message_iterator_status status =
557 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
558 struct trimmer_iterator *trimmer_it;
559
560 trimmer_it = g_new0(struct trimmer_iterator, 1);
561 if (!trimmer_it) {
562 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
563 goto end;
564 }
565
566 trimmer_it->trimmer_comp = bt_self_component_get_data(
567 bt_self_component_filter_as_self_component(self_comp));
568 BT_ASSERT(trimmer_it->trimmer_comp);
569
570 if (trimmer_it->trimmer_comp->begin.is_set &&
571 trimmer_it->trimmer_comp->end.is_set) {
572 /*
573 * Both trimming time range's bounds are set, so skip
574 * the
575 * `TRIMMER_ITERATOR_STATE_SET_BOUNDS_NS_FROM_ORIGIN`
576 * phase.
577 */
578 trimmer_it->state = TRIMMER_ITERATOR_STATE_SEEK_INITIALLY;
579 }
580
581 trimmer_it->begin = trimmer_it->trimmer_comp->begin;
582 trimmer_it->end = trimmer_it->trimmer_comp->end;
583 trimmer_it->upstream_iter =
584 bt_self_component_port_input_message_iterator_create(
585 bt_self_component_filter_borrow_input_port_by_name(
586 self_comp, in_port_name));
587 if (!trimmer_it->upstream_iter) {
588 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
589 goto end;
590 }
591
592 trimmer_it->output_messages = g_queue_new();
593 if (!trimmer_it->output_messages) {
594 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
595 goto end;
596 }
597
598 trimmer_it->stream_states = g_hash_table_new_full(g_direct_hash,
599 g_direct_equal, NULL,
600 (GDestroyNotify) destroy_trimmer_iterator_stream_state);
601 if (!trimmer_it->stream_states) {
602 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
603 goto end;
604 }
605
606 trimmer_it->self_msg_iter = self_msg_iter;
607 bt_self_message_iterator_set_data(self_msg_iter, trimmer_it);
608
609 end:
610 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK && trimmer_it) {
611 destroy_trimmer_iterator(trimmer_it);
612 }
613
614 return status;
615 }
616
617 static inline
618 int get_msg_ns_from_origin(const bt_message *msg, int64_t *ns_from_origin,
619 bool *skip)
620 {
621 const bt_clock_class *clock_class = NULL;
622 const bt_clock_snapshot *clock_snapshot = NULL;
623 bt_message_stream_activity_clock_snapshot_state sa_cs_state;
624 int ret = 0;
625
626 BT_ASSERT(msg);
627 BT_ASSERT(ns_from_origin);
628 BT_ASSERT(skip);
629
630 switch (bt_message_get_type(msg)) {
631 case BT_MESSAGE_TYPE_EVENT:
632 clock_class =
633 bt_message_event_borrow_stream_class_default_clock_class_const(
634 msg);
635 if (unlikely(!clock_class)) {
636 goto error;
637 }
638
639 clock_snapshot = bt_message_event_borrow_default_clock_snapshot_const(
640 msg);
641 break;
642 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
643 clock_class =
644 bt_message_packet_beginning_borrow_stream_class_default_clock_class_const(
645 msg);
646 if (unlikely(!clock_class)) {
647 goto error;
648 }
649
650 clock_snapshot = bt_message_packet_beginning_borrow_default_clock_snapshot_const(
651 msg);
652 break;
653 case BT_MESSAGE_TYPE_PACKET_END:
654 clock_class =
655 bt_message_packet_end_borrow_stream_class_default_clock_class_const(
656 msg);
657 if (unlikely(!clock_class)) {
658 goto error;
659 }
660
661 clock_snapshot = bt_message_packet_end_borrow_default_clock_snapshot_const(
662 msg);
663 break;
664 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
665 clock_class =
666 bt_message_discarded_events_borrow_stream_class_default_clock_class_const(
667 msg);
668 if (unlikely(!clock_class)) {
669 goto error;
670 }
671
672 clock_snapshot = bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(
673 msg);
674 break;
675 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
676 clock_class =
677 bt_message_discarded_packets_borrow_stream_class_default_clock_class_const(
678 msg);
679 if (unlikely(!clock_class)) {
680 goto error;
681 }
682
683 clock_snapshot = bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(
684 msg);
685 break;
686 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING:
687 clock_class =
688 bt_message_stream_activity_beginning_borrow_stream_class_default_clock_class_const(
689 msg);
690 if (unlikely(!clock_class)) {
691 goto error;
692 }
693
694 sa_cs_state = bt_message_stream_activity_beginning_borrow_default_clock_snapshot_const(
695 msg, &clock_snapshot);
696 if (sa_cs_state == BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_UNKNOWN ||
697 sa_cs_state == BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_INFINITE) {
698 /* Lowest possible time to always include them */
699 *ns_from_origin = INT64_MIN;
700 goto no_clock_snapshot;
701 }
702
703 break;
704 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END:
705 clock_class =
706 bt_message_stream_activity_end_borrow_stream_class_default_clock_class_const(
707 msg);
708 if (unlikely(!clock_class)) {
709 goto error;
710 }
711
712 sa_cs_state = bt_message_stream_activity_end_borrow_default_clock_snapshot_const(
713 msg, &clock_snapshot);
714 if (sa_cs_state == BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_UNKNOWN) {
715 /* Lowest time to always include it */
716 *ns_from_origin = INT64_MIN;
717 goto no_clock_snapshot;
718 } else if (sa_cs_state == BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_INFINITE) {
719 /* Greatest time to always exclude it */
720 *ns_from_origin = INT64_MAX;
721 goto no_clock_snapshot;
722 }
723
724 break;
725 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
726 clock_snapshot =
727 bt_message_message_iterator_inactivity_borrow_default_clock_snapshot_const(
728 msg);
729 break;
730 default:
731 goto no_clock_snapshot;
732 }
733
734 ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot,
735 ns_from_origin);
736 if (unlikely(ret)) {
737 goto error;
738 }
739
740 goto end;
741
742 no_clock_snapshot:
743 *skip = true;
744 goto end;
745
746 error:
747 ret = -1;
748
749 end:
750 return ret;
751 }
752
753 static inline
754 void put_messages(bt_message_array_const msgs, uint64_t count)
755 {
756 uint64_t i;
757
758 for (i = 0; i < count; i++) {
759 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
760 }
761 }
762
763 static inline
764 int set_trimmer_iterator_bound(struct trimmer_bound *bound,
765 int64_t ns_from_origin, bool is_gmt)
766 {
767 struct tm tm;
768 time_t time_seconds = (time_t) (ns_from_origin / NS_PER_S);
769 int ret = 0;
770
771 BT_ASSERT(!bound->is_set);
772 errno = 0;
773
774 /* We only need to extract the date from this time */
775 if (is_gmt) {
776 bt_gmtime_r(&time_seconds, &tm);
777 } else {
778 bt_localtime_r(&time_seconds, &tm);
779 }
780
781 if (errno) {
782 BT_LOGE_ERRNO("Cannot convert timestamp to date and time",
783 "ts=%" PRId64, (int64_t) time_seconds);
784 ret = -1;
785 goto end;
786 }
787
788 ret = set_bound_ns_from_origin(bound, tm.tm_year + 1900, tm.tm_mon + 1,
789 tm.tm_mday, bound->time.hour, bound->time.minute,
790 bound->time.second, bound->time.ns, is_gmt);
791
792 end:
793 return ret;
794 }
795
796 static
797 bt_self_message_iterator_status state_set_trimmer_iterator_bounds(
798 struct trimmer_iterator *trimmer_it)
799 {
800 bt_message_iterator_status upstream_iter_status =
801 BT_MESSAGE_ITERATOR_STATUS_OK;
802 struct trimmer_comp *trimmer_comp = trimmer_it->trimmer_comp;
803 bt_message_array_const msgs;
804 uint64_t count = 0;
805 int64_t ns_from_origin = INT64_MIN;
806 uint64_t i;
807 int ret;
808
809 BT_ASSERT(!trimmer_it->begin.is_set ||
810 !trimmer_it->end.is_set);
811
812 while (true) {
813 upstream_iter_status =
814 bt_self_component_port_input_message_iterator_next(
815 trimmer_it->upstream_iter, &msgs, &count);
816 if (upstream_iter_status != BT_MESSAGE_ITERATOR_STATUS_OK) {
817 goto end;
818 }
819
820 for (i = 0; i < count; i++) {
821 const bt_message *msg = msgs[i];
822 bool skip = false;
823 int ret;
824
825 ret = get_msg_ns_from_origin(msg, &ns_from_origin,
826 &skip);
827 if (ret) {
828 goto error;
829 }
830
831 if (skip) {
832 continue;
833 }
834
835 BT_ASSERT(ns_from_origin != INT64_MIN &&
836 ns_from_origin != INT64_MAX);
837 put_messages(msgs, count);
838 goto found;
839 }
840
841 put_messages(msgs, count);
842 }
843
844 found:
845 if (!trimmer_it->begin.is_set) {
846 BT_ASSERT(!trimmer_it->begin.is_infinite);
847 ret = set_trimmer_iterator_bound(&trimmer_it->begin,
848 ns_from_origin, trimmer_comp->is_gmt);
849 if (ret) {
850 goto error;
851 }
852 }
853
854 if (!trimmer_it->end.is_set) {
855 BT_ASSERT(!trimmer_it->end.is_infinite);
856 ret = set_trimmer_iterator_bound(&trimmer_it->end,
857 ns_from_origin, trimmer_comp->is_gmt);
858 if (ret) {
859 goto error;
860 }
861 }
862
863 ret = validate_trimmer_bounds(&trimmer_it->begin,
864 &trimmer_it->end);
865 if (ret) {
866 goto error;
867 }
868
869 goto end;
870
871 error:
872 put_messages(msgs, count);
873 upstream_iter_status = BT_MESSAGE_ITERATOR_STATUS_ERROR;
874
875 end:
876 return (int) upstream_iter_status;
877 }
878
879 static
880 bt_self_message_iterator_status state_seek_initially(
881 struct trimmer_iterator *trimmer_it)
882 {
883 bt_self_message_iterator_status status =
884 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
885
886 BT_ASSERT(trimmer_it->begin.is_set);
887
888 if (trimmer_it->begin.is_infinite) {
889 if (!bt_self_component_port_input_message_iterator_can_seek_beginning(
890 trimmer_it->upstream_iter)) {
891 BT_LOGE_STR("Cannot make upstream message iterator initially seek its beginning.");
892 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
893 goto end;
894 }
895
896 status = (int) bt_self_component_port_input_message_iterator_seek_beginning(
897 trimmer_it->upstream_iter);
898 } else {
899 if (!bt_self_component_port_input_message_iterator_can_seek_ns_from_origin(
900 trimmer_it->upstream_iter,
901 trimmer_it->begin.ns_from_origin)) {
902 BT_LOGE("Cannot make upstream message iterator initially seek: "
903 "seek-ns-from-origin=%" PRId64,
904 trimmer_it->begin.ns_from_origin);
905 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
906 goto end;
907 }
908
909 status = (int) bt_self_component_port_input_message_iterator_seek_ns_from_origin(
910 trimmer_it->upstream_iter, trimmer_it->begin.ns_from_origin);
911 }
912
913 if (status == BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
914 trimmer_it->state = TRIMMER_ITERATOR_STATE_TRIM;
915 }
916
917 end:
918 return status;
919 }
920
921 static inline
922 void push_message(struct trimmer_iterator *trimmer_it, const bt_message *msg)
923 {
924 g_queue_push_head(trimmer_it->output_messages, (void *) msg);
925 }
926
927 static inline
928 const bt_message *pop_message(struct trimmer_iterator *trimmer_it)
929 {
930 return g_queue_pop_tail(trimmer_it->output_messages);
931 }
932
933 static inline
934 int clock_raw_value_from_ns_from_origin(const bt_clock_class *clock_class,
935 int64_t ns_from_origin, uint64_t *raw_value)
936 {
937
938 int64_t cc_offset_s;
939 uint64_t cc_offset_cycles;
940 uint64_t cc_freq;
941
942 bt_clock_class_get_offset(clock_class, &cc_offset_s, &cc_offset_cycles);
943 cc_freq = bt_clock_class_get_frequency(clock_class);
944 return bt_common_clock_value_from_ns_from_origin(cc_offset_s,
945 cc_offset_cycles, cc_freq, ns_from_origin, raw_value);
946 }
947
948 static inline
949 bt_self_message_iterator_status end_stream(struct trimmer_iterator *trimmer_it,
950 struct trimmer_iterator_stream_state *sstate)
951 {
952 bt_self_message_iterator_status status =
953 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
954 uint64_t raw_value;
955 const bt_clock_class *clock_class;
956 int ret;
957 bt_message *msg = NULL;
958
959 BT_ASSERT(!trimmer_it->end.is_infinite);
960
961 if (!sstate->stream) {
962 goto end;
963 }
964
965 if (sstate->cur_packet) {
966 /*
967 * The last message could not have been a stream
968 * activity end message if we have a current packet.
969 */
970 BT_ASSERT(!sstate->last_msg_is_stream_activity_end);
971
972 /*
973 * Create and push a packet end message, making its time
974 * the trimming range's end time.
975 */
976 clock_class = bt_stream_class_borrow_default_clock_class_const(
977 bt_stream_borrow_class_const(sstate->stream));
978 BT_ASSERT(clock_class);
979 ret = clock_raw_value_from_ns_from_origin(clock_class,
980 trimmer_it->end.ns_from_origin, &raw_value);
981 if (ret) {
982 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
983 goto end;
984 }
985
986 msg = bt_message_packet_end_create_with_default_clock_snapshot(
987 trimmer_it->self_msg_iter, sstate->cur_packet,
988 raw_value);
989 if (!msg) {
990 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
991 goto end;
992 }
993
994 push_message(trimmer_it, msg);
995 msg = NULL;
996 BT_PACKET_PUT_REF_AND_RESET(sstate->cur_packet);
997
998 /*
999 * Because we generated a packet end message, set the
1000 * stream activity end message's time to use to the
1001 * trimming range's end time (this packet end message's
1002 * time).
1003 */
1004 sstate->stream_act_end_ns_from_origin =
1005 trimmer_it->end.ns_from_origin;
1006 }
1007
1008 if (!sstate->last_msg_is_stream_activity_end) {
1009 /* Create and push a stream activity end message */
1010 msg = bt_message_stream_activity_end_create(
1011 trimmer_it->self_msg_iter, sstate->stream);
1012 if (!msg) {
1013 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
1014 goto end;
1015 }
1016
1017 clock_class = bt_stream_class_borrow_default_clock_class_const(
1018 bt_stream_borrow_class_const(sstate->stream));
1019 BT_ASSERT(clock_class);
1020
1021 if (sstate->stream_act_end_ns_from_origin == INT64_MIN) {
1022 /*
1023 * We received at least what is necessary to
1024 * have a stream state (stream beginning and
1025 * stream activity beginning messages), but
1026 * nothing else: use the trimmer range's end
1027 * time.
1028 */
1029 sstate->stream_act_end_ns_from_origin =
1030 trimmer_it->end.ns_from_origin;
1031 }
1032
1033 ret = clock_raw_value_from_ns_from_origin(clock_class,
1034 sstate->stream_act_end_ns_from_origin, &raw_value);
1035 if (ret) {
1036 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1037 goto end;
1038 }
1039
1040 bt_message_stream_activity_end_set_default_clock_snapshot(
1041 msg, raw_value);
1042 push_message(trimmer_it, msg);
1043 msg = NULL;
1044 }
1045
1046 /* Create and push a stream end message */
1047 msg = bt_message_stream_end_create(trimmer_it->self_msg_iter,
1048 sstate->stream);
1049 if (!msg) {
1050 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
1051 goto end;
1052 }
1053
1054 push_message(trimmer_it, msg);
1055 msg = NULL;
1056
1057 /*
1058 * Just to make sure that we don't use this stream state again
1059 * in the future without an obvious error.
1060 */
1061 sstate->stream = NULL;
1062
1063 end:
1064 bt_message_put_ref(msg);
1065 return status;
1066 }
1067
1068 static inline
1069 bt_self_message_iterator_status end_iterator_streams(
1070 struct trimmer_iterator *trimmer_it)
1071 {
1072 bt_self_message_iterator_status status =
1073 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1074 GHashTableIter iter;
1075 gpointer key, sstate;
1076
1077 if (trimmer_it->end.is_infinite) {
1078 /*
1079 * An infinite trimming range's end time guarantees that
1080 * we received (and pushed) all the appropriate end
1081 * messages.
1082 */
1083 goto remove_all;
1084 }
1085
1086 /*
1087 * End each stream and then remove them from the hash table of
1088 * stream states to release unneeded references.
1089 */
1090 g_hash_table_iter_init(&iter, trimmer_it->stream_states);
1091
1092 while (g_hash_table_iter_next(&iter, &key, &sstate)) {
1093 status = end_stream(trimmer_it, sstate);
1094 if (status) {
1095 goto end;
1096 }
1097 }
1098
1099 remove_all:
1100 g_hash_table_remove_all(trimmer_it->stream_states);
1101
1102 end:
1103 return status;
1104 }
1105
1106 static inline
1107 bt_self_message_iterator_status create_stream_beginning_activity_message(
1108 struct trimmer_iterator *trimmer_it,
1109 const bt_stream *stream,
1110 const bt_clock_class *clock_class, bt_message **msg)
1111 {
1112 bt_message *local_msg;
1113 bt_self_message_iterator_status status =
1114 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1115
1116 BT_ASSERT(msg);
1117 BT_ASSERT(!trimmer_it->begin.is_infinite);
1118
1119 local_msg = bt_message_stream_activity_beginning_create(
1120 trimmer_it->self_msg_iter, stream);
1121 if (!local_msg) {
1122 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
1123 goto end;
1124 }
1125
1126 if (clock_class) {
1127 int ret;
1128 uint64_t raw_value;
1129
1130 ret = clock_raw_value_from_ns_from_origin(clock_class,
1131 trimmer_it->begin.ns_from_origin, &raw_value);
1132 if (ret) {
1133 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1134 bt_message_put_ref(local_msg);
1135 goto end;
1136 }
1137
1138 bt_message_stream_activity_beginning_set_default_clock_snapshot(
1139 local_msg, raw_value);
1140 }
1141
1142 BT_MESSAGE_MOVE_REF(*msg, local_msg);
1143
1144 end:
1145 return status;
1146 }
1147
1148 /*
1149 * Makes sure to initialize a stream state, pushing the appropriate
1150 * initial messages.
1151 *
1152 * `stream_act_beginning_msg` is an initial stream activity beginning
1153 * message to potentially use, depending on its clock snapshot state.
1154 * This function consumes `stream_act_beginning_msg` unconditionally.
1155 */
1156 static inline
1157 bt_self_message_iterator_status ensure_stream_state_is_inited(
1158 struct trimmer_iterator *trimmer_it,
1159 struct trimmer_iterator_stream_state *sstate,
1160 const bt_message *stream_act_beginning_msg)
1161 {
1162 bt_self_message_iterator_status status =
1163 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1164 bt_message *new_msg = NULL;
1165 const bt_clock_class *clock_class =
1166 bt_stream_class_borrow_default_clock_class_const(
1167 bt_stream_borrow_class_const(sstate->stream));
1168
1169 BT_ASSERT(!sstate->inited);
1170
1171 if (!sstate->stream_beginning_msg) {
1172 /* No initial stream beginning message: create one */
1173 sstate->stream_beginning_msg =
1174 bt_message_stream_beginning_create(
1175 trimmer_it->self_msg_iter, sstate->stream);
1176 if (!sstate->stream_beginning_msg) {
1177 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
1178 goto end;
1179 }
1180 }
1181
1182 /* Push initial stream beginning message */
1183 BT_ASSERT(sstate->stream_beginning_msg);
1184 push_message(trimmer_it, sstate->stream_beginning_msg);
1185 sstate->stream_beginning_msg = NULL;
1186
1187 if (stream_act_beginning_msg) {
1188 /*
1189 * Initial stream activity beginning message exists: if
1190 * its time is -inf, then create and push a new one
1191 * having the trimming range's beginning time. Otherwise
1192 * push it as is (known and unknown).
1193 */
1194 const bt_clock_snapshot *cs;
1195 bt_message_stream_activity_clock_snapshot_state sa_cs_state;
1196
1197 sa_cs_state = bt_message_stream_activity_beginning_borrow_default_clock_snapshot_const(
1198 stream_act_beginning_msg, &cs);
1199 if (sa_cs_state == BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_INFINITE &&
1200 !trimmer_it->begin.is_infinite) {
1201 /*
1202 * -inf time: use trimming range's beginning
1203 * time (which is not -inf).
1204 */
1205 status = create_stream_beginning_activity_message(
1206 trimmer_it, sstate->stream, clock_class,
1207 &new_msg);
1208 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1209 goto end;
1210 }
1211
1212 push_message(trimmer_it, new_msg);
1213 new_msg = NULL;
1214 } else {
1215 /* Known/unknown: push as is */
1216 push_message(trimmer_it, stream_act_beginning_msg);
1217 stream_act_beginning_msg = NULL;
1218 }
1219 } else {
1220 BT_ASSERT(!trimmer_it->begin.is_infinite);
1221
1222 /*
1223 * No stream beginning activity message: create and push
1224 * a new message.
1225 */
1226 status = create_stream_beginning_activity_message(
1227 trimmer_it, sstate->stream, clock_class, &new_msg);
1228 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1229 goto end;
1230 }
1231
1232 push_message(trimmer_it, new_msg);
1233 new_msg = NULL;
1234 }
1235
1236 sstate->inited = true;
1237
1238 end:
1239 bt_message_put_ref(new_msg);
1240 bt_message_put_ref(stream_act_beginning_msg);
1241 return status;
1242 }
1243
1244 static inline
1245 bt_self_message_iterator_status ensure_cur_packet_exists(
1246 struct trimmer_iterator *trimmer_it,
1247 struct trimmer_iterator_stream_state *sstate,
1248 const bt_packet *packet)
1249 {
1250 bt_self_message_iterator_status status =
1251 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1252 int ret;
1253 const bt_clock_class *clock_class =
1254 bt_stream_class_borrow_default_clock_class_const(
1255 bt_stream_borrow_class_const(sstate->stream));
1256 bt_message *msg = NULL;
1257 uint64_t raw_value;
1258
1259 BT_ASSERT(!trimmer_it->begin.is_infinite);
1260 BT_ASSERT(!sstate->cur_packet);
1261
1262 /*
1263 * Create and push an initial packet beginning message,
1264 * making its time the trimming range's beginning time.
1265 */
1266 ret = clock_raw_value_from_ns_from_origin(clock_class,
1267 trimmer_it->begin.ns_from_origin, &raw_value);
1268 if (ret) {
1269 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1270 goto end;
1271 }
1272
1273 msg = bt_message_packet_beginning_create_with_default_clock_snapshot(
1274 trimmer_it->self_msg_iter, packet, raw_value);
1275 if (!msg) {
1276 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
1277 goto end;
1278 }
1279
1280 push_message(trimmer_it, msg);
1281 msg = NULL;
1282
1283 /* Set packet as this stream's current packet */
1284 sstate->cur_packet = packet;
1285 bt_packet_get_ref(sstate->cur_packet);
1286
1287 end:
1288 bt_message_put_ref(msg);
1289 return status;
1290 }
1291
1292 /*
1293 * Handles a message which is associated to a given stream state. This
1294 * _could_ make the iterator's output message queue grow; this could
1295 * also consume the message without pushing anything to this queue, only
1296 * modifying the stream state.
1297 *
1298 * This function consumes the `msg` reference, _whatever the outcome_.
1299 *
1300 * `ns_from_origin` is the message's time, as given by
1301 * get_msg_ns_from_origin().
1302 *
1303 * This function sets `reached_end` if handling this message made the
1304 * iterator reach the end of the trimming range. Note that the output
1305 * message queue could contain messages even if this function sets
1306 * `reached_end`.
1307 */
1308 static inline
1309 bt_self_message_iterator_status handle_message_with_stream_state(
1310 struct trimmer_iterator *trimmer_it, const bt_message *msg,
1311 struct trimmer_iterator_stream_state *sstate,
1312 int64_t ns_from_origin, bool *reached_end)
1313 {
1314 bt_self_message_iterator_status status =
1315 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1316 bt_message_type msg_type = bt_message_get_type(msg);
1317 int ret;
1318
1319 switch (msg_type) {
1320 case BT_MESSAGE_TYPE_EVENT:
1321 if (unlikely(!trimmer_it->end.is_infinite &&
1322 ns_from_origin > trimmer_it->end.ns_from_origin)) {
1323 status = end_iterator_streams(trimmer_it);
1324 *reached_end = true;
1325 break;
1326 }
1327
1328 if (unlikely(!sstate->inited)) {
1329 status = ensure_stream_state_is_inited(trimmer_it,
1330 sstate, NULL);
1331 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1332 goto end;
1333 }
1334 }
1335
1336 if (unlikely(!sstate->cur_packet)) {
1337 const bt_event *event =
1338 bt_message_event_borrow_event_const(msg);
1339 const bt_packet *packet = bt_event_borrow_packet_const(
1340 event);
1341
1342 status = ensure_cur_packet_exists(trimmer_it, sstate,
1343 packet);
1344 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1345 goto end;
1346 }
1347 }
1348
1349 BT_ASSERT(sstate->cur_packet);
1350 push_message(trimmer_it, msg);
1351 msg = NULL;
1352 break;
1353 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
1354 if (unlikely(!trimmer_it->end.is_infinite &&
1355 ns_from_origin > trimmer_it->end.ns_from_origin)) {
1356 status = end_iterator_streams(trimmer_it);
1357 *reached_end = true;
1358 break;
1359 }
1360
1361 if (unlikely(!sstate->inited)) {
1362 status = ensure_stream_state_is_inited(trimmer_it,
1363 sstate, NULL);
1364 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1365 goto end;
1366 }
1367 }
1368
1369 BT_ASSERT(!sstate->cur_packet);
1370 sstate->cur_packet =
1371 bt_message_packet_beginning_borrow_packet_const(msg);
1372 bt_packet_get_ref(sstate->cur_packet);
1373 push_message(trimmer_it, msg);
1374 msg = NULL;
1375 break;
1376 case BT_MESSAGE_TYPE_PACKET_END:
1377 sstate->stream_act_end_ns_from_origin = ns_from_origin;
1378
1379 if (unlikely(!trimmer_it->end.is_infinite &&
1380 ns_from_origin > trimmer_it->end.ns_from_origin)) {
1381 status = end_iterator_streams(trimmer_it);
1382 *reached_end = true;
1383 break;
1384 }
1385
1386 if (unlikely(!sstate->inited)) {
1387 status = ensure_stream_state_is_inited(trimmer_it,
1388 sstate, NULL);
1389 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1390 goto end;
1391 }
1392 }
1393
1394 if (unlikely(!sstate->cur_packet)) {
1395 const bt_packet *packet =
1396 bt_message_packet_end_borrow_packet_const(msg);
1397
1398 status = ensure_cur_packet_exists(trimmer_it, sstate,
1399 packet);
1400 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1401 goto end;
1402 }
1403 }
1404
1405 BT_ASSERT(sstate->cur_packet);
1406 BT_PACKET_PUT_REF_AND_RESET(sstate->cur_packet);
1407 push_message(trimmer_it, msg);
1408 msg = NULL;
1409 break;
1410 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
1411 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
1412 {
1413 /*
1414 * `ns_from_origin` is the message's time range's
1415 * beginning time here.
1416 */
1417 int64_t end_ns_from_origin;
1418 const bt_clock_snapshot *end_cs;
1419
1420 if (bt_message_get_type(msg) ==
1421 BT_MESSAGE_TYPE_DISCARDED_EVENTS) {
1422 /*
1423 * Safe to ignore the return value because we
1424 * know there's a default clock and it's always
1425 * known.
1426 */
1427 end_cs = bt_message_discarded_events_borrow_end_default_clock_snapshot_const(
1428 msg);
1429 } else {
1430 /*
1431 * Safe to ignore the return value because we
1432 * know there's a default clock and it's always
1433 * known.
1434 */
1435 end_cs = bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(
1436 msg);
1437 }
1438
1439 if (bt_clock_snapshot_get_ns_from_origin(end_cs,
1440 &end_ns_from_origin)) {
1441 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1442 goto end;
1443 }
1444
1445 sstate->stream_act_end_ns_from_origin = end_ns_from_origin;
1446
1447 if (!trimmer_it->end.is_infinite &&
1448 ns_from_origin > trimmer_it->end.ns_from_origin) {
1449 status = end_iterator_streams(trimmer_it);
1450 *reached_end = true;
1451 break;
1452 }
1453
1454 if (!trimmer_it->end.is_infinite &&
1455 end_ns_from_origin > trimmer_it->end.ns_from_origin) {
1456 /*
1457 * This message's end time is outside the
1458 * trimming time range: replace it with a new
1459 * message having an end time equal to the
1460 * trimming time range's end and without a
1461 * count.
1462 */
1463 const bt_clock_class *clock_class =
1464 bt_clock_snapshot_borrow_clock_class_const(
1465 end_cs);
1466 const bt_clock_snapshot *begin_cs;
1467 bt_message *new_msg;
1468 uint64_t end_raw_value;
1469
1470 ret = clock_raw_value_from_ns_from_origin(clock_class,
1471 trimmer_it->end.ns_from_origin, &end_raw_value);
1472 if (ret) {
1473 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1474 goto end;
1475 }
1476
1477 if (msg_type == BT_MESSAGE_TYPE_DISCARDED_EVENTS) {
1478 begin_cs = bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(
1479 msg);
1480 new_msg = bt_message_discarded_events_create_with_default_clock_snapshots(
1481 trimmer_it->self_msg_iter,
1482 sstate->stream,
1483 bt_clock_snapshot_get_value(begin_cs),
1484 end_raw_value);
1485 } else {
1486 begin_cs = bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(
1487 msg);
1488 new_msg = bt_message_discarded_packets_create_with_default_clock_snapshots(
1489 trimmer_it->self_msg_iter,
1490 sstate->stream,
1491 bt_clock_snapshot_get_value(begin_cs),
1492 end_raw_value);
1493 }
1494
1495 if (!new_msg) {
1496 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
1497 goto end;
1498 }
1499
1500 /* Replace the original message */
1501 BT_MESSAGE_MOVE_REF(msg, new_msg);
1502 }
1503
1504 if (unlikely(!sstate->inited)) {
1505 status = ensure_stream_state_is_inited(trimmer_it,
1506 sstate, NULL);
1507 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1508 goto end;
1509 }
1510 }
1511
1512 push_message(trimmer_it, msg);
1513 msg = NULL;
1514 break;
1515 }
1516 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING:
1517 if (!trimmer_it->end.is_infinite &&
1518 ns_from_origin > trimmer_it->end.ns_from_origin) {
1519 /*
1520 * This only happens when the message's time is
1521 * known and is greater than the trimming
1522 * range's end time. Unknown and -inf times are
1523 * always less than
1524 * `trimmer_it->end.ns_from_origin`.
1525 */
1526 status = end_iterator_streams(trimmer_it);
1527 *reached_end = true;
1528 break;
1529 }
1530
1531 if (!sstate->inited) {
1532 status = ensure_stream_state_is_inited(trimmer_it,
1533 sstate, msg);
1534 msg = NULL;
1535 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1536 goto end;
1537 }
1538 } else {
1539 push_message(trimmer_it, msg);
1540 msg = NULL;
1541 }
1542
1543 break;
1544 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END:
1545 if (trimmer_it->end.is_infinite) {
1546 push_message(trimmer_it, msg);
1547 msg = NULL;
1548 break;
1549 }
1550
1551 if (ns_from_origin == INT64_MIN) {
1552 /* Unknown: push as is if stream state is inited */
1553 if (sstate->inited) {
1554 push_message(trimmer_it, msg);
1555 msg = NULL;
1556 sstate->last_msg_is_stream_activity_end = true;
1557 }
1558 } else if (ns_from_origin == INT64_MAX) {
1559 /* Infinite: use trimming range's end time */
1560 sstate->stream_act_end_ns_from_origin =
1561 trimmer_it->end.ns_from_origin;
1562 } else {
1563 /* Known: check if outside of trimming range */
1564 if (ns_from_origin > trimmer_it->end.ns_from_origin) {
1565 sstate->stream_act_end_ns_from_origin =
1566 trimmer_it->end.ns_from_origin;
1567 status = end_iterator_streams(trimmer_it);
1568 *reached_end = true;
1569 break;
1570 }
1571
1572 if (!sstate->inited) {
1573 /*
1574 * First message for this stream is a
1575 * stream activity end: we can't deduce
1576 * anything about the stream activity
1577 * beginning's time, and using this
1578 * message's time would make a useless
1579 * pair of stream activity beginning/end
1580 * with the same time. Just skip this
1581 * message and wait for something
1582 * useful.
1583 */
1584 break;
1585 }
1586
1587 push_message(trimmer_it, msg);
1588 msg = NULL;
1589 sstate->last_msg_is_stream_activity_end = true;
1590 sstate->stream_act_end_ns_from_origin = ns_from_origin;
1591 }
1592
1593 break;
1594 case BT_MESSAGE_TYPE_STREAM_BEGINNING:
1595 /*
1596 * We don't know what follows at this point, so just
1597 * keep this message until we know what to do with it
1598 * (it will be used in ensure_stream_state_is_inited()).
1599 */
1600 BT_ASSERT(!sstate->inited);
1601 BT_MESSAGE_MOVE_REF(sstate->stream_beginning_msg, msg);
1602 break;
1603 case BT_MESSAGE_TYPE_STREAM_END:
1604 if (sstate->inited) {
1605 /*
1606 * This is the end of an inited stream: end this
1607 * stream if its stream activity end message
1608 * time is not the trimming range's end time
1609 * (which means the final stream activity end
1610 * message had an infinite time). end_stream()
1611 * will generate its own stream end message.
1612 */
1613 if (trimmer_it->end.is_infinite) {
1614 push_message(trimmer_it, msg);
1615 msg = NULL;
1616 g_hash_table_remove(trimmer_it->stream_states,
1617 sstate->stream);
1618 } else if (sstate->stream_act_end_ns_from_origin <
1619 trimmer_it->end.ns_from_origin) {
1620 status = end_stream(trimmer_it, sstate);
1621 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1622 goto end;
1623 }
1624
1625 /* We won't need this stream state again */
1626 g_hash_table_remove(trimmer_it->stream_states,
1627 sstate->stream);
1628 }
1629 } else {
1630 /* We dont't need this stream state anymore */
1631 g_hash_table_remove(trimmer_it->stream_states, sstate->stream);
1632 }
1633
1634 break;
1635 default:
1636 break;
1637 }
1638
1639 end:
1640 /* We release the message's reference whatever the outcome */
1641 bt_message_put_ref(msg);
1642 return BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1643 }
1644
1645 /*
1646 * Handles an input message. This _could_ make the iterator's output
1647 * message queue grow; this could also consume the message without
1648 * pushing anything to this queue, only modifying the stream state.
1649 *
1650 * This function consumes the `msg` reference, _whatever the outcome_.
1651 *
1652 * This function sets `reached_end` if handling this message made the
1653 * iterator reach the end of the trimming range. Note that the output
1654 * message queue could contain messages even if this function sets
1655 * `reached_end`.
1656 */
1657 static inline
1658 bt_self_message_iterator_status handle_message(
1659 struct trimmer_iterator *trimmer_it, const bt_message *msg,
1660 bool *reached_end)
1661 {
1662 bt_self_message_iterator_status status;
1663 const bt_stream *stream = NULL;
1664 int64_t ns_from_origin = INT64_MIN;
1665 bool skip;
1666 int ret;
1667 struct trimmer_iterator_stream_state *sstate = NULL;
1668
1669 /* Find message's associated stream */
1670 switch (bt_message_get_type(msg)) {
1671 case BT_MESSAGE_TYPE_EVENT:
1672 stream = bt_event_borrow_stream_const(
1673 bt_message_event_borrow_event_const(msg));
1674 break;
1675 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
1676 stream = bt_packet_borrow_stream_const(
1677 bt_message_packet_beginning_borrow_packet_const(msg));
1678 break;
1679 case BT_MESSAGE_TYPE_PACKET_END:
1680 stream = bt_packet_borrow_stream_const(
1681 bt_message_packet_end_borrow_packet_const(msg));
1682 break;
1683 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
1684 stream = bt_message_discarded_events_borrow_stream_const(msg);
1685 break;
1686 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
1687 stream = bt_message_discarded_packets_borrow_stream_const(msg);
1688 break;
1689 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING:
1690 stream = bt_message_stream_activity_beginning_borrow_stream_const(msg);
1691 break;
1692 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END:
1693 stream = bt_message_stream_activity_end_borrow_stream_const(msg);
1694 break;
1695 case BT_MESSAGE_TYPE_STREAM_BEGINNING:
1696 stream = bt_message_stream_beginning_borrow_stream_const(msg);
1697 break;
1698 case BT_MESSAGE_TYPE_STREAM_END:
1699 stream = bt_message_stream_end_borrow_stream_const(msg);
1700 break;
1701 default:
1702 break;
1703 }
1704
1705 if (likely(stream)) {
1706 /* Find stream state */
1707 sstate = g_hash_table_lookup(trimmer_it->stream_states,
1708 stream);
1709 if (unlikely(!sstate)) {
1710 /* No stream state yet: create one now */
1711 const bt_stream_class *sc;
1712
1713 /*
1714 * Validate right now that the stream's class
1715 * has a registered default clock class so that
1716 * an existing stream state guarantees existing
1717 * default clock snapshots for its associated
1718 * messages.
1719 *
1720 * Also check that clock snapshots are always
1721 * known.
1722 */
1723 sc = bt_stream_borrow_class_const(stream);
1724 if (!bt_stream_class_borrow_default_clock_class_const(sc)) {
1725 BT_LOGE("Unsupported stream: stream class does "
1726 "not have a default clock class: "
1727 "stream-addr=%p, "
1728 "stream-id=%" PRIu64 ", "
1729 "stream-name=\"%s\"",
1730 stream, bt_stream_get_id(stream),
1731 bt_stream_get_name(stream));
1732 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1733 goto end;
1734 }
1735
1736 /*
1737 * Temporary: make sure packet beginning, packet
1738 * end, discarded events, and discarded packets
1739 * messages have default clock snapshots until
1740 * the support for not having them is
1741 * implemented.
1742 */
1743 if (!bt_stream_class_packets_have_beginning_default_clock_snapshot(
1744 sc)) {
1745 BT_LOGE("Unsupported stream: packets have "
1746 "no beginning clock snapshot: "
1747 "stream-addr=%p, "
1748 "stream-id=%" PRIu64 ", "
1749 "stream-name=\"%s\"",
1750 stream, bt_stream_get_id(stream),
1751 bt_stream_get_name(stream));
1752 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1753 goto end;
1754 }
1755
1756 if (!bt_stream_class_packets_have_end_default_clock_snapshot(
1757 sc)) {
1758 BT_LOGE("Unsupported stream: packets have "
1759 "no end clock snapshot: "
1760 "stream-addr=%p, "
1761 "stream-id=%" PRIu64 ", "
1762 "stream-name=\"%s\"",
1763 stream, bt_stream_get_id(stream),
1764 bt_stream_get_name(stream));
1765 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1766 goto end;
1767 }
1768
1769 if (bt_stream_class_supports_discarded_events(sc) &&
1770 !bt_stream_class_discarded_events_have_default_clock_snapshots(sc)) {
1771 BT_LOGE("Unsupported stream: discarded events "
1772 "have no clock snapshots: "
1773 "stream-addr=%p, "
1774 "stream-id=%" PRIu64 ", "
1775 "stream-name=\"%s\"",
1776 stream, bt_stream_get_id(stream),
1777 bt_stream_get_name(stream));
1778 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1779 goto end;
1780 }
1781
1782 if (bt_stream_class_supports_discarded_packets(sc) &&
1783 !bt_stream_class_discarded_packets_have_default_clock_snapshots(sc)) {
1784 BT_LOGE("Unsupported stream: discarded packets "
1785 "have no clock snapshots: "
1786 "stream-addr=%p, "
1787 "stream-id=%" PRIu64 ", "
1788 "stream-name=\"%s\"",
1789 stream, bt_stream_get_id(stream),
1790 bt_stream_get_name(stream));
1791 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1792 goto end;
1793 }
1794
1795 sstate = g_new0(struct trimmer_iterator_stream_state,
1796 1);
1797 if (!sstate) {
1798 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
1799 goto end;
1800 }
1801
1802 sstate->stream = stream;
1803 sstate->stream_act_end_ns_from_origin = INT64_MIN;
1804 g_hash_table_insert(trimmer_it->stream_states,
1805 (void *) stream, sstate);
1806 }
1807 }
1808
1809 /* Retrieve the message's time */
1810 ret = get_msg_ns_from_origin(msg, &ns_from_origin, &skip);
1811 if (unlikely(ret)) {
1812 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1813 goto end;
1814 }
1815
1816 if (likely(sstate)) {
1817 /* Message associated to a stream */
1818 status = handle_message_with_stream_state(trimmer_it, msg,
1819 sstate, ns_from_origin, reached_end);
1820
1821 /*
1822 * handle_message_with_stream_state() unconditionally
1823 * consumes `msg`.
1824 */
1825 msg = NULL;
1826 } else {
1827 /*
1828 * Message not associated to a stream (message iterator
1829 * inactivity).
1830 */
1831 if (unlikely(ns_from_origin > trimmer_it->end.ns_from_origin)) {
1832 BT_MESSAGE_PUT_REF_AND_RESET(msg);
1833 status = end_iterator_streams(trimmer_it);
1834 *reached_end = true;
1835 } else {
1836 push_message(trimmer_it, msg);
1837 status = BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1838 msg = NULL;
1839 }
1840 }
1841
1842 end:
1843 /* We release the message's reference whatever the outcome */
1844 bt_message_put_ref(msg);
1845 return status;
1846 }
1847
1848 static inline
1849 void fill_message_array_from_output_messages(
1850 struct trimmer_iterator *trimmer_it,
1851 bt_message_array_const msgs, uint64_t capacity, uint64_t *count)
1852 {
1853 *count = 0;
1854
1855 /*
1856 * Move auto-seek messages to the output array (which is this
1857 * iterator's base message array).
1858 */
1859 while (capacity > 0 && !g_queue_is_empty(trimmer_it->output_messages)) {
1860 msgs[*count] = pop_message(trimmer_it);
1861 capacity--;
1862 (*count)++;
1863 }
1864
1865 BT_ASSERT(*count > 0);
1866 }
1867
1868 static inline
1869 bt_self_message_iterator_status state_ending(
1870 struct trimmer_iterator *trimmer_it,
1871 bt_message_array_const msgs, uint64_t capacity,
1872 uint64_t *count)
1873 {
1874 bt_self_message_iterator_status status =
1875 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1876
1877 if (g_queue_is_empty(trimmer_it->output_messages)) {
1878 trimmer_it->state = TRIMMER_ITERATOR_STATE_ENDED;
1879 status = BT_SELF_MESSAGE_ITERATOR_STATUS_END;
1880 goto end;
1881 }
1882
1883 fill_message_array_from_output_messages(trimmer_it, msgs,
1884 capacity, count);
1885
1886 end:
1887 return status;
1888 }
1889
1890 static inline
1891 bt_self_message_iterator_status state_trim(struct trimmer_iterator *trimmer_it,
1892 bt_message_array_const msgs, uint64_t capacity,
1893 uint64_t *count)
1894 {
1895 bt_self_message_iterator_status status =
1896 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1897 bt_message_array_const my_msgs;
1898 uint64_t my_count;
1899 uint64_t i;
1900 bool reached_end = false;
1901
1902 while (g_queue_is_empty(trimmer_it->output_messages)) {
1903 status = (int) bt_self_component_port_input_message_iterator_next(
1904 trimmer_it->upstream_iter, &my_msgs, &my_count);
1905 if (unlikely(status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK)) {
1906 if (status == BT_SELF_MESSAGE_ITERATOR_STATUS_END) {
1907 status = end_iterator_streams(trimmer_it);
1908 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1909 goto end;
1910 }
1911
1912 trimmer_it->state =
1913 TRIMMER_ITERATOR_STATE_ENDING;
1914 status = state_ending(trimmer_it, msgs,
1915 capacity, count);
1916 }
1917
1918 goto end;
1919 }
1920
1921 BT_ASSERT(my_count > 0);
1922
1923 for (i = 0; i < my_count; i++) {
1924 status = handle_message(trimmer_it, my_msgs[i],
1925 &reached_end);
1926
1927 /*
1928 * handle_message() unconditionally consumes the
1929 * message reference.
1930 */
1931 my_msgs[i] = NULL;
1932
1933 if (unlikely(status !=
1934 BT_SELF_MESSAGE_ITERATOR_STATUS_OK)) {
1935 put_messages(my_msgs, my_count);
1936 goto end;
1937 }
1938
1939 if (unlikely(reached_end)) {
1940 /*
1941 * This message's time was passed the
1942 * trimming time range's end time: we
1943 * are done. Their might still be
1944 * messages in the output message queue,
1945 * so move to the "ending" state and
1946 * apply it immediately since
1947 * state_trim() is called within the
1948 * "next" method.
1949 */
1950 put_messages(my_msgs, my_count);
1951 trimmer_it->state =
1952 TRIMMER_ITERATOR_STATE_ENDING;
1953 status = state_ending(trimmer_it, msgs,
1954 capacity, count);
1955 goto end;
1956 }
1957 }
1958 }
1959
1960 /*
1961 * There's at least one message in the output message queue:
1962 * move the messages to the output message array.
1963 */
1964 BT_ASSERT(!g_queue_is_empty(trimmer_it->output_messages));
1965 fill_message_array_from_output_messages(trimmer_it, msgs,
1966 capacity, count);
1967
1968 end:
1969 return status;
1970 }
1971
1972 BT_HIDDEN
1973 bt_self_message_iterator_status trimmer_msg_iter_next(
1974 bt_self_message_iterator *self_msg_iter,
1975 bt_message_array_const msgs, uint64_t capacity,
1976 uint64_t *count)
1977 {
1978 struct trimmer_iterator *trimmer_it =
1979 bt_self_message_iterator_get_data(self_msg_iter);
1980 bt_self_message_iterator_status status =
1981 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1982
1983 BT_ASSERT(trimmer_it);
1984
1985 if (likely(trimmer_it->state == TRIMMER_ITERATOR_STATE_TRIM)) {
1986 status = state_trim(trimmer_it, msgs, capacity, count);
1987 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1988 goto end;
1989 }
1990 } else {
1991 switch (trimmer_it->state) {
1992 case TRIMMER_ITERATOR_STATE_SET_BOUNDS_NS_FROM_ORIGIN:
1993 status = state_set_trimmer_iterator_bounds(trimmer_it);
1994 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1995 goto end;
1996 }
1997
1998 status = state_seek_initially(trimmer_it);
1999 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
2000 goto end;
2001 }
2002
2003 status = state_trim(trimmer_it, msgs, capacity, count);
2004 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
2005 goto end;
2006 }
2007
2008 break;
2009 case TRIMMER_ITERATOR_STATE_SEEK_INITIALLY:
2010 status = state_seek_initially(trimmer_it);
2011 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
2012 goto end;
2013 }
2014
2015 status = state_trim(trimmer_it, msgs, capacity, count);
2016 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
2017 goto end;
2018 }
2019
2020 break;
2021 case TRIMMER_ITERATOR_STATE_ENDING:
2022 status = state_ending(trimmer_it, msgs, capacity,
2023 count);
2024 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
2025 goto end;
2026 }
2027
2028 break;
2029 case TRIMMER_ITERATOR_STATE_ENDED:
2030 status = BT_SELF_MESSAGE_ITERATOR_STATUS_END;
2031 break;
2032 default:
2033 abort();
2034 }
2035 }
2036
2037 end:
2038 return status;
2039 }
2040
2041 BT_HIDDEN
2042 void trimmer_msg_iter_finalize(bt_self_message_iterator *self_msg_iter)
2043 {
2044 struct trimmer_iterator *trimmer_it =
2045 bt_self_message_iterator_get_data(self_msg_iter);
2046
2047 BT_ASSERT(trimmer_it);
2048 destroy_trimmer_iterator(trimmer_it);
2049 }
This page took 0.087749 seconds and 4 git commands to generate.