trimmer: make filter.utils.trimmer append error causes
[babeltrace.git] / src / plugins / utils / trimmer / trimmer.c
1 /*
2 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 * Copyright 2019 Philippe Proulx <pproulx@efficios.com>
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a copy
6 * of this software and associated documentation files (the "Software"), to deal
7 * in the Software without restriction, including without limitation the rights
8 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 * copies of the Software, and to permit persons to whom the Software is
10 * furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21 * SOFTWARE.
22 */
23
24 #define BT_LOG_OUTPUT_LEVEL (trimmer_comp->log_level)
25 #define BT_LOG_TAG "PLUGIN/FLT.UTILS.TRIMMER"
26 #include "logging/comp-logging.h"
27
28 #include "compat/utc.h"
29 #include "compat/time.h"
30 #include <babeltrace2/babeltrace.h>
31 #include "common/common.h"
32 #include "common/assert.h"
33 #include <stdint.h>
34 #include <inttypes.h>
35 #include <glib.h>
36 #include "compat/glib.h"
37
38 #include "trimmer.h"
39
40 #define NS_PER_S INT64_C(1000000000)
41
42 static const char * const in_port_name = "in";
43
44 struct trimmer_time {
45 unsigned int hour, minute, second, ns;
46 };
47
48 struct trimmer_bound {
49 /*
50 * Nanoseconds from origin, valid if `is_set` is set and
51 * `is_infinite` is false.
52 */
53 int64_t ns_from_origin;
54
55 /* True if this bound's full time (`ns_from_origin`) is set */
56 bool is_set;
57
58 /*
59 * True if this bound represents the infinity (negative or
60 * positive depending on which bound it is). If this is true,
61 * then we don't care about `ns_from_origin` above.
62 */
63 bool is_infinite;
64
65 /*
66 * This bound's time without the date; this time is used to set
67 * `ns_from_origin` once we know the date.
68 */
69 struct trimmer_time time;
70 };
71
72 struct trimmer_comp {
73 struct trimmer_bound begin, end;
74 bool is_gmt;
75 bt_logging_level log_level;
76 bt_self_component *self_comp;
77 };
78
79 enum trimmer_iterator_state {
80 /*
81 * Find the first message's date and set the bounds's times
82 * accordingly.
83 */
84 TRIMMER_ITERATOR_STATE_SET_BOUNDS_NS_FROM_ORIGIN,
85
86 /*
87 * Initially seek to the trimming range's beginning time.
88 */
89 TRIMMER_ITERATOR_STATE_SEEK_INITIALLY,
90
91 /*
92 * Fill the output message queue for as long as received input
93 * messages are within the trimming time range.
94 */
95 TRIMMER_ITERATOR_STATE_TRIM,
96
97 /* Flush the remaining messages in the output message queue */
98 TRIMMER_ITERATOR_STATE_ENDING,
99
100 /* Trimming operation and message iterator is ended */
101 TRIMMER_ITERATOR_STATE_ENDED,
102 };
103
104 struct trimmer_iterator {
105 /* Weak */
106 struct trimmer_comp *trimmer_comp;
107
108 /* Weak */
109 bt_self_message_iterator *self_msg_iter;
110
111 enum trimmer_iterator_state state;
112
113 /* Owned by this */
114 bt_self_component_port_input_message_iterator *upstream_iter;
115 struct trimmer_bound begin, end;
116
117 /*
118 * Queue of `const bt_message *` (owned by the queue).
119 *
120 * This is where the trimming operation pushes the messages to
121 * output by this message iterator.
122 */
123 GQueue *output_messages;
124
125 /*
126 * Hash table of `bt_stream *` (weak) to
127 * `struct trimmer_iterator_stream_state *` (owned by the HT).
128 */
129 GHashTable *stream_states;
130 };
131
132 struct trimmer_iterator_stream_state {
133 /* Weak */
134 const bt_stream *stream;
135
136 /* Have we seen a message with clock_snapshot going through this stream? */
137 bool seen_clock_snapshot;
138
139 /* Owned by this (`NULL` initially and between packets) */
140 const bt_packet *cur_packet;
141 };
142
143 static
144 void destroy_trimmer_comp(struct trimmer_comp *trimmer_comp)
145 {
146 BT_ASSERT(trimmer_comp);
147 g_free(trimmer_comp);
148 }
149
150 static
151 struct trimmer_comp *create_trimmer_comp(void)
152 {
153 return g_new0(struct trimmer_comp, 1);
154 }
155
156 BT_HIDDEN
157 void trimmer_finalize(bt_self_component_filter *self_comp)
158 {
159 struct trimmer_comp *trimmer_comp =
160 bt_self_component_get_data(
161 bt_self_component_filter_as_self_component(self_comp));
162
163 if (trimmer_comp) {
164 destroy_trimmer_comp(trimmer_comp);
165 }
166 }
167
168 /*
169 * Compile regex in `pattern`, and try to match `string`. If there's a match,
170 * return true and set `*match_info` to the list of matches. The list of
171 * matches must be freed by the caller. If there's no match, return false and
172 * set `*match_info` to NULL;
173 */
174 static
175 bool compile_and_match(const char *pattern, const char *string, GMatchInfo **match_info) {
176 bool matches = false;
177 GError *regex_error = NULL;
178 GRegex *regex;
179
180 regex = g_regex_new(pattern, 0, 0, &regex_error);
181 if (!regex) {
182 goto end;
183 }
184
185 matches = g_regex_match(regex, string, 0, match_info);
186 if (!matches) {
187 /*
188 * g_regex_match allocates `*match_info` even if it returns
189 * FALSE. If there's no match, we have no use for it, so free
190 * it immediatly and don't return it to the caller.
191 */
192 g_match_info_free(*match_info);
193 *match_info = NULL;
194 }
195
196 g_regex_unref(regex);
197
198 end:
199
200 if (regex_error) {
201 g_error_free(regex_error);
202 }
203
204 return matches;
205 }
206
207 /*
208 * Convert the captured text in match number `match_num` in `match_info`
209 * to an unsigned integer.
210 */
211 static
212 guint64 match_to_uint(const GMatchInfo *match_info, gint match_num) {
213 gchar *text, *endptr;
214 guint64 result;
215
216 text = g_match_info_fetch(match_info, match_num);
217 BT_ASSERT(text);
218
219 /*
220 * Because the input is carefully sanitized with regexes by the caller,
221 * we assume that g_ascii_strtoull cannot fail.
222 */
223 errno = 0;
224 result = g_ascii_strtoull(text, &endptr, 10);
225 BT_ASSERT(endptr > text);
226 BT_ASSERT(errno == 0);
227
228 g_free(text);
229
230 return result;
231 }
232
233 /*
234 * When parsing the nanoseconds part, .512 means .512000000, not .000000512.
235 * This function is like match_to_uint, but multiplies the parsed number to get
236 * the expected result.
237 */
238 static
239 guint64 match_to_uint_ns(const GMatchInfo *match_info, gint match_num) {
240 guint64 nanoseconds;
241 gboolean ret;
242 gint start_pos, end_pos, power;
243 static int pow10[] = {
244 1, 10, 100, 1000, 10000, 100000, 1000000, 10000000, 100000000,
245 };
246
247 nanoseconds = match_to_uint(match_info, match_num);
248
249 /* Multiply by 10 as many times as there are omitted digits. */
250 ret = g_match_info_fetch_pos(match_info, match_num, &start_pos, &end_pos);
251 BT_ASSERT(ret);
252
253 power = 9 - (end_pos - start_pos);
254 BT_ASSERT(power >= 0 && power <= 8);
255
256 nanoseconds *= pow10[power];
257
258 return nanoseconds;
259 }
260
261 /*
262 * Sets the time (in ns from origin) of a trimmer bound from date and
263 * time components.
264 *
265 * Returns a negative value if anything goes wrong.
266 */
267 static
268 int set_bound_ns_from_origin(struct trimmer_bound *bound,
269 unsigned int year, unsigned int month, unsigned int day,
270 unsigned int hour, unsigned int minute, unsigned int second,
271 unsigned int ns, bool is_gmt)
272 {
273 int ret = 0;
274 time_t result;
275 struct tm tm = {
276 .tm_sec = second,
277 .tm_min = minute,
278 .tm_hour = hour,
279 .tm_mday = day,
280 .tm_mon = month - 1,
281 .tm_year = year - 1900,
282 .tm_isdst = -1,
283 };
284
285 if (is_gmt) {
286 result = bt_timegm(&tm);
287 } else {
288 result = mktime(&tm);
289 }
290
291 if (result < 0) {
292 ret = -1;
293 goto end;
294 }
295
296 BT_ASSERT(bound);
297 bound->ns_from_origin = (int64_t) result;
298 bound->ns_from_origin *= NS_PER_S;
299 bound->ns_from_origin += ns;
300 bound->is_set = true;
301
302 end:
303 return ret;
304 }
305
306 /*
307 * Parses a timestamp, figuring out its format.
308 *
309 * Returns a negative value if anything goes wrong.
310 *
311 * Expected formats:
312 *
313 * YYYY-MM-DD hh:mm[:ss[.ns]]
314 * [hh:mm:]ss[.ns]
315 * [-]s[.ns]
316 *
317 * TODO: Check overflows.
318 */
319 static
320 int set_bound_from_str(struct trimmer_comp *trimmer_comp,
321 const char *str, struct trimmer_bound *bound, bool is_gmt)
322 {
323 /* Matches YYYY-MM-DD */
324 #define DATE_RE "([0-9]{4})-([0-9]{2})-([0-9]{2})"
325
326 /* Matches HH:MM[:SS[.NS]] */
327 #define TIME_RE "([0-9]{2}):([0-9]{2})(?::([0-9]{2})(?:\\.([0-9]{1,9}))?)?"
328
329 /* Matches [-]SS[.NS] */
330 #define S_NS_RE "^(-?)([0-9]+)(?:\\.([0-9]{1,9}))?$"
331
332 GMatchInfo *match_info;
333 int ret = 0;
334
335 /* Try `YYYY-MM-DD hh:mm[:ss[.ns]]` format */
336 if (compile_and_match("^" DATE_RE " " TIME_RE "$", str, &match_info)) {
337 unsigned int year = 0, month = 0, day = 0, hours = 0, minutes = 0, seconds = 0, nanoseconds = 0;
338 gint match_count = g_match_info_get_match_count(match_info);
339
340 BT_ASSERT(match_count >= 6 && match_count <= 8);
341
342 year = match_to_uint(match_info, 1);
343 month = match_to_uint(match_info, 2);
344 day = match_to_uint(match_info, 3);
345 hours = match_to_uint(match_info, 4);
346 minutes = match_to_uint(match_info, 5);
347
348 if (match_count >= 7) {
349 seconds = match_to_uint(match_info, 6);
350 }
351
352 if (match_count >= 8) {
353 nanoseconds = match_to_uint_ns(match_info, 7);
354 }
355
356 set_bound_ns_from_origin(bound, year, month, day, hours, minutes, seconds, nanoseconds, is_gmt);
357
358 goto end;
359 }
360
361 if (compile_and_match("^" DATE_RE "$", str, &match_info)) {
362 unsigned int year = 0, month = 0, day = 0;
363
364 BT_ASSERT(g_match_info_get_match_count(match_info) == 4);
365
366 year = match_to_uint(match_info, 1);
367 month = match_to_uint(match_info, 2);
368 day = match_to_uint(match_info, 3);
369
370 set_bound_ns_from_origin(bound, year, month, day, 0, 0, 0, 0, is_gmt);
371
372 goto end;
373 }
374
375 /* Try `hh:mm[:ss[.ns]]` format */
376 if (compile_and_match("^" TIME_RE "$", str, &match_info)) {
377 gint match_count = g_match_info_get_match_count(match_info);
378 BT_ASSERT(match_count >= 3 && match_count <= 5);
379 bound->time.hour = match_to_uint(match_info, 1);
380 bound->time.minute = match_to_uint(match_info, 2);
381
382 if (match_count >= 4) {
383 bound->time.second = match_to_uint(match_info, 3);
384 }
385
386 if (match_count >= 5) {
387 bound->time.ns = match_to_uint_ns(match_info, 4);
388 }
389
390 goto end;
391 }
392
393 /* Try `[-]s[.ns]` format */
394 if (compile_and_match("^" S_NS_RE "$", str, &match_info)) {
395 gboolean is_neg, fetch_pos_ret;
396 gint start_pos, end_pos, match_count;
397 guint64 seconds, nanoseconds = 0;
398
399 match_count = g_match_info_get_match_count(match_info);
400 BT_ASSERT(match_count >= 3 && match_count <= 4);
401
402 /* Check for presence of negation sign. */
403 fetch_pos_ret = g_match_info_fetch_pos(match_info, 1, &start_pos, &end_pos);
404 BT_ASSERT(fetch_pos_ret);
405 is_neg = (end_pos - start_pos) > 0;
406
407 seconds = match_to_uint(match_info, 2);
408
409 if (match_count >= 4) {
410 nanoseconds = match_to_uint_ns(match_info, 3);
411 }
412
413 bound->ns_from_origin = seconds * NS_PER_S + nanoseconds;
414
415 if (is_neg) {
416 bound->ns_from_origin = -bound->ns_from_origin;
417 }
418
419 bound->is_set = true;
420
421 goto end;
422 }
423
424 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
425 "Invalid date/time format: param=\"%s\"", str);
426 ret = -1;
427
428 end:
429 return ret;
430 }
431
432 /*
433 * Sets a trimmer bound's properties from a parameter string/integer
434 * value.
435 *
436 * Returns a negative value if anything goes wrong.
437 */
438 static
439 int set_bound_from_param(struct trimmer_comp *trimmer_comp,
440 const char *param_name, const bt_value *param,
441 struct trimmer_bound *bound, bool is_gmt)
442 {
443 int ret;
444 const char *arg;
445 char tmp_arg[64];
446
447 if (bt_value_is_signed_integer(param)) {
448 int64_t value = bt_value_integer_signed_get(param);
449
450 /*
451 * Just convert it to a temporary string to handle
452 * everything the same way.
453 */
454 sprintf(tmp_arg, "%" PRId64, value);
455 arg = tmp_arg;
456 } else if (bt_value_is_string(param)) {
457 arg = bt_value_string_get(param);
458 } else {
459 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
460 "`%s` parameter must be an integer or a string value.",
461 param_name);
462 ret = -1;
463 goto end;
464 }
465
466 ret = set_bound_from_str(trimmer_comp, arg, bound, is_gmt);
467
468 end:
469 return ret;
470 }
471
472 static
473 int validate_trimmer_bounds(struct trimmer_comp *trimmer_comp,
474 struct trimmer_bound *begin, struct trimmer_bound *end)
475 {
476 int ret = 0;
477
478 BT_ASSERT(begin->is_set);
479 BT_ASSERT(end->is_set);
480
481 if (!begin->is_infinite && !end->is_infinite &&
482 begin->ns_from_origin > end->ns_from_origin) {
483 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
484 "Trimming time range's beginning time is greater than end time: "
485 "begin-ns-from-origin=%" PRId64 ", "
486 "end-ns-from-origin=%" PRId64,
487 begin->ns_from_origin,
488 end->ns_from_origin);
489 ret = -1;
490 goto end;
491 }
492
493 if (!begin->is_infinite && begin->ns_from_origin == INT64_MIN) {
494 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
495 "Invalid trimming time range's beginning time: "
496 "ns-from-origin=%" PRId64,
497 begin->ns_from_origin);
498 ret = -1;
499 goto end;
500 }
501
502 if (!end->is_infinite && end->ns_from_origin == INT64_MIN) {
503 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
504 "Invalid trimming time range's end time: "
505 "ns-from-origin=%" PRId64,
506 end->ns_from_origin);
507 ret = -1;
508 goto end;
509 }
510
511 end:
512 return ret;
513 }
514
515 static
516 int init_trimmer_comp_from_params(struct trimmer_comp *trimmer_comp,
517 const bt_value *params)
518 {
519 const bt_value *value;
520 int ret = 0;
521
522 BT_ASSERT(params);
523 value = bt_value_map_borrow_entry_value_const(params, "gmt");
524 if (value) {
525 trimmer_comp->is_gmt = (bool) bt_value_bool_get(value);
526 }
527
528 value = bt_value_map_borrow_entry_value_const(params, "begin");
529 if (value) {
530 if (set_bound_from_param(trimmer_comp, "begin", value,
531 &trimmer_comp->begin, trimmer_comp->is_gmt)) {
532 /* set_bound_from_param() logs errors */
533 ret = -1;
534 goto end;
535 }
536 } else {
537 trimmer_comp->begin.is_infinite = true;
538 trimmer_comp->begin.is_set = true;
539 }
540
541 value = bt_value_map_borrow_entry_value_const(params, "end");
542 if (value) {
543 if (set_bound_from_param(trimmer_comp, "end", value,
544 &trimmer_comp->end, trimmer_comp->is_gmt)) {
545 /* set_bound_from_param() logs errors */
546 ret = -1;
547 goto end;
548 }
549 } else {
550 trimmer_comp->end.is_infinite = true;
551 trimmer_comp->end.is_set = true;
552 }
553
554 end:
555 if (trimmer_comp->begin.is_set && trimmer_comp->end.is_set) {
556 /* validate_trimmer_bounds() logs errors */
557 ret = validate_trimmer_bounds(trimmer_comp,
558 &trimmer_comp->begin, &trimmer_comp->end);
559 }
560
561 return ret;
562 }
563
564 bt_component_class_init_method_status trimmer_init(
565 bt_self_component_filter *self_comp_flt,
566 const bt_value *params, void *init_data)
567 {
568 int ret;
569 bt_component_class_init_method_status status =
570 BT_COMPONENT_CLASS_INIT_METHOD_STATUS_OK;
571 bt_self_component_add_port_status add_port_status;
572 struct trimmer_comp *trimmer_comp = create_trimmer_comp();
573 bt_self_component *self_comp =
574 bt_self_component_filter_as_self_component(self_comp_flt);
575 if (!trimmer_comp) {
576 status = BT_COMPONENT_CLASS_INIT_METHOD_STATUS_MEMORY_ERROR;
577 goto error;
578 }
579
580 trimmer_comp->log_level = bt_component_get_logging_level(
581 bt_self_component_as_component(self_comp));
582 trimmer_comp->self_comp = self_comp;
583 add_port_status = bt_self_component_filter_add_input_port(
584 self_comp_flt, in_port_name, NULL, NULL);
585 switch (add_port_status) {
586 case BT_SELF_COMPONENT_ADD_PORT_STATUS_ERROR:
587 status = BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR;
588 goto error;
589 case BT_SELF_COMPONENT_ADD_PORT_STATUS_MEMORY_ERROR:
590 status = BT_COMPONENT_CLASS_INIT_METHOD_STATUS_MEMORY_ERROR;
591 goto error;
592 default:
593 break;
594 }
595
596 add_port_status = bt_self_component_filter_add_output_port(
597 self_comp_flt, "out", NULL, NULL);
598 switch (add_port_status) {
599 case BT_SELF_COMPONENT_ADD_PORT_STATUS_ERROR:
600 status = BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR;
601 goto error;
602 case BT_SELF_COMPONENT_ADD_PORT_STATUS_MEMORY_ERROR:
603 status = BT_COMPONENT_CLASS_INIT_METHOD_STATUS_MEMORY_ERROR;
604 goto error;
605 default:
606 break;
607 }
608
609 ret = init_trimmer_comp_from_params(trimmer_comp, params);
610 if (ret) {
611 status = BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR;
612 goto error;
613 }
614
615 bt_self_component_set_data(self_comp, trimmer_comp);
616 goto end;
617
618 error:
619 if (status == BT_COMPONENT_CLASS_INIT_METHOD_STATUS_OK) {
620 status = BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR;
621 }
622
623 if (trimmer_comp) {
624 destroy_trimmer_comp(trimmer_comp);
625 }
626
627 end:
628 return status;
629 }
630
631 static
632 void destroy_trimmer_iterator(struct trimmer_iterator *trimmer_it)
633 {
634 BT_ASSERT(trimmer_it);
635 bt_self_component_port_input_message_iterator_put_ref(
636 trimmer_it->upstream_iter);
637
638 if (trimmer_it->output_messages) {
639 g_queue_free(trimmer_it->output_messages);
640 }
641
642 if (trimmer_it->stream_states) {
643 g_hash_table_destroy(trimmer_it->stream_states);
644 }
645
646 g_free(trimmer_it);
647 }
648
649 static
650 void destroy_trimmer_iterator_stream_state(
651 struct trimmer_iterator_stream_state *sstate)
652 {
653 BT_ASSERT(sstate);
654 BT_PACKET_PUT_REF_AND_RESET(sstate->cur_packet);
655 g_free(sstate);
656 }
657
658 BT_HIDDEN
659 bt_component_class_message_iterator_init_method_status trimmer_msg_iter_init(
660 bt_self_message_iterator *self_msg_iter,
661 bt_self_component_filter *self_comp,
662 bt_self_component_port_output *port)
663 {
664 bt_component_class_message_iterator_init_method_status status =
665 BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INIT_METHOD_STATUS_OK;
666 struct trimmer_iterator *trimmer_it;
667
668 trimmer_it = g_new0(struct trimmer_iterator, 1);
669 if (!trimmer_it) {
670 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INIT_METHOD_STATUS_MEMORY_ERROR;
671 goto end;
672 }
673
674 trimmer_it->trimmer_comp = bt_self_component_get_data(
675 bt_self_component_filter_as_self_component(self_comp));
676 BT_ASSERT(trimmer_it->trimmer_comp);
677
678 if (trimmer_it->trimmer_comp->begin.is_set &&
679 trimmer_it->trimmer_comp->end.is_set) {
680 /*
681 * Both trimming time range's bounds are set, so skip
682 * the
683 * `TRIMMER_ITERATOR_STATE_SET_BOUNDS_NS_FROM_ORIGIN`
684 * phase.
685 */
686 trimmer_it->state = TRIMMER_ITERATOR_STATE_SEEK_INITIALLY;
687 }
688
689 trimmer_it->begin = trimmer_it->trimmer_comp->begin;
690 trimmer_it->end = trimmer_it->trimmer_comp->end;
691 trimmer_it->upstream_iter =
692 bt_self_component_port_input_message_iterator_create_from_message_iterator(
693 self_msg_iter,
694 bt_self_component_filter_borrow_input_port_by_name(
695 self_comp, in_port_name));
696 if (!trimmer_it->upstream_iter) {
697 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INIT_METHOD_STATUS_ERROR;
698 goto end;
699 }
700
701 trimmer_it->output_messages = g_queue_new();
702 if (!trimmer_it->output_messages) {
703 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INIT_METHOD_STATUS_MEMORY_ERROR;
704 goto end;
705 }
706
707 trimmer_it->stream_states = g_hash_table_new_full(g_direct_hash,
708 g_direct_equal, NULL,
709 (GDestroyNotify) destroy_trimmer_iterator_stream_state);
710 if (!trimmer_it->stream_states) {
711 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INIT_METHOD_STATUS_MEMORY_ERROR;
712 goto end;
713 }
714
715 trimmer_it->self_msg_iter = self_msg_iter;
716 bt_self_message_iterator_set_data(self_msg_iter, trimmer_it);
717
718 end:
719 if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INIT_METHOD_STATUS_OK && trimmer_it) {
720 destroy_trimmer_iterator(trimmer_it);
721 }
722
723 return status;
724 }
725
726 static inline
727 int get_msg_ns_from_origin(const bt_message *msg, int64_t *ns_from_origin,
728 bool *has_clock_snapshot)
729 {
730 const bt_clock_class *clock_class = NULL;
731 const bt_clock_snapshot *clock_snapshot = NULL;
732 int ret = 0;
733
734 BT_ASSERT(msg);
735 BT_ASSERT(ns_from_origin);
736 BT_ASSERT(has_clock_snapshot);
737
738 switch (bt_message_get_type(msg)) {
739 case BT_MESSAGE_TYPE_EVENT:
740 clock_class =
741 bt_message_event_borrow_stream_class_default_clock_class_const(
742 msg);
743 if (G_UNLIKELY(!clock_class)) {
744 goto error;
745 }
746
747 clock_snapshot = bt_message_event_borrow_default_clock_snapshot_const(
748 msg);
749 break;
750 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
751 clock_class =
752 bt_message_packet_beginning_borrow_stream_class_default_clock_class_const(
753 msg);
754 if (G_UNLIKELY(!clock_class)) {
755 goto error;
756 }
757
758 clock_snapshot = bt_message_packet_beginning_borrow_default_clock_snapshot_const(
759 msg);
760 break;
761 case BT_MESSAGE_TYPE_PACKET_END:
762 clock_class =
763 bt_message_packet_end_borrow_stream_class_default_clock_class_const(
764 msg);
765 if (G_UNLIKELY(!clock_class)) {
766 goto error;
767 }
768
769 clock_snapshot = bt_message_packet_end_borrow_default_clock_snapshot_const(
770 msg);
771 break;
772 case BT_MESSAGE_TYPE_STREAM_BEGINNING:
773 {
774 enum bt_message_stream_clock_snapshot_state cs_state;
775
776 clock_class =
777 bt_message_stream_beginning_borrow_stream_class_default_clock_class_const(msg);
778 if (G_UNLIKELY(!clock_class)) {
779 goto error;
780 }
781
782 cs_state = bt_message_stream_beginning_borrow_default_clock_snapshot_const(msg, &clock_snapshot);
783 if (cs_state != BT_MESSAGE_STREAM_CLOCK_SNAPSHOT_STATE_KNOWN) {
784 goto no_clock_snapshot;
785 }
786
787 break;
788 }
789 case BT_MESSAGE_TYPE_STREAM_END:
790 {
791 enum bt_message_stream_clock_snapshot_state cs_state;
792
793 clock_class =
794 bt_message_stream_end_borrow_stream_class_default_clock_class_const(msg);
795 if (G_UNLIKELY(!clock_class)) {
796 goto error;
797 }
798
799 cs_state = bt_message_stream_end_borrow_default_clock_snapshot_const(msg, &clock_snapshot);
800 if (cs_state != BT_MESSAGE_STREAM_CLOCK_SNAPSHOT_STATE_KNOWN) {
801 goto no_clock_snapshot;
802 }
803
804 break;
805 }
806 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
807 clock_class =
808 bt_message_discarded_events_borrow_stream_class_default_clock_class_const(
809 msg);
810 if (G_UNLIKELY(!clock_class)) {
811 goto error;
812 }
813
814 clock_snapshot = bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(
815 msg);
816 break;
817 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
818 clock_class =
819 bt_message_discarded_packets_borrow_stream_class_default_clock_class_const(
820 msg);
821 if (G_UNLIKELY(!clock_class)) {
822 goto error;
823 }
824
825 clock_snapshot = bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(
826 msg);
827 break;
828 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
829 clock_snapshot =
830 bt_message_message_iterator_inactivity_borrow_default_clock_snapshot_const(
831 msg);
832 break;
833 default:
834 goto no_clock_snapshot;
835 }
836
837 ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot,
838 ns_from_origin);
839 if (G_UNLIKELY(ret)) {
840 goto error;
841 }
842
843 *has_clock_snapshot = true;
844 goto end;
845
846 no_clock_snapshot:
847 *has_clock_snapshot = false;
848 goto end;
849
850 error:
851 ret = -1;
852
853 end:
854 return ret;
855 }
856
857 static inline
858 void put_messages(bt_message_array_const msgs, uint64_t count)
859 {
860 uint64_t i;
861
862 for (i = 0; i < count; i++) {
863 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
864 }
865 }
866
867 static inline
868 int set_trimmer_iterator_bound(struct trimmer_iterator *trimmer_it,
869 struct trimmer_bound *bound, int64_t ns_from_origin,
870 bool is_gmt)
871 {
872 struct trimmer_comp *trimmer_comp = trimmer_it->trimmer_comp;
873 struct tm tm;
874 struct tm *res;
875 time_t time_seconds = (time_t) (ns_from_origin / NS_PER_S);
876 int ret = 0;
877
878 BT_ASSERT(!bound->is_set);
879 errno = 0;
880
881 /* We only need to extract the date from this time */
882 if (is_gmt) {
883 res = bt_gmtime_r(&time_seconds, &tm);
884 } else {
885 res = bt_localtime_r(&time_seconds, &tm);
886 }
887
888 if (!res) {
889 BT_COMP_LOGE_APPEND_CAUSE_ERRNO(trimmer_comp->self_comp,
890 "Cannot convert timestamp to date and time",
891 ": ts=%" PRId64, (int64_t) time_seconds);
892 ret = -1;
893 goto end;
894 }
895
896 ret = set_bound_ns_from_origin(bound, tm.tm_year + 1900, tm.tm_mon + 1,
897 tm.tm_mday, bound->time.hour, bound->time.minute,
898 bound->time.second, bound->time.ns, is_gmt);
899
900 end:
901 return ret;
902 }
903
904 static
905 bt_component_class_message_iterator_next_method_status
906 state_set_trimmer_iterator_bounds(
907 struct trimmer_iterator *trimmer_it)
908 {
909 bt_message_iterator_next_status upstream_iter_status =
910 BT_MESSAGE_ITERATOR_NEXT_STATUS_OK;
911 struct trimmer_comp *trimmer_comp = trimmer_it->trimmer_comp;
912 bt_message_array_const msgs;
913 uint64_t count = 0;
914 int64_t ns_from_origin = INT64_MIN;
915 uint64_t i;
916 int ret;
917
918 BT_ASSERT(!trimmer_it->begin.is_set ||
919 !trimmer_it->end.is_set);
920
921 while (true) {
922 upstream_iter_status =
923 bt_self_component_port_input_message_iterator_next(
924 trimmer_it->upstream_iter, &msgs, &count);
925 if (upstream_iter_status != BT_MESSAGE_ITERATOR_NEXT_STATUS_OK) {
926 goto end;
927 }
928
929 for (i = 0; i < count; i++) {
930 const bt_message *msg = msgs[i];
931 bool has_ns_from_origin;
932 int ret;
933
934 ret = get_msg_ns_from_origin(msg, &ns_from_origin,
935 &has_ns_from_origin);
936 if (ret) {
937 goto error;
938 }
939
940 if (!has_ns_from_origin) {
941 continue;
942 }
943
944 BT_ASSERT(ns_from_origin != INT64_MIN &&
945 ns_from_origin != INT64_MAX);
946 put_messages(msgs, count);
947 goto found;
948 }
949
950 put_messages(msgs, count);
951 }
952
953 found:
954 if (!trimmer_it->begin.is_set) {
955 BT_ASSERT(!trimmer_it->begin.is_infinite);
956 ret = set_trimmer_iterator_bound(trimmer_it, &trimmer_it->begin,
957 ns_from_origin, trimmer_comp->is_gmt);
958 if (ret) {
959 goto error;
960 }
961 }
962
963 if (!trimmer_it->end.is_set) {
964 BT_ASSERT(!trimmer_it->end.is_infinite);
965 ret = set_trimmer_iterator_bound(trimmer_it, &trimmer_it->end,
966 ns_from_origin, trimmer_comp->is_gmt);
967 if (ret) {
968 goto error;
969 }
970 }
971
972 ret = validate_trimmer_bounds(trimmer_it->trimmer_comp,
973 &trimmer_it->begin, &trimmer_it->end);
974 if (ret) {
975 goto error;
976 }
977
978 goto end;
979
980 error:
981 put_messages(msgs, count);
982 upstream_iter_status = BT_MESSAGE_ITERATOR_NEXT_STATUS_ERROR;
983
984 end:
985 return (int) upstream_iter_status;
986 }
987
988 static
989 bt_component_class_message_iterator_next_method_status state_seek_initially(
990 struct trimmer_iterator *trimmer_it)
991 {
992 struct trimmer_comp *trimmer_comp = trimmer_it->trimmer_comp;
993 bt_component_class_message_iterator_next_method_status status =
994 BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK;
995
996 BT_ASSERT(trimmer_it->begin.is_set);
997
998 if (trimmer_it->begin.is_infinite) {
999 if (!bt_self_component_port_input_message_iterator_can_seek_beginning(
1000 trimmer_it->upstream_iter)) {
1001 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
1002 "Cannot make upstream message iterator initially seek its beginning.");
1003 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
1004 goto end;
1005 }
1006
1007 status = (int) bt_self_component_port_input_message_iterator_seek_beginning(
1008 trimmer_it->upstream_iter);
1009 } else {
1010 if (!bt_self_component_port_input_message_iterator_can_seek_ns_from_origin(
1011 trimmer_it->upstream_iter,
1012 trimmer_it->begin.ns_from_origin)) {
1013 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
1014 "Cannot make upstream message iterator initially seek: seek-ns-from-origin=%" PRId64,
1015 trimmer_it->begin.ns_from_origin);
1016 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
1017 goto end;
1018 }
1019
1020 status = (int) bt_self_component_port_input_message_iterator_seek_ns_from_origin(
1021 trimmer_it->upstream_iter, trimmer_it->begin.ns_from_origin);
1022 }
1023
1024 if (status == BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) {
1025 trimmer_it->state = TRIMMER_ITERATOR_STATE_TRIM;
1026 }
1027
1028 end:
1029 return status;
1030 }
1031
1032 static inline
1033 void push_message(struct trimmer_iterator *trimmer_it, const bt_message *msg)
1034 {
1035 g_queue_push_head(trimmer_it->output_messages, (void *) msg);
1036 }
1037
1038 static inline
1039 const bt_message *pop_message(struct trimmer_iterator *trimmer_it)
1040 {
1041 return g_queue_pop_tail(trimmer_it->output_messages);
1042 }
1043
1044 static inline
1045 int clock_raw_value_from_ns_from_origin(const bt_clock_class *clock_class,
1046 int64_t ns_from_origin, uint64_t *raw_value)
1047 {
1048
1049 int64_t cc_offset_s;
1050 uint64_t cc_offset_cycles;
1051 uint64_t cc_freq;
1052
1053 bt_clock_class_get_offset(clock_class, &cc_offset_s, &cc_offset_cycles);
1054 cc_freq = bt_clock_class_get_frequency(clock_class);
1055 return bt_common_clock_value_from_ns_from_origin(cc_offset_s,
1056 cc_offset_cycles, cc_freq, ns_from_origin, raw_value);
1057 }
1058
1059 static inline
1060 bt_component_class_message_iterator_next_method_status
1061 end_stream(struct trimmer_iterator *trimmer_it,
1062 struct trimmer_iterator_stream_state *sstate)
1063 {
1064 bt_component_class_message_iterator_next_method_status status =
1065 BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK;
1066 /* Initialize to silence maybe-uninitialized warning. */
1067 uint64_t raw_value = 0;
1068 bt_message *msg = NULL;
1069
1070 BT_ASSERT(!trimmer_it->end.is_infinite);
1071 BT_ASSERT(sstate->stream);
1072
1073 /*
1074 * If we haven't seen a message with a clock snapshot, we don't know if the trimmer's end bound is within
1075 * the clock's range, so it wouldn't be safe to try to convert ns_from_origin to a clock value.
1076 *
1077 * Also, it would be a bit of a lie to generate a stream end message with the end bound as its
1078 * clock snapshot, because we don't really know if the stream existed at that time. If we have
1079 * seen a message with a clock snapshot and the stream is cut short by another message with a
1080 * clock snapshot, then we are sure that the the end bound time is not below the clock range,
1081 * and we know the stream was active at that time (and that we cut it short).
1082 */
1083 if (sstate->seen_clock_snapshot) {
1084 const bt_clock_class *clock_class;
1085 int ret;
1086
1087 clock_class = bt_stream_class_borrow_default_clock_class_const(
1088 bt_stream_borrow_class_const(sstate->stream));
1089 BT_ASSERT(clock_class);
1090 ret = clock_raw_value_from_ns_from_origin(clock_class,
1091 trimmer_it->end.ns_from_origin, &raw_value);
1092 if (ret) {
1093 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
1094 goto end;
1095 }
1096 }
1097
1098 if (sstate->cur_packet) {
1099 /*
1100 * Create and push a packet end message, making its time
1101 * the trimming range's end time.
1102 *
1103 * We know that we must have seen a clock snapshot, the one in
1104 * the packet beginning message, since trimmer currently
1105 * requires packet messages to have clock snapshots (see comment
1106 * in create_stream_state_entry).
1107 */
1108 BT_ASSERT(sstate->seen_clock_snapshot);
1109
1110 msg = bt_message_packet_end_create_with_default_clock_snapshot(
1111 trimmer_it->self_msg_iter, sstate->cur_packet,
1112 raw_value);
1113 if (!msg) {
1114 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_MEMORY_ERROR;
1115 goto end;
1116 }
1117
1118 push_message(trimmer_it, msg);
1119 msg = NULL;
1120 BT_PACKET_PUT_REF_AND_RESET(sstate->cur_packet);
1121 }
1122
1123 /* Create and push a stream end message. */
1124 msg = bt_message_stream_end_create(trimmer_it->self_msg_iter,
1125 sstate->stream);
1126 if (!msg) {
1127 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_MEMORY_ERROR;
1128 goto end;
1129 }
1130
1131 if (sstate->seen_clock_snapshot) {
1132 bt_message_stream_end_set_default_clock_snapshot(msg, raw_value);
1133 }
1134
1135 push_message(trimmer_it, msg);
1136 msg = NULL;
1137
1138 /*
1139 * Just to make sure that we don't use this stream state again
1140 * in the future without an obvious error.
1141 */
1142 sstate->stream = NULL;
1143
1144 end:
1145 bt_message_put_ref(msg);
1146 return status;
1147 }
1148
1149 static inline
1150 bt_component_class_message_iterator_next_method_status end_iterator_streams(
1151 struct trimmer_iterator *trimmer_it)
1152 {
1153 bt_component_class_message_iterator_next_method_status status =
1154 BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK;
1155 GHashTableIter iter;
1156 gpointer key, sstate;
1157
1158 if (trimmer_it->end.is_infinite) {
1159 /*
1160 * An infinite trimming range's end time guarantees that
1161 * we received (and pushed) all the appropriate end
1162 * messages.
1163 */
1164 goto remove_all;
1165 }
1166
1167 /*
1168 * End each stream and then remove them from the hash table of
1169 * stream states to release unneeded references.
1170 */
1171 g_hash_table_iter_init(&iter, trimmer_it->stream_states);
1172
1173 while (g_hash_table_iter_next(&iter, &key, &sstate)) {
1174 status = end_stream(trimmer_it, sstate);
1175 if (status) {
1176 goto end;
1177 }
1178 }
1179
1180 remove_all:
1181 g_hash_table_remove_all(trimmer_it->stream_states);
1182
1183 end:
1184 return status;
1185 }
1186
1187 static
1188 bt_component_class_message_iterator_next_method_status
1189 create_stream_state_entry(
1190 struct trimmer_iterator *trimmer_it,
1191 const struct bt_stream *stream,
1192 struct trimmer_iterator_stream_state **stream_state)
1193 {
1194 struct trimmer_comp *trimmer_comp = trimmer_it->trimmer_comp;
1195 bt_component_class_message_iterator_next_method_status status;
1196 struct trimmer_iterator_stream_state *sstate;
1197 const bt_stream_class *sc;
1198
1199 BT_ASSERT(!bt_g_hash_table_contains(trimmer_it->stream_states, stream));
1200
1201 /*
1202 * Validate right now that the stream's class
1203 * has a registered default clock class so that
1204 * an existing stream state guarantees existing
1205 * default clock snapshots for its associated
1206 * messages.
1207 *
1208 * Also check that clock snapshots are always
1209 * known.
1210 */
1211 sc = bt_stream_borrow_class_const(stream);
1212 if (!bt_stream_class_borrow_default_clock_class_const(sc)) {
1213 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
1214 "Unsupported stream: stream class does "
1215 "not have a default clock class: "
1216 "stream-addr=%p, "
1217 "stream-id=%" PRIu64 ", "
1218 "stream-name=\"%s\"",
1219 stream, bt_stream_get_id(stream),
1220 bt_stream_get_name(stream));
1221 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
1222 goto end;
1223 }
1224
1225 /*
1226 * Temporary: make sure packet beginning, packet
1227 * end, discarded events, and discarded packets
1228 * messages have default clock snapshots until
1229 * the support for not having them is
1230 * implemented.
1231 */
1232 if (!bt_stream_class_packets_have_beginning_default_clock_snapshot(
1233 sc)) {
1234 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
1235 "Unsupported stream: packets have no beginning clock snapshot: "
1236 "stream-addr=%p, "
1237 "stream-id=%" PRIu64 ", "
1238 "stream-name=\"%s\"",
1239 stream, bt_stream_get_id(stream),
1240 bt_stream_get_name(stream));
1241 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
1242 goto end;
1243 }
1244
1245 if (!bt_stream_class_packets_have_end_default_clock_snapshot(
1246 sc)) {
1247 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
1248 "Unsupported stream: packets have no end clock snapshot: "
1249 "stream-addr=%p, "
1250 "stream-id=%" PRIu64 ", "
1251 "stream-name=\"%s\"",
1252 stream, bt_stream_get_id(stream),
1253 bt_stream_get_name(stream));
1254 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
1255 goto end;
1256 }
1257
1258 if (bt_stream_class_supports_discarded_events(sc) &&
1259 !bt_stream_class_discarded_events_have_default_clock_snapshots(sc)) {
1260 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
1261 "Unsupported stream: discarded events have no clock snapshots: "
1262 "stream-addr=%p, "
1263 "stream-id=%" PRIu64 ", "
1264 "stream-name=\"%s\"",
1265 stream, bt_stream_get_id(stream),
1266 bt_stream_get_name(stream));
1267 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
1268 goto end;
1269 }
1270
1271 if (bt_stream_class_supports_discarded_packets(sc) &&
1272 !bt_stream_class_discarded_packets_have_default_clock_snapshots(sc)) {
1273 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
1274 "Unsupported stream: discarded packets "
1275 "have no clock snapshots: "
1276 "stream-addr=%p, "
1277 "stream-id=%" PRIu64 ", "
1278 "stream-name=\"%s\"",
1279 stream, bt_stream_get_id(stream),
1280 bt_stream_get_name(stream));
1281 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
1282 goto end;
1283 }
1284
1285 sstate = g_new0(struct trimmer_iterator_stream_state, 1);
1286 if (!sstate) {
1287 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_MEMORY_ERROR;
1288 goto end;
1289 }
1290
1291 sstate->stream = stream;
1292
1293 g_hash_table_insert(trimmer_it->stream_states, (void *) stream, sstate);
1294
1295 *stream_state = sstate;
1296
1297 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK;
1298
1299 end:
1300 return status;
1301 }
1302
1303 static
1304 struct trimmer_iterator_stream_state *get_stream_state_entry(
1305 struct trimmer_iterator *trimmer_it,
1306 const struct bt_stream *stream)
1307 {
1308 struct trimmer_iterator_stream_state *sstate;
1309
1310 BT_ASSERT(stream);
1311 sstate = g_hash_table_lookup(trimmer_it->stream_states, stream);
1312 BT_ASSERT(sstate);
1313
1314 return sstate;
1315 }
1316
1317 /*
1318 * Handles a message which is associated to a given stream state. This
1319 * _could_ make the iterator's output message queue grow; this could
1320 * also consume the message without pushing anything to this queue, only
1321 * modifying the stream state.
1322 *
1323 * This function consumes the `msg` reference, _whatever the outcome_.
1324 *
1325 * If non-NULL, `ns_from_origin` is the message's time, as given by
1326 * get_msg_ns_from_origin(). If NULL, the message doesn't have a time.
1327 *
1328 * This function sets `reached_end` if handling this message made the
1329 * iterator reach the end of the trimming range. Note that the output
1330 * message queue could contain messages even if this function sets
1331 * `reached_end`.
1332 */
1333 static
1334 bt_component_class_message_iterator_next_method_status
1335 handle_message_with_stream(
1336 struct trimmer_iterator *trimmer_it, const bt_message *msg,
1337 const struct bt_stream *stream, const int64_t *ns_from_origin,
1338 bool *reached_end)
1339 {
1340 bt_component_class_message_iterator_next_method_status status =
1341 BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK;
1342 bt_message_type msg_type = bt_message_get_type(msg);
1343 int ret;
1344 struct trimmer_iterator_stream_state *sstate = NULL;
1345
1346 /*
1347 * Retrieve the stream's state - except if the message is stream
1348 * beginning, in which case we don't know about about this stream yet.
1349 */
1350 if (msg_type != BT_MESSAGE_TYPE_STREAM_BEGINNING) {
1351 sstate = get_stream_state_entry(trimmer_it, stream);
1352 }
1353
1354 switch (msg_type) {
1355 case BT_MESSAGE_TYPE_EVENT:
1356 /*
1357 * Event messages always have a clock snapshot if the stream
1358 * class has a clock class. And we know it has, otherwise we
1359 * couldn't be using the trimmer component.
1360 */
1361 BT_ASSERT(ns_from_origin);
1362
1363 if (G_UNLIKELY(!trimmer_it->end.is_infinite &&
1364 *ns_from_origin > trimmer_it->end.ns_from_origin)) {
1365 status = end_iterator_streams(trimmer_it);
1366 *reached_end = true;
1367 break;
1368 }
1369
1370 sstate->seen_clock_snapshot = true;
1371
1372 push_message(trimmer_it, msg);
1373 msg = NULL;
1374 break;
1375
1376 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
1377 /*
1378 * Packet beginning messages won't have a clock snapshot if
1379 * stream_class->packets_have_beginning_default_clock_snapshot
1380 * is false. But for now, assume they always do.
1381 */
1382 BT_ASSERT(ns_from_origin);
1383 BT_ASSERT(!sstate->cur_packet);
1384
1385 if (G_UNLIKELY(!trimmer_it->end.is_infinite &&
1386 *ns_from_origin > trimmer_it->end.ns_from_origin)) {
1387 status = end_iterator_streams(trimmer_it);
1388 *reached_end = true;
1389 break;
1390 }
1391
1392 sstate->cur_packet =
1393 bt_message_packet_beginning_borrow_packet_const(msg);
1394 bt_packet_get_ref(sstate->cur_packet);
1395
1396 sstate->seen_clock_snapshot = true;
1397
1398 push_message(trimmer_it, msg);
1399 msg = NULL;
1400 break;
1401
1402 case BT_MESSAGE_TYPE_PACKET_END:
1403 /*
1404 * Packet end messages won't have a clock snapshot if
1405 * stream_class->packets_have_end_default_clock_snapshot
1406 * is false. But for now, assume they always do.
1407 */
1408 BT_ASSERT(ns_from_origin);
1409 BT_ASSERT(sstate->cur_packet);
1410
1411 if (G_UNLIKELY(!trimmer_it->end.is_infinite &&
1412 *ns_from_origin > trimmer_it->end.ns_from_origin)) {
1413 status = end_iterator_streams(trimmer_it);
1414 *reached_end = true;
1415 break;
1416 }
1417
1418 BT_PACKET_PUT_REF_AND_RESET(sstate->cur_packet);
1419
1420 sstate->seen_clock_snapshot = true;
1421
1422 push_message(trimmer_it, msg);
1423 msg = NULL;
1424 break;
1425
1426 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
1427 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
1428 {
1429 /*
1430 * `ns_from_origin` is the message's time range's
1431 * beginning time here.
1432 */
1433 int64_t end_ns_from_origin;
1434 const bt_clock_snapshot *end_cs;
1435
1436 BT_ASSERT(ns_from_origin);
1437
1438 sstate->seen_clock_snapshot = true;
1439
1440 if (bt_message_get_type(msg) ==
1441 BT_MESSAGE_TYPE_DISCARDED_EVENTS) {
1442 /*
1443 * Safe to ignore the return value because we
1444 * know there's a default clock and it's always
1445 * known.
1446 */
1447 end_cs = bt_message_discarded_events_borrow_end_default_clock_snapshot_const(
1448 msg);
1449 } else {
1450 /*
1451 * Safe to ignore the return value because we
1452 * know there's a default clock and it's always
1453 * known.
1454 */
1455 end_cs = bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(
1456 msg);
1457 }
1458
1459 if (bt_clock_snapshot_get_ns_from_origin(end_cs,
1460 &end_ns_from_origin)) {
1461 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
1462 goto end;
1463 }
1464
1465 if (!trimmer_it->end.is_infinite &&
1466 *ns_from_origin > trimmer_it->end.ns_from_origin) {
1467 status = end_iterator_streams(trimmer_it);
1468 *reached_end = true;
1469 break;
1470 }
1471
1472 if (!trimmer_it->end.is_infinite &&
1473 end_ns_from_origin > trimmer_it->end.ns_from_origin) {
1474 /*
1475 * This message's end time is outside the
1476 * trimming time range: replace it with a new
1477 * message having an end time equal to the
1478 * trimming time range's end and without a
1479 * count.
1480 */
1481 const bt_clock_class *clock_class =
1482 bt_clock_snapshot_borrow_clock_class_const(
1483 end_cs);
1484 const bt_clock_snapshot *begin_cs;
1485 bt_message *new_msg;
1486 uint64_t end_raw_value;
1487
1488 ret = clock_raw_value_from_ns_from_origin(clock_class,
1489 trimmer_it->end.ns_from_origin, &end_raw_value);
1490 if (ret) {
1491 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
1492 goto end;
1493 }
1494
1495 if (msg_type == BT_MESSAGE_TYPE_DISCARDED_EVENTS) {
1496 begin_cs = bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(
1497 msg);
1498 new_msg = bt_message_discarded_events_create_with_default_clock_snapshots(
1499 trimmer_it->self_msg_iter,
1500 sstate->stream,
1501 bt_clock_snapshot_get_value(begin_cs),
1502 end_raw_value);
1503 } else {
1504 begin_cs = bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(
1505 msg);
1506 new_msg = bt_message_discarded_packets_create_with_default_clock_snapshots(
1507 trimmer_it->self_msg_iter,
1508 sstate->stream,
1509 bt_clock_snapshot_get_value(begin_cs),
1510 end_raw_value);
1511 }
1512
1513 if (!new_msg) {
1514 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_MEMORY_ERROR;
1515 goto end;
1516 }
1517
1518 /* Replace the original message */
1519 BT_MESSAGE_MOVE_REF(msg, new_msg);
1520 }
1521
1522 push_message(trimmer_it, msg);
1523 msg = NULL;
1524 break;
1525 }
1526
1527 case BT_MESSAGE_TYPE_STREAM_BEGINNING:
1528 /*
1529 * If this message has a time and this time is greater than the
1530 * trimmer's end bound, it triggers the end of the trim window.
1531 */
1532 if (G_UNLIKELY(ns_from_origin && !trimmer_it->end.is_infinite &&
1533 *ns_from_origin > trimmer_it->end.ns_from_origin)) {
1534 status = end_iterator_streams(trimmer_it);
1535 *reached_end = true;
1536 break;
1537 }
1538
1539 /* Learn about this stream. */
1540 status = create_stream_state_entry(trimmer_it, stream, &sstate);
1541 if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) {
1542 goto end;
1543 }
1544
1545 if (ns_from_origin) {
1546 sstate->seen_clock_snapshot = true;
1547 }
1548
1549 push_message(trimmer_it, msg);
1550 msg = NULL;
1551 break;
1552 case BT_MESSAGE_TYPE_STREAM_END:
1553 {
1554 gboolean removed;
1555
1556 /*
1557 * If this message has a time and this time is greater than the
1558 * trimmer's end bound, it triggers the end of the trim window.
1559 */
1560 if (G_UNLIKELY(ns_from_origin && !trimmer_it->end.is_infinite &&
1561 *ns_from_origin > trimmer_it->end.ns_from_origin)) {
1562 status = end_iterator_streams(trimmer_it);
1563 *reached_end = true;
1564 break;
1565 }
1566
1567 /*
1568 * Either the stream end message's time is within the trimmer's
1569 * bounds, or it doesn't have a time. In both cases, pass
1570 * the message unmodified.
1571 */
1572 push_message(trimmer_it, msg);
1573 msg = NULL;
1574
1575 /* Forget about this stream. */
1576 removed = g_hash_table_remove(trimmer_it->stream_states, sstate->stream);
1577 BT_ASSERT(removed);
1578 break;
1579 }
1580 default:
1581 break;
1582 }
1583
1584 end:
1585 /* We release the message's reference whatever the outcome */
1586 bt_message_put_ref(msg);
1587 return status;
1588 }
1589
1590 /*
1591 * Handles an input message. This _could_ make the iterator's output
1592 * message queue grow; this could also consume the message without
1593 * pushing anything to this queue, only modifying the stream state.
1594 *
1595 * This function consumes the `msg` reference, _whatever the outcome_.
1596 *
1597 * This function sets `reached_end` if handling this message made the
1598 * iterator reach the end of the trimming range. Note that the output
1599 * message queue could contain messages even if this function sets
1600 * `reached_end`.
1601 */
1602 static inline
1603 bt_component_class_message_iterator_next_method_status handle_message(
1604 struct trimmer_iterator *trimmer_it, const bt_message *msg,
1605 bool *reached_end)
1606 {
1607 bt_component_class_message_iterator_next_method_status status;
1608 const bt_stream *stream = NULL;
1609 int64_t ns_from_origin = INT64_MIN;
1610 bool has_ns_from_origin = false;
1611 int ret;
1612
1613 /* Find message's associated stream */
1614 switch (bt_message_get_type(msg)) {
1615 case BT_MESSAGE_TYPE_EVENT:
1616 stream = bt_event_borrow_stream_const(
1617 bt_message_event_borrow_event_const(msg));
1618 break;
1619 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
1620 stream = bt_packet_borrow_stream_const(
1621 bt_message_packet_beginning_borrow_packet_const(msg));
1622 break;
1623 case BT_MESSAGE_TYPE_PACKET_END:
1624 stream = bt_packet_borrow_stream_const(
1625 bt_message_packet_end_borrow_packet_const(msg));
1626 break;
1627 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
1628 stream = bt_message_discarded_events_borrow_stream_const(msg);
1629 break;
1630 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
1631 stream = bt_message_discarded_packets_borrow_stream_const(msg);
1632 break;
1633 case BT_MESSAGE_TYPE_STREAM_BEGINNING:
1634 stream = bt_message_stream_beginning_borrow_stream_const(msg);
1635 break;
1636 case BT_MESSAGE_TYPE_STREAM_END:
1637 stream = bt_message_stream_end_borrow_stream_const(msg);
1638 break;
1639 default:
1640 break;
1641 }
1642
1643 /* Retrieve the message's time */
1644 ret = get_msg_ns_from_origin(msg, &ns_from_origin, &has_ns_from_origin);
1645 if (G_UNLIKELY(ret)) {
1646 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
1647 goto end;
1648 }
1649
1650 if (G_LIKELY(stream)) {
1651 /* Message associated to a stream */
1652 status = handle_message_with_stream(trimmer_it, msg,
1653 stream, has_ns_from_origin ? &ns_from_origin : NULL, reached_end);
1654
1655 /*
1656 * handle_message_with_stream_state() unconditionally
1657 * consumes `msg`.
1658 */
1659 msg = NULL;
1660 } else {
1661 /*
1662 * Message not associated to a stream (message iterator
1663 * inactivity).
1664 */
1665 if (G_UNLIKELY(ns_from_origin > trimmer_it->end.ns_from_origin)) {
1666 BT_MESSAGE_PUT_REF_AND_RESET(msg);
1667 status = end_iterator_streams(trimmer_it);
1668 *reached_end = true;
1669 } else {
1670 push_message(trimmer_it, msg);
1671 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK;
1672 msg = NULL;
1673 }
1674 }
1675
1676 end:
1677 /* We release the message's reference whatever the outcome */
1678 bt_message_put_ref(msg);
1679 return status;
1680 }
1681
1682 static inline
1683 void fill_message_array_from_output_messages(
1684 struct trimmer_iterator *trimmer_it,
1685 bt_message_array_const msgs, uint64_t capacity, uint64_t *count)
1686 {
1687 *count = 0;
1688
1689 /*
1690 * Move auto-seek messages to the output array (which is this
1691 * iterator's base message array).
1692 */
1693 while (capacity > 0 && !g_queue_is_empty(trimmer_it->output_messages)) {
1694 msgs[*count] = pop_message(trimmer_it);
1695 capacity--;
1696 (*count)++;
1697 }
1698
1699 BT_ASSERT(*count > 0);
1700 }
1701
1702 static inline
1703 bt_component_class_message_iterator_next_method_status state_ending(
1704 struct trimmer_iterator *trimmer_it,
1705 bt_message_array_const msgs, uint64_t capacity,
1706 uint64_t *count)
1707 {
1708 bt_component_class_message_iterator_next_method_status status =
1709 BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK;
1710
1711 if (g_queue_is_empty(trimmer_it->output_messages)) {
1712 trimmer_it->state = TRIMMER_ITERATOR_STATE_ENDED;
1713 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_END;
1714 goto end;
1715 }
1716
1717 fill_message_array_from_output_messages(trimmer_it, msgs,
1718 capacity, count);
1719
1720 end:
1721 return status;
1722 }
1723
1724 static inline
1725 bt_component_class_message_iterator_next_method_status
1726 state_trim(struct trimmer_iterator *trimmer_it,
1727 bt_message_array_const msgs, uint64_t capacity,
1728 uint64_t *count)
1729 {
1730 bt_component_class_message_iterator_next_method_status status =
1731 BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK;
1732 bt_message_array_const my_msgs;
1733 uint64_t my_count;
1734 uint64_t i;
1735 bool reached_end = false;
1736
1737 while (g_queue_is_empty(trimmer_it->output_messages)) {
1738 status = (int) bt_self_component_port_input_message_iterator_next(
1739 trimmer_it->upstream_iter, &my_msgs, &my_count);
1740 if (G_UNLIKELY(status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK)) {
1741 if (status == BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_END) {
1742 status = end_iterator_streams(trimmer_it);
1743 if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) {
1744 goto end;
1745 }
1746
1747 trimmer_it->state =
1748 TRIMMER_ITERATOR_STATE_ENDING;
1749 status = state_ending(trimmer_it, msgs,
1750 capacity, count);
1751 }
1752
1753 goto end;
1754 }
1755
1756 BT_ASSERT(my_count > 0);
1757
1758 for (i = 0; i < my_count; i++) {
1759 status = handle_message(trimmer_it, my_msgs[i],
1760 &reached_end);
1761
1762 /*
1763 * handle_message() unconditionally consumes the
1764 * message reference.
1765 */
1766 my_msgs[i] = NULL;
1767
1768 if (G_UNLIKELY(status !=
1769 BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK)) {
1770 put_messages(my_msgs, my_count);
1771 goto end;
1772 }
1773
1774 if (G_UNLIKELY(reached_end)) {
1775 /*
1776 * This message's time was passed the
1777 * trimming time range's end time: we
1778 * are done. Their might still be
1779 * messages in the output message queue,
1780 * so move to the "ending" state and
1781 * apply it immediately since
1782 * state_trim() is called within the
1783 * "next" method.
1784 */
1785 put_messages(my_msgs, my_count);
1786 trimmer_it->state =
1787 TRIMMER_ITERATOR_STATE_ENDING;
1788 status = state_ending(trimmer_it, msgs,
1789 capacity, count);
1790 goto end;
1791 }
1792 }
1793 }
1794
1795 /*
1796 * There's at least one message in the output message queue:
1797 * move the messages to the output message array.
1798 */
1799 BT_ASSERT(!g_queue_is_empty(trimmer_it->output_messages));
1800 fill_message_array_from_output_messages(trimmer_it, msgs,
1801 capacity, count);
1802
1803 end:
1804 return status;
1805 }
1806
1807 BT_HIDDEN
1808 bt_component_class_message_iterator_next_method_status trimmer_msg_iter_next(
1809 bt_self_message_iterator *self_msg_iter,
1810 bt_message_array_const msgs, uint64_t capacity,
1811 uint64_t *count)
1812 {
1813 struct trimmer_iterator *trimmer_it =
1814 bt_self_message_iterator_get_data(self_msg_iter);
1815 bt_component_class_message_iterator_next_method_status status =
1816 BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK;
1817
1818 BT_ASSERT(trimmer_it);
1819
1820 if (G_LIKELY(trimmer_it->state == TRIMMER_ITERATOR_STATE_TRIM)) {
1821 status = state_trim(trimmer_it, msgs, capacity, count);
1822 if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) {
1823 goto end;
1824 }
1825 } else {
1826 switch (trimmer_it->state) {
1827 case TRIMMER_ITERATOR_STATE_SET_BOUNDS_NS_FROM_ORIGIN:
1828 status = state_set_trimmer_iterator_bounds(trimmer_it);
1829 if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) {
1830 goto end;
1831 }
1832
1833 status = state_seek_initially(trimmer_it);
1834 if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) {
1835 goto end;
1836 }
1837
1838 status = state_trim(trimmer_it, msgs, capacity, count);
1839 if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) {
1840 goto end;
1841 }
1842
1843 break;
1844 case TRIMMER_ITERATOR_STATE_SEEK_INITIALLY:
1845 status = state_seek_initially(trimmer_it);
1846 if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) {
1847 goto end;
1848 }
1849
1850 status = state_trim(trimmer_it, msgs, capacity, count);
1851 if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) {
1852 goto end;
1853 }
1854
1855 break;
1856 case TRIMMER_ITERATOR_STATE_ENDING:
1857 status = state_ending(trimmer_it, msgs, capacity,
1858 count);
1859 if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) {
1860 goto end;
1861 }
1862
1863 break;
1864 case TRIMMER_ITERATOR_STATE_ENDED:
1865 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_END;
1866 break;
1867 default:
1868 abort();
1869 }
1870 }
1871
1872 end:
1873 return status;
1874 }
1875
1876 BT_HIDDEN
1877 void trimmer_msg_iter_finalize(bt_self_message_iterator *self_msg_iter)
1878 {
1879 struct trimmer_iterator *trimmer_it =
1880 bt_self_message_iterator_get_data(self_msg_iter);
1881
1882 BT_ASSERT(trimmer_it);
1883 destroy_trimmer_iterator(trimmer_it);
1884 }
This page took 0.068241 seconds and 5 git commands to generate.