218f7788ee8159f10e050f0bd4459208573ec35f
[babeltrace.git] / src / plugins / utils / trimmer / trimmer.c
1 /*
2 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 * Copyright 2019 Philippe Proulx <pproulx@efficios.com>
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a copy
6 * of this software and associated documentation files (the "Software"), to deal
7 * in the Software without restriction, including without limitation the rights
8 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 * copies of the Software, and to permit persons to whom the Software is
10 * furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21 * SOFTWARE.
22 */
23
24 #define BT_COMP_LOG_SELF_COMP (trimmer_comp->self_comp)
25 #define BT_LOG_OUTPUT_LEVEL (trimmer_comp->log_level)
26 #define BT_LOG_TAG "PLUGIN/FLT.UTILS.TRIMMER"
27 #include "plugins/comp-logging.h"
28
29 #include "compat/utc.h"
30 #include "compat/time.h"
31 #include <babeltrace2/babeltrace.h>
32 #include "common/common.h"
33 #include "common/assert.h"
34 #include <stdint.h>
35 #include <inttypes.h>
36 #include <glib.h>
37
38 #include "trimmer.h"
39
40 #define NS_PER_S INT64_C(1000000000)
41
42 static const char * const in_port_name = "in";
43
44 struct trimmer_time {
45 unsigned int hour, minute, second, ns;
46 };
47
48 struct trimmer_bound {
49 /*
50 * Nanoseconds from origin, valid if `is_set` is set and
51 * `is_infinite` is false.
52 */
53 int64_t ns_from_origin;
54
55 /* True if this bound's full time (`ns_from_origin`) is set */
56 bool is_set;
57
58 /*
59 * True if this bound represents the infinity (negative or
60 * positive depending on which bound it is). If this is true,
61 * then we don't care about `ns_from_origin` above.
62 */
63 bool is_infinite;
64
65 /*
66 * This bound's time without the date; this time is used to set
67 * `ns_from_origin` once we know the date.
68 */
69 struct trimmer_time time;
70 };
71
72 struct trimmer_comp {
73 struct trimmer_bound begin, end;
74 bool is_gmt;
75 bt_logging_level log_level;
76 bt_self_component *self_comp;
77 };
78
79 enum trimmer_iterator_state {
80 /*
81 * Find the first message's date and set the bounds's times
82 * accordingly.
83 */
84 TRIMMER_ITERATOR_STATE_SET_BOUNDS_NS_FROM_ORIGIN,
85
86 /*
87 * Initially seek to the trimming range's beginning time.
88 */
89 TRIMMER_ITERATOR_STATE_SEEK_INITIALLY,
90
91 /*
92 * Fill the output message queue for as long as received input
93 * messages are within the trimming time range.
94 */
95 TRIMMER_ITERATOR_STATE_TRIM,
96
97 /* Flush the remaining messages in the output message queue */
98 TRIMMER_ITERATOR_STATE_ENDING,
99
100 /* Trimming operation and message iterator is ended */
101 TRIMMER_ITERATOR_STATE_ENDED,
102 };
103
104 struct trimmer_iterator {
105 /* Weak */
106 struct trimmer_comp *trimmer_comp;
107
108 /* Weak */
109 bt_self_message_iterator *self_msg_iter;
110
111 enum trimmer_iterator_state state;
112
113 /* Owned by this */
114 bt_self_component_port_input_message_iterator *upstream_iter;
115 struct trimmer_bound begin, end;
116
117 /*
118 * Queue of `const bt_message *` (owned by the queue).
119 *
120 * This is where the trimming operation pushes the messages to
121 * output by this message iterator.
122 */
123 GQueue *output_messages;
124
125 /*
126 * Hash table of `bt_stream *` (weak) to
127 * `struct trimmer_iterator_stream_state *` (owned by the HT).
128 */
129 GHashTable *stream_states;
130 };
131
132 struct trimmer_iterator_stream_state {
133 /*
134 * True if the last pushed message for this stream was a stream
135 * activity end message.
136 */
137 bool last_msg_is_stream_activity_end;
138
139 /*
140 * Time to use for a generated stream end activity message when
141 * ending the stream.
142 */
143 int64_t stream_act_end_ns_from_origin;
144
145 /* Weak */
146 const bt_stream *stream;
147
148 /* Owned by this (`NULL` initially and between packets) */
149 const bt_packet *cur_packet;
150 };
151
152 static
153 void destroy_trimmer_comp(struct trimmer_comp *trimmer_comp)
154 {
155 BT_ASSERT(trimmer_comp);
156 g_free(trimmer_comp);
157 }
158
159 static
160 struct trimmer_comp *create_trimmer_comp(void)
161 {
162 return g_new0(struct trimmer_comp, 1);
163 }
164
165 BT_HIDDEN
166 void trimmer_finalize(bt_self_component_filter *self_comp)
167 {
168 struct trimmer_comp *trimmer_comp =
169 bt_self_component_get_data(
170 bt_self_component_filter_as_self_component(self_comp));
171
172 if (trimmer_comp) {
173 destroy_trimmer_comp(trimmer_comp);
174 }
175 }
176
177 /*
178 * Sets the time (in ns from origin) of a trimmer bound from date and
179 * time components.
180 *
181 * Returns a negative value if anything goes wrong.
182 */
183 static
184 int set_bound_ns_from_origin(struct trimmer_bound *bound,
185 unsigned int year, unsigned int month, unsigned int day,
186 unsigned int hour, unsigned int minute, unsigned int second,
187 unsigned int ns, bool is_gmt)
188 {
189 int ret = 0;
190 time_t result;
191 struct tm tm = {
192 .tm_sec = second,
193 .tm_min = minute,
194 .tm_hour = hour,
195 .tm_mday = day,
196 .tm_mon = month - 1,
197 .tm_year = year - 1900,
198 .tm_isdst = -1,
199 };
200
201 if (is_gmt) {
202 result = bt_timegm(&tm);
203 } else {
204 result = mktime(&tm);
205 }
206
207 if (result < 0) {
208 ret = -1;
209 goto end;
210 }
211
212 BT_ASSERT(bound);
213 bound->ns_from_origin = (int64_t) result;
214 bound->ns_from_origin *= NS_PER_S;
215 bound->ns_from_origin += ns;
216 bound->is_set = true;
217
218 end:
219 return ret;
220 }
221
222 /*
223 * Parses a timestamp, figuring out its format.
224 *
225 * Returns a negative value if anything goes wrong.
226 *
227 * Expected formats:
228 *
229 * YYYY-MM-DD hh:mm[:ss[.ns]]
230 * [hh:mm:]ss[.ns]
231 * [-]s[.ns]
232 *
233 * TODO: Check overflows.
234 */
235 static
236 int set_bound_from_str(struct trimmer_comp *trimmer_comp,
237 const char *str, struct trimmer_bound *bound, bool is_gmt)
238 {
239 int ret = 0;
240 int s_ret;
241 unsigned int year, month, day, hour, minute, second, ns;
242 char dummy;
243
244 /* Try `YYYY-MM-DD hh:mm:ss.ns` format */
245 s_ret = sscanf(str, "%u-%u-%u %u:%u:%u.%u%c", &year, &month, &day,
246 &hour, &minute, &second, &ns, &dummy);
247 if (s_ret == 7) {
248 ret = set_bound_ns_from_origin(bound, year, month, day,
249 hour, minute, second, ns, is_gmt);
250 goto end;
251 }
252
253 /* Try `YYYY-MM-DD hh:mm:ss` format */
254 s_ret = sscanf(str, "%u-%u-%u %u:%u:%u%c", &year, &month, &day,
255 &hour, &minute, &second, &dummy);
256 if (s_ret == 6) {
257 ret = set_bound_ns_from_origin(bound, year, month, day,
258 hour, minute, second, 0, is_gmt);
259 goto end;
260 }
261
262 /* Try `YYYY-MM-DD hh:mm` format */
263 s_ret = sscanf(str, "%u-%u-%u %u:%u%c", &year, &month, &day,
264 &hour, &minute, &dummy);
265 if (s_ret == 5) {
266 ret = set_bound_ns_from_origin(bound, year, month, day,
267 hour, minute, 0, 0, is_gmt);
268 goto end;
269 }
270
271 /* Try `YYYY-MM-DD` format */
272 s_ret = sscanf(str, "%u-%u-%u%c", &year, &month, &day, &dummy);
273 if (s_ret == 3) {
274 ret = set_bound_ns_from_origin(bound, year, month, day,
275 0, 0, 0, 0, is_gmt);
276 goto end;
277 }
278
279 /* Try `hh:mm:ss.ns` format */
280 s_ret = sscanf(str, "%u:%u:%u.%u%c", &hour, &minute, &second, &ns,
281 &dummy);
282 if (s_ret == 4) {
283 bound->time.hour = hour;
284 bound->time.minute = minute;
285 bound->time.second = second;
286 bound->time.ns = ns;
287 goto end;
288 }
289
290 /* Try `hh:mm:ss` format */
291 s_ret = sscanf(str, "%u:%u:%u%c", &hour, &minute, &second, &dummy);
292 if (s_ret == 3) {
293 bound->time.hour = hour;
294 bound->time.minute = minute;
295 bound->time.second = second;
296 bound->time.ns = 0;
297 goto end;
298 }
299
300 /* Try `-s.ns` format */
301 s_ret = sscanf(str, "-%u.%u%c", &second, &ns, &dummy);
302 if (s_ret == 2) {
303 bound->ns_from_origin = -((int64_t) second) * NS_PER_S;
304 bound->ns_from_origin -= (int64_t) ns;
305 bound->is_set = true;
306 goto end;
307 }
308
309 /* Try `s.ns` format */
310 s_ret = sscanf(str, "%u.%u%c", &second, &ns, &dummy);
311 if (s_ret == 2) {
312 bound->ns_from_origin = ((int64_t) second) * NS_PER_S;
313 bound->ns_from_origin += (int64_t) ns;
314 bound->is_set = true;
315 goto end;
316 }
317
318 /* Try `-s` format */
319 s_ret = sscanf(str, "-%u%c", &second, &dummy);
320 if (s_ret == 1) {
321 bound->ns_from_origin = -((int64_t) second) * NS_PER_S;
322 bound->is_set = true;
323 goto end;
324 }
325
326 /* Try `s` format */
327 s_ret = sscanf(str, "%u%c", &second, &dummy);
328 if (s_ret == 1) {
329 bound->ns_from_origin = (int64_t) second * NS_PER_S;
330 bound->is_set = true;
331 goto end;
332 }
333
334 BT_COMP_LOGE("Invalid date/time format: param=\"%s\"", str);
335 ret = -1;
336
337 end:
338 return ret;
339 }
340
341 /*
342 * Sets a trimmer bound's properties from a parameter string/integer
343 * value.
344 *
345 * Returns a negative value if anything goes wrong.
346 */
347 static
348 int set_bound_from_param(struct trimmer_comp *trimmer_comp,
349 const char *param_name, const bt_value *param,
350 struct trimmer_bound *bound, bool is_gmt)
351 {
352 int ret;
353 const char *arg;
354 char tmp_arg[64];
355
356 if (bt_value_is_signed_integer(param)) {
357 int64_t value = bt_value_signed_integer_get(param);
358
359 /*
360 * Just convert it to a temporary string to handle
361 * everything the same way.
362 */
363 sprintf(tmp_arg, "%" PRId64, value);
364 arg = tmp_arg;
365 } else if (bt_value_is_string(param)) {
366 arg = bt_value_string_get(param);
367 } else {
368 BT_COMP_LOGE("`%s` parameter must be an integer or a string value.",
369 param_name);
370 ret = -1;
371 goto end;
372 }
373
374 ret = set_bound_from_str(trimmer_comp, arg, bound, is_gmt);
375
376 end:
377 return ret;
378 }
379
380 static
381 int validate_trimmer_bounds(struct trimmer_comp *trimmer_comp,
382 struct trimmer_bound *begin, struct trimmer_bound *end)
383 {
384 int ret = 0;
385
386 BT_ASSERT(begin->is_set);
387 BT_ASSERT(end->is_set);
388
389 if (!begin->is_infinite && !end->is_infinite &&
390 begin->ns_from_origin > end->ns_from_origin) {
391 BT_COMP_LOGE("Trimming time range's beginning time is greater than end time: "
392 "begin-ns-from-origin=%" PRId64 ", "
393 "end-ns-from-origin=%" PRId64,
394 begin->ns_from_origin,
395 end->ns_from_origin);
396 ret = -1;
397 goto end;
398 }
399
400 if (!begin->is_infinite && begin->ns_from_origin == INT64_MIN) {
401 BT_COMP_LOGE("Invalid trimming time range's beginning time: "
402 "ns-from-origin=%" PRId64,
403 begin->ns_from_origin);
404 ret = -1;
405 goto end;
406 }
407
408 if (!end->is_infinite && end->ns_from_origin == INT64_MIN) {
409 BT_COMP_LOGE("Invalid trimming time range's end time: "
410 "ns-from-origin=%" PRId64,
411 end->ns_from_origin);
412 ret = -1;
413 goto end;
414 }
415
416 end:
417 return ret;
418 }
419
420 static
421 int init_trimmer_comp_from_params(struct trimmer_comp *trimmer_comp,
422 const bt_value *params)
423 {
424 const bt_value *value;
425 int ret = 0;
426
427 BT_ASSERT(params);
428 value = bt_value_map_borrow_entry_value_const(params, "gmt");
429 if (value) {
430 trimmer_comp->is_gmt = (bool) bt_value_bool_get(value);
431 }
432
433 value = bt_value_map_borrow_entry_value_const(params, "begin");
434 if (value) {
435 if (set_bound_from_param(trimmer_comp, "begin", value,
436 &trimmer_comp->begin, trimmer_comp->is_gmt)) {
437 /* set_bound_from_param() logs errors */
438 ret = BT_SELF_COMPONENT_STATUS_ERROR;
439 goto end;
440 }
441 } else {
442 trimmer_comp->begin.is_infinite = true;
443 trimmer_comp->begin.is_set = true;
444 }
445
446 value = bt_value_map_borrow_entry_value_const(params, "end");
447 if (value) {
448 if (set_bound_from_param(trimmer_comp, "end", value,
449 &trimmer_comp->end, trimmer_comp->is_gmt)) {
450 /* set_bound_from_param() logs errors */
451 ret = BT_SELF_COMPONENT_STATUS_ERROR;
452 goto end;
453 }
454 } else {
455 trimmer_comp->end.is_infinite = true;
456 trimmer_comp->end.is_set = true;
457 }
458
459 end:
460 if (trimmer_comp->begin.is_set && trimmer_comp->end.is_set) {
461 /* validate_trimmer_bounds() logs errors */
462 ret = validate_trimmer_bounds(trimmer_comp,
463 &trimmer_comp->begin, &trimmer_comp->end);
464 }
465
466 return ret;
467 }
468
469 bt_self_component_status trimmer_init(bt_self_component_filter *self_comp_flt,
470 const bt_value *params, void *init_data)
471 {
472 int ret;
473 bt_self_component_status status;
474 struct trimmer_comp *trimmer_comp = create_trimmer_comp();
475 bt_self_component *self_comp =
476 bt_self_component_filter_as_self_component(self_comp_flt);
477 if (!trimmer_comp) {
478 status = BT_SELF_COMPONENT_STATUS_NOMEM;
479 goto error;
480 }
481
482 trimmer_comp->log_level = bt_component_get_logging_level(
483 bt_self_component_as_component(self_comp));
484 trimmer_comp->self_comp = self_comp;
485 status = bt_self_component_filter_add_input_port(
486 self_comp_flt, in_port_name, NULL, NULL);
487 if (status != BT_SELF_COMPONENT_STATUS_OK) {
488 goto error;
489 }
490
491 status = bt_self_component_filter_add_output_port(
492 self_comp_flt, "out", NULL, NULL);
493 if (status != BT_SELF_COMPONENT_STATUS_OK) {
494 goto error;
495 }
496
497 ret = init_trimmer_comp_from_params(trimmer_comp, params);
498 if (ret) {
499 status = BT_SELF_COMPONENT_STATUS_ERROR;
500 goto error;
501 }
502
503 bt_self_component_set_data(self_comp, trimmer_comp);
504 goto end;
505
506 error:
507 if (status == BT_SELF_COMPONENT_STATUS_OK) {
508 status = BT_SELF_COMPONENT_STATUS_ERROR;
509 }
510
511 if (trimmer_comp) {
512 destroy_trimmer_comp(trimmer_comp);
513 }
514
515 end:
516 return status;
517 }
518
519 static
520 void destroy_trimmer_iterator(struct trimmer_iterator *trimmer_it)
521 {
522 BT_ASSERT(trimmer_it);
523 bt_self_component_port_input_message_iterator_put_ref(
524 trimmer_it->upstream_iter);
525
526 if (trimmer_it->output_messages) {
527 g_queue_free(trimmer_it->output_messages);
528 }
529
530 if (trimmer_it->stream_states) {
531 g_hash_table_destroy(trimmer_it->stream_states);
532 }
533
534 g_free(trimmer_it);
535 }
536
537 static
538 void destroy_trimmer_iterator_stream_state(
539 struct trimmer_iterator_stream_state *sstate)
540 {
541 BT_ASSERT(sstate);
542 BT_PACKET_PUT_REF_AND_RESET(sstate->cur_packet);
543 g_free(sstate);
544 }
545
546 BT_HIDDEN
547 bt_self_message_iterator_status trimmer_msg_iter_init(
548 bt_self_message_iterator *self_msg_iter,
549 bt_self_component_filter *self_comp,
550 bt_self_component_port_output *port)
551 {
552 bt_self_message_iterator_status status =
553 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
554 struct trimmer_iterator *trimmer_it;
555
556 trimmer_it = g_new0(struct trimmer_iterator, 1);
557 if (!trimmer_it) {
558 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
559 goto end;
560 }
561
562 trimmer_it->trimmer_comp = bt_self_component_get_data(
563 bt_self_component_filter_as_self_component(self_comp));
564 BT_ASSERT(trimmer_it->trimmer_comp);
565
566 if (trimmer_it->trimmer_comp->begin.is_set &&
567 trimmer_it->trimmer_comp->end.is_set) {
568 /*
569 * Both trimming time range's bounds are set, so skip
570 * the
571 * `TRIMMER_ITERATOR_STATE_SET_BOUNDS_NS_FROM_ORIGIN`
572 * phase.
573 */
574 trimmer_it->state = TRIMMER_ITERATOR_STATE_SEEK_INITIALLY;
575 }
576
577 trimmer_it->begin = trimmer_it->trimmer_comp->begin;
578 trimmer_it->end = trimmer_it->trimmer_comp->end;
579 trimmer_it->upstream_iter =
580 bt_self_component_port_input_message_iterator_create(
581 bt_self_component_filter_borrow_input_port_by_name(
582 self_comp, in_port_name));
583 if (!trimmer_it->upstream_iter) {
584 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
585 goto end;
586 }
587
588 trimmer_it->output_messages = g_queue_new();
589 if (!trimmer_it->output_messages) {
590 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
591 goto end;
592 }
593
594 trimmer_it->stream_states = g_hash_table_new_full(g_direct_hash,
595 g_direct_equal, NULL,
596 (GDestroyNotify) destroy_trimmer_iterator_stream_state);
597 if (!trimmer_it->stream_states) {
598 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
599 goto end;
600 }
601
602 trimmer_it->self_msg_iter = self_msg_iter;
603 bt_self_message_iterator_set_data(self_msg_iter, trimmer_it);
604
605 end:
606 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK && trimmer_it) {
607 destroy_trimmer_iterator(trimmer_it);
608 }
609
610 return status;
611 }
612
613 static inline
614 int get_msg_ns_from_origin(const bt_message *msg, int64_t *ns_from_origin,
615 bool *skip)
616 {
617 const bt_clock_class *clock_class = NULL;
618 const bt_clock_snapshot *clock_snapshot = NULL;
619 bt_message_stream_activity_clock_snapshot_state sa_cs_state;
620 int ret = 0;
621
622 BT_ASSERT(msg);
623 BT_ASSERT(ns_from_origin);
624 BT_ASSERT(skip);
625
626 switch (bt_message_get_type(msg)) {
627 case BT_MESSAGE_TYPE_EVENT:
628 clock_class =
629 bt_message_event_borrow_stream_class_default_clock_class_const(
630 msg);
631 if (G_UNLIKELY(!clock_class)) {
632 goto error;
633 }
634
635 clock_snapshot = bt_message_event_borrow_default_clock_snapshot_const(
636 msg);
637 break;
638 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
639 clock_class =
640 bt_message_packet_beginning_borrow_stream_class_default_clock_class_const(
641 msg);
642 if (G_UNLIKELY(!clock_class)) {
643 goto error;
644 }
645
646 clock_snapshot = bt_message_packet_beginning_borrow_default_clock_snapshot_const(
647 msg);
648 break;
649 case BT_MESSAGE_TYPE_PACKET_END:
650 clock_class =
651 bt_message_packet_end_borrow_stream_class_default_clock_class_const(
652 msg);
653 if (G_UNLIKELY(!clock_class)) {
654 goto error;
655 }
656
657 clock_snapshot = bt_message_packet_end_borrow_default_clock_snapshot_const(
658 msg);
659 break;
660 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
661 clock_class =
662 bt_message_discarded_events_borrow_stream_class_default_clock_class_const(
663 msg);
664 if (G_UNLIKELY(!clock_class)) {
665 goto error;
666 }
667
668 clock_snapshot = bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(
669 msg);
670 break;
671 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
672 clock_class =
673 bt_message_discarded_packets_borrow_stream_class_default_clock_class_const(
674 msg);
675 if (G_UNLIKELY(!clock_class)) {
676 goto error;
677 }
678
679 clock_snapshot = bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(
680 msg);
681 break;
682 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING:
683 clock_class =
684 bt_message_stream_activity_beginning_borrow_stream_class_default_clock_class_const(
685 msg);
686 if (G_UNLIKELY(!clock_class)) {
687 goto error;
688 }
689
690 sa_cs_state = bt_message_stream_activity_beginning_borrow_default_clock_snapshot_const(
691 msg, &clock_snapshot);
692 if (sa_cs_state == BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_UNKNOWN ||
693 sa_cs_state == BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_INFINITE) {
694 /* Lowest possible time to always include them */
695 *ns_from_origin = INT64_MIN;
696 goto no_clock_snapshot;
697 }
698
699 break;
700 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END:
701 clock_class =
702 bt_message_stream_activity_end_borrow_stream_class_default_clock_class_const(
703 msg);
704 if (G_UNLIKELY(!clock_class)) {
705 goto error;
706 }
707
708 sa_cs_state = bt_message_stream_activity_end_borrow_default_clock_snapshot_const(
709 msg, &clock_snapshot);
710 if (sa_cs_state == BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_UNKNOWN) {
711 /* Lowest time to always include it */
712 *ns_from_origin = INT64_MIN;
713 goto no_clock_snapshot;
714 } else if (sa_cs_state == BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_INFINITE) {
715 /* Greatest time to always exclude it */
716 *ns_from_origin = INT64_MAX;
717 goto no_clock_snapshot;
718 }
719
720 break;
721 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
722 clock_snapshot =
723 bt_message_message_iterator_inactivity_borrow_default_clock_snapshot_const(
724 msg);
725 break;
726 default:
727 goto no_clock_snapshot;
728 }
729
730 ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot,
731 ns_from_origin);
732 if (G_UNLIKELY(ret)) {
733 goto error;
734 }
735
736 goto end;
737
738 no_clock_snapshot:
739 *skip = true;
740 goto end;
741
742 error:
743 ret = -1;
744
745 end:
746 return ret;
747 }
748
749 static inline
750 void put_messages(bt_message_array_const msgs, uint64_t count)
751 {
752 uint64_t i;
753
754 for (i = 0; i < count; i++) {
755 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
756 }
757 }
758
759 static inline
760 int set_trimmer_iterator_bound(struct trimmer_iterator *trimmer_it,
761 struct trimmer_bound *bound, int64_t ns_from_origin,
762 bool is_gmt)
763 {
764 struct trimmer_comp *trimmer_comp = trimmer_it->trimmer_comp;
765 struct tm tm;
766 time_t time_seconds = (time_t) (ns_from_origin / NS_PER_S);
767 int ret = 0;
768
769 BT_ASSERT(!bound->is_set);
770 errno = 0;
771
772 /* We only need to extract the date from this time */
773 if (is_gmt) {
774 bt_gmtime_r(&time_seconds, &tm);
775 } else {
776 bt_localtime_r(&time_seconds, &tm);
777 }
778
779 if (errno) {
780 BT_COMP_LOGE_ERRNO("Cannot convert timestamp to date and time",
781 "ts=%" PRId64, (int64_t) time_seconds);
782 ret = -1;
783 goto end;
784 }
785
786 ret = set_bound_ns_from_origin(bound, tm.tm_year + 1900, tm.tm_mon + 1,
787 tm.tm_mday, bound->time.hour, bound->time.minute,
788 bound->time.second, bound->time.ns, is_gmt);
789
790 end:
791 return ret;
792 }
793
794 static
795 bt_self_message_iterator_status state_set_trimmer_iterator_bounds(
796 struct trimmer_iterator *trimmer_it)
797 {
798 bt_message_iterator_status upstream_iter_status =
799 BT_MESSAGE_ITERATOR_STATUS_OK;
800 struct trimmer_comp *trimmer_comp = trimmer_it->trimmer_comp;
801 bt_message_array_const msgs;
802 uint64_t count = 0;
803 int64_t ns_from_origin = INT64_MIN;
804 uint64_t i;
805 int ret;
806
807 BT_ASSERT(!trimmer_it->begin.is_set ||
808 !trimmer_it->end.is_set);
809
810 while (true) {
811 upstream_iter_status =
812 bt_self_component_port_input_message_iterator_next(
813 trimmer_it->upstream_iter, &msgs, &count);
814 if (upstream_iter_status != BT_MESSAGE_ITERATOR_STATUS_OK) {
815 goto end;
816 }
817
818 for (i = 0; i < count; i++) {
819 const bt_message *msg = msgs[i];
820 bool skip = false;
821 int ret;
822
823 ret = get_msg_ns_from_origin(msg, &ns_from_origin,
824 &skip);
825 if (ret) {
826 goto error;
827 }
828
829 if (skip) {
830 continue;
831 }
832
833 BT_ASSERT(ns_from_origin != INT64_MIN &&
834 ns_from_origin != INT64_MAX);
835 put_messages(msgs, count);
836 goto found;
837 }
838
839 put_messages(msgs, count);
840 }
841
842 found:
843 if (!trimmer_it->begin.is_set) {
844 BT_ASSERT(!trimmer_it->begin.is_infinite);
845 ret = set_trimmer_iterator_bound(trimmer_it, &trimmer_it->begin,
846 ns_from_origin, trimmer_comp->is_gmt);
847 if (ret) {
848 goto error;
849 }
850 }
851
852 if (!trimmer_it->end.is_set) {
853 BT_ASSERT(!trimmer_it->end.is_infinite);
854 ret = set_trimmer_iterator_bound(trimmer_it, &trimmer_it->end,
855 ns_from_origin, trimmer_comp->is_gmt);
856 if (ret) {
857 goto error;
858 }
859 }
860
861 ret = validate_trimmer_bounds(trimmer_it->trimmer_comp,
862 &trimmer_it->begin, &trimmer_it->end);
863 if (ret) {
864 goto error;
865 }
866
867 goto end;
868
869 error:
870 put_messages(msgs, count);
871 upstream_iter_status = BT_MESSAGE_ITERATOR_STATUS_ERROR;
872
873 end:
874 return (int) upstream_iter_status;
875 }
876
877 static
878 bt_self_message_iterator_status state_seek_initially(
879 struct trimmer_iterator *trimmer_it)
880 {
881 struct trimmer_comp *trimmer_comp = trimmer_it->trimmer_comp;
882 bt_self_message_iterator_status status =
883 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
884
885 BT_ASSERT(trimmer_it->begin.is_set);
886
887 if (trimmer_it->begin.is_infinite) {
888 if (!bt_self_component_port_input_message_iterator_can_seek_beginning(
889 trimmer_it->upstream_iter)) {
890 BT_COMP_LOGE_STR("Cannot make upstream message iterator initially seek its beginning.");
891 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
892 goto end;
893 }
894
895 status = (int) bt_self_component_port_input_message_iterator_seek_beginning(
896 trimmer_it->upstream_iter);
897 } else {
898 if (!bt_self_component_port_input_message_iterator_can_seek_ns_from_origin(
899 trimmer_it->upstream_iter,
900 trimmer_it->begin.ns_from_origin)) {
901 BT_COMP_LOGE("Cannot make upstream message iterator initially seek: "
902 "seek-ns-from-origin=%" PRId64,
903 trimmer_it->begin.ns_from_origin);
904 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
905 goto end;
906 }
907
908 status = (int) bt_self_component_port_input_message_iterator_seek_ns_from_origin(
909 trimmer_it->upstream_iter, trimmer_it->begin.ns_from_origin);
910 }
911
912 if (status == BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
913 trimmer_it->state = TRIMMER_ITERATOR_STATE_TRIM;
914 }
915
916 end:
917 return status;
918 }
919
920 static inline
921 void push_message(struct trimmer_iterator *trimmer_it, const bt_message *msg)
922 {
923 g_queue_push_head(trimmer_it->output_messages, (void *) msg);
924 }
925
926 static inline
927 const bt_message *pop_message(struct trimmer_iterator *trimmer_it)
928 {
929 return g_queue_pop_tail(trimmer_it->output_messages);
930 }
931
932 static inline
933 int clock_raw_value_from_ns_from_origin(const bt_clock_class *clock_class,
934 int64_t ns_from_origin, uint64_t *raw_value)
935 {
936
937 int64_t cc_offset_s;
938 uint64_t cc_offset_cycles;
939 uint64_t cc_freq;
940
941 bt_clock_class_get_offset(clock_class, &cc_offset_s, &cc_offset_cycles);
942 cc_freq = bt_clock_class_get_frequency(clock_class);
943 return bt_common_clock_value_from_ns_from_origin(cc_offset_s,
944 cc_offset_cycles, cc_freq, ns_from_origin, raw_value);
945 }
946
947 static inline
948 bt_self_message_iterator_status end_stream(struct trimmer_iterator *trimmer_it,
949 struct trimmer_iterator_stream_state *sstate)
950 {
951 bt_self_message_iterator_status status =
952 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
953 uint64_t raw_value;
954 const bt_clock_class *clock_class;
955 int ret;
956 bt_message *msg = NULL;
957
958 BT_ASSERT(!trimmer_it->end.is_infinite);
959
960 if (!sstate->stream) {
961 goto end;
962 }
963
964 if (sstate->cur_packet) {
965 /*
966 * The last message could not have been a stream
967 * activity end message if we have a current packet.
968 */
969 BT_ASSERT(!sstate->last_msg_is_stream_activity_end);
970
971 /*
972 * Create and push a packet end message, making its time
973 * the trimming range's end time.
974 */
975 clock_class = bt_stream_class_borrow_default_clock_class_const(
976 bt_stream_borrow_class_const(sstate->stream));
977 BT_ASSERT(clock_class);
978 ret = clock_raw_value_from_ns_from_origin(clock_class,
979 trimmer_it->end.ns_from_origin, &raw_value);
980 if (ret) {
981 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
982 goto end;
983 }
984
985 msg = bt_message_packet_end_create_with_default_clock_snapshot(
986 trimmer_it->self_msg_iter, sstate->cur_packet,
987 raw_value);
988 if (!msg) {
989 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
990 goto end;
991 }
992
993 push_message(trimmer_it, msg);
994 msg = NULL;
995 BT_PACKET_PUT_REF_AND_RESET(sstate->cur_packet);
996
997 /*
998 * Because we generated a packet end message, set the
999 * stream activity end message's time to use to the
1000 * trimming range's end time (this packet end message's
1001 * time).
1002 */
1003 sstate->stream_act_end_ns_from_origin =
1004 trimmer_it->end.ns_from_origin;
1005 }
1006
1007 if (!sstate->last_msg_is_stream_activity_end) {
1008 /* Create and push a stream activity end message */
1009 msg = bt_message_stream_activity_end_create(
1010 trimmer_it->self_msg_iter, sstate->stream);
1011 if (!msg) {
1012 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
1013 goto end;
1014 }
1015
1016 clock_class = bt_stream_class_borrow_default_clock_class_const(
1017 bt_stream_borrow_class_const(sstate->stream));
1018 BT_ASSERT(clock_class);
1019
1020 if (sstate->stream_act_end_ns_from_origin == INT64_MIN) {
1021 /*
1022 * We received at least what is necessary to
1023 * have a stream state (stream beginning and
1024 * stream activity beginning messages), but
1025 * nothing else: use the trimmer range's end
1026 * time.
1027 */
1028 sstate->stream_act_end_ns_from_origin =
1029 trimmer_it->end.ns_from_origin;
1030 }
1031
1032 ret = clock_raw_value_from_ns_from_origin(clock_class,
1033 sstate->stream_act_end_ns_from_origin, &raw_value);
1034 if (ret) {
1035 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1036 goto end;
1037 }
1038
1039 bt_message_stream_activity_end_set_default_clock_snapshot(
1040 msg, raw_value);
1041 push_message(trimmer_it, msg);
1042 msg = NULL;
1043 }
1044
1045 /* Create and push a stream end message */
1046 msg = bt_message_stream_end_create(trimmer_it->self_msg_iter,
1047 sstate->stream);
1048 if (!msg) {
1049 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
1050 goto end;
1051 }
1052
1053 push_message(trimmer_it, msg);
1054 msg = NULL;
1055
1056 /*
1057 * Just to make sure that we don't use this stream state again
1058 * in the future without an obvious error.
1059 */
1060 sstate->stream = NULL;
1061
1062 end:
1063 bt_message_put_ref(msg);
1064 return status;
1065 }
1066
1067 static inline
1068 bt_self_message_iterator_status end_iterator_streams(
1069 struct trimmer_iterator *trimmer_it)
1070 {
1071 bt_self_message_iterator_status status =
1072 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1073 GHashTableIter iter;
1074 gpointer key, sstate;
1075
1076 if (trimmer_it->end.is_infinite) {
1077 /*
1078 * An infinite trimming range's end time guarantees that
1079 * we received (and pushed) all the appropriate end
1080 * messages.
1081 */
1082 goto remove_all;
1083 }
1084
1085 /*
1086 * End each stream and then remove them from the hash table of
1087 * stream states to release unneeded references.
1088 */
1089 g_hash_table_iter_init(&iter, trimmer_it->stream_states);
1090
1091 while (g_hash_table_iter_next(&iter, &key, &sstate)) {
1092 status = end_stream(trimmer_it, sstate);
1093 if (status) {
1094 goto end;
1095 }
1096 }
1097
1098 remove_all:
1099 g_hash_table_remove_all(trimmer_it->stream_states);
1100
1101 end:
1102 return status;
1103 }
1104
1105 static inline
1106 bt_self_message_iterator_status create_stream_beginning_activity_message(
1107 struct trimmer_iterator *trimmer_it,
1108 const bt_stream *stream,
1109 const bt_clock_class *clock_class, bt_message **msg)
1110 {
1111 bt_message *local_msg;
1112 bt_self_message_iterator_status status =
1113 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1114
1115 BT_ASSERT(msg);
1116 BT_ASSERT(!trimmer_it->begin.is_infinite);
1117
1118 local_msg = bt_message_stream_activity_beginning_create(
1119 trimmer_it->self_msg_iter, stream);
1120 if (!local_msg) {
1121 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
1122 goto end;
1123 }
1124
1125 if (clock_class) {
1126 int ret;
1127 uint64_t raw_value;
1128
1129 ret = clock_raw_value_from_ns_from_origin(clock_class,
1130 trimmer_it->begin.ns_from_origin, &raw_value);
1131 if (ret) {
1132 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1133 bt_message_put_ref(local_msg);
1134 goto end;
1135 }
1136
1137 bt_message_stream_activity_beginning_set_default_clock_snapshot(
1138 local_msg, raw_value);
1139 }
1140
1141 BT_MESSAGE_MOVE_REF(*msg, local_msg);
1142
1143 end:
1144 return status;
1145 }
1146
1147 /*
1148 * Handles a message which is associated to a given stream state. This
1149 * _could_ make the iterator's output message queue grow; this could
1150 * also consume the message without pushing anything to this queue, only
1151 * modifying the stream state.
1152 *
1153 * This function consumes the `msg` reference, _whatever the outcome_.
1154 *
1155 * `ns_from_origin` is the message's time, as given by
1156 * get_msg_ns_from_origin().
1157 *
1158 * This function sets `reached_end` if handling this message made the
1159 * iterator reach the end of the trimming range. Note that the output
1160 * message queue could contain messages even if this function sets
1161 * `reached_end`.
1162 */
1163 static inline
1164 bt_self_message_iterator_status handle_message_with_stream_state(
1165 struct trimmer_iterator *trimmer_it, const bt_message *msg,
1166 struct trimmer_iterator_stream_state *sstate,
1167 int64_t ns_from_origin, bool *reached_end)
1168 {
1169 bt_self_message_iterator_status status =
1170 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1171 bt_message_type msg_type = bt_message_get_type(msg);
1172 int ret;
1173
1174 switch (msg_type) {
1175 case BT_MESSAGE_TYPE_EVENT:
1176 if (G_UNLIKELY(!trimmer_it->end.is_infinite &&
1177 ns_from_origin > trimmer_it->end.ns_from_origin)) {
1178 status = end_iterator_streams(trimmer_it);
1179 *reached_end = true;
1180 break;
1181 }
1182
1183 BT_ASSERT(sstate->cur_packet);
1184 push_message(trimmer_it, msg);
1185 msg = NULL;
1186 break;
1187 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
1188 if (G_UNLIKELY(!trimmer_it->end.is_infinite &&
1189 ns_from_origin > trimmer_it->end.ns_from_origin)) {
1190 status = end_iterator_streams(trimmer_it);
1191 *reached_end = true;
1192 break;
1193 }
1194
1195 BT_ASSERT(!sstate->cur_packet);
1196 sstate->cur_packet =
1197 bt_message_packet_beginning_borrow_packet_const(msg);
1198 bt_packet_get_ref(sstate->cur_packet);
1199 push_message(trimmer_it, msg);
1200 msg = NULL;
1201 break;
1202 case BT_MESSAGE_TYPE_PACKET_END:
1203 sstate->stream_act_end_ns_from_origin = ns_from_origin;
1204
1205 if (G_UNLIKELY(!trimmer_it->end.is_infinite &&
1206 ns_from_origin > trimmer_it->end.ns_from_origin)) {
1207 status = end_iterator_streams(trimmer_it);
1208 *reached_end = true;
1209 break;
1210 }
1211
1212 BT_ASSERT(sstate->cur_packet);
1213 BT_PACKET_PUT_REF_AND_RESET(sstate->cur_packet);
1214 push_message(trimmer_it, msg);
1215 msg = NULL;
1216 break;
1217 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
1218 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
1219 {
1220 /*
1221 * `ns_from_origin` is the message's time range's
1222 * beginning time here.
1223 */
1224 int64_t end_ns_from_origin;
1225 const bt_clock_snapshot *end_cs;
1226
1227 if (bt_message_get_type(msg) ==
1228 BT_MESSAGE_TYPE_DISCARDED_EVENTS) {
1229 /*
1230 * Safe to ignore the return value because we
1231 * know there's a default clock and it's always
1232 * known.
1233 */
1234 end_cs = bt_message_discarded_events_borrow_end_default_clock_snapshot_const(
1235 msg);
1236 } else {
1237 /*
1238 * Safe to ignore the return value because we
1239 * know there's a default clock and it's always
1240 * known.
1241 */
1242 end_cs = bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(
1243 msg);
1244 }
1245
1246 if (bt_clock_snapshot_get_ns_from_origin(end_cs,
1247 &end_ns_from_origin)) {
1248 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1249 goto end;
1250 }
1251
1252 sstate->stream_act_end_ns_from_origin = end_ns_from_origin;
1253
1254 if (!trimmer_it->end.is_infinite &&
1255 ns_from_origin > trimmer_it->end.ns_from_origin) {
1256 status = end_iterator_streams(trimmer_it);
1257 *reached_end = true;
1258 break;
1259 }
1260
1261 if (!trimmer_it->end.is_infinite &&
1262 end_ns_from_origin > trimmer_it->end.ns_from_origin) {
1263 /*
1264 * This message's end time is outside the
1265 * trimming time range: replace it with a new
1266 * message having an end time equal to the
1267 * trimming time range's end and without a
1268 * count.
1269 */
1270 const bt_clock_class *clock_class =
1271 bt_clock_snapshot_borrow_clock_class_const(
1272 end_cs);
1273 const bt_clock_snapshot *begin_cs;
1274 bt_message *new_msg;
1275 uint64_t end_raw_value;
1276
1277 ret = clock_raw_value_from_ns_from_origin(clock_class,
1278 trimmer_it->end.ns_from_origin, &end_raw_value);
1279 if (ret) {
1280 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1281 goto end;
1282 }
1283
1284 if (msg_type == BT_MESSAGE_TYPE_DISCARDED_EVENTS) {
1285 begin_cs = bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(
1286 msg);
1287 new_msg = bt_message_discarded_events_create_with_default_clock_snapshots(
1288 trimmer_it->self_msg_iter,
1289 sstate->stream,
1290 bt_clock_snapshot_get_value(begin_cs),
1291 end_raw_value);
1292 } else {
1293 begin_cs = bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(
1294 msg);
1295 new_msg = bt_message_discarded_packets_create_with_default_clock_snapshots(
1296 trimmer_it->self_msg_iter,
1297 sstate->stream,
1298 bt_clock_snapshot_get_value(begin_cs),
1299 end_raw_value);
1300 }
1301
1302 if (!new_msg) {
1303 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
1304 goto end;
1305 }
1306
1307 /* Replace the original message */
1308 BT_MESSAGE_MOVE_REF(msg, new_msg);
1309 }
1310
1311 push_message(trimmer_it, msg);
1312 msg = NULL;
1313 break;
1314 }
1315 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING:
1316 if (!trimmer_it->end.is_infinite &&
1317 ns_from_origin > trimmer_it->end.ns_from_origin) {
1318 /*
1319 * This only happens when the message's time is
1320 * known and is greater than the trimming
1321 * range's end time. Unknown and -inf times are
1322 * always less than
1323 * `trimmer_it->end.ns_from_origin`.
1324 */
1325 status = end_iterator_streams(trimmer_it);
1326 *reached_end = true;
1327 break;
1328 }
1329
1330 push_message(trimmer_it, msg);
1331 msg = NULL;
1332 break;
1333 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END:
1334 if (trimmer_it->end.is_infinite) {
1335 push_message(trimmer_it, msg);
1336 msg = NULL;
1337 break;
1338 }
1339
1340 if (ns_from_origin == INT64_MIN) {
1341 /* Unknown: consider it to be in the trimmer window. */
1342 push_message(trimmer_it, msg);
1343 msg = NULL;
1344 sstate->last_msg_is_stream_activity_end = true;
1345 } else if (ns_from_origin == INT64_MAX) {
1346 /* Infinite: use trimming range's end time */
1347 sstate->stream_act_end_ns_from_origin =
1348 trimmer_it->end.ns_from_origin;
1349 } else {
1350 /* Known: check if outside of trimming range */
1351 if (ns_from_origin > trimmer_it->end.ns_from_origin) {
1352 sstate->stream_act_end_ns_from_origin =
1353 trimmer_it->end.ns_from_origin;
1354 status = end_iterator_streams(trimmer_it);
1355 *reached_end = true;
1356 break;
1357 }
1358
1359 push_message(trimmer_it, msg);
1360 msg = NULL;
1361 sstate->last_msg_is_stream_activity_end = true;
1362 sstate->stream_act_end_ns_from_origin = ns_from_origin;
1363 }
1364
1365 break;
1366 case BT_MESSAGE_TYPE_STREAM_BEGINNING:
1367 push_message(trimmer_it, msg);
1368 msg = NULL;
1369 break;
1370 case BT_MESSAGE_TYPE_STREAM_END:
1371 /*
1372 * This is the end of a stream: end this
1373 * stream if its stream activity end message
1374 * time is not the trimming range's end time
1375 * (which means the final stream activity end
1376 * message had an infinite time). end_stream()
1377 * will generate its own stream end message.
1378 */
1379 if (trimmer_it->end.is_infinite) {
1380 push_message(trimmer_it, msg);
1381 msg = NULL;
1382
1383 /* We won't need this stream state again */
1384 g_hash_table_remove(trimmer_it->stream_states, sstate->stream);
1385 } else if (sstate->stream_act_end_ns_from_origin <
1386 trimmer_it->end.ns_from_origin) {
1387 status = end_stream(trimmer_it, sstate);
1388 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1389 goto end;
1390 }
1391
1392 /* We won't need this stream state again */
1393 g_hash_table_remove(trimmer_it->stream_states, sstate->stream);
1394 }
1395 break;
1396 default:
1397 break;
1398 }
1399
1400 end:
1401 /* We release the message's reference whatever the outcome */
1402 bt_message_put_ref(msg);
1403 return BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1404 }
1405
1406 /*
1407 * Handles an input message. This _could_ make the iterator's output
1408 * message queue grow; this could also consume the message without
1409 * pushing anything to this queue, only modifying the stream state.
1410 *
1411 * This function consumes the `msg` reference, _whatever the outcome_.
1412 *
1413 * This function sets `reached_end` if handling this message made the
1414 * iterator reach the end of the trimming range. Note that the output
1415 * message queue could contain messages even if this function sets
1416 * `reached_end`.
1417 */
1418 static inline
1419 bt_self_message_iterator_status handle_message(
1420 struct trimmer_iterator *trimmer_it, const bt_message *msg,
1421 bool *reached_end)
1422 {
1423 bt_self_message_iterator_status status;
1424 const bt_stream *stream = NULL;
1425 int64_t ns_from_origin = INT64_MIN;
1426 bool skip;
1427 int ret;
1428 struct trimmer_iterator_stream_state *sstate = NULL;
1429 struct trimmer_comp *trimmer_comp = trimmer_it->trimmer_comp;
1430
1431 /* Find message's associated stream */
1432 switch (bt_message_get_type(msg)) {
1433 case BT_MESSAGE_TYPE_EVENT:
1434 stream = bt_event_borrow_stream_const(
1435 bt_message_event_borrow_event_const(msg));
1436 break;
1437 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
1438 stream = bt_packet_borrow_stream_const(
1439 bt_message_packet_beginning_borrow_packet_const(msg));
1440 break;
1441 case BT_MESSAGE_TYPE_PACKET_END:
1442 stream = bt_packet_borrow_stream_const(
1443 bt_message_packet_end_borrow_packet_const(msg));
1444 break;
1445 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
1446 stream = bt_message_discarded_events_borrow_stream_const(msg);
1447 break;
1448 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
1449 stream = bt_message_discarded_packets_borrow_stream_const(msg);
1450 break;
1451 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING:
1452 stream = bt_message_stream_activity_beginning_borrow_stream_const(msg);
1453 break;
1454 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END:
1455 stream = bt_message_stream_activity_end_borrow_stream_const(msg);
1456 break;
1457 case BT_MESSAGE_TYPE_STREAM_BEGINNING:
1458 stream = bt_message_stream_beginning_borrow_stream_const(msg);
1459 break;
1460 case BT_MESSAGE_TYPE_STREAM_END:
1461 stream = bt_message_stream_end_borrow_stream_const(msg);
1462 break;
1463 default:
1464 break;
1465 }
1466
1467 if (G_LIKELY(stream)) {
1468 /* Find stream state */
1469 sstate = g_hash_table_lookup(trimmer_it->stream_states,
1470 stream);
1471 if (G_UNLIKELY(!sstate)) {
1472 /* No stream state yet: create one now */
1473 const bt_stream_class *sc;
1474
1475 /*
1476 * Validate right now that the stream's class
1477 * has a registered default clock class so that
1478 * an existing stream state guarantees existing
1479 * default clock snapshots for its associated
1480 * messages.
1481 *
1482 * Also check that clock snapshots are always
1483 * known.
1484 */
1485 sc = bt_stream_borrow_class_const(stream);
1486 if (!bt_stream_class_borrow_default_clock_class_const(sc)) {
1487 BT_COMP_LOGE("Unsupported stream: stream class does "
1488 "not have a default clock class: "
1489 "stream-addr=%p, "
1490 "stream-id=%" PRIu64 ", "
1491 "stream-name=\"%s\"",
1492 stream, bt_stream_get_id(stream),
1493 bt_stream_get_name(stream));
1494 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1495 goto end;
1496 }
1497
1498 /*
1499 * Temporary: make sure packet beginning, packet
1500 * end, discarded events, and discarded packets
1501 * messages have default clock snapshots until
1502 * the support for not having them is
1503 * implemented.
1504 */
1505 if (!bt_stream_class_packets_have_beginning_default_clock_snapshot(
1506 sc)) {
1507 BT_COMP_LOGE("Unsupported stream: packets have "
1508 "no beginning clock snapshot: "
1509 "stream-addr=%p, "
1510 "stream-id=%" PRIu64 ", "
1511 "stream-name=\"%s\"",
1512 stream, bt_stream_get_id(stream),
1513 bt_stream_get_name(stream));
1514 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1515 goto end;
1516 }
1517
1518 if (!bt_stream_class_packets_have_end_default_clock_snapshot(
1519 sc)) {
1520 BT_COMP_LOGE("Unsupported stream: packets have "
1521 "no end clock snapshot: "
1522 "stream-addr=%p, "
1523 "stream-id=%" PRIu64 ", "
1524 "stream-name=\"%s\"",
1525 stream, bt_stream_get_id(stream),
1526 bt_stream_get_name(stream));
1527 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1528 goto end;
1529 }
1530
1531 if (bt_stream_class_supports_discarded_events(sc) &&
1532 !bt_stream_class_discarded_events_have_default_clock_snapshots(sc)) {
1533 BT_COMP_LOGE("Unsupported stream: discarded events "
1534 "have no clock snapshots: "
1535 "stream-addr=%p, "
1536 "stream-id=%" PRIu64 ", "
1537 "stream-name=\"%s\"",
1538 stream, bt_stream_get_id(stream),
1539 bt_stream_get_name(stream));
1540 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1541 goto end;
1542 }
1543
1544 if (bt_stream_class_supports_discarded_packets(sc) &&
1545 !bt_stream_class_discarded_packets_have_default_clock_snapshots(sc)) {
1546 BT_COMP_LOGE("Unsupported stream: discarded packets "
1547 "have no clock snapshots: "
1548 "stream-addr=%p, "
1549 "stream-id=%" PRIu64 ", "
1550 "stream-name=\"%s\"",
1551 stream, bt_stream_get_id(stream),
1552 bt_stream_get_name(stream));
1553 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1554 goto end;
1555 }
1556
1557 sstate = g_new0(struct trimmer_iterator_stream_state,
1558 1);
1559 if (!sstate) {
1560 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
1561 goto end;
1562 }
1563
1564 sstate->stream = stream;
1565 sstate->stream_act_end_ns_from_origin = INT64_MIN;
1566 g_hash_table_insert(trimmer_it->stream_states,
1567 (void *) stream, sstate);
1568 }
1569 }
1570
1571 /* Retrieve the message's time */
1572 ret = get_msg_ns_from_origin(msg, &ns_from_origin, &skip);
1573 if (G_UNLIKELY(ret)) {
1574 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1575 goto end;
1576 }
1577
1578 if (G_LIKELY(sstate)) {
1579 /* Message associated to a stream */
1580 status = handle_message_with_stream_state(trimmer_it, msg,
1581 sstate, ns_from_origin, reached_end);
1582
1583 /*
1584 * handle_message_with_stream_state() unconditionally
1585 * consumes `msg`.
1586 */
1587 msg = NULL;
1588 } else {
1589 /*
1590 * Message not associated to a stream (message iterator
1591 * inactivity).
1592 */
1593 if (G_UNLIKELY(ns_from_origin > trimmer_it->end.ns_from_origin)) {
1594 BT_MESSAGE_PUT_REF_AND_RESET(msg);
1595 status = end_iterator_streams(trimmer_it);
1596 *reached_end = true;
1597 } else {
1598 push_message(trimmer_it, msg);
1599 status = BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1600 msg = NULL;
1601 }
1602 }
1603
1604 end:
1605 /* We release the message's reference whatever the outcome */
1606 bt_message_put_ref(msg);
1607 return status;
1608 }
1609
1610 static inline
1611 void fill_message_array_from_output_messages(
1612 struct trimmer_iterator *trimmer_it,
1613 bt_message_array_const msgs, uint64_t capacity, uint64_t *count)
1614 {
1615 *count = 0;
1616
1617 /*
1618 * Move auto-seek messages to the output array (which is this
1619 * iterator's base message array).
1620 */
1621 while (capacity > 0 && !g_queue_is_empty(trimmer_it->output_messages)) {
1622 msgs[*count] = pop_message(trimmer_it);
1623 capacity--;
1624 (*count)++;
1625 }
1626
1627 BT_ASSERT(*count > 0);
1628 }
1629
1630 static inline
1631 bt_self_message_iterator_status state_ending(
1632 struct trimmer_iterator *trimmer_it,
1633 bt_message_array_const msgs, uint64_t capacity,
1634 uint64_t *count)
1635 {
1636 bt_self_message_iterator_status status =
1637 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1638
1639 if (g_queue_is_empty(trimmer_it->output_messages)) {
1640 trimmer_it->state = TRIMMER_ITERATOR_STATE_ENDED;
1641 status = BT_SELF_MESSAGE_ITERATOR_STATUS_END;
1642 goto end;
1643 }
1644
1645 fill_message_array_from_output_messages(trimmer_it, msgs,
1646 capacity, count);
1647
1648 end:
1649 return status;
1650 }
1651
1652 static inline
1653 bt_self_message_iterator_status state_trim(struct trimmer_iterator *trimmer_it,
1654 bt_message_array_const msgs, uint64_t capacity,
1655 uint64_t *count)
1656 {
1657 bt_self_message_iterator_status status =
1658 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1659 bt_message_array_const my_msgs;
1660 uint64_t my_count;
1661 uint64_t i;
1662 bool reached_end = false;
1663
1664 while (g_queue_is_empty(trimmer_it->output_messages)) {
1665 status = (int) bt_self_component_port_input_message_iterator_next(
1666 trimmer_it->upstream_iter, &my_msgs, &my_count);
1667 if (G_UNLIKELY(status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK)) {
1668 if (status == BT_SELF_MESSAGE_ITERATOR_STATUS_END) {
1669 status = end_iterator_streams(trimmer_it);
1670 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1671 goto end;
1672 }
1673
1674 trimmer_it->state =
1675 TRIMMER_ITERATOR_STATE_ENDING;
1676 status = state_ending(trimmer_it, msgs,
1677 capacity, count);
1678 }
1679
1680 goto end;
1681 }
1682
1683 BT_ASSERT(my_count > 0);
1684
1685 for (i = 0; i < my_count; i++) {
1686 status = handle_message(trimmer_it, my_msgs[i],
1687 &reached_end);
1688
1689 /*
1690 * handle_message() unconditionally consumes the
1691 * message reference.
1692 */
1693 my_msgs[i] = NULL;
1694
1695 if (G_UNLIKELY(status !=
1696 BT_SELF_MESSAGE_ITERATOR_STATUS_OK)) {
1697 put_messages(my_msgs, my_count);
1698 goto end;
1699 }
1700
1701 if (G_UNLIKELY(reached_end)) {
1702 /*
1703 * This message's time was passed the
1704 * trimming time range's end time: we
1705 * are done. Their might still be
1706 * messages in the output message queue,
1707 * so move to the "ending" state and
1708 * apply it immediately since
1709 * state_trim() is called within the
1710 * "next" method.
1711 */
1712 put_messages(my_msgs, my_count);
1713 trimmer_it->state =
1714 TRIMMER_ITERATOR_STATE_ENDING;
1715 status = state_ending(trimmer_it, msgs,
1716 capacity, count);
1717 goto end;
1718 }
1719 }
1720 }
1721
1722 /*
1723 * There's at least one message in the output message queue:
1724 * move the messages to the output message array.
1725 */
1726 BT_ASSERT(!g_queue_is_empty(trimmer_it->output_messages));
1727 fill_message_array_from_output_messages(trimmer_it, msgs,
1728 capacity, count);
1729
1730 end:
1731 return status;
1732 }
1733
1734 BT_HIDDEN
1735 bt_self_message_iterator_status trimmer_msg_iter_next(
1736 bt_self_message_iterator *self_msg_iter,
1737 bt_message_array_const msgs, uint64_t capacity,
1738 uint64_t *count)
1739 {
1740 struct trimmer_iterator *trimmer_it =
1741 bt_self_message_iterator_get_data(self_msg_iter);
1742 bt_self_message_iterator_status status =
1743 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1744
1745 BT_ASSERT(trimmer_it);
1746
1747 if (G_LIKELY(trimmer_it->state == TRIMMER_ITERATOR_STATE_TRIM)) {
1748 status = state_trim(trimmer_it, msgs, capacity, count);
1749 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1750 goto end;
1751 }
1752 } else {
1753 switch (trimmer_it->state) {
1754 case TRIMMER_ITERATOR_STATE_SET_BOUNDS_NS_FROM_ORIGIN:
1755 status = state_set_trimmer_iterator_bounds(trimmer_it);
1756 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1757 goto end;
1758 }
1759
1760 status = state_seek_initially(trimmer_it);
1761 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1762 goto end;
1763 }
1764
1765 status = state_trim(trimmer_it, msgs, capacity, count);
1766 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1767 goto end;
1768 }
1769
1770 break;
1771 case TRIMMER_ITERATOR_STATE_SEEK_INITIALLY:
1772 status = state_seek_initially(trimmer_it);
1773 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1774 goto end;
1775 }
1776
1777 status = state_trim(trimmer_it, msgs, capacity, count);
1778 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1779 goto end;
1780 }
1781
1782 break;
1783 case TRIMMER_ITERATOR_STATE_ENDING:
1784 status = state_ending(trimmer_it, msgs, capacity,
1785 count);
1786 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1787 goto end;
1788 }
1789
1790 break;
1791 case TRIMMER_ITERATOR_STATE_ENDED:
1792 status = BT_SELF_MESSAGE_ITERATOR_STATUS_END;
1793 break;
1794 default:
1795 abort();
1796 }
1797 }
1798
1799 end:
1800 return status;
1801 }
1802
1803 BT_HIDDEN
1804 void trimmer_msg_iter_finalize(bt_self_message_iterator *self_msg_iter)
1805 {
1806 struct trimmer_iterator *trimmer_it =
1807 bt_self_message_iterator_get_data(self_msg_iter);
1808
1809 BT_ASSERT(trimmer_it);
1810 destroy_trimmer_iterator(trimmer_it);
1811 }
This page took 0.091421 seconds and 3 git commands to generate.