lib: introduce bt_message_iterator_class
[babeltrace.git] / src / plugins / utils / trimmer / trimmer.c
1 /*
2 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 * Copyright 2019 Philippe Proulx <pproulx@efficios.com>
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a copy
6 * of this software and associated documentation files (the "Software"), to deal
7 * in the Software without restriction, including without limitation the rights
8 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 * copies of the Software, and to permit persons to whom the Software is
10 * furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21 * SOFTWARE.
22 */
23
24 #define BT_LOG_OUTPUT_LEVEL (trimmer_comp->log_level)
25 #define BT_LOG_TAG "PLUGIN/FLT.UTILS.TRIMMER"
26 #include "logging/comp-logging.h"
27
28 #include "compat/utc.h"
29 #include "compat/time.h"
30 #include <babeltrace2/babeltrace.h>
31 #include "common/common.h"
32 #include "common/assert.h"
33 #include <stdbool.h>
34 #include <stdint.h>
35 #include <inttypes.h>
36 #include <glib.h>
37 #include "compat/glib.h"
38 #include "plugins/common/param-validation/param-validation.h"
39
40 #include "trimmer.h"
41
42 #define NS_PER_S INT64_C(1000000000)
43
44 static const char * const in_port_name = "in";
45
46 struct trimmer_time {
47 unsigned int hour, minute, second, ns;
48 };
49
50 struct trimmer_bound {
51 /*
52 * Nanoseconds from origin, valid if `is_set` is set and
53 * `is_infinite` is false.
54 */
55 int64_t ns_from_origin;
56
57 /* True if this bound's full time (`ns_from_origin`) is set */
58 bool is_set;
59
60 /*
61 * True if this bound represents the infinity (negative or
62 * positive depending on which bound it is). If this is true,
63 * then we don't care about `ns_from_origin` above.
64 */
65 bool is_infinite;
66
67 /*
68 * This bound's time without the date; this time is used to set
69 * `ns_from_origin` once we know the date.
70 */
71 struct trimmer_time time;
72 };
73
74 struct trimmer_comp {
75 struct trimmer_bound begin, end;
76 bool is_gmt;
77 bt_logging_level log_level;
78 bt_self_component *self_comp;
79 bt_self_component_filter *self_comp_filter;
80 };
81
82 enum trimmer_iterator_state {
83 /*
84 * Find the first message's date and set the bounds's times
85 * accordingly.
86 */
87 TRIMMER_ITERATOR_STATE_SET_BOUNDS_NS_FROM_ORIGIN,
88
89 /*
90 * Initially seek to the trimming range's beginning time.
91 */
92 TRIMMER_ITERATOR_STATE_SEEK_INITIALLY,
93
94 /*
95 * Fill the output message queue for as long as received input
96 * messages are within the trimming time range.
97 */
98 TRIMMER_ITERATOR_STATE_TRIM,
99
100 /* Flush the remaining messages in the output message queue */
101 TRIMMER_ITERATOR_STATE_ENDING,
102
103 /* Trimming operation and message iterator is ended */
104 TRIMMER_ITERATOR_STATE_ENDED,
105 };
106
107 struct trimmer_iterator {
108 /* Weak */
109 struct trimmer_comp *trimmer_comp;
110
111 /* Weak */
112 bt_self_message_iterator *self_msg_iter;
113
114 enum trimmer_iterator_state state;
115
116 /* Owned by this */
117 bt_self_component_port_input_message_iterator *upstream_iter;
118 struct trimmer_bound begin, end;
119
120 /*
121 * Queue of `const bt_message *` (owned by the queue).
122 *
123 * This is where the trimming operation pushes the messages to
124 * output by this message iterator.
125 */
126 GQueue *output_messages;
127
128 /*
129 * Hash table of `bt_stream *` (weak) to
130 * `struct trimmer_iterator_stream_state *` (owned by the HT).
131 */
132 GHashTable *stream_states;
133 };
134
135 struct trimmer_iterator_stream_state {
136 /* Weak */
137 const bt_stream *stream;
138
139 /* Have we seen a message with clock_snapshot going through this stream? */
140 bool seen_clock_snapshot;
141
142 /* Owned by this (`NULL` initially and between packets) */
143 const bt_packet *cur_packet;
144 };
145
146 static
147 void destroy_trimmer_comp(struct trimmer_comp *trimmer_comp)
148 {
149 BT_ASSERT(trimmer_comp);
150 g_free(trimmer_comp);
151 }
152
153 static
154 struct trimmer_comp *create_trimmer_comp(void)
155 {
156 return g_new0(struct trimmer_comp, 1);
157 }
158
159 BT_HIDDEN
160 void trimmer_finalize(bt_self_component_filter *self_comp)
161 {
162 struct trimmer_comp *trimmer_comp =
163 bt_self_component_get_data(
164 bt_self_component_filter_as_self_component(self_comp));
165
166 if (trimmer_comp) {
167 destroy_trimmer_comp(trimmer_comp);
168 }
169 }
170
171 /*
172 * Compile regex in `pattern`, and try to match `string`. If there's a match,
173 * return true and set `*match_info` to the list of matches. The list of
174 * matches must be freed by the caller. If there's no match, return false and
175 * set `*match_info` to NULL;
176 */
177 static
178 bool compile_and_match(const char *pattern, const char *string, GMatchInfo **match_info) {
179 bool matches = false;
180 GError *regex_error = NULL;
181 GRegex *regex;
182
183 regex = g_regex_new(pattern, 0, 0, &regex_error);
184 if (!regex) {
185 goto end;
186 }
187
188 matches = g_regex_match(regex, string, 0, match_info);
189 if (!matches) {
190 /*
191 * g_regex_match allocates `*match_info` even if it returns
192 * FALSE. If there's no match, we have no use for it, so free
193 * it immediatly and don't return it to the caller.
194 */
195 g_match_info_free(*match_info);
196 *match_info = NULL;
197 }
198
199 g_regex_unref(regex);
200
201 end:
202
203 if (regex_error) {
204 g_error_free(regex_error);
205 }
206
207 return matches;
208 }
209
210 /*
211 * Convert the captured text in match number `match_num` in `match_info`
212 * to an unsigned integer.
213 */
214 static
215 guint64 match_to_uint(const GMatchInfo *match_info, gint match_num) {
216 gchar *text, *endptr;
217 guint64 result;
218
219 text = g_match_info_fetch(match_info, match_num);
220 BT_ASSERT(text);
221
222 /*
223 * Because the input is carefully sanitized with regexes by the caller,
224 * we assume that g_ascii_strtoull cannot fail.
225 */
226 errno = 0;
227 result = g_ascii_strtoull(text, &endptr, 10);
228 BT_ASSERT(endptr > text);
229 BT_ASSERT(errno == 0);
230
231 g_free(text);
232
233 return result;
234 }
235
236 /*
237 * When parsing the nanoseconds part, .512 means .512000000, not .000000512.
238 * This function is like match_to_uint, but multiplies the parsed number to get
239 * the expected result.
240 */
241 static
242 guint64 match_to_uint_ns(const GMatchInfo *match_info, gint match_num) {
243 guint64 nanoseconds;
244 gboolean ret;
245 gint start_pos, end_pos, power;
246 static int pow10[] = {
247 1, 10, 100, 1000, 10000, 100000, 1000000, 10000000, 100000000,
248 };
249
250 nanoseconds = match_to_uint(match_info, match_num);
251
252 /* Multiply by 10 as many times as there are omitted digits. */
253 ret = g_match_info_fetch_pos(match_info, match_num, &start_pos, &end_pos);
254 BT_ASSERT(ret);
255
256 power = 9 - (end_pos - start_pos);
257 BT_ASSERT(power >= 0 && power <= 8);
258
259 nanoseconds *= pow10[power];
260
261 return nanoseconds;
262 }
263
264 /*
265 * Sets the time (in ns from origin) of a trimmer bound from date and
266 * time components.
267 *
268 * Returns a negative value if anything goes wrong.
269 */
270 static
271 int set_bound_ns_from_origin(struct trimmer_bound *bound,
272 unsigned int year, unsigned int month, unsigned int day,
273 unsigned int hour, unsigned int minute, unsigned int second,
274 unsigned int ns, bool is_gmt)
275 {
276 int ret = 0;
277 time_t result;
278 struct tm tm = {
279 .tm_sec = second,
280 .tm_min = minute,
281 .tm_hour = hour,
282 .tm_mday = day,
283 .tm_mon = month - 1,
284 .tm_year = year - 1900,
285 .tm_isdst = -1,
286 };
287
288 if (is_gmt) {
289 result = bt_timegm(&tm);
290 } else {
291 result = mktime(&tm);
292 }
293
294 if (result < 0) {
295 ret = -1;
296 goto end;
297 }
298
299 BT_ASSERT(bound);
300 bound->ns_from_origin = (int64_t) result;
301 bound->ns_from_origin *= NS_PER_S;
302 bound->ns_from_origin += ns;
303 bound->is_set = true;
304
305 end:
306 return ret;
307 }
308
309 /*
310 * Parses a timestamp, figuring out its format.
311 *
312 * Returns a negative value if anything goes wrong.
313 *
314 * Expected formats:
315 *
316 * YYYY-MM-DD hh:mm[:ss[.ns]]
317 * [hh:mm:]ss[.ns]
318 * [-]s[.ns]
319 *
320 * TODO: Check overflows.
321 */
322 static
323 int set_bound_from_str(struct trimmer_comp *trimmer_comp,
324 const char *str, struct trimmer_bound *bound, bool is_gmt)
325 {
326 /* Matches YYYY-MM-DD */
327 #define DATE_RE "([0-9]{4})-([0-9]{2})-([0-9]{2})"
328
329 /* Matches HH:MM[:SS[.NS]] */
330 #define TIME_RE "([0-9]{2}):([0-9]{2})(?::([0-9]{2})(?:\\.([0-9]{1,9}))?)?"
331
332 /* Matches [-]SS[.NS] */
333 #define S_NS_RE "^(-?)([0-9]+)(?:\\.([0-9]{1,9}))?$"
334
335 GMatchInfo *match_info;
336 int ret = 0;
337
338 /* Try `YYYY-MM-DD hh:mm[:ss[.ns]]` format */
339 if (compile_and_match("^" DATE_RE " " TIME_RE "$", str, &match_info)) {
340 unsigned int year = 0, month = 0, day = 0, hours = 0, minutes = 0, seconds = 0, nanoseconds = 0;
341 gint match_count = g_match_info_get_match_count(match_info);
342
343 BT_ASSERT(match_count >= 6 && match_count <= 8);
344
345 year = match_to_uint(match_info, 1);
346 month = match_to_uint(match_info, 2);
347 day = match_to_uint(match_info, 3);
348 hours = match_to_uint(match_info, 4);
349 minutes = match_to_uint(match_info, 5);
350
351 if (match_count >= 7) {
352 seconds = match_to_uint(match_info, 6);
353 }
354
355 if (match_count >= 8) {
356 nanoseconds = match_to_uint_ns(match_info, 7);
357 }
358
359 set_bound_ns_from_origin(bound, year, month, day, hours, minutes, seconds, nanoseconds, is_gmt);
360
361 goto end;
362 }
363
364 if (compile_and_match("^" DATE_RE "$", str, &match_info)) {
365 unsigned int year = 0, month = 0, day = 0;
366
367 BT_ASSERT(g_match_info_get_match_count(match_info) == 4);
368
369 year = match_to_uint(match_info, 1);
370 month = match_to_uint(match_info, 2);
371 day = match_to_uint(match_info, 3);
372
373 set_bound_ns_from_origin(bound, year, month, day, 0, 0, 0, 0, is_gmt);
374
375 goto end;
376 }
377
378 /* Try `hh:mm[:ss[.ns]]` format */
379 if (compile_and_match("^" TIME_RE "$", str, &match_info)) {
380 gint match_count = g_match_info_get_match_count(match_info);
381 BT_ASSERT(match_count >= 3 && match_count <= 5);
382 bound->time.hour = match_to_uint(match_info, 1);
383 bound->time.minute = match_to_uint(match_info, 2);
384
385 if (match_count >= 4) {
386 bound->time.second = match_to_uint(match_info, 3);
387 }
388
389 if (match_count >= 5) {
390 bound->time.ns = match_to_uint_ns(match_info, 4);
391 }
392
393 goto end;
394 }
395
396 /* Try `[-]s[.ns]` format */
397 if (compile_and_match("^" S_NS_RE "$", str, &match_info)) {
398 gboolean is_neg, fetch_pos_ret;
399 gint start_pos, end_pos, match_count;
400 guint64 seconds, nanoseconds = 0;
401
402 match_count = g_match_info_get_match_count(match_info);
403 BT_ASSERT(match_count >= 3 && match_count <= 4);
404
405 /* Check for presence of negation sign. */
406 fetch_pos_ret = g_match_info_fetch_pos(match_info, 1, &start_pos, &end_pos);
407 BT_ASSERT(fetch_pos_ret);
408 is_neg = (end_pos - start_pos) > 0;
409
410 seconds = match_to_uint(match_info, 2);
411
412 if (match_count >= 4) {
413 nanoseconds = match_to_uint_ns(match_info, 3);
414 }
415
416 bound->ns_from_origin = seconds * NS_PER_S + nanoseconds;
417
418 if (is_neg) {
419 bound->ns_from_origin = -bound->ns_from_origin;
420 }
421
422 bound->is_set = true;
423
424 goto end;
425 }
426
427 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
428 "Invalid date/time format: param=\"%s\"", str);
429 ret = -1;
430
431 end:
432 g_match_info_free(match_info);
433
434 return ret;
435 }
436
437 /*
438 * Sets a trimmer bound's properties from a parameter string/integer
439 * value.
440 *
441 * Returns a negative value if anything goes wrong.
442 */
443 static
444 int set_bound_from_param(struct trimmer_comp *trimmer_comp,
445 const char *param_name, const bt_value *param,
446 struct trimmer_bound *bound, bool is_gmt)
447 {
448 int ret;
449 const char *arg;
450 char tmp_arg[64];
451
452 if (bt_value_is_signed_integer(param)) {
453 int64_t value = bt_value_integer_signed_get(param);
454
455 /*
456 * Just convert it to a temporary string to handle
457 * everything the same way.
458 */
459 sprintf(tmp_arg, "%" PRId64, value);
460 arg = tmp_arg;
461 } else {
462 BT_ASSERT(bt_value_is_string(param));
463 arg = bt_value_string_get(param);
464 }
465
466 ret = set_bound_from_str(trimmer_comp, arg, bound, is_gmt);
467
468 return ret;
469 }
470
471 static
472 int validate_trimmer_bounds(struct trimmer_comp *trimmer_comp,
473 struct trimmer_bound *begin, struct trimmer_bound *end)
474 {
475 int ret = 0;
476
477 BT_ASSERT(begin->is_set);
478 BT_ASSERT(end->is_set);
479
480 if (!begin->is_infinite && !end->is_infinite &&
481 begin->ns_from_origin > end->ns_from_origin) {
482 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
483 "Trimming time range's beginning time is greater than end time: "
484 "begin-ns-from-origin=%" PRId64 ", "
485 "end-ns-from-origin=%" PRId64,
486 begin->ns_from_origin,
487 end->ns_from_origin);
488 ret = -1;
489 goto end;
490 }
491
492 if (!begin->is_infinite && begin->ns_from_origin == INT64_MIN) {
493 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
494 "Invalid trimming time range's beginning time: "
495 "ns-from-origin=%" PRId64,
496 begin->ns_from_origin);
497 ret = -1;
498 goto end;
499 }
500
501 if (!end->is_infinite && end->ns_from_origin == INT64_MIN) {
502 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
503 "Invalid trimming time range's end time: "
504 "ns-from-origin=%" PRId64,
505 end->ns_from_origin);
506 ret = -1;
507 goto end;
508 }
509
510 end:
511 return ret;
512 }
513
514 static
515 enum bt_param_validation_status validate_bound_type(
516 const bt_value *value,
517 struct bt_param_validation_context *context)
518 {
519 enum bt_param_validation_status status = BT_PARAM_VALIDATION_STATUS_OK;
520
521 if (!bt_value_is_signed_integer(value) &&
522 !bt_value_is_string(value)) {
523 status = bt_param_validation_error(context,
524 "unexpected type: expected-types=[%s, %s], actual-type=%s",
525 bt_common_value_type_string(BT_VALUE_TYPE_SIGNED_INTEGER),
526 bt_common_value_type_string(BT_VALUE_TYPE_STRING),
527 bt_common_value_type_string(bt_value_get_type(value)));
528 }
529
530 return status;
531 }
532
533 static
534 struct bt_param_validation_map_value_entry_descr trimmer_params[] = {
535 { "gmt", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL, { .type = BT_VALUE_TYPE_BOOL } },
536 { "begin", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL, { .validation_func = validate_bound_type } },
537 { "end", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL, { .validation_func = validate_bound_type } },
538 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END
539 };
540
541 static
542 bt_component_class_initialize_method_status init_trimmer_comp_from_params(
543 struct trimmer_comp *trimmer_comp,
544 const bt_value *params)
545 {
546 const bt_value *value;
547 bt_component_class_initialize_method_status status;
548 enum bt_param_validation_status validation_status;
549 gchar *validate_error = NULL;
550
551 validation_status = bt_param_validation_validate(params,
552 trimmer_params, &validate_error);
553 if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
554 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
555 goto end;
556 } else if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) {
557 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
558 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp, "%s",
559 validate_error);
560 goto end;
561 }
562
563 BT_ASSERT(params);
564 value = bt_value_map_borrow_entry_value_const(params, "gmt");
565 if (value) {
566 trimmer_comp->is_gmt = (bool) bt_value_bool_get(value);
567 }
568
569 value = bt_value_map_borrow_entry_value_const(params, "begin");
570 if (value) {
571 if (set_bound_from_param(trimmer_comp, "begin", value,
572 &trimmer_comp->begin, trimmer_comp->is_gmt)) {
573 /* set_bound_from_param() logs errors */
574 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
575 goto end;
576 }
577 } else {
578 trimmer_comp->begin.is_infinite = true;
579 trimmer_comp->begin.is_set = true;
580 }
581
582 value = bt_value_map_borrow_entry_value_const(params, "end");
583 if (value) {
584 if (set_bound_from_param(trimmer_comp, "end", value,
585 &trimmer_comp->end, trimmer_comp->is_gmt)) {
586 /* set_bound_from_param() logs errors */
587 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
588 goto end;
589 }
590 } else {
591 trimmer_comp->end.is_infinite = true;
592 trimmer_comp->end.is_set = true;
593 }
594
595 if (trimmer_comp->begin.is_set && trimmer_comp->end.is_set) {
596 /* validate_trimmer_bounds() logs errors */
597 if (validate_trimmer_bounds(trimmer_comp,
598 &trimmer_comp->begin, &trimmer_comp->end)) {
599 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
600 goto end;
601 }
602 }
603
604 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
605
606 end:
607 g_free(validate_error);
608
609 return status;
610 }
611
612 bt_component_class_initialize_method_status trimmer_init(
613 bt_self_component_filter *self_comp_flt,
614 bt_self_component_filter_configuration *config,
615 const bt_value *params, void *init_data)
616 {
617 bt_component_class_initialize_method_status status;
618 bt_self_component_add_port_status add_port_status;
619 struct trimmer_comp *trimmer_comp = create_trimmer_comp();
620 bt_self_component *self_comp =
621 bt_self_component_filter_as_self_component(self_comp_flt);
622
623 if (!trimmer_comp) {
624 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
625 goto error;
626 }
627
628 trimmer_comp->log_level = bt_component_get_logging_level(
629 bt_self_component_as_component(self_comp));
630 trimmer_comp->self_comp = self_comp;
631 trimmer_comp->self_comp_filter = self_comp_flt;
632
633 add_port_status = bt_self_component_filter_add_input_port(
634 self_comp_flt, in_port_name, NULL, NULL);
635 if (add_port_status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) {
636 status = (int) add_port_status;
637 goto error;
638 }
639
640 add_port_status = bt_self_component_filter_add_output_port(
641 self_comp_flt, "out", NULL, NULL);
642 if (add_port_status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) {
643 status = (int) add_port_status;
644 goto error;
645 }
646
647 status = init_trimmer_comp_from_params(trimmer_comp, params);
648 if (status != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
649 goto error;
650 }
651
652 bt_self_component_set_data(self_comp, trimmer_comp);
653
654 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
655 goto end;
656
657 error:
658 if (trimmer_comp) {
659 destroy_trimmer_comp(trimmer_comp);
660 }
661
662 end:
663 return status;
664 }
665
666 static
667 void destroy_trimmer_iterator(struct trimmer_iterator *trimmer_it)
668 {
669 if (!trimmer_it) {
670 goto end;
671 }
672
673 bt_self_component_port_input_message_iterator_put_ref(
674 trimmer_it->upstream_iter);
675
676 if (trimmer_it->output_messages) {
677 g_queue_free(trimmer_it->output_messages);
678 }
679
680 if (trimmer_it->stream_states) {
681 g_hash_table_destroy(trimmer_it->stream_states);
682 }
683
684 g_free(trimmer_it);
685 end:
686 return;
687 }
688
689 static
690 void destroy_trimmer_iterator_stream_state(
691 struct trimmer_iterator_stream_state *sstate)
692 {
693 BT_ASSERT(sstate);
694 BT_PACKET_PUT_REF_AND_RESET(sstate->cur_packet);
695 g_free(sstate);
696 }
697
698 BT_HIDDEN
699 bt_message_iterator_class_initialize_method_status trimmer_msg_iter_init(
700 bt_self_message_iterator *self_msg_iter,
701 bt_self_message_iterator_configuration *config,
702 bt_self_component *self_comp,
703 bt_self_component_port_output *port)
704 {
705 bt_message_iterator_class_initialize_method_status status;
706 bt_self_component_port_input_message_iterator_create_from_message_iterator_status
707 msg_iter_status;
708 struct trimmer_iterator *trimmer_it;
709
710 trimmer_it = g_new0(struct trimmer_iterator, 1);
711 if (!trimmer_it) {
712 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
713 goto error;
714 }
715
716 trimmer_it->trimmer_comp = bt_self_component_get_data(self_comp);
717 BT_ASSERT(trimmer_it->trimmer_comp);
718
719 if (trimmer_it->trimmer_comp->begin.is_set &&
720 trimmer_it->trimmer_comp->end.is_set) {
721 /*
722 * Both trimming time range's bounds are set, so skip
723 * the
724 * `TRIMMER_ITERATOR_STATE_SET_BOUNDS_NS_FROM_ORIGIN`
725 * phase.
726 */
727 trimmer_it->state = TRIMMER_ITERATOR_STATE_SEEK_INITIALLY;
728 }
729
730 trimmer_it->begin = trimmer_it->trimmer_comp->begin;
731 trimmer_it->end = trimmer_it->trimmer_comp->end;
732 msg_iter_status =
733 bt_self_component_port_input_message_iterator_create_from_message_iterator(
734 self_msg_iter,
735 bt_self_component_filter_borrow_input_port_by_name(
736 trimmer_it->trimmer_comp->self_comp_filter, in_port_name),
737 &trimmer_it->upstream_iter);
738 if (msg_iter_status != BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_CREATE_FROM_MESSAGE_ITERATOR_STATUS_OK) {
739 status = (int) msg_iter_status;
740 goto error;
741 }
742
743 trimmer_it->output_messages = g_queue_new();
744 if (!trimmer_it->output_messages) {
745 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
746 goto error;
747 }
748
749 trimmer_it->stream_states = g_hash_table_new_full(g_direct_hash,
750 g_direct_equal, NULL,
751 (GDestroyNotify) destroy_trimmer_iterator_stream_state);
752 if (!trimmer_it->stream_states) {
753 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
754 goto error;
755 }
756
757 /*
758 * The trimmer requires upstream messages to have times, so it can
759 * always seek forward.
760 */
761 bt_self_message_iterator_configuration_set_can_seek_forward(
762 config, BT_TRUE);
763
764 trimmer_it->self_msg_iter = self_msg_iter;
765 bt_self_message_iterator_set_data(self_msg_iter, trimmer_it);
766
767 status = BT_MESSAGE_ITERATOR_CLASS_INITIALIZE_METHOD_STATUS_OK;
768 goto end;
769
770 error:
771 destroy_trimmer_iterator(trimmer_it);
772
773 end:
774 return status;
775 }
776
777 static inline
778 int get_msg_ns_from_origin(const bt_message *msg, int64_t *ns_from_origin,
779 bool *has_clock_snapshot)
780 {
781 const bt_clock_class *clock_class = NULL;
782 const bt_clock_snapshot *clock_snapshot = NULL;
783 int ret = 0;
784
785 BT_ASSERT_DBG(msg);
786 BT_ASSERT_DBG(ns_from_origin);
787 BT_ASSERT_DBG(has_clock_snapshot);
788
789 switch (bt_message_get_type(msg)) {
790 case BT_MESSAGE_TYPE_EVENT:
791 clock_class =
792 bt_message_event_borrow_stream_class_default_clock_class_const(
793 msg);
794 if (G_UNLIKELY(!clock_class)) {
795 goto error;
796 }
797
798 clock_snapshot = bt_message_event_borrow_default_clock_snapshot_const(
799 msg);
800 break;
801 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
802 clock_class =
803 bt_message_packet_beginning_borrow_stream_class_default_clock_class_const(
804 msg);
805 if (G_UNLIKELY(!clock_class)) {
806 goto error;
807 }
808
809 clock_snapshot = bt_message_packet_beginning_borrow_default_clock_snapshot_const(
810 msg);
811 break;
812 case BT_MESSAGE_TYPE_PACKET_END:
813 clock_class =
814 bt_message_packet_end_borrow_stream_class_default_clock_class_const(
815 msg);
816 if (G_UNLIKELY(!clock_class)) {
817 goto error;
818 }
819
820 clock_snapshot = bt_message_packet_end_borrow_default_clock_snapshot_const(
821 msg);
822 break;
823 case BT_MESSAGE_TYPE_STREAM_BEGINNING:
824 {
825 enum bt_message_stream_clock_snapshot_state cs_state;
826
827 clock_class =
828 bt_message_stream_beginning_borrow_stream_class_default_clock_class_const(msg);
829 if (G_UNLIKELY(!clock_class)) {
830 goto error;
831 }
832
833 cs_state = bt_message_stream_beginning_borrow_default_clock_snapshot_const(msg, &clock_snapshot);
834 if (cs_state != BT_MESSAGE_STREAM_CLOCK_SNAPSHOT_STATE_KNOWN) {
835 goto no_clock_snapshot;
836 }
837
838 break;
839 }
840 case BT_MESSAGE_TYPE_STREAM_END:
841 {
842 enum bt_message_stream_clock_snapshot_state cs_state;
843
844 clock_class =
845 bt_message_stream_end_borrow_stream_class_default_clock_class_const(msg);
846 if (G_UNLIKELY(!clock_class)) {
847 goto error;
848 }
849
850 cs_state = bt_message_stream_end_borrow_default_clock_snapshot_const(msg, &clock_snapshot);
851 if (cs_state != BT_MESSAGE_STREAM_CLOCK_SNAPSHOT_STATE_KNOWN) {
852 goto no_clock_snapshot;
853 }
854
855 break;
856 }
857 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
858 clock_class =
859 bt_message_discarded_events_borrow_stream_class_default_clock_class_const(
860 msg);
861 if (G_UNLIKELY(!clock_class)) {
862 goto error;
863 }
864
865 clock_snapshot = bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(
866 msg);
867 break;
868 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
869 clock_class =
870 bt_message_discarded_packets_borrow_stream_class_default_clock_class_const(
871 msg);
872 if (G_UNLIKELY(!clock_class)) {
873 goto error;
874 }
875
876 clock_snapshot = bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(
877 msg);
878 break;
879 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
880 clock_snapshot =
881 bt_message_message_iterator_inactivity_borrow_default_clock_snapshot_const(
882 msg);
883 break;
884 default:
885 goto no_clock_snapshot;
886 }
887
888 ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot,
889 ns_from_origin);
890 if (G_UNLIKELY(ret)) {
891 goto error;
892 }
893
894 *has_clock_snapshot = true;
895 goto end;
896
897 no_clock_snapshot:
898 *has_clock_snapshot = false;
899 goto end;
900
901 error:
902 ret = -1;
903
904 end:
905 return ret;
906 }
907
908 static inline
909 void put_messages(bt_message_array_const msgs, uint64_t count)
910 {
911 uint64_t i;
912
913 for (i = 0; i < count; i++) {
914 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
915 }
916 }
917
918 static inline
919 int set_trimmer_iterator_bound(struct trimmer_iterator *trimmer_it,
920 struct trimmer_bound *bound, int64_t ns_from_origin,
921 bool is_gmt)
922 {
923 struct trimmer_comp *trimmer_comp = trimmer_it->trimmer_comp;
924 struct tm tm;
925 struct tm *res;
926 time_t time_seconds = (time_t) (ns_from_origin / NS_PER_S);
927 int ret = 0;
928
929 BT_ASSERT(!bound->is_set);
930 errno = 0;
931
932 /* We only need to extract the date from this time */
933 if (is_gmt) {
934 res = bt_gmtime_r(&time_seconds, &tm);
935 } else {
936 res = bt_localtime_r(&time_seconds, &tm);
937 }
938
939 if (!res) {
940 BT_COMP_LOGE_APPEND_CAUSE_ERRNO(trimmer_comp->self_comp,
941 "Cannot convert timestamp to date and time",
942 ": ts=%" PRId64, (int64_t) time_seconds);
943 ret = -1;
944 goto end;
945 }
946
947 ret = set_bound_ns_from_origin(bound, tm.tm_year + 1900, tm.tm_mon + 1,
948 tm.tm_mday, bound->time.hour, bound->time.minute,
949 bound->time.second, bound->time.ns, is_gmt);
950
951 end:
952 return ret;
953 }
954
955 static
956 bt_message_iterator_class_next_method_status
957 state_set_trimmer_iterator_bounds(
958 struct trimmer_iterator *trimmer_it)
959 {
960 bt_message_iterator_next_status upstream_iter_status =
961 BT_MESSAGE_ITERATOR_NEXT_STATUS_OK;
962 struct trimmer_comp *trimmer_comp = trimmer_it->trimmer_comp;
963 bt_message_array_const msgs;
964 uint64_t count = 0;
965 int64_t ns_from_origin = INT64_MIN;
966 uint64_t i;
967 int ret;
968
969 BT_ASSERT(!trimmer_it->begin.is_set ||
970 !trimmer_it->end.is_set);
971
972 while (true) {
973 upstream_iter_status =
974 bt_self_component_port_input_message_iterator_next(
975 trimmer_it->upstream_iter, &msgs, &count);
976 if (upstream_iter_status != BT_MESSAGE_ITERATOR_NEXT_STATUS_OK) {
977 goto end;
978 }
979
980 for (i = 0; i < count; i++) {
981 const bt_message *msg = msgs[i];
982 bool has_ns_from_origin;
983 ret = get_msg_ns_from_origin(msg, &ns_from_origin,
984 &has_ns_from_origin);
985 if (ret) {
986 goto error;
987 }
988
989 if (!has_ns_from_origin) {
990 continue;
991 }
992
993 BT_ASSERT_DBG(ns_from_origin != INT64_MIN &&
994 ns_from_origin != INT64_MAX);
995 put_messages(msgs, count);
996 goto found;
997 }
998
999 put_messages(msgs, count);
1000 }
1001
1002 found:
1003 if (!trimmer_it->begin.is_set) {
1004 BT_ASSERT(!trimmer_it->begin.is_infinite);
1005 ret = set_trimmer_iterator_bound(trimmer_it, &trimmer_it->begin,
1006 ns_from_origin, trimmer_comp->is_gmt);
1007 if (ret) {
1008 goto error;
1009 }
1010 }
1011
1012 if (!trimmer_it->end.is_set) {
1013 BT_ASSERT(!trimmer_it->end.is_infinite);
1014 ret = set_trimmer_iterator_bound(trimmer_it, &trimmer_it->end,
1015 ns_from_origin, trimmer_comp->is_gmt);
1016 if (ret) {
1017 goto error;
1018 }
1019 }
1020
1021 ret = validate_trimmer_bounds(trimmer_it->trimmer_comp,
1022 &trimmer_it->begin, &trimmer_it->end);
1023 if (ret) {
1024 goto error;
1025 }
1026
1027 goto end;
1028
1029 error:
1030 put_messages(msgs, count);
1031 upstream_iter_status = BT_MESSAGE_ITERATOR_NEXT_STATUS_ERROR;
1032
1033 end:
1034 return (int) upstream_iter_status;
1035 }
1036
1037 static
1038 bt_message_iterator_class_next_method_status state_seek_initially(
1039 struct trimmer_iterator *trimmer_it)
1040 {
1041 struct trimmer_comp *trimmer_comp = trimmer_it->trimmer_comp;
1042 bt_message_iterator_class_next_method_status status;
1043
1044 BT_ASSERT(trimmer_it->begin.is_set);
1045
1046 if (trimmer_it->begin.is_infinite) {
1047 bt_bool can_seek;
1048
1049 status = (int) bt_self_component_port_input_message_iterator_can_seek_beginning(
1050 trimmer_it->upstream_iter, &can_seek);
1051 if (status != BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK) {
1052 if (status < 0) {
1053 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
1054 "Cannot make upstream message iterator initially seek its beginning.");
1055 }
1056
1057 goto end;
1058 }
1059
1060 if (!can_seek) {
1061 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
1062 "Cannot make upstream message iterator initially seek its beginning.");
1063 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1064 goto end;
1065 }
1066
1067 status = (int) bt_self_component_port_input_message_iterator_seek_beginning(
1068 trimmer_it->upstream_iter);
1069 } else {
1070 bt_bool can_seek;
1071
1072 status = (int) bt_self_component_port_input_message_iterator_can_seek_ns_from_origin(
1073 trimmer_it->upstream_iter, trimmer_it->begin.ns_from_origin,
1074 &can_seek);
1075
1076 if (status != BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK) {
1077 if (status < 0) {
1078 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
1079 "Cannot make upstream message iterator initially seek: seek-ns-from-origin=%" PRId64,
1080 trimmer_it->begin.ns_from_origin);
1081 }
1082
1083 goto end;
1084 }
1085
1086 if (!can_seek) {
1087 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
1088 "Cannot make upstream message iterator initially seek: seek-ns-from-origin=%" PRId64,
1089 trimmer_it->begin.ns_from_origin);
1090 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1091 goto end;
1092 }
1093
1094 status = (int) bt_self_component_port_input_message_iterator_seek_ns_from_origin(
1095 trimmer_it->upstream_iter, trimmer_it->begin.ns_from_origin);
1096 }
1097
1098 if (status == BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK) {
1099 trimmer_it->state = TRIMMER_ITERATOR_STATE_TRIM;
1100 }
1101
1102 end:
1103 return status;
1104 }
1105
1106 static inline
1107 void push_message(struct trimmer_iterator *trimmer_it, const bt_message *msg)
1108 {
1109 g_queue_push_head(trimmer_it->output_messages, (void *) msg);
1110 }
1111
1112 static inline
1113 const bt_message *pop_message(struct trimmer_iterator *trimmer_it)
1114 {
1115 return g_queue_pop_tail(trimmer_it->output_messages);
1116 }
1117
1118 static inline
1119 int clock_raw_value_from_ns_from_origin(const bt_clock_class *clock_class,
1120 int64_t ns_from_origin, uint64_t *raw_value)
1121 {
1122
1123 int64_t cc_offset_s;
1124 uint64_t cc_offset_cycles;
1125 uint64_t cc_freq;
1126
1127 bt_clock_class_get_offset(clock_class, &cc_offset_s, &cc_offset_cycles);
1128 cc_freq = bt_clock_class_get_frequency(clock_class);
1129 return bt_common_clock_value_from_ns_from_origin(cc_offset_s,
1130 cc_offset_cycles, cc_freq, ns_from_origin, raw_value);
1131 }
1132
1133 static inline
1134 bt_message_iterator_class_next_method_status
1135 end_stream(struct trimmer_iterator *trimmer_it,
1136 struct trimmer_iterator_stream_state *sstate)
1137 {
1138 bt_message_iterator_class_next_method_status status =
1139 BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK;
1140 /* Initialize to silence maybe-uninitialized warning. */
1141 uint64_t raw_value = 0;
1142 bt_message *msg = NULL;
1143
1144 BT_ASSERT(!trimmer_it->end.is_infinite);
1145 BT_ASSERT(sstate->stream);
1146
1147 /*
1148 * If we haven't seen a message with a clock snapshot, we don't know if the trimmer's end bound is within
1149 * the clock's range, so it wouldn't be safe to try to convert ns_from_origin to a clock value.
1150 *
1151 * Also, it would be a bit of a lie to generate a stream end message with the end bound as its
1152 * clock snapshot, because we don't really know if the stream existed at that time. If we have
1153 * seen a message with a clock snapshot and the stream is cut short by another message with a
1154 * clock snapshot, then we are sure that the the end bound time is not below the clock range,
1155 * and we know the stream was active at that time (and that we cut it short).
1156 */
1157 if (sstate->seen_clock_snapshot) {
1158 const bt_clock_class *clock_class;
1159 int ret;
1160
1161 clock_class = bt_stream_class_borrow_default_clock_class_const(
1162 bt_stream_borrow_class_const(sstate->stream));
1163 BT_ASSERT(clock_class);
1164 ret = clock_raw_value_from_ns_from_origin(clock_class,
1165 trimmer_it->end.ns_from_origin, &raw_value);
1166 if (ret) {
1167 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1168 goto end;
1169 }
1170 }
1171
1172 if (sstate->cur_packet) {
1173 /*
1174 * Create and push a packet end message, making its time
1175 * the trimming range's end time.
1176 *
1177 * We know that we must have seen a clock snapshot, the one in
1178 * the packet beginning message, since trimmer currently
1179 * requires packet messages to have clock snapshots (see comment
1180 * in create_stream_state_entry).
1181 */
1182 BT_ASSERT(sstate->seen_clock_snapshot);
1183
1184 msg = bt_message_packet_end_create_with_default_clock_snapshot(
1185 trimmer_it->self_msg_iter, sstate->cur_packet,
1186 raw_value);
1187 if (!msg) {
1188 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_MEMORY_ERROR;
1189 goto end;
1190 }
1191
1192 push_message(trimmer_it, msg);
1193 msg = NULL;
1194 BT_PACKET_PUT_REF_AND_RESET(sstate->cur_packet);
1195 }
1196
1197 /* Create and push a stream end message. */
1198 msg = bt_message_stream_end_create(trimmer_it->self_msg_iter,
1199 sstate->stream);
1200 if (!msg) {
1201 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_MEMORY_ERROR;
1202 goto end;
1203 }
1204
1205 if (sstate->seen_clock_snapshot) {
1206 bt_message_stream_end_set_default_clock_snapshot(msg, raw_value);
1207 }
1208
1209 push_message(trimmer_it, msg);
1210 msg = NULL;
1211
1212 /*
1213 * Just to make sure that we don't use this stream state again
1214 * in the future without an obvious error.
1215 */
1216 sstate->stream = NULL;
1217
1218 end:
1219 bt_message_put_ref(msg);
1220 return status;
1221 }
1222
1223 static inline
1224 bt_message_iterator_class_next_method_status end_iterator_streams(
1225 struct trimmer_iterator *trimmer_it)
1226 {
1227 bt_message_iterator_class_next_method_status status =
1228 BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK;
1229 GHashTableIter iter;
1230 gpointer key, sstate;
1231
1232 if (trimmer_it->end.is_infinite) {
1233 /*
1234 * An infinite trimming range's end time guarantees that
1235 * we received (and pushed) all the appropriate end
1236 * messages.
1237 */
1238 goto remove_all;
1239 }
1240
1241 /*
1242 * End each stream and then remove them from the hash table of
1243 * stream states to release unneeded references.
1244 */
1245 g_hash_table_iter_init(&iter, trimmer_it->stream_states);
1246
1247 while (g_hash_table_iter_next(&iter, &key, &sstate)) {
1248 status = end_stream(trimmer_it, sstate);
1249 if (status) {
1250 goto end;
1251 }
1252 }
1253
1254 remove_all:
1255 g_hash_table_remove_all(trimmer_it->stream_states);
1256
1257 end:
1258 return status;
1259 }
1260
1261 static
1262 bt_message_iterator_class_next_method_status
1263 create_stream_state_entry(
1264 struct trimmer_iterator *trimmer_it,
1265 const struct bt_stream *stream,
1266 struct trimmer_iterator_stream_state **stream_state)
1267 {
1268 struct trimmer_comp *trimmer_comp = trimmer_it->trimmer_comp;
1269 bt_message_iterator_class_next_method_status status;
1270 struct trimmer_iterator_stream_state *sstate;
1271 const bt_stream_class *sc;
1272
1273 BT_ASSERT(!bt_g_hash_table_contains(trimmer_it->stream_states, stream));
1274
1275 /*
1276 * Validate right now that the stream's class
1277 * has a registered default clock class so that
1278 * an existing stream state guarantees existing
1279 * default clock snapshots for its associated
1280 * messages.
1281 *
1282 * Also check that clock snapshots are always
1283 * known.
1284 */
1285 sc = bt_stream_borrow_class_const(stream);
1286 if (!bt_stream_class_borrow_default_clock_class_const(sc)) {
1287 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
1288 "Unsupported stream: stream class does "
1289 "not have a default clock class: "
1290 "stream-addr=%p, "
1291 "stream-id=%" PRIu64 ", "
1292 "stream-name=\"%s\"",
1293 stream, bt_stream_get_id(stream),
1294 bt_stream_get_name(stream));
1295 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1296 goto end;
1297 }
1298
1299 /*
1300 * Temporary: make sure packet beginning, packet
1301 * end, discarded events, and discarded packets
1302 * messages have default clock snapshots until
1303 * the support for not having them is
1304 * implemented.
1305 */
1306 if (!bt_stream_class_packets_have_beginning_default_clock_snapshot(
1307 sc)) {
1308 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
1309 "Unsupported stream: packets have no beginning clock snapshot: "
1310 "stream-addr=%p, "
1311 "stream-id=%" PRIu64 ", "
1312 "stream-name=\"%s\"",
1313 stream, bt_stream_get_id(stream),
1314 bt_stream_get_name(stream));
1315 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1316 goto end;
1317 }
1318
1319 if (!bt_stream_class_packets_have_end_default_clock_snapshot(
1320 sc)) {
1321 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
1322 "Unsupported stream: packets have no end clock snapshot: "
1323 "stream-addr=%p, "
1324 "stream-id=%" PRIu64 ", "
1325 "stream-name=\"%s\"",
1326 stream, bt_stream_get_id(stream),
1327 bt_stream_get_name(stream));
1328 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1329 goto end;
1330 }
1331
1332 if (bt_stream_class_supports_discarded_events(sc) &&
1333 !bt_stream_class_discarded_events_have_default_clock_snapshots(sc)) {
1334 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
1335 "Unsupported stream: discarded events have no clock snapshots: "
1336 "stream-addr=%p, "
1337 "stream-id=%" PRIu64 ", "
1338 "stream-name=\"%s\"",
1339 stream, bt_stream_get_id(stream),
1340 bt_stream_get_name(stream));
1341 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1342 goto end;
1343 }
1344
1345 if (bt_stream_class_supports_discarded_packets(sc) &&
1346 !bt_stream_class_discarded_packets_have_default_clock_snapshots(sc)) {
1347 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
1348 "Unsupported stream: discarded packets "
1349 "have no clock snapshots: "
1350 "stream-addr=%p, "
1351 "stream-id=%" PRIu64 ", "
1352 "stream-name=\"%s\"",
1353 stream, bt_stream_get_id(stream),
1354 bt_stream_get_name(stream));
1355 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1356 goto end;
1357 }
1358
1359 sstate = g_new0(struct trimmer_iterator_stream_state, 1);
1360 if (!sstate) {
1361 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_MEMORY_ERROR;
1362 goto end;
1363 }
1364
1365 sstate->stream = stream;
1366
1367 g_hash_table_insert(trimmer_it->stream_states, (void *) stream, sstate);
1368
1369 *stream_state = sstate;
1370
1371 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK;
1372
1373 end:
1374 return status;
1375 }
1376
1377 static
1378 struct trimmer_iterator_stream_state *get_stream_state_entry(
1379 struct trimmer_iterator *trimmer_it,
1380 const struct bt_stream *stream)
1381 {
1382 struct trimmer_iterator_stream_state *sstate;
1383
1384 BT_ASSERT_DBG(stream);
1385 sstate = g_hash_table_lookup(trimmer_it->stream_states, stream);
1386 BT_ASSERT_DBG(sstate);
1387
1388 return sstate;
1389 }
1390
1391 /*
1392 * Handles a message which is associated to a given stream state. This
1393 * _could_ make the iterator's output message queue grow; this could
1394 * also consume the message without pushing anything to this queue, only
1395 * modifying the stream state.
1396 *
1397 * This function consumes the `msg` reference, _whatever the outcome_.
1398 *
1399 * If non-NULL, `ns_from_origin` is the message's time, as given by
1400 * get_msg_ns_from_origin(). If NULL, the message doesn't have a time.
1401 *
1402 * This function sets `reached_end` if handling this message made the
1403 * iterator reach the end of the trimming range. Note that the output
1404 * message queue could contain messages even if this function sets
1405 * `reached_end`.
1406 */
1407 static
1408 bt_message_iterator_class_next_method_status
1409 handle_message_with_stream(
1410 struct trimmer_iterator *trimmer_it, const bt_message *msg,
1411 const struct bt_stream *stream, const int64_t *ns_from_origin,
1412 bool *reached_end)
1413 {
1414 bt_message_iterator_class_next_method_status status =
1415 BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK;
1416 bt_message_type msg_type = bt_message_get_type(msg);
1417 int ret;
1418 struct trimmer_iterator_stream_state *sstate = NULL;
1419
1420 /*
1421 * Retrieve the stream's state - except if the message is stream
1422 * beginning, in which case we don't know about about this stream yet.
1423 */
1424 if (msg_type != BT_MESSAGE_TYPE_STREAM_BEGINNING) {
1425 sstate = get_stream_state_entry(trimmer_it, stream);
1426 }
1427
1428 switch (msg_type) {
1429 case BT_MESSAGE_TYPE_EVENT:
1430 /*
1431 * Event messages always have a clock snapshot if the stream
1432 * class has a clock class. And we know it has, otherwise we
1433 * couldn't be using the trimmer component.
1434 */
1435 BT_ASSERT_DBG(ns_from_origin);
1436
1437 if (G_UNLIKELY(!trimmer_it->end.is_infinite &&
1438 *ns_from_origin > trimmer_it->end.ns_from_origin)) {
1439 status = end_iterator_streams(trimmer_it);
1440 *reached_end = true;
1441 break;
1442 }
1443
1444 sstate->seen_clock_snapshot = true;
1445
1446 push_message(trimmer_it, msg);
1447 msg = NULL;
1448 break;
1449
1450 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
1451 /*
1452 * Packet beginning messages won't have a clock snapshot if
1453 * stream_class->packets_have_beginning_default_clock_snapshot
1454 * is false. But for now, assume they always do.
1455 */
1456 BT_ASSERT(ns_from_origin);
1457 BT_ASSERT(!sstate->cur_packet);
1458
1459 if (G_UNLIKELY(!trimmer_it->end.is_infinite &&
1460 *ns_from_origin > trimmer_it->end.ns_from_origin)) {
1461 status = end_iterator_streams(trimmer_it);
1462 *reached_end = true;
1463 break;
1464 }
1465
1466 sstate->cur_packet =
1467 bt_message_packet_beginning_borrow_packet_const(msg);
1468 bt_packet_get_ref(sstate->cur_packet);
1469
1470 sstate->seen_clock_snapshot = true;
1471
1472 push_message(trimmer_it, msg);
1473 msg = NULL;
1474 break;
1475
1476 case BT_MESSAGE_TYPE_PACKET_END:
1477 /*
1478 * Packet end messages won't have a clock snapshot if
1479 * stream_class->packets_have_end_default_clock_snapshot
1480 * is false. But for now, assume they always do.
1481 */
1482 BT_ASSERT(ns_from_origin);
1483 BT_ASSERT(sstate->cur_packet);
1484
1485 if (G_UNLIKELY(!trimmer_it->end.is_infinite &&
1486 *ns_from_origin > trimmer_it->end.ns_from_origin)) {
1487 status = end_iterator_streams(trimmer_it);
1488 *reached_end = true;
1489 break;
1490 }
1491
1492 BT_PACKET_PUT_REF_AND_RESET(sstate->cur_packet);
1493
1494 sstate->seen_clock_snapshot = true;
1495
1496 push_message(trimmer_it, msg);
1497 msg = NULL;
1498 break;
1499
1500 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
1501 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
1502 {
1503 /*
1504 * `ns_from_origin` is the message's time range's
1505 * beginning time here.
1506 */
1507 int64_t end_ns_from_origin;
1508 const bt_clock_snapshot *end_cs;
1509
1510 BT_ASSERT(ns_from_origin);
1511
1512 sstate->seen_clock_snapshot = true;
1513
1514 if (bt_message_get_type(msg) ==
1515 BT_MESSAGE_TYPE_DISCARDED_EVENTS) {
1516 /*
1517 * Safe to ignore the return value because we
1518 * know there's a default clock and it's always
1519 * known.
1520 */
1521 end_cs = bt_message_discarded_events_borrow_end_default_clock_snapshot_const(
1522 msg);
1523 } else {
1524 /*
1525 * Safe to ignore the return value because we
1526 * know there's a default clock and it's always
1527 * known.
1528 */
1529 end_cs = bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(
1530 msg);
1531 }
1532
1533 if (bt_clock_snapshot_get_ns_from_origin(end_cs,
1534 &end_ns_from_origin)) {
1535 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1536 goto end;
1537 }
1538
1539 if (!trimmer_it->end.is_infinite &&
1540 *ns_from_origin > trimmer_it->end.ns_from_origin) {
1541 status = end_iterator_streams(trimmer_it);
1542 *reached_end = true;
1543 break;
1544 }
1545
1546 if (!trimmer_it->end.is_infinite &&
1547 end_ns_from_origin > trimmer_it->end.ns_from_origin) {
1548 /*
1549 * This message's end time is outside the
1550 * trimming time range: replace it with a new
1551 * message having an end time equal to the
1552 * trimming time range's end and without a
1553 * count.
1554 */
1555 const bt_clock_class *clock_class =
1556 bt_clock_snapshot_borrow_clock_class_const(
1557 end_cs);
1558 const bt_clock_snapshot *begin_cs;
1559 bt_message *new_msg;
1560 uint64_t end_raw_value;
1561
1562 ret = clock_raw_value_from_ns_from_origin(clock_class,
1563 trimmer_it->end.ns_from_origin, &end_raw_value);
1564 if (ret) {
1565 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1566 goto end;
1567 }
1568
1569 if (msg_type == BT_MESSAGE_TYPE_DISCARDED_EVENTS) {
1570 begin_cs = bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(
1571 msg);
1572 new_msg = bt_message_discarded_events_create_with_default_clock_snapshots(
1573 trimmer_it->self_msg_iter,
1574 sstate->stream,
1575 bt_clock_snapshot_get_value(begin_cs),
1576 end_raw_value);
1577 } else {
1578 begin_cs = bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(
1579 msg);
1580 new_msg = bt_message_discarded_packets_create_with_default_clock_snapshots(
1581 trimmer_it->self_msg_iter,
1582 sstate->stream,
1583 bt_clock_snapshot_get_value(begin_cs),
1584 end_raw_value);
1585 }
1586
1587 if (!new_msg) {
1588 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_MEMORY_ERROR;
1589 goto end;
1590 }
1591
1592 /* Replace the original message */
1593 BT_MESSAGE_MOVE_REF(msg, new_msg);
1594 }
1595
1596 push_message(trimmer_it, msg);
1597 msg = NULL;
1598 break;
1599 }
1600
1601 case BT_MESSAGE_TYPE_STREAM_BEGINNING:
1602 /*
1603 * If this message has a time and this time is greater than the
1604 * trimmer's end bound, it triggers the end of the trim window.
1605 */
1606 if (G_UNLIKELY(ns_from_origin && !trimmer_it->end.is_infinite &&
1607 *ns_from_origin > trimmer_it->end.ns_from_origin)) {
1608 status = end_iterator_streams(trimmer_it);
1609 *reached_end = true;
1610 break;
1611 }
1612
1613 /* Learn about this stream. */
1614 status = create_stream_state_entry(trimmer_it, stream, &sstate);
1615 if (status != BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK) {
1616 goto end;
1617 }
1618
1619 if (ns_from_origin) {
1620 sstate->seen_clock_snapshot = true;
1621 }
1622
1623 push_message(trimmer_it, msg);
1624 msg = NULL;
1625 break;
1626 case BT_MESSAGE_TYPE_STREAM_END:
1627 {
1628 gboolean removed;
1629
1630 /*
1631 * If this message has a time and this time is greater than the
1632 * trimmer's end bound, it triggers the end of the trim window.
1633 */
1634 if (G_UNLIKELY(ns_from_origin && !trimmer_it->end.is_infinite &&
1635 *ns_from_origin > trimmer_it->end.ns_from_origin)) {
1636 status = end_iterator_streams(trimmer_it);
1637 *reached_end = true;
1638 break;
1639 }
1640
1641 /*
1642 * Either the stream end message's time is within the trimmer's
1643 * bounds, or it doesn't have a time. In both cases, pass
1644 * the message unmodified.
1645 */
1646 push_message(trimmer_it, msg);
1647 msg = NULL;
1648
1649 /* Forget about this stream. */
1650 removed = g_hash_table_remove(trimmer_it->stream_states, sstate->stream);
1651 BT_ASSERT(removed);
1652 break;
1653 }
1654 default:
1655 break;
1656 }
1657
1658 end:
1659 /* We release the message's reference whatever the outcome */
1660 bt_message_put_ref(msg);
1661 return status;
1662 }
1663
1664 /*
1665 * Handles an input message. This _could_ make the iterator's output
1666 * message queue grow; this could also consume the message without
1667 * pushing anything to this queue, only modifying the stream state.
1668 *
1669 * This function consumes the `msg` reference, _whatever the outcome_.
1670 *
1671 * This function sets `reached_end` if handling this message made the
1672 * iterator reach the end of the trimming range. Note that the output
1673 * message queue could contain messages even if this function sets
1674 * `reached_end`.
1675 */
1676 static inline
1677 bt_message_iterator_class_next_method_status handle_message(
1678 struct trimmer_iterator *trimmer_it, const bt_message *msg,
1679 bool *reached_end)
1680 {
1681 bt_message_iterator_class_next_method_status status;
1682 const bt_stream *stream = NULL;
1683 int64_t ns_from_origin = INT64_MIN;
1684 bool has_ns_from_origin = false;
1685 int ret;
1686
1687 /* Find message's associated stream */
1688 switch (bt_message_get_type(msg)) {
1689 case BT_MESSAGE_TYPE_EVENT:
1690 stream = bt_event_borrow_stream_const(
1691 bt_message_event_borrow_event_const(msg));
1692 break;
1693 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
1694 stream = bt_packet_borrow_stream_const(
1695 bt_message_packet_beginning_borrow_packet_const(msg));
1696 break;
1697 case BT_MESSAGE_TYPE_PACKET_END:
1698 stream = bt_packet_borrow_stream_const(
1699 bt_message_packet_end_borrow_packet_const(msg));
1700 break;
1701 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
1702 stream = bt_message_discarded_events_borrow_stream_const(msg);
1703 break;
1704 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
1705 stream = bt_message_discarded_packets_borrow_stream_const(msg);
1706 break;
1707 case BT_MESSAGE_TYPE_STREAM_BEGINNING:
1708 stream = bt_message_stream_beginning_borrow_stream_const(msg);
1709 break;
1710 case BT_MESSAGE_TYPE_STREAM_END:
1711 stream = bt_message_stream_end_borrow_stream_const(msg);
1712 break;
1713 default:
1714 break;
1715 }
1716
1717 /* Retrieve the message's time */
1718 ret = get_msg_ns_from_origin(msg, &ns_from_origin, &has_ns_from_origin);
1719 if (G_UNLIKELY(ret)) {
1720 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_ERROR;
1721 goto end;
1722 }
1723
1724 if (G_LIKELY(stream)) {
1725 /* Message associated to a stream */
1726 status = handle_message_with_stream(trimmer_it, msg,
1727 stream, has_ns_from_origin ? &ns_from_origin : NULL, reached_end);
1728
1729 /*
1730 * handle_message_with_stream_state() unconditionally
1731 * consumes `msg`.
1732 */
1733 msg = NULL;
1734 } else {
1735 /*
1736 * Message not associated to a stream (message iterator
1737 * inactivity).
1738 */
1739 if (G_UNLIKELY(ns_from_origin > trimmer_it->end.ns_from_origin)) {
1740 BT_MESSAGE_PUT_REF_AND_RESET(msg);
1741 status = end_iterator_streams(trimmer_it);
1742 *reached_end = true;
1743 } else {
1744 push_message(trimmer_it, msg);
1745 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK;
1746 msg = NULL;
1747 }
1748 }
1749
1750 end:
1751 /* We release the message's reference whatever the outcome */
1752 bt_message_put_ref(msg);
1753 return status;
1754 }
1755
1756 static inline
1757 void fill_message_array_from_output_messages(
1758 struct trimmer_iterator *trimmer_it,
1759 bt_message_array_const msgs, uint64_t capacity, uint64_t *count)
1760 {
1761 *count = 0;
1762
1763 /*
1764 * Move auto-seek messages to the output array (which is this
1765 * iterator's base message array).
1766 */
1767 while (capacity > 0 && !g_queue_is_empty(trimmer_it->output_messages)) {
1768 msgs[*count] = pop_message(trimmer_it);
1769 capacity--;
1770 (*count)++;
1771 }
1772
1773 BT_ASSERT_DBG(*count > 0);
1774 }
1775
1776 static inline
1777 bt_message_iterator_class_next_method_status state_ending(
1778 struct trimmer_iterator *trimmer_it,
1779 bt_message_array_const msgs, uint64_t capacity,
1780 uint64_t *count)
1781 {
1782 bt_message_iterator_class_next_method_status status =
1783 BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK;
1784
1785 if (g_queue_is_empty(trimmer_it->output_messages)) {
1786 trimmer_it->state = TRIMMER_ITERATOR_STATE_ENDED;
1787 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END;
1788 goto end;
1789 }
1790
1791 fill_message_array_from_output_messages(trimmer_it, msgs,
1792 capacity, count);
1793
1794 end:
1795 return status;
1796 }
1797
1798 static inline
1799 bt_message_iterator_class_next_method_status
1800 state_trim(struct trimmer_iterator *trimmer_it,
1801 bt_message_array_const msgs, uint64_t capacity,
1802 uint64_t *count)
1803 {
1804 bt_message_iterator_class_next_method_status status =
1805 BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK;
1806 bt_message_array_const my_msgs;
1807 uint64_t my_count;
1808 uint64_t i;
1809 bool reached_end = false;
1810
1811 while (g_queue_is_empty(trimmer_it->output_messages)) {
1812 status = (int) bt_self_component_port_input_message_iterator_next(
1813 trimmer_it->upstream_iter, &my_msgs, &my_count);
1814 if (G_UNLIKELY(status != BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK)) {
1815 if (status == BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END) {
1816 status = end_iterator_streams(trimmer_it);
1817 if (status != BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK) {
1818 goto end;
1819 }
1820
1821 trimmer_it->state =
1822 TRIMMER_ITERATOR_STATE_ENDING;
1823 status = state_ending(trimmer_it, msgs,
1824 capacity, count);
1825 }
1826
1827 goto end;
1828 }
1829
1830 BT_ASSERT_DBG(my_count > 0);
1831
1832 for (i = 0; i < my_count; i++) {
1833 status = handle_message(trimmer_it, my_msgs[i],
1834 &reached_end);
1835
1836 /*
1837 * handle_message() unconditionally consumes the
1838 * message reference.
1839 */
1840 my_msgs[i] = NULL;
1841
1842 if (G_UNLIKELY(status !=
1843 BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK)) {
1844 put_messages(my_msgs, my_count);
1845 goto end;
1846 }
1847
1848 if (G_UNLIKELY(reached_end)) {
1849 /*
1850 * This message's time was passed the
1851 * trimming time range's end time: we
1852 * are done. Their might still be
1853 * messages in the output message queue,
1854 * so move to the "ending" state and
1855 * apply it immediately since
1856 * state_trim() is called within the
1857 * "next" method.
1858 */
1859 put_messages(my_msgs, my_count);
1860 trimmer_it->state =
1861 TRIMMER_ITERATOR_STATE_ENDING;
1862 status = state_ending(trimmer_it, msgs,
1863 capacity, count);
1864 goto end;
1865 }
1866 }
1867 }
1868
1869 /*
1870 * There's at least one message in the output message queue:
1871 * move the messages to the output message array.
1872 */
1873 BT_ASSERT_DBG(!g_queue_is_empty(trimmer_it->output_messages));
1874 fill_message_array_from_output_messages(trimmer_it, msgs,
1875 capacity, count);
1876
1877 end:
1878 return status;
1879 }
1880
1881 BT_HIDDEN
1882 bt_message_iterator_class_next_method_status trimmer_msg_iter_next(
1883 bt_self_message_iterator *self_msg_iter,
1884 bt_message_array_const msgs, uint64_t capacity,
1885 uint64_t *count)
1886 {
1887 struct trimmer_iterator *trimmer_it =
1888 bt_self_message_iterator_get_data(self_msg_iter);
1889 bt_message_iterator_class_next_method_status status =
1890 BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK;
1891
1892 BT_ASSERT_DBG(trimmer_it);
1893
1894 if (G_LIKELY(trimmer_it->state == TRIMMER_ITERATOR_STATE_TRIM)) {
1895 status = state_trim(trimmer_it, msgs, capacity, count);
1896 if (status != BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK) {
1897 goto end;
1898 }
1899 } else {
1900 switch (trimmer_it->state) {
1901 case TRIMMER_ITERATOR_STATE_SET_BOUNDS_NS_FROM_ORIGIN:
1902 status = state_set_trimmer_iterator_bounds(trimmer_it);
1903 if (status != BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK) {
1904 goto end;
1905 }
1906
1907 status = state_seek_initially(trimmer_it);
1908 if (status != BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK) {
1909 goto end;
1910 }
1911
1912 status = state_trim(trimmer_it, msgs, capacity, count);
1913 if (status != BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK) {
1914 goto end;
1915 }
1916
1917 break;
1918 case TRIMMER_ITERATOR_STATE_SEEK_INITIALLY:
1919 status = state_seek_initially(trimmer_it);
1920 if (status != BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK) {
1921 goto end;
1922 }
1923
1924 status = state_trim(trimmer_it, msgs, capacity, count);
1925 if (status != BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK) {
1926 goto end;
1927 }
1928
1929 break;
1930 case TRIMMER_ITERATOR_STATE_ENDING:
1931 status = state_ending(trimmer_it, msgs, capacity,
1932 count);
1933 if (status != BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK) {
1934 goto end;
1935 }
1936
1937 break;
1938 case TRIMMER_ITERATOR_STATE_ENDED:
1939 status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_END;
1940 break;
1941 default:
1942 bt_common_abort();
1943 }
1944 }
1945
1946 end:
1947 return status;
1948 }
1949
1950 BT_HIDDEN
1951 void trimmer_msg_iter_finalize(bt_self_message_iterator *self_msg_iter)
1952 {
1953 struct trimmer_iterator *trimmer_it =
1954 bt_self_message_iterator_get_data(self_msg_iter);
1955
1956 BT_ASSERT(trimmer_it);
1957 destroy_trimmer_iterator(trimmer_it);
1958 }
This page took 0.079428 seconds and 4 git commands to generate.