c156c0fe7c2f432cb7308f0187677407a01050b5
[babeltrace.git] / src / plugins / utils / trimmer / trimmer.c
1 /*
2 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 * Copyright 2019 Philippe Proulx <pproulx@efficios.com>
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a copy
6 * of this software and associated documentation files (the "Software"), to deal
7 * in the Software without restriction, including without limitation the rights
8 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 * copies of the Software, and to permit persons to whom the Software is
10 * furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21 * SOFTWARE.
22 */
23
24 #define BT_LOG_OUTPUT_LEVEL (trimmer_comp->log_level)
25 #define BT_LOG_TAG "PLUGIN/FLT.UTILS.TRIMMER"
26 #include "logging/comp-logging.h"
27
28 #include "compat/utc.h"
29 #include "compat/time.h"
30 #include <babeltrace2/babeltrace.h>
31 #include "common/common.h"
32 #include "common/assert.h"
33 #include <stdbool.h>
34 #include <stdint.h>
35 #include <inttypes.h>
36 #include <glib.h>
37 #include "compat/glib.h"
38 #include "plugins/common/param-validation/param-validation.h"
39
40 #include "trimmer.h"
41
42 #define NS_PER_S INT64_C(1000000000)
43
44 static const char * const in_port_name = "in";
45
46 struct trimmer_time {
47 unsigned int hour, minute, second, ns;
48 };
49
50 struct trimmer_bound {
51 /*
52 * Nanoseconds from origin, valid if `is_set` is set and
53 * `is_infinite` is false.
54 */
55 int64_t ns_from_origin;
56
57 /* True if this bound's full time (`ns_from_origin`) is set */
58 bool is_set;
59
60 /*
61 * True if this bound represents the infinity (negative or
62 * positive depending on which bound it is). If this is true,
63 * then we don't care about `ns_from_origin` above.
64 */
65 bool is_infinite;
66
67 /*
68 * This bound's time without the date; this time is used to set
69 * `ns_from_origin` once we know the date.
70 */
71 struct trimmer_time time;
72 };
73
74 struct trimmer_comp {
75 struct trimmer_bound begin, end;
76 bool is_gmt;
77 bt_logging_level log_level;
78 bt_self_component *self_comp;
79 };
80
81 enum trimmer_iterator_state {
82 /*
83 * Find the first message's date and set the bounds's times
84 * accordingly.
85 */
86 TRIMMER_ITERATOR_STATE_SET_BOUNDS_NS_FROM_ORIGIN,
87
88 /*
89 * Initially seek to the trimming range's beginning time.
90 */
91 TRIMMER_ITERATOR_STATE_SEEK_INITIALLY,
92
93 /*
94 * Fill the output message queue for as long as received input
95 * messages are within the trimming time range.
96 */
97 TRIMMER_ITERATOR_STATE_TRIM,
98
99 /* Flush the remaining messages in the output message queue */
100 TRIMMER_ITERATOR_STATE_ENDING,
101
102 /* Trimming operation and message iterator is ended */
103 TRIMMER_ITERATOR_STATE_ENDED,
104 };
105
106 struct trimmer_iterator {
107 /* Weak */
108 struct trimmer_comp *trimmer_comp;
109
110 /* Weak */
111 bt_self_message_iterator *self_msg_iter;
112
113 enum trimmer_iterator_state state;
114
115 /* Owned by this */
116 bt_self_component_port_input_message_iterator *upstream_iter;
117 struct trimmer_bound begin, end;
118
119 /*
120 * Queue of `const bt_message *` (owned by the queue).
121 *
122 * This is where the trimming operation pushes the messages to
123 * output by this message iterator.
124 */
125 GQueue *output_messages;
126
127 /*
128 * Hash table of `bt_stream *` (weak) to
129 * `struct trimmer_iterator_stream_state *` (owned by the HT).
130 */
131 GHashTable *stream_states;
132 };
133
134 struct trimmer_iterator_stream_state {
135 /* Weak */
136 const bt_stream *stream;
137
138 /* Have we seen a message with clock_snapshot going through this stream? */
139 bool seen_clock_snapshot;
140
141 /* Owned by this (`NULL` initially and between packets) */
142 const bt_packet *cur_packet;
143 };
144
145 static
146 void destroy_trimmer_comp(struct trimmer_comp *trimmer_comp)
147 {
148 BT_ASSERT(trimmer_comp);
149 g_free(trimmer_comp);
150 }
151
152 static
153 struct trimmer_comp *create_trimmer_comp(void)
154 {
155 return g_new0(struct trimmer_comp, 1);
156 }
157
158 BT_HIDDEN
159 void trimmer_finalize(bt_self_component_filter *self_comp)
160 {
161 struct trimmer_comp *trimmer_comp =
162 bt_self_component_get_data(
163 bt_self_component_filter_as_self_component(self_comp));
164
165 if (trimmer_comp) {
166 destroy_trimmer_comp(trimmer_comp);
167 }
168 }
169
170 /*
171 * Compile regex in `pattern`, and try to match `string`. If there's a match,
172 * return true and set `*match_info` to the list of matches. The list of
173 * matches must be freed by the caller. If there's no match, return false and
174 * set `*match_info` to NULL;
175 */
176 static
177 bool compile_and_match(const char *pattern, const char *string, GMatchInfo **match_info) {
178 bool matches = false;
179 GError *regex_error = NULL;
180 GRegex *regex;
181
182 regex = g_regex_new(pattern, 0, 0, &regex_error);
183 if (!regex) {
184 goto end;
185 }
186
187 matches = g_regex_match(regex, string, 0, match_info);
188 if (!matches) {
189 /*
190 * g_regex_match allocates `*match_info` even if it returns
191 * FALSE. If there's no match, we have no use for it, so free
192 * it immediatly and don't return it to the caller.
193 */
194 g_match_info_free(*match_info);
195 *match_info = NULL;
196 }
197
198 g_regex_unref(regex);
199
200 end:
201
202 if (regex_error) {
203 g_error_free(regex_error);
204 }
205
206 return matches;
207 }
208
209 /*
210 * Convert the captured text in match number `match_num` in `match_info`
211 * to an unsigned integer.
212 */
213 static
214 guint64 match_to_uint(const GMatchInfo *match_info, gint match_num) {
215 gchar *text, *endptr;
216 guint64 result;
217
218 text = g_match_info_fetch(match_info, match_num);
219 BT_ASSERT(text);
220
221 /*
222 * Because the input is carefully sanitized with regexes by the caller,
223 * we assume that g_ascii_strtoull cannot fail.
224 */
225 errno = 0;
226 result = g_ascii_strtoull(text, &endptr, 10);
227 BT_ASSERT(endptr > text);
228 BT_ASSERT(errno == 0);
229
230 g_free(text);
231
232 return result;
233 }
234
235 /*
236 * When parsing the nanoseconds part, .512 means .512000000, not .000000512.
237 * This function is like match_to_uint, but multiplies the parsed number to get
238 * the expected result.
239 */
240 static
241 guint64 match_to_uint_ns(const GMatchInfo *match_info, gint match_num) {
242 guint64 nanoseconds;
243 gboolean ret;
244 gint start_pos, end_pos, power;
245 static int pow10[] = {
246 1, 10, 100, 1000, 10000, 100000, 1000000, 10000000, 100000000,
247 };
248
249 nanoseconds = match_to_uint(match_info, match_num);
250
251 /* Multiply by 10 as many times as there are omitted digits. */
252 ret = g_match_info_fetch_pos(match_info, match_num, &start_pos, &end_pos);
253 BT_ASSERT(ret);
254
255 power = 9 - (end_pos - start_pos);
256 BT_ASSERT(power >= 0 && power <= 8);
257
258 nanoseconds *= pow10[power];
259
260 return nanoseconds;
261 }
262
263 /*
264 * Sets the time (in ns from origin) of a trimmer bound from date and
265 * time components.
266 *
267 * Returns a negative value if anything goes wrong.
268 */
269 static
270 int set_bound_ns_from_origin(struct trimmer_bound *bound,
271 unsigned int year, unsigned int month, unsigned int day,
272 unsigned int hour, unsigned int minute, unsigned int second,
273 unsigned int ns, bool is_gmt)
274 {
275 int ret = 0;
276 time_t result;
277 struct tm tm = {
278 .tm_sec = second,
279 .tm_min = minute,
280 .tm_hour = hour,
281 .tm_mday = day,
282 .tm_mon = month - 1,
283 .tm_year = year - 1900,
284 .tm_isdst = -1,
285 };
286
287 if (is_gmt) {
288 result = bt_timegm(&tm);
289 } else {
290 result = mktime(&tm);
291 }
292
293 if (result < 0) {
294 ret = -1;
295 goto end;
296 }
297
298 BT_ASSERT(bound);
299 bound->ns_from_origin = (int64_t) result;
300 bound->ns_from_origin *= NS_PER_S;
301 bound->ns_from_origin += ns;
302 bound->is_set = true;
303
304 end:
305 return ret;
306 }
307
308 /*
309 * Parses a timestamp, figuring out its format.
310 *
311 * Returns a negative value if anything goes wrong.
312 *
313 * Expected formats:
314 *
315 * YYYY-MM-DD hh:mm[:ss[.ns]]
316 * [hh:mm:]ss[.ns]
317 * [-]s[.ns]
318 *
319 * TODO: Check overflows.
320 */
321 static
322 int set_bound_from_str(struct trimmer_comp *trimmer_comp,
323 const char *str, struct trimmer_bound *bound, bool is_gmt)
324 {
325 /* Matches YYYY-MM-DD */
326 #define DATE_RE "([0-9]{4})-([0-9]{2})-([0-9]{2})"
327
328 /* Matches HH:MM[:SS[.NS]] */
329 #define TIME_RE "([0-9]{2}):([0-9]{2})(?::([0-9]{2})(?:\\.([0-9]{1,9}))?)?"
330
331 /* Matches [-]SS[.NS] */
332 #define S_NS_RE "^(-?)([0-9]+)(?:\\.([0-9]{1,9}))?$"
333
334 GMatchInfo *match_info;
335 int ret = 0;
336
337 /* Try `YYYY-MM-DD hh:mm[:ss[.ns]]` format */
338 if (compile_and_match("^" DATE_RE " " TIME_RE "$", str, &match_info)) {
339 unsigned int year = 0, month = 0, day = 0, hours = 0, minutes = 0, seconds = 0, nanoseconds = 0;
340 gint match_count = g_match_info_get_match_count(match_info);
341
342 BT_ASSERT(match_count >= 6 && match_count <= 8);
343
344 year = match_to_uint(match_info, 1);
345 month = match_to_uint(match_info, 2);
346 day = match_to_uint(match_info, 3);
347 hours = match_to_uint(match_info, 4);
348 minutes = match_to_uint(match_info, 5);
349
350 if (match_count >= 7) {
351 seconds = match_to_uint(match_info, 6);
352 }
353
354 if (match_count >= 8) {
355 nanoseconds = match_to_uint_ns(match_info, 7);
356 }
357
358 set_bound_ns_from_origin(bound, year, month, day, hours, minutes, seconds, nanoseconds, is_gmt);
359
360 goto end;
361 }
362
363 if (compile_and_match("^" DATE_RE "$", str, &match_info)) {
364 unsigned int year = 0, month = 0, day = 0;
365
366 BT_ASSERT(g_match_info_get_match_count(match_info) == 4);
367
368 year = match_to_uint(match_info, 1);
369 month = match_to_uint(match_info, 2);
370 day = match_to_uint(match_info, 3);
371
372 set_bound_ns_from_origin(bound, year, month, day, 0, 0, 0, 0, is_gmt);
373
374 goto end;
375 }
376
377 /* Try `hh:mm[:ss[.ns]]` format */
378 if (compile_and_match("^" TIME_RE "$", str, &match_info)) {
379 gint match_count = g_match_info_get_match_count(match_info);
380 BT_ASSERT(match_count >= 3 && match_count <= 5);
381 bound->time.hour = match_to_uint(match_info, 1);
382 bound->time.minute = match_to_uint(match_info, 2);
383
384 if (match_count >= 4) {
385 bound->time.second = match_to_uint(match_info, 3);
386 }
387
388 if (match_count >= 5) {
389 bound->time.ns = match_to_uint_ns(match_info, 4);
390 }
391
392 goto end;
393 }
394
395 /* Try `[-]s[.ns]` format */
396 if (compile_and_match("^" S_NS_RE "$", str, &match_info)) {
397 gboolean is_neg, fetch_pos_ret;
398 gint start_pos, end_pos, match_count;
399 guint64 seconds, nanoseconds = 0;
400
401 match_count = g_match_info_get_match_count(match_info);
402 BT_ASSERT(match_count >= 3 && match_count <= 4);
403
404 /* Check for presence of negation sign. */
405 fetch_pos_ret = g_match_info_fetch_pos(match_info, 1, &start_pos, &end_pos);
406 BT_ASSERT(fetch_pos_ret);
407 is_neg = (end_pos - start_pos) > 0;
408
409 seconds = match_to_uint(match_info, 2);
410
411 if (match_count >= 4) {
412 nanoseconds = match_to_uint_ns(match_info, 3);
413 }
414
415 bound->ns_from_origin = seconds * NS_PER_S + nanoseconds;
416
417 if (is_neg) {
418 bound->ns_from_origin = -bound->ns_from_origin;
419 }
420
421 bound->is_set = true;
422
423 goto end;
424 }
425
426 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
427 "Invalid date/time format: param=\"%s\"", str);
428 ret = -1;
429
430 end:
431 g_match_info_free(match_info);
432
433 return ret;
434 }
435
436 /*
437 * Sets a trimmer bound's properties from a parameter string/integer
438 * value.
439 *
440 * Returns a negative value if anything goes wrong.
441 */
442 static
443 int set_bound_from_param(struct trimmer_comp *trimmer_comp,
444 const char *param_name, const bt_value *param,
445 struct trimmer_bound *bound, bool is_gmt)
446 {
447 int ret;
448 const char *arg;
449 char tmp_arg[64];
450
451 if (bt_value_is_signed_integer(param)) {
452 int64_t value = bt_value_integer_signed_get(param);
453
454 /*
455 * Just convert it to a temporary string to handle
456 * everything the same way.
457 */
458 sprintf(tmp_arg, "%" PRId64, value);
459 arg = tmp_arg;
460 } else {
461 BT_ASSERT(bt_value_is_string(param));
462 arg = bt_value_string_get(param);
463 }
464
465 ret = set_bound_from_str(trimmer_comp, arg, bound, is_gmt);
466
467 return ret;
468 }
469
470 static
471 int validate_trimmer_bounds(struct trimmer_comp *trimmer_comp,
472 struct trimmer_bound *begin, struct trimmer_bound *end)
473 {
474 int ret = 0;
475
476 BT_ASSERT(begin->is_set);
477 BT_ASSERT(end->is_set);
478
479 if (!begin->is_infinite && !end->is_infinite &&
480 begin->ns_from_origin > end->ns_from_origin) {
481 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
482 "Trimming time range's beginning time is greater than end time: "
483 "begin-ns-from-origin=%" PRId64 ", "
484 "end-ns-from-origin=%" PRId64,
485 begin->ns_from_origin,
486 end->ns_from_origin);
487 ret = -1;
488 goto end;
489 }
490
491 if (!begin->is_infinite && begin->ns_from_origin == INT64_MIN) {
492 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
493 "Invalid trimming time range's beginning time: "
494 "ns-from-origin=%" PRId64,
495 begin->ns_from_origin);
496 ret = -1;
497 goto end;
498 }
499
500 if (!end->is_infinite && end->ns_from_origin == INT64_MIN) {
501 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
502 "Invalid trimming time range's end time: "
503 "ns-from-origin=%" PRId64,
504 end->ns_from_origin);
505 ret = -1;
506 goto end;
507 }
508
509 end:
510 return ret;
511 }
512
513 static
514 enum bt_param_validation_status validate_bound_type(
515 const bt_value *value,
516 struct bt_param_validation_context *context)
517 {
518 enum bt_param_validation_status status = BT_PARAM_VALIDATION_STATUS_OK;
519
520 if (!bt_value_is_signed_integer(value) &&
521 !bt_value_is_string(value)) {
522 status = bt_param_validation_error(context,
523 "unexpected type: expected-types=[%s, %s], actual-type=%s",
524 bt_common_value_type_string(BT_VALUE_TYPE_SIGNED_INTEGER),
525 bt_common_value_type_string(BT_VALUE_TYPE_STRING),
526 bt_common_value_type_string(bt_value_get_type(value)));
527 }
528
529 return status;
530 }
531
532 static
533 struct bt_param_validation_map_value_entry_descr trimmer_params[] = {
534 { "gmt", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL, { .type = BT_VALUE_TYPE_BOOL } },
535 { "begin", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL, { .validation_func = validate_bound_type } },
536 { "end", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL, { .validation_func = validate_bound_type } },
537 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END
538 };
539
540 static
541 bt_component_class_initialize_method_status init_trimmer_comp_from_params(
542 struct trimmer_comp *trimmer_comp,
543 const bt_value *params)
544 {
545 const bt_value *value;
546 bt_component_class_initialize_method_status status;
547 enum bt_param_validation_status validation_status;
548 gchar *validate_error = NULL;
549
550 validation_status = bt_param_validation_validate(params,
551 trimmer_params, &validate_error);
552 if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
553 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
554 goto end;
555 } else if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) {
556 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
557 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp, "%s",
558 validate_error);
559 goto end;
560 }
561
562 BT_ASSERT(params);
563 value = bt_value_map_borrow_entry_value_const(params, "gmt");
564 if (value) {
565 trimmer_comp->is_gmt = (bool) bt_value_bool_get(value);
566 }
567
568 value = bt_value_map_borrow_entry_value_const(params, "begin");
569 if (value) {
570 if (set_bound_from_param(trimmer_comp, "begin", value,
571 &trimmer_comp->begin, trimmer_comp->is_gmt)) {
572 /* set_bound_from_param() logs errors */
573 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
574 goto end;
575 }
576 } else {
577 trimmer_comp->begin.is_infinite = true;
578 trimmer_comp->begin.is_set = true;
579 }
580
581 value = bt_value_map_borrow_entry_value_const(params, "end");
582 if (value) {
583 if (set_bound_from_param(trimmer_comp, "end", value,
584 &trimmer_comp->end, trimmer_comp->is_gmt)) {
585 /* set_bound_from_param() logs errors */
586 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
587 goto end;
588 }
589 } else {
590 trimmer_comp->end.is_infinite = true;
591 trimmer_comp->end.is_set = true;
592 }
593
594 if (trimmer_comp->begin.is_set && trimmer_comp->end.is_set) {
595 /* validate_trimmer_bounds() logs errors */
596 if (validate_trimmer_bounds(trimmer_comp,
597 &trimmer_comp->begin, &trimmer_comp->end)) {
598 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
599 goto end;
600 }
601 }
602
603 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
604
605 end:
606 g_free(validate_error);
607
608 return status;
609 }
610
611 bt_component_class_initialize_method_status trimmer_init(
612 bt_self_component_filter *self_comp_flt,
613 bt_self_component_filter_configuration *config,
614 const bt_value *params, void *init_data)
615 {
616 bt_component_class_initialize_method_status status;
617 bt_self_component_add_port_status add_port_status;
618 struct trimmer_comp *trimmer_comp = create_trimmer_comp();
619 bt_self_component *self_comp =
620 bt_self_component_filter_as_self_component(self_comp_flt);
621
622 if (!trimmer_comp) {
623 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
624 goto error;
625 }
626
627 trimmer_comp->log_level = bt_component_get_logging_level(
628 bt_self_component_as_component(self_comp));
629 trimmer_comp->self_comp = self_comp;
630
631 add_port_status = bt_self_component_filter_add_input_port(
632 self_comp_flt, in_port_name, NULL, NULL);
633 if (add_port_status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) {
634 status = (int) add_port_status;
635 goto error;
636 }
637
638 add_port_status = bt_self_component_filter_add_output_port(
639 self_comp_flt, "out", NULL, NULL);
640 if (add_port_status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) {
641 status = (int) add_port_status;
642 goto error;
643 }
644
645 status = init_trimmer_comp_from_params(trimmer_comp, params);
646 if (status != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
647 goto error;
648 }
649
650 bt_self_component_set_data(self_comp, trimmer_comp);
651
652 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
653 goto end;
654
655 error:
656 if (trimmer_comp) {
657 destroy_trimmer_comp(trimmer_comp);
658 }
659
660 end:
661 return status;
662 }
663
664 static
665 void destroy_trimmer_iterator(struct trimmer_iterator *trimmer_it)
666 {
667 if (!trimmer_it) {
668 goto end;
669 }
670
671 bt_self_component_port_input_message_iterator_put_ref(
672 trimmer_it->upstream_iter);
673
674 if (trimmer_it->output_messages) {
675 g_queue_free(trimmer_it->output_messages);
676 }
677
678 if (trimmer_it->stream_states) {
679 g_hash_table_destroy(trimmer_it->stream_states);
680 }
681
682 g_free(trimmer_it);
683 end:
684 return;
685 }
686
687 static
688 void destroy_trimmer_iterator_stream_state(
689 struct trimmer_iterator_stream_state *sstate)
690 {
691 BT_ASSERT(sstate);
692 BT_PACKET_PUT_REF_AND_RESET(sstate->cur_packet);
693 g_free(sstate);
694 }
695
696 BT_HIDDEN
697 bt_component_class_message_iterator_initialize_method_status trimmer_msg_iter_init(
698 bt_self_message_iterator *self_msg_iter,
699 bt_self_message_iterator_configuration *config,
700 bt_self_component_filter *self_comp,
701 bt_self_component_port_output *port)
702 {
703 bt_component_class_message_iterator_initialize_method_status status;
704 bt_self_component_port_input_message_iterator_create_from_message_iterator_status
705 msg_iter_status;
706 struct trimmer_iterator *trimmer_it;
707
708 trimmer_it = g_new0(struct trimmer_iterator, 1);
709 if (!trimmer_it) {
710 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
711 goto error;
712 }
713
714 trimmer_it->trimmer_comp = bt_self_component_get_data(
715 bt_self_component_filter_as_self_component(self_comp));
716 BT_ASSERT(trimmer_it->trimmer_comp);
717
718 if (trimmer_it->trimmer_comp->begin.is_set &&
719 trimmer_it->trimmer_comp->end.is_set) {
720 /*
721 * Both trimming time range's bounds are set, so skip
722 * the
723 * `TRIMMER_ITERATOR_STATE_SET_BOUNDS_NS_FROM_ORIGIN`
724 * phase.
725 */
726 trimmer_it->state = TRIMMER_ITERATOR_STATE_SEEK_INITIALLY;
727 }
728
729 trimmer_it->begin = trimmer_it->trimmer_comp->begin;
730 trimmer_it->end = trimmer_it->trimmer_comp->end;
731 msg_iter_status =
732 bt_self_component_port_input_message_iterator_create_from_message_iterator(
733 self_msg_iter,
734 bt_self_component_filter_borrow_input_port_by_name(
735 self_comp, in_port_name), &trimmer_it->upstream_iter);
736 if (msg_iter_status != BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_CREATE_FROM_MESSAGE_ITERATOR_STATUS_OK) {
737 status = (int) msg_iter_status;
738 goto error;
739 }
740
741 trimmer_it->output_messages = g_queue_new();
742 if (!trimmer_it->output_messages) {
743 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
744 goto error;
745 }
746
747 trimmer_it->stream_states = g_hash_table_new_full(g_direct_hash,
748 g_direct_equal, NULL,
749 (GDestroyNotify) destroy_trimmer_iterator_stream_state);
750 if (!trimmer_it->stream_states) {
751 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
752 goto error;
753 }
754
755 /*
756 * The trimmer requires upstream messages to have times, so it can
757 * always seek forward.
758 */
759 bt_self_message_iterator_configuration_set_can_seek_forward(
760 config, BT_TRUE);
761
762 trimmer_it->self_msg_iter = self_msg_iter;
763 bt_self_message_iterator_set_data(self_msg_iter, trimmer_it);
764
765 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_OK;
766 goto end;
767
768 error:
769 destroy_trimmer_iterator(trimmer_it);
770
771 end:
772 return status;
773 }
774
775 static inline
776 int get_msg_ns_from_origin(const bt_message *msg, int64_t *ns_from_origin,
777 bool *has_clock_snapshot)
778 {
779 const bt_clock_class *clock_class = NULL;
780 const bt_clock_snapshot *clock_snapshot = NULL;
781 int ret = 0;
782
783 BT_ASSERT_DBG(msg);
784 BT_ASSERT_DBG(ns_from_origin);
785 BT_ASSERT_DBG(has_clock_snapshot);
786
787 switch (bt_message_get_type(msg)) {
788 case BT_MESSAGE_TYPE_EVENT:
789 clock_class =
790 bt_message_event_borrow_stream_class_default_clock_class_const(
791 msg);
792 if (G_UNLIKELY(!clock_class)) {
793 goto error;
794 }
795
796 clock_snapshot = bt_message_event_borrow_default_clock_snapshot_const(
797 msg);
798 break;
799 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
800 clock_class =
801 bt_message_packet_beginning_borrow_stream_class_default_clock_class_const(
802 msg);
803 if (G_UNLIKELY(!clock_class)) {
804 goto error;
805 }
806
807 clock_snapshot = bt_message_packet_beginning_borrow_default_clock_snapshot_const(
808 msg);
809 break;
810 case BT_MESSAGE_TYPE_PACKET_END:
811 clock_class =
812 bt_message_packet_end_borrow_stream_class_default_clock_class_const(
813 msg);
814 if (G_UNLIKELY(!clock_class)) {
815 goto error;
816 }
817
818 clock_snapshot = bt_message_packet_end_borrow_default_clock_snapshot_const(
819 msg);
820 break;
821 case BT_MESSAGE_TYPE_STREAM_BEGINNING:
822 {
823 enum bt_message_stream_clock_snapshot_state cs_state;
824
825 clock_class =
826 bt_message_stream_beginning_borrow_stream_class_default_clock_class_const(msg);
827 if (G_UNLIKELY(!clock_class)) {
828 goto error;
829 }
830
831 cs_state = bt_message_stream_beginning_borrow_default_clock_snapshot_const(msg, &clock_snapshot);
832 if (cs_state != BT_MESSAGE_STREAM_CLOCK_SNAPSHOT_STATE_KNOWN) {
833 goto no_clock_snapshot;
834 }
835
836 break;
837 }
838 case BT_MESSAGE_TYPE_STREAM_END:
839 {
840 enum bt_message_stream_clock_snapshot_state cs_state;
841
842 clock_class =
843 bt_message_stream_end_borrow_stream_class_default_clock_class_const(msg);
844 if (G_UNLIKELY(!clock_class)) {
845 goto error;
846 }
847
848 cs_state = bt_message_stream_end_borrow_default_clock_snapshot_const(msg, &clock_snapshot);
849 if (cs_state != BT_MESSAGE_STREAM_CLOCK_SNAPSHOT_STATE_KNOWN) {
850 goto no_clock_snapshot;
851 }
852
853 break;
854 }
855 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
856 clock_class =
857 bt_message_discarded_events_borrow_stream_class_default_clock_class_const(
858 msg);
859 if (G_UNLIKELY(!clock_class)) {
860 goto error;
861 }
862
863 clock_snapshot = bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(
864 msg);
865 break;
866 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
867 clock_class =
868 bt_message_discarded_packets_borrow_stream_class_default_clock_class_const(
869 msg);
870 if (G_UNLIKELY(!clock_class)) {
871 goto error;
872 }
873
874 clock_snapshot = bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(
875 msg);
876 break;
877 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
878 clock_snapshot =
879 bt_message_message_iterator_inactivity_borrow_default_clock_snapshot_const(
880 msg);
881 break;
882 default:
883 goto no_clock_snapshot;
884 }
885
886 ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot,
887 ns_from_origin);
888 if (G_UNLIKELY(ret)) {
889 goto error;
890 }
891
892 *has_clock_snapshot = true;
893 goto end;
894
895 no_clock_snapshot:
896 *has_clock_snapshot = false;
897 goto end;
898
899 error:
900 ret = -1;
901
902 end:
903 return ret;
904 }
905
906 static inline
907 void put_messages(bt_message_array_const msgs, uint64_t count)
908 {
909 uint64_t i;
910
911 for (i = 0; i < count; i++) {
912 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
913 }
914 }
915
916 static inline
917 int set_trimmer_iterator_bound(struct trimmer_iterator *trimmer_it,
918 struct trimmer_bound *bound, int64_t ns_from_origin,
919 bool is_gmt)
920 {
921 struct trimmer_comp *trimmer_comp = trimmer_it->trimmer_comp;
922 struct tm tm;
923 struct tm *res;
924 time_t time_seconds = (time_t) (ns_from_origin / NS_PER_S);
925 int ret = 0;
926
927 BT_ASSERT(!bound->is_set);
928 errno = 0;
929
930 /* We only need to extract the date from this time */
931 if (is_gmt) {
932 res = bt_gmtime_r(&time_seconds, &tm);
933 } else {
934 res = bt_localtime_r(&time_seconds, &tm);
935 }
936
937 if (!res) {
938 BT_COMP_LOGE_APPEND_CAUSE_ERRNO(trimmer_comp->self_comp,
939 "Cannot convert timestamp to date and time",
940 ": ts=%" PRId64, (int64_t) time_seconds);
941 ret = -1;
942 goto end;
943 }
944
945 ret = set_bound_ns_from_origin(bound, tm.tm_year + 1900, tm.tm_mon + 1,
946 tm.tm_mday, bound->time.hour, bound->time.minute,
947 bound->time.second, bound->time.ns, is_gmt);
948
949 end:
950 return ret;
951 }
952
953 static
954 bt_component_class_message_iterator_next_method_status
955 state_set_trimmer_iterator_bounds(
956 struct trimmer_iterator *trimmer_it)
957 {
958 bt_message_iterator_next_status upstream_iter_status =
959 BT_MESSAGE_ITERATOR_NEXT_STATUS_OK;
960 struct trimmer_comp *trimmer_comp = trimmer_it->trimmer_comp;
961 bt_message_array_const msgs;
962 uint64_t count = 0;
963 int64_t ns_from_origin = INT64_MIN;
964 uint64_t i;
965 int ret;
966
967 BT_ASSERT(!trimmer_it->begin.is_set ||
968 !trimmer_it->end.is_set);
969
970 while (true) {
971 upstream_iter_status =
972 bt_self_component_port_input_message_iterator_next(
973 trimmer_it->upstream_iter, &msgs, &count);
974 if (upstream_iter_status != BT_MESSAGE_ITERATOR_NEXT_STATUS_OK) {
975 goto end;
976 }
977
978 for (i = 0; i < count; i++) {
979 const bt_message *msg = msgs[i];
980 bool has_ns_from_origin;
981 ret = get_msg_ns_from_origin(msg, &ns_from_origin,
982 &has_ns_from_origin);
983 if (ret) {
984 goto error;
985 }
986
987 if (!has_ns_from_origin) {
988 continue;
989 }
990
991 BT_ASSERT_DBG(ns_from_origin != INT64_MIN &&
992 ns_from_origin != INT64_MAX);
993 put_messages(msgs, count);
994 goto found;
995 }
996
997 put_messages(msgs, count);
998 }
999
1000 found:
1001 if (!trimmer_it->begin.is_set) {
1002 BT_ASSERT(!trimmer_it->begin.is_infinite);
1003 ret = set_trimmer_iterator_bound(trimmer_it, &trimmer_it->begin,
1004 ns_from_origin, trimmer_comp->is_gmt);
1005 if (ret) {
1006 goto error;
1007 }
1008 }
1009
1010 if (!trimmer_it->end.is_set) {
1011 BT_ASSERT(!trimmer_it->end.is_infinite);
1012 ret = set_trimmer_iterator_bound(trimmer_it, &trimmer_it->end,
1013 ns_from_origin, trimmer_comp->is_gmt);
1014 if (ret) {
1015 goto error;
1016 }
1017 }
1018
1019 ret = validate_trimmer_bounds(trimmer_it->trimmer_comp,
1020 &trimmer_it->begin, &trimmer_it->end);
1021 if (ret) {
1022 goto error;
1023 }
1024
1025 goto end;
1026
1027 error:
1028 put_messages(msgs, count);
1029 upstream_iter_status = BT_MESSAGE_ITERATOR_NEXT_STATUS_ERROR;
1030
1031 end:
1032 return (int) upstream_iter_status;
1033 }
1034
1035 static
1036 bt_component_class_message_iterator_next_method_status state_seek_initially(
1037 struct trimmer_iterator *trimmer_it)
1038 {
1039 struct trimmer_comp *trimmer_comp = trimmer_it->trimmer_comp;
1040 bt_component_class_message_iterator_next_method_status status;
1041
1042 BT_ASSERT(trimmer_it->begin.is_set);
1043
1044 if (trimmer_it->begin.is_infinite) {
1045 bt_bool can_seek;
1046
1047 status = (int) bt_self_component_port_input_message_iterator_can_seek_beginning(
1048 trimmer_it->upstream_iter, &can_seek);
1049 if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) {
1050 if (status < 0) {
1051 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
1052 "Cannot make upstream message iterator initially seek its beginning.");
1053 }
1054
1055 goto end;
1056 }
1057
1058 if (!can_seek) {
1059 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
1060 "Cannot make upstream message iterator initially seek its beginning.");
1061 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
1062 goto end;
1063 }
1064
1065 status = (int) bt_self_component_port_input_message_iterator_seek_beginning(
1066 trimmer_it->upstream_iter);
1067 } else {
1068 bt_bool can_seek;
1069
1070 status = (int) bt_self_component_port_input_message_iterator_can_seek_ns_from_origin(
1071 trimmer_it->upstream_iter, trimmer_it->begin.ns_from_origin,
1072 &can_seek);
1073
1074 if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) {
1075 if (status < 0) {
1076 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
1077 "Cannot make upstream message iterator initially seek: seek-ns-from-origin=%" PRId64,
1078 trimmer_it->begin.ns_from_origin);
1079 }
1080
1081 goto end;
1082 }
1083
1084 if (!can_seek) {
1085 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
1086 "Cannot make upstream message iterator initially seek: seek-ns-from-origin=%" PRId64,
1087 trimmer_it->begin.ns_from_origin);
1088 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
1089 goto end;
1090 }
1091
1092 status = (int) bt_self_component_port_input_message_iterator_seek_ns_from_origin(
1093 trimmer_it->upstream_iter, trimmer_it->begin.ns_from_origin);
1094 }
1095
1096 if (status == BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) {
1097 trimmer_it->state = TRIMMER_ITERATOR_STATE_TRIM;
1098 }
1099
1100 end:
1101 return status;
1102 }
1103
1104 static inline
1105 void push_message(struct trimmer_iterator *trimmer_it, const bt_message *msg)
1106 {
1107 g_queue_push_head(trimmer_it->output_messages, (void *) msg);
1108 }
1109
1110 static inline
1111 const bt_message *pop_message(struct trimmer_iterator *trimmer_it)
1112 {
1113 return g_queue_pop_tail(trimmer_it->output_messages);
1114 }
1115
1116 static inline
1117 int clock_raw_value_from_ns_from_origin(const bt_clock_class *clock_class,
1118 int64_t ns_from_origin, uint64_t *raw_value)
1119 {
1120
1121 int64_t cc_offset_s;
1122 uint64_t cc_offset_cycles;
1123 uint64_t cc_freq;
1124
1125 bt_clock_class_get_offset(clock_class, &cc_offset_s, &cc_offset_cycles);
1126 cc_freq = bt_clock_class_get_frequency(clock_class);
1127 return bt_common_clock_value_from_ns_from_origin(cc_offset_s,
1128 cc_offset_cycles, cc_freq, ns_from_origin, raw_value);
1129 }
1130
1131 static inline
1132 bt_component_class_message_iterator_next_method_status
1133 end_stream(struct trimmer_iterator *trimmer_it,
1134 struct trimmer_iterator_stream_state *sstate)
1135 {
1136 bt_component_class_message_iterator_next_method_status status =
1137 BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK;
1138 /* Initialize to silence maybe-uninitialized warning. */
1139 uint64_t raw_value = 0;
1140 bt_message *msg = NULL;
1141
1142 BT_ASSERT(!trimmer_it->end.is_infinite);
1143 BT_ASSERT(sstate->stream);
1144
1145 /*
1146 * If we haven't seen a message with a clock snapshot, we don't know if the trimmer's end bound is within
1147 * the clock's range, so it wouldn't be safe to try to convert ns_from_origin to a clock value.
1148 *
1149 * Also, it would be a bit of a lie to generate a stream end message with the end bound as its
1150 * clock snapshot, because we don't really know if the stream existed at that time. If we have
1151 * seen a message with a clock snapshot and the stream is cut short by another message with a
1152 * clock snapshot, then we are sure that the the end bound time is not below the clock range,
1153 * and we know the stream was active at that time (and that we cut it short).
1154 */
1155 if (sstate->seen_clock_snapshot) {
1156 const bt_clock_class *clock_class;
1157 int ret;
1158
1159 clock_class = bt_stream_class_borrow_default_clock_class_const(
1160 bt_stream_borrow_class_const(sstate->stream));
1161 BT_ASSERT(clock_class);
1162 ret = clock_raw_value_from_ns_from_origin(clock_class,
1163 trimmer_it->end.ns_from_origin, &raw_value);
1164 if (ret) {
1165 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
1166 goto end;
1167 }
1168 }
1169
1170 if (sstate->cur_packet) {
1171 /*
1172 * Create and push a packet end message, making its time
1173 * the trimming range's end time.
1174 *
1175 * We know that we must have seen a clock snapshot, the one in
1176 * the packet beginning message, since trimmer currently
1177 * requires packet messages to have clock snapshots (see comment
1178 * in create_stream_state_entry).
1179 */
1180 BT_ASSERT(sstate->seen_clock_snapshot);
1181
1182 msg = bt_message_packet_end_create_with_default_clock_snapshot(
1183 trimmer_it->self_msg_iter, sstate->cur_packet,
1184 raw_value);
1185 if (!msg) {
1186 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_MEMORY_ERROR;
1187 goto end;
1188 }
1189
1190 push_message(trimmer_it, msg);
1191 msg = NULL;
1192 BT_PACKET_PUT_REF_AND_RESET(sstate->cur_packet);
1193 }
1194
1195 /* Create and push a stream end message. */
1196 msg = bt_message_stream_end_create(trimmer_it->self_msg_iter,
1197 sstate->stream);
1198 if (!msg) {
1199 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_MEMORY_ERROR;
1200 goto end;
1201 }
1202
1203 if (sstate->seen_clock_snapshot) {
1204 bt_message_stream_end_set_default_clock_snapshot(msg, raw_value);
1205 }
1206
1207 push_message(trimmer_it, msg);
1208 msg = NULL;
1209
1210 /*
1211 * Just to make sure that we don't use this stream state again
1212 * in the future without an obvious error.
1213 */
1214 sstate->stream = NULL;
1215
1216 end:
1217 bt_message_put_ref(msg);
1218 return status;
1219 }
1220
1221 static inline
1222 bt_component_class_message_iterator_next_method_status end_iterator_streams(
1223 struct trimmer_iterator *trimmer_it)
1224 {
1225 bt_component_class_message_iterator_next_method_status status =
1226 BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK;
1227 GHashTableIter iter;
1228 gpointer key, sstate;
1229
1230 if (trimmer_it->end.is_infinite) {
1231 /*
1232 * An infinite trimming range's end time guarantees that
1233 * we received (and pushed) all the appropriate end
1234 * messages.
1235 */
1236 goto remove_all;
1237 }
1238
1239 /*
1240 * End each stream and then remove them from the hash table of
1241 * stream states to release unneeded references.
1242 */
1243 g_hash_table_iter_init(&iter, trimmer_it->stream_states);
1244
1245 while (g_hash_table_iter_next(&iter, &key, &sstate)) {
1246 status = end_stream(trimmer_it, sstate);
1247 if (status) {
1248 goto end;
1249 }
1250 }
1251
1252 remove_all:
1253 g_hash_table_remove_all(trimmer_it->stream_states);
1254
1255 end:
1256 return status;
1257 }
1258
1259 static
1260 bt_component_class_message_iterator_next_method_status
1261 create_stream_state_entry(
1262 struct trimmer_iterator *trimmer_it,
1263 const struct bt_stream *stream,
1264 struct trimmer_iterator_stream_state **stream_state)
1265 {
1266 struct trimmer_comp *trimmer_comp = trimmer_it->trimmer_comp;
1267 bt_component_class_message_iterator_next_method_status status;
1268 struct trimmer_iterator_stream_state *sstate;
1269 const bt_stream_class *sc;
1270
1271 BT_ASSERT(!bt_g_hash_table_contains(trimmer_it->stream_states, stream));
1272
1273 /*
1274 * Validate right now that the stream's class
1275 * has a registered default clock class so that
1276 * an existing stream state guarantees existing
1277 * default clock snapshots for its associated
1278 * messages.
1279 *
1280 * Also check that clock snapshots are always
1281 * known.
1282 */
1283 sc = bt_stream_borrow_class_const(stream);
1284 if (!bt_stream_class_borrow_default_clock_class_const(sc)) {
1285 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
1286 "Unsupported stream: stream class does "
1287 "not have a default clock class: "
1288 "stream-addr=%p, "
1289 "stream-id=%" PRIu64 ", "
1290 "stream-name=\"%s\"",
1291 stream, bt_stream_get_id(stream),
1292 bt_stream_get_name(stream));
1293 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
1294 goto end;
1295 }
1296
1297 /*
1298 * Temporary: make sure packet beginning, packet
1299 * end, discarded events, and discarded packets
1300 * messages have default clock snapshots until
1301 * the support for not having them is
1302 * implemented.
1303 */
1304 if (!bt_stream_class_packets_have_beginning_default_clock_snapshot(
1305 sc)) {
1306 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
1307 "Unsupported stream: packets have no beginning clock snapshot: "
1308 "stream-addr=%p, "
1309 "stream-id=%" PRIu64 ", "
1310 "stream-name=\"%s\"",
1311 stream, bt_stream_get_id(stream),
1312 bt_stream_get_name(stream));
1313 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
1314 goto end;
1315 }
1316
1317 if (!bt_stream_class_packets_have_end_default_clock_snapshot(
1318 sc)) {
1319 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
1320 "Unsupported stream: packets have no end clock snapshot: "
1321 "stream-addr=%p, "
1322 "stream-id=%" PRIu64 ", "
1323 "stream-name=\"%s\"",
1324 stream, bt_stream_get_id(stream),
1325 bt_stream_get_name(stream));
1326 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
1327 goto end;
1328 }
1329
1330 if (bt_stream_class_supports_discarded_events(sc) &&
1331 !bt_stream_class_discarded_events_have_default_clock_snapshots(sc)) {
1332 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
1333 "Unsupported stream: discarded events have no clock snapshots: "
1334 "stream-addr=%p, "
1335 "stream-id=%" PRIu64 ", "
1336 "stream-name=\"%s\"",
1337 stream, bt_stream_get_id(stream),
1338 bt_stream_get_name(stream));
1339 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
1340 goto end;
1341 }
1342
1343 if (bt_stream_class_supports_discarded_packets(sc) &&
1344 !bt_stream_class_discarded_packets_have_default_clock_snapshots(sc)) {
1345 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
1346 "Unsupported stream: discarded packets "
1347 "have no clock snapshots: "
1348 "stream-addr=%p, "
1349 "stream-id=%" PRIu64 ", "
1350 "stream-name=\"%s\"",
1351 stream, bt_stream_get_id(stream),
1352 bt_stream_get_name(stream));
1353 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
1354 goto end;
1355 }
1356
1357 sstate = g_new0(struct trimmer_iterator_stream_state, 1);
1358 if (!sstate) {
1359 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_MEMORY_ERROR;
1360 goto end;
1361 }
1362
1363 sstate->stream = stream;
1364
1365 g_hash_table_insert(trimmer_it->stream_states, (void *) stream, sstate);
1366
1367 *stream_state = sstate;
1368
1369 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK;
1370
1371 end:
1372 return status;
1373 }
1374
1375 static
1376 struct trimmer_iterator_stream_state *get_stream_state_entry(
1377 struct trimmer_iterator *trimmer_it,
1378 const struct bt_stream *stream)
1379 {
1380 struct trimmer_iterator_stream_state *sstate;
1381
1382 BT_ASSERT_DBG(stream);
1383 sstate = g_hash_table_lookup(trimmer_it->stream_states, stream);
1384 BT_ASSERT_DBG(sstate);
1385
1386 return sstate;
1387 }
1388
1389 /*
1390 * Handles a message which is associated to a given stream state. This
1391 * _could_ make the iterator's output message queue grow; this could
1392 * also consume the message without pushing anything to this queue, only
1393 * modifying the stream state.
1394 *
1395 * This function consumes the `msg` reference, _whatever the outcome_.
1396 *
1397 * If non-NULL, `ns_from_origin` is the message's time, as given by
1398 * get_msg_ns_from_origin(). If NULL, the message doesn't have a time.
1399 *
1400 * This function sets `reached_end` if handling this message made the
1401 * iterator reach the end of the trimming range. Note that the output
1402 * message queue could contain messages even if this function sets
1403 * `reached_end`.
1404 */
1405 static
1406 bt_component_class_message_iterator_next_method_status
1407 handle_message_with_stream(
1408 struct trimmer_iterator *trimmer_it, const bt_message *msg,
1409 const struct bt_stream *stream, const int64_t *ns_from_origin,
1410 bool *reached_end)
1411 {
1412 bt_component_class_message_iterator_next_method_status status =
1413 BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK;
1414 bt_message_type msg_type = bt_message_get_type(msg);
1415 int ret;
1416 struct trimmer_iterator_stream_state *sstate = NULL;
1417
1418 /*
1419 * Retrieve the stream's state - except if the message is stream
1420 * beginning, in which case we don't know about about this stream yet.
1421 */
1422 if (msg_type != BT_MESSAGE_TYPE_STREAM_BEGINNING) {
1423 sstate = get_stream_state_entry(trimmer_it, stream);
1424 }
1425
1426 switch (msg_type) {
1427 case BT_MESSAGE_TYPE_EVENT:
1428 /*
1429 * Event messages always have a clock snapshot if the stream
1430 * class has a clock class. And we know it has, otherwise we
1431 * couldn't be using the trimmer component.
1432 */
1433 BT_ASSERT_DBG(ns_from_origin);
1434
1435 if (G_UNLIKELY(!trimmer_it->end.is_infinite &&
1436 *ns_from_origin > trimmer_it->end.ns_from_origin)) {
1437 status = end_iterator_streams(trimmer_it);
1438 *reached_end = true;
1439 break;
1440 }
1441
1442 sstate->seen_clock_snapshot = true;
1443
1444 push_message(trimmer_it, msg);
1445 msg = NULL;
1446 break;
1447
1448 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
1449 /*
1450 * Packet beginning messages won't have a clock snapshot if
1451 * stream_class->packets_have_beginning_default_clock_snapshot
1452 * is false. But for now, assume they always do.
1453 */
1454 BT_ASSERT(ns_from_origin);
1455 BT_ASSERT(!sstate->cur_packet);
1456
1457 if (G_UNLIKELY(!trimmer_it->end.is_infinite &&
1458 *ns_from_origin > trimmer_it->end.ns_from_origin)) {
1459 status = end_iterator_streams(trimmer_it);
1460 *reached_end = true;
1461 break;
1462 }
1463
1464 sstate->cur_packet =
1465 bt_message_packet_beginning_borrow_packet_const(msg);
1466 bt_packet_get_ref(sstate->cur_packet);
1467
1468 sstate->seen_clock_snapshot = true;
1469
1470 push_message(trimmer_it, msg);
1471 msg = NULL;
1472 break;
1473
1474 case BT_MESSAGE_TYPE_PACKET_END:
1475 /*
1476 * Packet end messages won't have a clock snapshot if
1477 * stream_class->packets_have_end_default_clock_snapshot
1478 * is false. But for now, assume they always do.
1479 */
1480 BT_ASSERT(ns_from_origin);
1481 BT_ASSERT(sstate->cur_packet);
1482
1483 if (G_UNLIKELY(!trimmer_it->end.is_infinite &&
1484 *ns_from_origin > trimmer_it->end.ns_from_origin)) {
1485 status = end_iterator_streams(trimmer_it);
1486 *reached_end = true;
1487 break;
1488 }
1489
1490 BT_PACKET_PUT_REF_AND_RESET(sstate->cur_packet);
1491
1492 sstate->seen_clock_snapshot = true;
1493
1494 push_message(trimmer_it, msg);
1495 msg = NULL;
1496 break;
1497
1498 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
1499 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
1500 {
1501 /*
1502 * `ns_from_origin` is the message's time range's
1503 * beginning time here.
1504 */
1505 int64_t end_ns_from_origin;
1506 const bt_clock_snapshot *end_cs;
1507
1508 BT_ASSERT(ns_from_origin);
1509
1510 sstate->seen_clock_snapshot = true;
1511
1512 if (bt_message_get_type(msg) ==
1513 BT_MESSAGE_TYPE_DISCARDED_EVENTS) {
1514 /*
1515 * Safe to ignore the return value because we
1516 * know there's a default clock and it's always
1517 * known.
1518 */
1519 end_cs = bt_message_discarded_events_borrow_end_default_clock_snapshot_const(
1520 msg);
1521 } else {
1522 /*
1523 * Safe to ignore the return value because we
1524 * know there's a default clock and it's always
1525 * known.
1526 */
1527 end_cs = bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(
1528 msg);
1529 }
1530
1531 if (bt_clock_snapshot_get_ns_from_origin(end_cs,
1532 &end_ns_from_origin)) {
1533 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
1534 goto end;
1535 }
1536
1537 if (!trimmer_it->end.is_infinite &&
1538 *ns_from_origin > trimmer_it->end.ns_from_origin) {
1539 status = end_iterator_streams(trimmer_it);
1540 *reached_end = true;
1541 break;
1542 }
1543
1544 if (!trimmer_it->end.is_infinite &&
1545 end_ns_from_origin > trimmer_it->end.ns_from_origin) {
1546 /*
1547 * This message's end time is outside the
1548 * trimming time range: replace it with a new
1549 * message having an end time equal to the
1550 * trimming time range's end and without a
1551 * count.
1552 */
1553 const bt_clock_class *clock_class =
1554 bt_clock_snapshot_borrow_clock_class_const(
1555 end_cs);
1556 const bt_clock_snapshot *begin_cs;
1557 bt_message *new_msg;
1558 uint64_t end_raw_value;
1559
1560 ret = clock_raw_value_from_ns_from_origin(clock_class,
1561 trimmer_it->end.ns_from_origin, &end_raw_value);
1562 if (ret) {
1563 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
1564 goto end;
1565 }
1566
1567 if (msg_type == BT_MESSAGE_TYPE_DISCARDED_EVENTS) {
1568 begin_cs = bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(
1569 msg);
1570 new_msg = bt_message_discarded_events_create_with_default_clock_snapshots(
1571 trimmer_it->self_msg_iter,
1572 sstate->stream,
1573 bt_clock_snapshot_get_value(begin_cs),
1574 end_raw_value);
1575 } else {
1576 begin_cs = bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(
1577 msg);
1578 new_msg = bt_message_discarded_packets_create_with_default_clock_snapshots(
1579 trimmer_it->self_msg_iter,
1580 sstate->stream,
1581 bt_clock_snapshot_get_value(begin_cs),
1582 end_raw_value);
1583 }
1584
1585 if (!new_msg) {
1586 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_MEMORY_ERROR;
1587 goto end;
1588 }
1589
1590 /* Replace the original message */
1591 BT_MESSAGE_MOVE_REF(msg, new_msg);
1592 }
1593
1594 push_message(trimmer_it, msg);
1595 msg = NULL;
1596 break;
1597 }
1598
1599 case BT_MESSAGE_TYPE_STREAM_BEGINNING:
1600 /*
1601 * If this message has a time and this time is greater than the
1602 * trimmer's end bound, it triggers the end of the trim window.
1603 */
1604 if (G_UNLIKELY(ns_from_origin && !trimmer_it->end.is_infinite &&
1605 *ns_from_origin > trimmer_it->end.ns_from_origin)) {
1606 status = end_iterator_streams(trimmer_it);
1607 *reached_end = true;
1608 break;
1609 }
1610
1611 /* Learn about this stream. */
1612 status = create_stream_state_entry(trimmer_it, stream, &sstate);
1613 if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) {
1614 goto end;
1615 }
1616
1617 if (ns_from_origin) {
1618 sstate->seen_clock_snapshot = true;
1619 }
1620
1621 push_message(trimmer_it, msg);
1622 msg = NULL;
1623 break;
1624 case BT_MESSAGE_TYPE_STREAM_END:
1625 {
1626 gboolean removed;
1627
1628 /*
1629 * If this message has a time and this time is greater than the
1630 * trimmer's end bound, it triggers the end of the trim window.
1631 */
1632 if (G_UNLIKELY(ns_from_origin && !trimmer_it->end.is_infinite &&
1633 *ns_from_origin > trimmer_it->end.ns_from_origin)) {
1634 status = end_iterator_streams(trimmer_it);
1635 *reached_end = true;
1636 break;
1637 }
1638
1639 /*
1640 * Either the stream end message's time is within the trimmer's
1641 * bounds, or it doesn't have a time. In both cases, pass
1642 * the message unmodified.
1643 */
1644 push_message(trimmer_it, msg);
1645 msg = NULL;
1646
1647 /* Forget about this stream. */
1648 removed = g_hash_table_remove(trimmer_it->stream_states, sstate->stream);
1649 BT_ASSERT(removed);
1650 break;
1651 }
1652 default:
1653 break;
1654 }
1655
1656 end:
1657 /* We release the message's reference whatever the outcome */
1658 bt_message_put_ref(msg);
1659 return status;
1660 }
1661
1662 /*
1663 * Handles an input message. This _could_ make the iterator's output
1664 * message queue grow; this could also consume the message without
1665 * pushing anything to this queue, only modifying the stream state.
1666 *
1667 * This function consumes the `msg` reference, _whatever the outcome_.
1668 *
1669 * This function sets `reached_end` if handling this message made the
1670 * iterator reach the end of the trimming range. Note that the output
1671 * message queue could contain messages even if this function sets
1672 * `reached_end`.
1673 */
1674 static inline
1675 bt_component_class_message_iterator_next_method_status handle_message(
1676 struct trimmer_iterator *trimmer_it, const bt_message *msg,
1677 bool *reached_end)
1678 {
1679 bt_component_class_message_iterator_next_method_status status;
1680 const bt_stream *stream = NULL;
1681 int64_t ns_from_origin = INT64_MIN;
1682 bool has_ns_from_origin = false;
1683 int ret;
1684
1685 /* Find message's associated stream */
1686 switch (bt_message_get_type(msg)) {
1687 case BT_MESSAGE_TYPE_EVENT:
1688 stream = bt_event_borrow_stream_const(
1689 bt_message_event_borrow_event_const(msg));
1690 break;
1691 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
1692 stream = bt_packet_borrow_stream_const(
1693 bt_message_packet_beginning_borrow_packet_const(msg));
1694 break;
1695 case BT_MESSAGE_TYPE_PACKET_END:
1696 stream = bt_packet_borrow_stream_const(
1697 bt_message_packet_end_borrow_packet_const(msg));
1698 break;
1699 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
1700 stream = bt_message_discarded_events_borrow_stream_const(msg);
1701 break;
1702 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
1703 stream = bt_message_discarded_packets_borrow_stream_const(msg);
1704 break;
1705 case BT_MESSAGE_TYPE_STREAM_BEGINNING:
1706 stream = bt_message_stream_beginning_borrow_stream_const(msg);
1707 break;
1708 case BT_MESSAGE_TYPE_STREAM_END:
1709 stream = bt_message_stream_end_borrow_stream_const(msg);
1710 break;
1711 default:
1712 break;
1713 }
1714
1715 /* Retrieve the message's time */
1716 ret = get_msg_ns_from_origin(msg, &ns_from_origin, &has_ns_from_origin);
1717 if (G_UNLIKELY(ret)) {
1718 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
1719 goto end;
1720 }
1721
1722 if (G_LIKELY(stream)) {
1723 /* Message associated to a stream */
1724 status = handle_message_with_stream(trimmer_it, msg,
1725 stream, has_ns_from_origin ? &ns_from_origin : NULL, reached_end);
1726
1727 /*
1728 * handle_message_with_stream_state() unconditionally
1729 * consumes `msg`.
1730 */
1731 msg = NULL;
1732 } else {
1733 /*
1734 * Message not associated to a stream (message iterator
1735 * inactivity).
1736 */
1737 if (G_UNLIKELY(ns_from_origin > trimmer_it->end.ns_from_origin)) {
1738 BT_MESSAGE_PUT_REF_AND_RESET(msg);
1739 status = end_iterator_streams(trimmer_it);
1740 *reached_end = true;
1741 } else {
1742 push_message(trimmer_it, msg);
1743 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK;
1744 msg = NULL;
1745 }
1746 }
1747
1748 end:
1749 /* We release the message's reference whatever the outcome */
1750 bt_message_put_ref(msg);
1751 return status;
1752 }
1753
1754 static inline
1755 void fill_message_array_from_output_messages(
1756 struct trimmer_iterator *trimmer_it,
1757 bt_message_array_const msgs, uint64_t capacity, uint64_t *count)
1758 {
1759 *count = 0;
1760
1761 /*
1762 * Move auto-seek messages to the output array (which is this
1763 * iterator's base message array).
1764 */
1765 while (capacity > 0 && !g_queue_is_empty(trimmer_it->output_messages)) {
1766 msgs[*count] = pop_message(trimmer_it);
1767 capacity--;
1768 (*count)++;
1769 }
1770
1771 BT_ASSERT_DBG(*count > 0);
1772 }
1773
1774 static inline
1775 bt_component_class_message_iterator_next_method_status state_ending(
1776 struct trimmer_iterator *trimmer_it,
1777 bt_message_array_const msgs, uint64_t capacity,
1778 uint64_t *count)
1779 {
1780 bt_component_class_message_iterator_next_method_status status =
1781 BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK;
1782
1783 if (g_queue_is_empty(trimmer_it->output_messages)) {
1784 trimmer_it->state = TRIMMER_ITERATOR_STATE_ENDED;
1785 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_END;
1786 goto end;
1787 }
1788
1789 fill_message_array_from_output_messages(trimmer_it, msgs,
1790 capacity, count);
1791
1792 end:
1793 return status;
1794 }
1795
1796 static inline
1797 bt_component_class_message_iterator_next_method_status
1798 state_trim(struct trimmer_iterator *trimmer_it,
1799 bt_message_array_const msgs, uint64_t capacity,
1800 uint64_t *count)
1801 {
1802 bt_component_class_message_iterator_next_method_status status =
1803 BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK;
1804 bt_message_array_const my_msgs;
1805 uint64_t my_count;
1806 uint64_t i;
1807 bool reached_end = false;
1808
1809 while (g_queue_is_empty(trimmer_it->output_messages)) {
1810 status = (int) bt_self_component_port_input_message_iterator_next(
1811 trimmer_it->upstream_iter, &my_msgs, &my_count);
1812 if (G_UNLIKELY(status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK)) {
1813 if (status == BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_END) {
1814 status = end_iterator_streams(trimmer_it);
1815 if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) {
1816 goto end;
1817 }
1818
1819 trimmer_it->state =
1820 TRIMMER_ITERATOR_STATE_ENDING;
1821 status = state_ending(trimmer_it, msgs,
1822 capacity, count);
1823 }
1824
1825 goto end;
1826 }
1827
1828 BT_ASSERT_DBG(my_count > 0);
1829
1830 for (i = 0; i < my_count; i++) {
1831 status = handle_message(trimmer_it, my_msgs[i],
1832 &reached_end);
1833
1834 /*
1835 * handle_message() unconditionally consumes the
1836 * message reference.
1837 */
1838 my_msgs[i] = NULL;
1839
1840 if (G_UNLIKELY(status !=
1841 BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK)) {
1842 put_messages(my_msgs, my_count);
1843 goto end;
1844 }
1845
1846 if (G_UNLIKELY(reached_end)) {
1847 /*
1848 * This message's time was passed the
1849 * trimming time range's end time: we
1850 * are done. Their might still be
1851 * messages in the output message queue,
1852 * so move to the "ending" state and
1853 * apply it immediately since
1854 * state_trim() is called within the
1855 * "next" method.
1856 */
1857 put_messages(my_msgs, my_count);
1858 trimmer_it->state =
1859 TRIMMER_ITERATOR_STATE_ENDING;
1860 status = state_ending(trimmer_it, msgs,
1861 capacity, count);
1862 goto end;
1863 }
1864 }
1865 }
1866
1867 /*
1868 * There's at least one message in the output message queue:
1869 * move the messages to the output message array.
1870 */
1871 BT_ASSERT_DBG(!g_queue_is_empty(trimmer_it->output_messages));
1872 fill_message_array_from_output_messages(trimmer_it, msgs,
1873 capacity, count);
1874
1875 end:
1876 return status;
1877 }
1878
1879 BT_HIDDEN
1880 bt_component_class_message_iterator_next_method_status trimmer_msg_iter_next(
1881 bt_self_message_iterator *self_msg_iter,
1882 bt_message_array_const msgs, uint64_t capacity,
1883 uint64_t *count)
1884 {
1885 struct trimmer_iterator *trimmer_it =
1886 bt_self_message_iterator_get_data(self_msg_iter);
1887 bt_component_class_message_iterator_next_method_status status =
1888 BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK;
1889
1890 BT_ASSERT_DBG(trimmer_it);
1891
1892 if (G_LIKELY(trimmer_it->state == TRIMMER_ITERATOR_STATE_TRIM)) {
1893 status = state_trim(trimmer_it, msgs, capacity, count);
1894 if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) {
1895 goto end;
1896 }
1897 } else {
1898 switch (trimmer_it->state) {
1899 case TRIMMER_ITERATOR_STATE_SET_BOUNDS_NS_FROM_ORIGIN:
1900 status = state_set_trimmer_iterator_bounds(trimmer_it);
1901 if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) {
1902 goto end;
1903 }
1904
1905 status = state_seek_initially(trimmer_it);
1906 if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) {
1907 goto end;
1908 }
1909
1910 status = state_trim(trimmer_it, msgs, capacity, count);
1911 if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) {
1912 goto end;
1913 }
1914
1915 break;
1916 case TRIMMER_ITERATOR_STATE_SEEK_INITIALLY:
1917 status = state_seek_initially(trimmer_it);
1918 if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) {
1919 goto end;
1920 }
1921
1922 status = state_trim(trimmer_it, msgs, capacity, count);
1923 if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) {
1924 goto end;
1925 }
1926
1927 break;
1928 case TRIMMER_ITERATOR_STATE_ENDING:
1929 status = state_ending(trimmer_it, msgs, capacity,
1930 count);
1931 if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) {
1932 goto end;
1933 }
1934
1935 break;
1936 case TRIMMER_ITERATOR_STATE_ENDED:
1937 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_END;
1938 break;
1939 default:
1940 bt_common_abort();
1941 }
1942 }
1943
1944 end:
1945 return status;
1946 }
1947
1948 BT_HIDDEN
1949 void trimmer_msg_iter_finalize(bt_self_message_iterator *self_msg_iter)
1950 {
1951 struct trimmer_iterator *trimmer_it =
1952 bt_self_message_iterator_get_data(self_msg_iter);
1953
1954 BT_ASSERT(trimmer_it);
1955 destroy_trimmer_iterator(trimmer_it);
1956 }
This page took 0.124827 seconds and 3 git commands to generate.