lib: make message iterator creation functions return a status
[babeltrace.git] / src / plugins / utils / trimmer / trimmer.c
1 /*
2 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 * Copyright 2019 Philippe Proulx <pproulx@efficios.com>
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a copy
6 * of this software and associated documentation files (the "Software"), to deal
7 * in the Software without restriction, including without limitation the rights
8 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 * copies of the Software, and to permit persons to whom the Software is
10 * furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21 * SOFTWARE.
22 */
23
24 #define BT_LOG_OUTPUT_LEVEL (trimmer_comp->log_level)
25 #define BT_LOG_TAG "PLUGIN/FLT.UTILS.TRIMMER"
26 #include "logging/comp-logging.h"
27
28 #include "compat/utc.h"
29 #include "compat/time.h"
30 #include <babeltrace2/babeltrace.h>
31 #include "common/common.h"
32 #include "common/assert.h"
33 #include <stdint.h>
34 #include <inttypes.h>
35 #include <glib.h>
36 #include "compat/glib.h"
37
38 #include "trimmer.h"
39
40 #define NS_PER_S INT64_C(1000000000)
41
42 static const char * const in_port_name = "in";
43
44 struct trimmer_time {
45 unsigned int hour, minute, second, ns;
46 };
47
48 struct trimmer_bound {
49 /*
50 * Nanoseconds from origin, valid if `is_set` is set and
51 * `is_infinite` is false.
52 */
53 int64_t ns_from_origin;
54
55 /* True if this bound's full time (`ns_from_origin`) is set */
56 bool is_set;
57
58 /*
59 * True if this bound represents the infinity (negative or
60 * positive depending on which bound it is). If this is true,
61 * then we don't care about `ns_from_origin` above.
62 */
63 bool is_infinite;
64
65 /*
66 * This bound's time without the date; this time is used to set
67 * `ns_from_origin` once we know the date.
68 */
69 struct trimmer_time time;
70 };
71
72 struct trimmer_comp {
73 struct trimmer_bound begin, end;
74 bool is_gmt;
75 bt_logging_level log_level;
76 bt_self_component *self_comp;
77 };
78
79 enum trimmer_iterator_state {
80 /*
81 * Find the first message's date and set the bounds's times
82 * accordingly.
83 */
84 TRIMMER_ITERATOR_STATE_SET_BOUNDS_NS_FROM_ORIGIN,
85
86 /*
87 * Initially seek to the trimming range's beginning time.
88 */
89 TRIMMER_ITERATOR_STATE_SEEK_INITIALLY,
90
91 /*
92 * Fill the output message queue for as long as received input
93 * messages are within the trimming time range.
94 */
95 TRIMMER_ITERATOR_STATE_TRIM,
96
97 /* Flush the remaining messages in the output message queue */
98 TRIMMER_ITERATOR_STATE_ENDING,
99
100 /* Trimming operation and message iterator is ended */
101 TRIMMER_ITERATOR_STATE_ENDED,
102 };
103
104 struct trimmer_iterator {
105 /* Weak */
106 struct trimmer_comp *trimmer_comp;
107
108 /* Weak */
109 bt_self_message_iterator *self_msg_iter;
110
111 enum trimmer_iterator_state state;
112
113 /* Owned by this */
114 bt_self_component_port_input_message_iterator *upstream_iter;
115 struct trimmer_bound begin, end;
116
117 /*
118 * Queue of `const bt_message *` (owned by the queue).
119 *
120 * This is where the trimming operation pushes the messages to
121 * output by this message iterator.
122 */
123 GQueue *output_messages;
124
125 /*
126 * Hash table of `bt_stream *` (weak) to
127 * `struct trimmer_iterator_stream_state *` (owned by the HT).
128 */
129 GHashTable *stream_states;
130 };
131
132 struct trimmer_iterator_stream_state {
133 /* Weak */
134 const bt_stream *stream;
135
136 /* Have we seen a message with clock_snapshot going through this stream? */
137 bool seen_clock_snapshot;
138
139 /* Owned by this (`NULL` initially and between packets) */
140 const bt_packet *cur_packet;
141 };
142
143 static
144 void destroy_trimmer_comp(struct trimmer_comp *trimmer_comp)
145 {
146 BT_ASSERT(trimmer_comp);
147 g_free(trimmer_comp);
148 }
149
150 static
151 struct trimmer_comp *create_trimmer_comp(void)
152 {
153 return g_new0(struct trimmer_comp, 1);
154 }
155
156 BT_HIDDEN
157 void trimmer_finalize(bt_self_component_filter *self_comp)
158 {
159 struct trimmer_comp *trimmer_comp =
160 bt_self_component_get_data(
161 bt_self_component_filter_as_self_component(self_comp));
162
163 if (trimmer_comp) {
164 destroy_trimmer_comp(trimmer_comp);
165 }
166 }
167
168 /*
169 * Compile regex in `pattern`, and try to match `string`. If there's a match,
170 * return true and set `*match_info` to the list of matches. The list of
171 * matches must be freed by the caller. If there's no match, return false and
172 * set `*match_info` to NULL;
173 */
174 static
175 bool compile_and_match(const char *pattern, const char *string, GMatchInfo **match_info) {
176 bool matches = false;
177 GError *regex_error = NULL;
178 GRegex *regex;
179
180 regex = g_regex_new(pattern, 0, 0, &regex_error);
181 if (!regex) {
182 goto end;
183 }
184
185 matches = g_regex_match(regex, string, 0, match_info);
186 if (!matches) {
187 /*
188 * g_regex_match allocates `*match_info` even if it returns
189 * FALSE. If there's no match, we have no use for it, so free
190 * it immediatly and don't return it to the caller.
191 */
192 g_match_info_free(*match_info);
193 *match_info = NULL;
194 }
195
196 g_regex_unref(regex);
197
198 end:
199
200 if (regex_error) {
201 g_error_free(regex_error);
202 }
203
204 return matches;
205 }
206
207 /*
208 * Convert the captured text in match number `match_num` in `match_info`
209 * to an unsigned integer.
210 */
211 static
212 guint64 match_to_uint(const GMatchInfo *match_info, gint match_num) {
213 gchar *text, *endptr;
214 guint64 result;
215
216 text = g_match_info_fetch(match_info, match_num);
217 BT_ASSERT(text);
218
219 /*
220 * Because the input is carefully sanitized with regexes by the caller,
221 * we assume that g_ascii_strtoull cannot fail.
222 */
223 errno = 0;
224 result = g_ascii_strtoull(text, &endptr, 10);
225 BT_ASSERT(endptr > text);
226 BT_ASSERT(errno == 0);
227
228 g_free(text);
229
230 return result;
231 }
232
233 /*
234 * When parsing the nanoseconds part, .512 means .512000000, not .000000512.
235 * This function is like match_to_uint, but multiplies the parsed number to get
236 * the expected result.
237 */
238 static
239 guint64 match_to_uint_ns(const GMatchInfo *match_info, gint match_num) {
240 guint64 nanoseconds;
241 gboolean ret;
242 gint start_pos, end_pos, power;
243 static int pow10[] = {
244 1, 10, 100, 1000, 10000, 100000, 1000000, 10000000, 100000000,
245 };
246
247 nanoseconds = match_to_uint(match_info, match_num);
248
249 /* Multiply by 10 as many times as there are omitted digits. */
250 ret = g_match_info_fetch_pos(match_info, match_num, &start_pos, &end_pos);
251 BT_ASSERT(ret);
252
253 power = 9 - (end_pos - start_pos);
254 BT_ASSERT(power >= 0 && power <= 8);
255
256 nanoseconds *= pow10[power];
257
258 return nanoseconds;
259 }
260
261 /*
262 * Sets the time (in ns from origin) of a trimmer bound from date and
263 * time components.
264 *
265 * Returns a negative value if anything goes wrong.
266 */
267 static
268 int set_bound_ns_from_origin(struct trimmer_bound *bound,
269 unsigned int year, unsigned int month, unsigned int day,
270 unsigned int hour, unsigned int minute, unsigned int second,
271 unsigned int ns, bool is_gmt)
272 {
273 int ret = 0;
274 time_t result;
275 struct tm tm = {
276 .tm_sec = second,
277 .tm_min = minute,
278 .tm_hour = hour,
279 .tm_mday = day,
280 .tm_mon = month - 1,
281 .tm_year = year - 1900,
282 .tm_isdst = -1,
283 };
284
285 if (is_gmt) {
286 result = bt_timegm(&tm);
287 } else {
288 result = mktime(&tm);
289 }
290
291 if (result < 0) {
292 ret = -1;
293 goto end;
294 }
295
296 BT_ASSERT(bound);
297 bound->ns_from_origin = (int64_t) result;
298 bound->ns_from_origin *= NS_PER_S;
299 bound->ns_from_origin += ns;
300 bound->is_set = true;
301
302 end:
303 return ret;
304 }
305
306 /*
307 * Parses a timestamp, figuring out its format.
308 *
309 * Returns a negative value if anything goes wrong.
310 *
311 * Expected formats:
312 *
313 * YYYY-MM-DD hh:mm[:ss[.ns]]
314 * [hh:mm:]ss[.ns]
315 * [-]s[.ns]
316 *
317 * TODO: Check overflows.
318 */
319 static
320 int set_bound_from_str(struct trimmer_comp *trimmer_comp,
321 const char *str, struct trimmer_bound *bound, bool is_gmt)
322 {
323 /* Matches YYYY-MM-DD */
324 #define DATE_RE "([0-9]{4})-([0-9]{2})-([0-9]{2})"
325
326 /* Matches HH:MM[:SS[.NS]] */
327 #define TIME_RE "([0-9]{2}):([0-9]{2})(?::([0-9]{2})(?:\\.([0-9]{1,9}))?)?"
328
329 /* Matches [-]SS[.NS] */
330 #define S_NS_RE "^(-?)([0-9]+)(?:\\.([0-9]{1,9}))?$"
331
332 GMatchInfo *match_info;
333 int ret = 0;
334
335 /* Try `YYYY-MM-DD hh:mm[:ss[.ns]]` format */
336 if (compile_and_match("^" DATE_RE " " TIME_RE "$", str, &match_info)) {
337 unsigned int year = 0, month = 0, day = 0, hours = 0, minutes = 0, seconds = 0, nanoseconds = 0;
338 gint match_count = g_match_info_get_match_count(match_info);
339
340 BT_ASSERT(match_count >= 6 && match_count <= 8);
341
342 year = match_to_uint(match_info, 1);
343 month = match_to_uint(match_info, 2);
344 day = match_to_uint(match_info, 3);
345 hours = match_to_uint(match_info, 4);
346 minutes = match_to_uint(match_info, 5);
347
348 if (match_count >= 7) {
349 seconds = match_to_uint(match_info, 6);
350 }
351
352 if (match_count >= 8) {
353 nanoseconds = match_to_uint_ns(match_info, 7);
354 }
355
356 set_bound_ns_from_origin(bound, year, month, day, hours, minutes, seconds, nanoseconds, is_gmt);
357
358 goto end;
359 }
360
361 if (compile_and_match("^" DATE_RE "$", str, &match_info)) {
362 unsigned int year = 0, month = 0, day = 0;
363
364 BT_ASSERT(g_match_info_get_match_count(match_info) == 4);
365
366 year = match_to_uint(match_info, 1);
367 month = match_to_uint(match_info, 2);
368 day = match_to_uint(match_info, 3);
369
370 set_bound_ns_from_origin(bound, year, month, day, 0, 0, 0, 0, is_gmt);
371
372 goto end;
373 }
374
375 /* Try `hh:mm[:ss[.ns]]` format */
376 if (compile_and_match("^" TIME_RE "$", str, &match_info)) {
377 gint match_count = g_match_info_get_match_count(match_info);
378 BT_ASSERT(match_count >= 3 && match_count <= 5);
379 bound->time.hour = match_to_uint(match_info, 1);
380 bound->time.minute = match_to_uint(match_info, 2);
381
382 if (match_count >= 4) {
383 bound->time.second = match_to_uint(match_info, 3);
384 }
385
386 if (match_count >= 5) {
387 bound->time.ns = match_to_uint_ns(match_info, 4);
388 }
389
390 goto end;
391 }
392
393 /* Try `[-]s[.ns]` format */
394 if (compile_and_match("^" S_NS_RE "$", str, &match_info)) {
395 gboolean is_neg, fetch_pos_ret;
396 gint start_pos, end_pos, match_count;
397 guint64 seconds, nanoseconds = 0;
398
399 match_count = g_match_info_get_match_count(match_info);
400 BT_ASSERT(match_count >= 3 && match_count <= 4);
401
402 /* Check for presence of negation sign. */
403 fetch_pos_ret = g_match_info_fetch_pos(match_info, 1, &start_pos, &end_pos);
404 BT_ASSERT(fetch_pos_ret);
405 is_neg = (end_pos - start_pos) > 0;
406
407 seconds = match_to_uint(match_info, 2);
408
409 if (match_count >= 4) {
410 nanoseconds = match_to_uint_ns(match_info, 3);
411 }
412
413 bound->ns_from_origin = seconds * NS_PER_S + nanoseconds;
414
415 if (is_neg) {
416 bound->ns_from_origin = -bound->ns_from_origin;
417 }
418
419 bound->is_set = true;
420
421 goto end;
422 }
423
424 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
425 "Invalid date/time format: param=\"%s\"", str);
426 ret = -1;
427
428 end:
429 return ret;
430 }
431
432 /*
433 * Sets a trimmer bound's properties from a parameter string/integer
434 * value.
435 *
436 * Returns a negative value if anything goes wrong.
437 */
438 static
439 int set_bound_from_param(struct trimmer_comp *trimmer_comp,
440 const char *param_name, const bt_value *param,
441 struct trimmer_bound *bound, bool is_gmt)
442 {
443 int ret;
444 const char *arg;
445 char tmp_arg[64];
446
447 if (bt_value_is_signed_integer(param)) {
448 int64_t value = bt_value_integer_signed_get(param);
449
450 /*
451 * Just convert it to a temporary string to handle
452 * everything the same way.
453 */
454 sprintf(tmp_arg, "%" PRId64, value);
455 arg = tmp_arg;
456 } else if (bt_value_is_string(param)) {
457 arg = bt_value_string_get(param);
458 } else {
459 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
460 "`%s` parameter must be an integer or a string value.",
461 param_name);
462 ret = -1;
463 goto end;
464 }
465
466 ret = set_bound_from_str(trimmer_comp, arg, bound, is_gmt);
467
468 end:
469 return ret;
470 }
471
472 static
473 int validate_trimmer_bounds(struct trimmer_comp *trimmer_comp,
474 struct trimmer_bound *begin, struct trimmer_bound *end)
475 {
476 int ret = 0;
477
478 BT_ASSERT(begin->is_set);
479 BT_ASSERT(end->is_set);
480
481 if (!begin->is_infinite && !end->is_infinite &&
482 begin->ns_from_origin > end->ns_from_origin) {
483 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
484 "Trimming time range's beginning time is greater than end time: "
485 "begin-ns-from-origin=%" PRId64 ", "
486 "end-ns-from-origin=%" PRId64,
487 begin->ns_from_origin,
488 end->ns_from_origin);
489 ret = -1;
490 goto end;
491 }
492
493 if (!begin->is_infinite && begin->ns_from_origin == INT64_MIN) {
494 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
495 "Invalid trimming time range's beginning time: "
496 "ns-from-origin=%" PRId64,
497 begin->ns_from_origin);
498 ret = -1;
499 goto end;
500 }
501
502 if (!end->is_infinite && end->ns_from_origin == INT64_MIN) {
503 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
504 "Invalid trimming time range's end time: "
505 "ns-from-origin=%" PRId64,
506 end->ns_from_origin);
507 ret = -1;
508 goto end;
509 }
510
511 end:
512 return ret;
513 }
514
515 static
516 int init_trimmer_comp_from_params(struct trimmer_comp *trimmer_comp,
517 const bt_value *params)
518 {
519 const bt_value *value;
520 int ret = 0;
521
522 BT_ASSERT(params);
523 value = bt_value_map_borrow_entry_value_const(params, "gmt");
524 if (value) {
525 trimmer_comp->is_gmt = (bool) bt_value_bool_get(value);
526 }
527
528 value = bt_value_map_borrow_entry_value_const(params, "begin");
529 if (value) {
530 if (set_bound_from_param(trimmer_comp, "begin", value,
531 &trimmer_comp->begin, trimmer_comp->is_gmt)) {
532 /* set_bound_from_param() logs errors */
533 ret = -1;
534 goto end;
535 }
536 } else {
537 trimmer_comp->begin.is_infinite = true;
538 trimmer_comp->begin.is_set = true;
539 }
540
541 value = bt_value_map_borrow_entry_value_const(params, "end");
542 if (value) {
543 if (set_bound_from_param(trimmer_comp, "end", value,
544 &trimmer_comp->end, trimmer_comp->is_gmt)) {
545 /* set_bound_from_param() logs errors */
546 ret = -1;
547 goto end;
548 }
549 } else {
550 trimmer_comp->end.is_infinite = true;
551 trimmer_comp->end.is_set = true;
552 }
553
554 end:
555 if (trimmer_comp->begin.is_set && trimmer_comp->end.is_set) {
556 /* validate_trimmer_bounds() logs errors */
557 ret = validate_trimmer_bounds(trimmer_comp,
558 &trimmer_comp->begin, &trimmer_comp->end);
559 }
560
561 return ret;
562 }
563
564 bt_component_class_init_method_status trimmer_init(
565 bt_self_component_filter *self_comp_flt,
566 const bt_value *params, void *init_data)
567 {
568 int ret;
569 bt_component_class_init_method_status status =
570 BT_COMPONENT_CLASS_INIT_METHOD_STATUS_OK;
571 bt_self_component_add_port_status add_port_status;
572 struct trimmer_comp *trimmer_comp = create_trimmer_comp();
573 bt_self_component *self_comp =
574 bt_self_component_filter_as_self_component(self_comp_flt);
575 if (!trimmer_comp) {
576 status = BT_COMPONENT_CLASS_INIT_METHOD_STATUS_MEMORY_ERROR;
577 goto error;
578 }
579
580 trimmer_comp->log_level = bt_component_get_logging_level(
581 bt_self_component_as_component(self_comp));
582 trimmer_comp->self_comp = self_comp;
583 add_port_status = bt_self_component_filter_add_input_port(
584 self_comp_flt, in_port_name, NULL, NULL);
585 switch (add_port_status) {
586 case BT_SELF_COMPONENT_ADD_PORT_STATUS_ERROR:
587 status = BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR;
588 goto error;
589 case BT_SELF_COMPONENT_ADD_PORT_STATUS_MEMORY_ERROR:
590 status = BT_COMPONENT_CLASS_INIT_METHOD_STATUS_MEMORY_ERROR;
591 goto error;
592 default:
593 break;
594 }
595
596 add_port_status = bt_self_component_filter_add_output_port(
597 self_comp_flt, "out", NULL, NULL);
598 switch (add_port_status) {
599 case BT_SELF_COMPONENT_ADD_PORT_STATUS_ERROR:
600 status = BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR;
601 goto error;
602 case BT_SELF_COMPONENT_ADD_PORT_STATUS_MEMORY_ERROR:
603 status = BT_COMPONENT_CLASS_INIT_METHOD_STATUS_MEMORY_ERROR;
604 goto error;
605 default:
606 break;
607 }
608
609 ret = init_trimmer_comp_from_params(trimmer_comp, params);
610 if (ret) {
611 status = BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR;
612 goto error;
613 }
614
615 bt_self_component_set_data(self_comp, trimmer_comp);
616 goto end;
617
618 error:
619 if (status == BT_COMPONENT_CLASS_INIT_METHOD_STATUS_OK) {
620 status = BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR;
621 }
622
623 if (trimmer_comp) {
624 destroy_trimmer_comp(trimmer_comp);
625 }
626
627 end:
628 return status;
629 }
630
631 static
632 void destroy_trimmer_iterator(struct trimmer_iterator *trimmer_it)
633 {
634 if (!trimmer_it) {
635 goto end;
636 }
637
638 bt_self_component_port_input_message_iterator_put_ref(
639 trimmer_it->upstream_iter);
640
641 if (trimmer_it->output_messages) {
642 g_queue_free(trimmer_it->output_messages);
643 }
644
645 if (trimmer_it->stream_states) {
646 g_hash_table_destroy(trimmer_it->stream_states);
647 }
648
649 g_free(trimmer_it);
650 end:
651 return;
652 }
653
654 static
655 void destroy_trimmer_iterator_stream_state(
656 struct trimmer_iterator_stream_state *sstate)
657 {
658 BT_ASSERT(sstate);
659 BT_PACKET_PUT_REF_AND_RESET(sstate->cur_packet);
660 g_free(sstate);
661 }
662
663 BT_HIDDEN
664 bt_component_class_message_iterator_init_method_status trimmer_msg_iter_init(
665 bt_self_message_iterator *self_msg_iter,
666 bt_self_component_filter *self_comp,
667 bt_self_component_port_output *port)
668 {
669 bt_component_class_message_iterator_init_method_status status;
670 bt_self_component_port_input_message_iterator_create_from_message_iterator_status
671 msg_iter_status;
672 struct trimmer_iterator *trimmer_it;
673
674 trimmer_it = g_new0(struct trimmer_iterator, 1);
675 if (!trimmer_it) {
676 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INIT_METHOD_STATUS_MEMORY_ERROR;
677 goto error;
678 }
679
680 trimmer_it->trimmer_comp = bt_self_component_get_data(
681 bt_self_component_filter_as_self_component(self_comp));
682 BT_ASSERT(trimmer_it->trimmer_comp);
683
684 if (trimmer_it->trimmer_comp->begin.is_set &&
685 trimmer_it->trimmer_comp->end.is_set) {
686 /*
687 * Both trimming time range's bounds are set, so skip
688 * the
689 * `TRIMMER_ITERATOR_STATE_SET_BOUNDS_NS_FROM_ORIGIN`
690 * phase.
691 */
692 trimmer_it->state = TRIMMER_ITERATOR_STATE_SEEK_INITIALLY;
693 }
694
695 trimmer_it->begin = trimmer_it->trimmer_comp->begin;
696 trimmer_it->end = trimmer_it->trimmer_comp->end;
697 msg_iter_status =
698 bt_self_component_port_input_message_iterator_create_from_message_iterator(
699 self_msg_iter,
700 bt_self_component_filter_borrow_input_port_by_name(
701 self_comp, in_port_name), &trimmer_it->upstream_iter);
702 if (msg_iter_status != BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_CREATE_FROM_MESSAGE_ITERATOR_STATUS_OK) {
703 status = (int) msg_iter_status;
704 goto error;
705 }
706
707 trimmer_it->output_messages = g_queue_new();
708 if (!trimmer_it->output_messages) {
709 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INIT_METHOD_STATUS_MEMORY_ERROR;
710 goto error;
711 }
712
713 trimmer_it->stream_states = g_hash_table_new_full(g_direct_hash,
714 g_direct_equal, NULL,
715 (GDestroyNotify) destroy_trimmer_iterator_stream_state);
716 if (!trimmer_it->stream_states) {
717 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INIT_METHOD_STATUS_MEMORY_ERROR;
718 goto error;
719 }
720
721 trimmer_it->self_msg_iter = self_msg_iter;
722 bt_self_message_iterator_set_data(self_msg_iter, trimmer_it);
723
724 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INIT_METHOD_STATUS_OK;
725 goto end;
726
727 error:
728 destroy_trimmer_iterator(trimmer_it);
729
730 end:
731 return status;
732 }
733
734 static inline
735 int get_msg_ns_from_origin(const bt_message *msg, int64_t *ns_from_origin,
736 bool *has_clock_snapshot)
737 {
738 const bt_clock_class *clock_class = NULL;
739 const bt_clock_snapshot *clock_snapshot = NULL;
740 int ret = 0;
741
742 BT_ASSERT(msg);
743 BT_ASSERT(ns_from_origin);
744 BT_ASSERT(has_clock_snapshot);
745
746 switch (bt_message_get_type(msg)) {
747 case BT_MESSAGE_TYPE_EVENT:
748 clock_class =
749 bt_message_event_borrow_stream_class_default_clock_class_const(
750 msg);
751 if (G_UNLIKELY(!clock_class)) {
752 goto error;
753 }
754
755 clock_snapshot = bt_message_event_borrow_default_clock_snapshot_const(
756 msg);
757 break;
758 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
759 clock_class =
760 bt_message_packet_beginning_borrow_stream_class_default_clock_class_const(
761 msg);
762 if (G_UNLIKELY(!clock_class)) {
763 goto error;
764 }
765
766 clock_snapshot = bt_message_packet_beginning_borrow_default_clock_snapshot_const(
767 msg);
768 break;
769 case BT_MESSAGE_TYPE_PACKET_END:
770 clock_class =
771 bt_message_packet_end_borrow_stream_class_default_clock_class_const(
772 msg);
773 if (G_UNLIKELY(!clock_class)) {
774 goto error;
775 }
776
777 clock_snapshot = bt_message_packet_end_borrow_default_clock_snapshot_const(
778 msg);
779 break;
780 case BT_MESSAGE_TYPE_STREAM_BEGINNING:
781 {
782 enum bt_message_stream_clock_snapshot_state cs_state;
783
784 clock_class =
785 bt_message_stream_beginning_borrow_stream_class_default_clock_class_const(msg);
786 if (G_UNLIKELY(!clock_class)) {
787 goto error;
788 }
789
790 cs_state = bt_message_stream_beginning_borrow_default_clock_snapshot_const(msg, &clock_snapshot);
791 if (cs_state != BT_MESSAGE_STREAM_CLOCK_SNAPSHOT_STATE_KNOWN) {
792 goto no_clock_snapshot;
793 }
794
795 break;
796 }
797 case BT_MESSAGE_TYPE_STREAM_END:
798 {
799 enum bt_message_stream_clock_snapshot_state cs_state;
800
801 clock_class =
802 bt_message_stream_end_borrow_stream_class_default_clock_class_const(msg);
803 if (G_UNLIKELY(!clock_class)) {
804 goto error;
805 }
806
807 cs_state = bt_message_stream_end_borrow_default_clock_snapshot_const(msg, &clock_snapshot);
808 if (cs_state != BT_MESSAGE_STREAM_CLOCK_SNAPSHOT_STATE_KNOWN) {
809 goto no_clock_snapshot;
810 }
811
812 break;
813 }
814 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
815 clock_class =
816 bt_message_discarded_events_borrow_stream_class_default_clock_class_const(
817 msg);
818 if (G_UNLIKELY(!clock_class)) {
819 goto error;
820 }
821
822 clock_snapshot = bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(
823 msg);
824 break;
825 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
826 clock_class =
827 bt_message_discarded_packets_borrow_stream_class_default_clock_class_const(
828 msg);
829 if (G_UNLIKELY(!clock_class)) {
830 goto error;
831 }
832
833 clock_snapshot = bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(
834 msg);
835 break;
836 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
837 clock_snapshot =
838 bt_message_message_iterator_inactivity_borrow_default_clock_snapshot_const(
839 msg);
840 break;
841 default:
842 goto no_clock_snapshot;
843 }
844
845 ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot,
846 ns_from_origin);
847 if (G_UNLIKELY(ret)) {
848 goto error;
849 }
850
851 *has_clock_snapshot = true;
852 goto end;
853
854 no_clock_snapshot:
855 *has_clock_snapshot = false;
856 goto end;
857
858 error:
859 ret = -1;
860
861 end:
862 return ret;
863 }
864
865 static inline
866 void put_messages(bt_message_array_const msgs, uint64_t count)
867 {
868 uint64_t i;
869
870 for (i = 0; i < count; i++) {
871 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
872 }
873 }
874
875 static inline
876 int set_trimmer_iterator_bound(struct trimmer_iterator *trimmer_it,
877 struct trimmer_bound *bound, int64_t ns_from_origin,
878 bool is_gmt)
879 {
880 struct trimmer_comp *trimmer_comp = trimmer_it->trimmer_comp;
881 struct tm tm;
882 struct tm *res;
883 time_t time_seconds = (time_t) (ns_from_origin / NS_PER_S);
884 int ret = 0;
885
886 BT_ASSERT(!bound->is_set);
887 errno = 0;
888
889 /* We only need to extract the date from this time */
890 if (is_gmt) {
891 res = bt_gmtime_r(&time_seconds, &tm);
892 } else {
893 res = bt_localtime_r(&time_seconds, &tm);
894 }
895
896 if (!res) {
897 BT_COMP_LOGE_APPEND_CAUSE_ERRNO(trimmer_comp->self_comp,
898 "Cannot convert timestamp to date and time",
899 ": ts=%" PRId64, (int64_t) time_seconds);
900 ret = -1;
901 goto end;
902 }
903
904 ret = set_bound_ns_from_origin(bound, tm.tm_year + 1900, tm.tm_mon + 1,
905 tm.tm_mday, bound->time.hour, bound->time.minute,
906 bound->time.second, bound->time.ns, is_gmt);
907
908 end:
909 return ret;
910 }
911
912 static
913 bt_component_class_message_iterator_next_method_status
914 state_set_trimmer_iterator_bounds(
915 struct trimmer_iterator *trimmer_it)
916 {
917 bt_message_iterator_next_status upstream_iter_status =
918 BT_MESSAGE_ITERATOR_NEXT_STATUS_OK;
919 struct trimmer_comp *trimmer_comp = trimmer_it->trimmer_comp;
920 bt_message_array_const msgs;
921 uint64_t count = 0;
922 int64_t ns_from_origin = INT64_MIN;
923 uint64_t i;
924 int ret;
925
926 BT_ASSERT(!trimmer_it->begin.is_set ||
927 !trimmer_it->end.is_set);
928
929 while (true) {
930 upstream_iter_status =
931 bt_self_component_port_input_message_iterator_next(
932 trimmer_it->upstream_iter, &msgs, &count);
933 if (upstream_iter_status != BT_MESSAGE_ITERATOR_NEXT_STATUS_OK) {
934 goto end;
935 }
936
937 for (i = 0; i < count; i++) {
938 const bt_message *msg = msgs[i];
939 bool has_ns_from_origin;
940 int ret;
941
942 ret = get_msg_ns_from_origin(msg, &ns_from_origin,
943 &has_ns_from_origin);
944 if (ret) {
945 goto error;
946 }
947
948 if (!has_ns_from_origin) {
949 continue;
950 }
951
952 BT_ASSERT(ns_from_origin != INT64_MIN &&
953 ns_from_origin != INT64_MAX);
954 put_messages(msgs, count);
955 goto found;
956 }
957
958 put_messages(msgs, count);
959 }
960
961 found:
962 if (!trimmer_it->begin.is_set) {
963 BT_ASSERT(!trimmer_it->begin.is_infinite);
964 ret = set_trimmer_iterator_bound(trimmer_it, &trimmer_it->begin,
965 ns_from_origin, trimmer_comp->is_gmt);
966 if (ret) {
967 goto error;
968 }
969 }
970
971 if (!trimmer_it->end.is_set) {
972 BT_ASSERT(!trimmer_it->end.is_infinite);
973 ret = set_trimmer_iterator_bound(trimmer_it, &trimmer_it->end,
974 ns_from_origin, trimmer_comp->is_gmt);
975 if (ret) {
976 goto error;
977 }
978 }
979
980 ret = validate_trimmer_bounds(trimmer_it->trimmer_comp,
981 &trimmer_it->begin, &trimmer_it->end);
982 if (ret) {
983 goto error;
984 }
985
986 goto end;
987
988 error:
989 put_messages(msgs, count);
990 upstream_iter_status = BT_MESSAGE_ITERATOR_NEXT_STATUS_ERROR;
991
992 end:
993 return (int) upstream_iter_status;
994 }
995
996 static
997 bt_component_class_message_iterator_next_method_status state_seek_initially(
998 struct trimmer_iterator *trimmer_it)
999 {
1000 struct trimmer_comp *trimmer_comp = trimmer_it->trimmer_comp;
1001 bt_component_class_message_iterator_next_method_status status =
1002 BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK;
1003
1004 BT_ASSERT(trimmer_it->begin.is_set);
1005
1006 if (trimmer_it->begin.is_infinite) {
1007 if (!bt_self_component_port_input_message_iterator_can_seek_beginning(
1008 trimmer_it->upstream_iter)) {
1009 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
1010 "Cannot make upstream message iterator initially seek its beginning.");
1011 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
1012 goto end;
1013 }
1014
1015 status = (int) bt_self_component_port_input_message_iterator_seek_beginning(
1016 trimmer_it->upstream_iter);
1017 } else {
1018 if (!bt_self_component_port_input_message_iterator_can_seek_ns_from_origin(
1019 trimmer_it->upstream_iter,
1020 trimmer_it->begin.ns_from_origin)) {
1021 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
1022 "Cannot make upstream message iterator initially seek: seek-ns-from-origin=%" PRId64,
1023 trimmer_it->begin.ns_from_origin);
1024 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
1025 goto end;
1026 }
1027
1028 status = (int) bt_self_component_port_input_message_iterator_seek_ns_from_origin(
1029 trimmer_it->upstream_iter, trimmer_it->begin.ns_from_origin);
1030 }
1031
1032 if (status == BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) {
1033 trimmer_it->state = TRIMMER_ITERATOR_STATE_TRIM;
1034 }
1035
1036 end:
1037 return status;
1038 }
1039
1040 static inline
1041 void push_message(struct trimmer_iterator *trimmer_it, const bt_message *msg)
1042 {
1043 g_queue_push_head(trimmer_it->output_messages, (void *) msg);
1044 }
1045
1046 static inline
1047 const bt_message *pop_message(struct trimmer_iterator *trimmer_it)
1048 {
1049 return g_queue_pop_tail(trimmer_it->output_messages);
1050 }
1051
1052 static inline
1053 int clock_raw_value_from_ns_from_origin(const bt_clock_class *clock_class,
1054 int64_t ns_from_origin, uint64_t *raw_value)
1055 {
1056
1057 int64_t cc_offset_s;
1058 uint64_t cc_offset_cycles;
1059 uint64_t cc_freq;
1060
1061 bt_clock_class_get_offset(clock_class, &cc_offset_s, &cc_offset_cycles);
1062 cc_freq = bt_clock_class_get_frequency(clock_class);
1063 return bt_common_clock_value_from_ns_from_origin(cc_offset_s,
1064 cc_offset_cycles, cc_freq, ns_from_origin, raw_value);
1065 }
1066
1067 static inline
1068 bt_component_class_message_iterator_next_method_status
1069 end_stream(struct trimmer_iterator *trimmer_it,
1070 struct trimmer_iterator_stream_state *sstate)
1071 {
1072 bt_component_class_message_iterator_next_method_status status =
1073 BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK;
1074 /* Initialize to silence maybe-uninitialized warning. */
1075 uint64_t raw_value = 0;
1076 bt_message *msg = NULL;
1077
1078 BT_ASSERT(!trimmer_it->end.is_infinite);
1079 BT_ASSERT(sstate->stream);
1080
1081 /*
1082 * If we haven't seen a message with a clock snapshot, we don't know if the trimmer's end bound is within
1083 * the clock's range, so it wouldn't be safe to try to convert ns_from_origin to a clock value.
1084 *
1085 * Also, it would be a bit of a lie to generate a stream end message with the end bound as its
1086 * clock snapshot, because we don't really know if the stream existed at that time. If we have
1087 * seen a message with a clock snapshot and the stream is cut short by another message with a
1088 * clock snapshot, then we are sure that the the end bound time is not below the clock range,
1089 * and we know the stream was active at that time (and that we cut it short).
1090 */
1091 if (sstate->seen_clock_snapshot) {
1092 const bt_clock_class *clock_class;
1093 int ret;
1094
1095 clock_class = bt_stream_class_borrow_default_clock_class_const(
1096 bt_stream_borrow_class_const(sstate->stream));
1097 BT_ASSERT(clock_class);
1098 ret = clock_raw_value_from_ns_from_origin(clock_class,
1099 trimmer_it->end.ns_from_origin, &raw_value);
1100 if (ret) {
1101 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
1102 goto end;
1103 }
1104 }
1105
1106 if (sstate->cur_packet) {
1107 /*
1108 * Create and push a packet end message, making its time
1109 * the trimming range's end time.
1110 *
1111 * We know that we must have seen a clock snapshot, the one in
1112 * the packet beginning message, since trimmer currently
1113 * requires packet messages to have clock snapshots (see comment
1114 * in create_stream_state_entry).
1115 */
1116 BT_ASSERT(sstate->seen_clock_snapshot);
1117
1118 msg = bt_message_packet_end_create_with_default_clock_snapshot(
1119 trimmer_it->self_msg_iter, sstate->cur_packet,
1120 raw_value);
1121 if (!msg) {
1122 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_MEMORY_ERROR;
1123 goto end;
1124 }
1125
1126 push_message(trimmer_it, msg);
1127 msg = NULL;
1128 BT_PACKET_PUT_REF_AND_RESET(sstate->cur_packet);
1129 }
1130
1131 /* Create and push a stream end message. */
1132 msg = bt_message_stream_end_create(trimmer_it->self_msg_iter,
1133 sstate->stream);
1134 if (!msg) {
1135 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_MEMORY_ERROR;
1136 goto end;
1137 }
1138
1139 if (sstate->seen_clock_snapshot) {
1140 bt_message_stream_end_set_default_clock_snapshot(msg, raw_value);
1141 }
1142
1143 push_message(trimmer_it, msg);
1144 msg = NULL;
1145
1146 /*
1147 * Just to make sure that we don't use this stream state again
1148 * in the future without an obvious error.
1149 */
1150 sstate->stream = NULL;
1151
1152 end:
1153 bt_message_put_ref(msg);
1154 return status;
1155 }
1156
1157 static inline
1158 bt_component_class_message_iterator_next_method_status end_iterator_streams(
1159 struct trimmer_iterator *trimmer_it)
1160 {
1161 bt_component_class_message_iterator_next_method_status status =
1162 BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK;
1163 GHashTableIter iter;
1164 gpointer key, sstate;
1165
1166 if (trimmer_it->end.is_infinite) {
1167 /*
1168 * An infinite trimming range's end time guarantees that
1169 * we received (and pushed) all the appropriate end
1170 * messages.
1171 */
1172 goto remove_all;
1173 }
1174
1175 /*
1176 * End each stream and then remove them from the hash table of
1177 * stream states to release unneeded references.
1178 */
1179 g_hash_table_iter_init(&iter, trimmer_it->stream_states);
1180
1181 while (g_hash_table_iter_next(&iter, &key, &sstate)) {
1182 status = end_stream(trimmer_it, sstate);
1183 if (status) {
1184 goto end;
1185 }
1186 }
1187
1188 remove_all:
1189 g_hash_table_remove_all(trimmer_it->stream_states);
1190
1191 end:
1192 return status;
1193 }
1194
1195 static
1196 bt_component_class_message_iterator_next_method_status
1197 create_stream_state_entry(
1198 struct trimmer_iterator *trimmer_it,
1199 const struct bt_stream *stream,
1200 struct trimmer_iterator_stream_state **stream_state)
1201 {
1202 struct trimmer_comp *trimmer_comp = trimmer_it->trimmer_comp;
1203 bt_component_class_message_iterator_next_method_status status;
1204 struct trimmer_iterator_stream_state *sstate;
1205 const bt_stream_class *sc;
1206
1207 BT_ASSERT(!bt_g_hash_table_contains(trimmer_it->stream_states, stream));
1208
1209 /*
1210 * Validate right now that the stream's class
1211 * has a registered default clock class so that
1212 * an existing stream state guarantees existing
1213 * default clock snapshots for its associated
1214 * messages.
1215 *
1216 * Also check that clock snapshots are always
1217 * known.
1218 */
1219 sc = bt_stream_borrow_class_const(stream);
1220 if (!bt_stream_class_borrow_default_clock_class_const(sc)) {
1221 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
1222 "Unsupported stream: stream class does "
1223 "not have a default clock class: "
1224 "stream-addr=%p, "
1225 "stream-id=%" PRIu64 ", "
1226 "stream-name=\"%s\"",
1227 stream, bt_stream_get_id(stream),
1228 bt_stream_get_name(stream));
1229 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
1230 goto end;
1231 }
1232
1233 /*
1234 * Temporary: make sure packet beginning, packet
1235 * end, discarded events, and discarded packets
1236 * messages have default clock snapshots until
1237 * the support for not having them is
1238 * implemented.
1239 */
1240 if (!bt_stream_class_packets_have_beginning_default_clock_snapshot(
1241 sc)) {
1242 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
1243 "Unsupported stream: packets have no beginning clock snapshot: "
1244 "stream-addr=%p, "
1245 "stream-id=%" PRIu64 ", "
1246 "stream-name=\"%s\"",
1247 stream, bt_stream_get_id(stream),
1248 bt_stream_get_name(stream));
1249 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
1250 goto end;
1251 }
1252
1253 if (!bt_stream_class_packets_have_end_default_clock_snapshot(
1254 sc)) {
1255 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
1256 "Unsupported stream: packets have no end clock snapshot: "
1257 "stream-addr=%p, "
1258 "stream-id=%" PRIu64 ", "
1259 "stream-name=\"%s\"",
1260 stream, bt_stream_get_id(stream),
1261 bt_stream_get_name(stream));
1262 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
1263 goto end;
1264 }
1265
1266 if (bt_stream_class_supports_discarded_events(sc) &&
1267 !bt_stream_class_discarded_events_have_default_clock_snapshots(sc)) {
1268 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
1269 "Unsupported stream: discarded events have no clock snapshots: "
1270 "stream-addr=%p, "
1271 "stream-id=%" PRIu64 ", "
1272 "stream-name=\"%s\"",
1273 stream, bt_stream_get_id(stream),
1274 bt_stream_get_name(stream));
1275 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
1276 goto end;
1277 }
1278
1279 if (bt_stream_class_supports_discarded_packets(sc) &&
1280 !bt_stream_class_discarded_packets_have_default_clock_snapshots(sc)) {
1281 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
1282 "Unsupported stream: discarded packets "
1283 "have no clock snapshots: "
1284 "stream-addr=%p, "
1285 "stream-id=%" PRIu64 ", "
1286 "stream-name=\"%s\"",
1287 stream, bt_stream_get_id(stream),
1288 bt_stream_get_name(stream));
1289 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
1290 goto end;
1291 }
1292
1293 sstate = g_new0(struct trimmer_iterator_stream_state, 1);
1294 if (!sstate) {
1295 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_MEMORY_ERROR;
1296 goto end;
1297 }
1298
1299 sstate->stream = stream;
1300
1301 g_hash_table_insert(trimmer_it->stream_states, (void *) stream, sstate);
1302
1303 *stream_state = sstate;
1304
1305 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK;
1306
1307 end:
1308 return status;
1309 }
1310
1311 static
1312 struct trimmer_iterator_stream_state *get_stream_state_entry(
1313 struct trimmer_iterator *trimmer_it,
1314 const struct bt_stream *stream)
1315 {
1316 struct trimmer_iterator_stream_state *sstate;
1317
1318 BT_ASSERT(stream);
1319 sstate = g_hash_table_lookup(trimmer_it->stream_states, stream);
1320 BT_ASSERT(sstate);
1321
1322 return sstate;
1323 }
1324
1325 /*
1326 * Handles a message which is associated to a given stream state. This
1327 * _could_ make the iterator's output message queue grow; this could
1328 * also consume the message without pushing anything to this queue, only
1329 * modifying the stream state.
1330 *
1331 * This function consumes the `msg` reference, _whatever the outcome_.
1332 *
1333 * If non-NULL, `ns_from_origin` is the message's time, as given by
1334 * get_msg_ns_from_origin(). If NULL, the message doesn't have a time.
1335 *
1336 * This function sets `reached_end` if handling this message made the
1337 * iterator reach the end of the trimming range. Note that the output
1338 * message queue could contain messages even if this function sets
1339 * `reached_end`.
1340 */
1341 static
1342 bt_component_class_message_iterator_next_method_status
1343 handle_message_with_stream(
1344 struct trimmer_iterator *trimmer_it, const bt_message *msg,
1345 const struct bt_stream *stream, const int64_t *ns_from_origin,
1346 bool *reached_end)
1347 {
1348 bt_component_class_message_iterator_next_method_status status =
1349 BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK;
1350 bt_message_type msg_type = bt_message_get_type(msg);
1351 int ret;
1352 struct trimmer_iterator_stream_state *sstate = NULL;
1353
1354 /*
1355 * Retrieve the stream's state - except if the message is stream
1356 * beginning, in which case we don't know about about this stream yet.
1357 */
1358 if (msg_type != BT_MESSAGE_TYPE_STREAM_BEGINNING) {
1359 sstate = get_stream_state_entry(trimmer_it, stream);
1360 }
1361
1362 switch (msg_type) {
1363 case BT_MESSAGE_TYPE_EVENT:
1364 /*
1365 * Event messages always have a clock snapshot if the stream
1366 * class has a clock class. And we know it has, otherwise we
1367 * couldn't be using the trimmer component.
1368 */
1369 BT_ASSERT(ns_from_origin);
1370
1371 if (G_UNLIKELY(!trimmer_it->end.is_infinite &&
1372 *ns_from_origin > trimmer_it->end.ns_from_origin)) {
1373 status = end_iterator_streams(trimmer_it);
1374 *reached_end = true;
1375 break;
1376 }
1377
1378 sstate->seen_clock_snapshot = true;
1379
1380 push_message(trimmer_it, msg);
1381 msg = NULL;
1382 break;
1383
1384 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
1385 /*
1386 * Packet beginning messages won't have a clock snapshot if
1387 * stream_class->packets_have_beginning_default_clock_snapshot
1388 * is false. But for now, assume they always do.
1389 */
1390 BT_ASSERT(ns_from_origin);
1391 BT_ASSERT(!sstate->cur_packet);
1392
1393 if (G_UNLIKELY(!trimmer_it->end.is_infinite &&
1394 *ns_from_origin > trimmer_it->end.ns_from_origin)) {
1395 status = end_iterator_streams(trimmer_it);
1396 *reached_end = true;
1397 break;
1398 }
1399
1400 sstate->cur_packet =
1401 bt_message_packet_beginning_borrow_packet_const(msg);
1402 bt_packet_get_ref(sstate->cur_packet);
1403
1404 sstate->seen_clock_snapshot = true;
1405
1406 push_message(trimmer_it, msg);
1407 msg = NULL;
1408 break;
1409
1410 case BT_MESSAGE_TYPE_PACKET_END:
1411 /*
1412 * Packet end messages won't have a clock snapshot if
1413 * stream_class->packets_have_end_default_clock_snapshot
1414 * is false. But for now, assume they always do.
1415 */
1416 BT_ASSERT(ns_from_origin);
1417 BT_ASSERT(sstate->cur_packet);
1418
1419 if (G_UNLIKELY(!trimmer_it->end.is_infinite &&
1420 *ns_from_origin > trimmer_it->end.ns_from_origin)) {
1421 status = end_iterator_streams(trimmer_it);
1422 *reached_end = true;
1423 break;
1424 }
1425
1426 BT_PACKET_PUT_REF_AND_RESET(sstate->cur_packet);
1427
1428 sstate->seen_clock_snapshot = true;
1429
1430 push_message(trimmer_it, msg);
1431 msg = NULL;
1432 break;
1433
1434 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
1435 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
1436 {
1437 /*
1438 * `ns_from_origin` is the message's time range's
1439 * beginning time here.
1440 */
1441 int64_t end_ns_from_origin;
1442 const bt_clock_snapshot *end_cs;
1443
1444 BT_ASSERT(ns_from_origin);
1445
1446 sstate->seen_clock_snapshot = true;
1447
1448 if (bt_message_get_type(msg) ==
1449 BT_MESSAGE_TYPE_DISCARDED_EVENTS) {
1450 /*
1451 * Safe to ignore the return value because we
1452 * know there's a default clock and it's always
1453 * known.
1454 */
1455 end_cs = bt_message_discarded_events_borrow_end_default_clock_snapshot_const(
1456 msg);
1457 } else {
1458 /*
1459 * Safe to ignore the return value because we
1460 * know there's a default clock and it's always
1461 * known.
1462 */
1463 end_cs = bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(
1464 msg);
1465 }
1466
1467 if (bt_clock_snapshot_get_ns_from_origin(end_cs,
1468 &end_ns_from_origin)) {
1469 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
1470 goto end;
1471 }
1472
1473 if (!trimmer_it->end.is_infinite &&
1474 *ns_from_origin > trimmer_it->end.ns_from_origin) {
1475 status = end_iterator_streams(trimmer_it);
1476 *reached_end = true;
1477 break;
1478 }
1479
1480 if (!trimmer_it->end.is_infinite &&
1481 end_ns_from_origin > trimmer_it->end.ns_from_origin) {
1482 /*
1483 * This message's end time is outside the
1484 * trimming time range: replace it with a new
1485 * message having an end time equal to the
1486 * trimming time range's end and without a
1487 * count.
1488 */
1489 const bt_clock_class *clock_class =
1490 bt_clock_snapshot_borrow_clock_class_const(
1491 end_cs);
1492 const bt_clock_snapshot *begin_cs;
1493 bt_message *new_msg;
1494 uint64_t end_raw_value;
1495
1496 ret = clock_raw_value_from_ns_from_origin(clock_class,
1497 trimmer_it->end.ns_from_origin, &end_raw_value);
1498 if (ret) {
1499 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
1500 goto end;
1501 }
1502
1503 if (msg_type == BT_MESSAGE_TYPE_DISCARDED_EVENTS) {
1504 begin_cs = bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(
1505 msg);
1506 new_msg = bt_message_discarded_events_create_with_default_clock_snapshots(
1507 trimmer_it->self_msg_iter,
1508 sstate->stream,
1509 bt_clock_snapshot_get_value(begin_cs),
1510 end_raw_value);
1511 } else {
1512 begin_cs = bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(
1513 msg);
1514 new_msg = bt_message_discarded_packets_create_with_default_clock_snapshots(
1515 trimmer_it->self_msg_iter,
1516 sstate->stream,
1517 bt_clock_snapshot_get_value(begin_cs),
1518 end_raw_value);
1519 }
1520
1521 if (!new_msg) {
1522 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_MEMORY_ERROR;
1523 goto end;
1524 }
1525
1526 /* Replace the original message */
1527 BT_MESSAGE_MOVE_REF(msg, new_msg);
1528 }
1529
1530 push_message(trimmer_it, msg);
1531 msg = NULL;
1532 break;
1533 }
1534
1535 case BT_MESSAGE_TYPE_STREAM_BEGINNING:
1536 /*
1537 * If this message has a time and this time is greater than the
1538 * trimmer's end bound, it triggers the end of the trim window.
1539 */
1540 if (G_UNLIKELY(ns_from_origin && !trimmer_it->end.is_infinite &&
1541 *ns_from_origin > trimmer_it->end.ns_from_origin)) {
1542 status = end_iterator_streams(trimmer_it);
1543 *reached_end = true;
1544 break;
1545 }
1546
1547 /* Learn about this stream. */
1548 status = create_stream_state_entry(trimmer_it, stream, &sstate);
1549 if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) {
1550 goto end;
1551 }
1552
1553 if (ns_from_origin) {
1554 sstate->seen_clock_snapshot = true;
1555 }
1556
1557 push_message(trimmer_it, msg);
1558 msg = NULL;
1559 break;
1560 case BT_MESSAGE_TYPE_STREAM_END:
1561 {
1562 gboolean removed;
1563
1564 /*
1565 * If this message has a time and this time is greater than the
1566 * trimmer's end bound, it triggers the end of the trim window.
1567 */
1568 if (G_UNLIKELY(ns_from_origin && !trimmer_it->end.is_infinite &&
1569 *ns_from_origin > trimmer_it->end.ns_from_origin)) {
1570 status = end_iterator_streams(trimmer_it);
1571 *reached_end = true;
1572 break;
1573 }
1574
1575 /*
1576 * Either the stream end message's time is within the trimmer's
1577 * bounds, or it doesn't have a time. In both cases, pass
1578 * the message unmodified.
1579 */
1580 push_message(trimmer_it, msg);
1581 msg = NULL;
1582
1583 /* Forget about this stream. */
1584 removed = g_hash_table_remove(trimmer_it->stream_states, sstate->stream);
1585 BT_ASSERT(removed);
1586 break;
1587 }
1588 default:
1589 break;
1590 }
1591
1592 end:
1593 /* We release the message's reference whatever the outcome */
1594 bt_message_put_ref(msg);
1595 return status;
1596 }
1597
1598 /*
1599 * Handles an input message. This _could_ make the iterator's output
1600 * message queue grow; this could also consume the message without
1601 * pushing anything to this queue, only modifying the stream state.
1602 *
1603 * This function consumes the `msg` reference, _whatever the outcome_.
1604 *
1605 * This function sets `reached_end` if handling this message made the
1606 * iterator reach the end of the trimming range. Note that the output
1607 * message queue could contain messages even if this function sets
1608 * `reached_end`.
1609 */
1610 static inline
1611 bt_component_class_message_iterator_next_method_status handle_message(
1612 struct trimmer_iterator *trimmer_it, const bt_message *msg,
1613 bool *reached_end)
1614 {
1615 bt_component_class_message_iterator_next_method_status status;
1616 const bt_stream *stream = NULL;
1617 int64_t ns_from_origin = INT64_MIN;
1618 bool has_ns_from_origin = false;
1619 int ret;
1620
1621 /* Find message's associated stream */
1622 switch (bt_message_get_type(msg)) {
1623 case BT_MESSAGE_TYPE_EVENT:
1624 stream = bt_event_borrow_stream_const(
1625 bt_message_event_borrow_event_const(msg));
1626 break;
1627 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
1628 stream = bt_packet_borrow_stream_const(
1629 bt_message_packet_beginning_borrow_packet_const(msg));
1630 break;
1631 case BT_MESSAGE_TYPE_PACKET_END:
1632 stream = bt_packet_borrow_stream_const(
1633 bt_message_packet_end_borrow_packet_const(msg));
1634 break;
1635 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
1636 stream = bt_message_discarded_events_borrow_stream_const(msg);
1637 break;
1638 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
1639 stream = bt_message_discarded_packets_borrow_stream_const(msg);
1640 break;
1641 case BT_MESSAGE_TYPE_STREAM_BEGINNING:
1642 stream = bt_message_stream_beginning_borrow_stream_const(msg);
1643 break;
1644 case BT_MESSAGE_TYPE_STREAM_END:
1645 stream = bt_message_stream_end_borrow_stream_const(msg);
1646 break;
1647 default:
1648 break;
1649 }
1650
1651 /* Retrieve the message's time */
1652 ret = get_msg_ns_from_origin(msg, &ns_from_origin, &has_ns_from_origin);
1653 if (G_UNLIKELY(ret)) {
1654 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
1655 goto end;
1656 }
1657
1658 if (G_LIKELY(stream)) {
1659 /* Message associated to a stream */
1660 status = handle_message_with_stream(trimmer_it, msg,
1661 stream, has_ns_from_origin ? &ns_from_origin : NULL, reached_end);
1662
1663 /*
1664 * handle_message_with_stream_state() unconditionally
1665 * consumes `msg`.
1666 */
1667 msg = NULL;
1668 } else {
1669 /*
1670 * Message not associated to a stream (message iterator
1671 * inactivity).
1672 */
1673 if (G_UNLIKELY(ns_from_origin > trimmer_it->end.ns_from_origin)) {
1674 BT_MESSAGE_PUT_REF_AND_RESET(msg);
1675 status = end_iterator_streams(trimmer_it);
1676 *reached_end = true;
1677 } else {
1678 push_message(trimmer_it, msg);
1679 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK;
1680 msg = NULL;
1681 }
1682 }
1683
1684 end:
1685 /* We release the message's reference whatever the outcome */
1686 bt_message_put_ref(msg);
1687 return status;
1688 }
1689
1690 static inline
1691 void fill_message_array_from_output_messages(
1692 struct trimmer_iterator *trimmer_it,
1693 bt_message_array_const msgs, uint64_t capacity, uint64_t *count)
1694 {
1695 *count = 0;
1696
1697 /*
1698 * Move auto-seek messages to the output array (which is this
1699 * iterator's base message array).
1700 */
1701 while (capacity > 0 && !g_queue_is_empty(trimmer_it->output_messages)) {
1702 msgs[*count] = pop_message(trimmer_it);
1703 capacity--;
1704 (*count)++;
1705 }
1706
1707 BT_ASSERT(*count > 0);
1708 }
1709
1710 static inline
1711 bt_component_class_message_iterator_next_method_status state_ending(
1712 struct trimmer_iterator *trimmer_it,
1713 bt_message_array_const msgs, uint64_t capacity,
1714 uint64_t *count)
1715 {
1716 bt_component_class_message_iterator_next_method_status status =
1717 BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK;
1718
1719 if (g_queue_is_empty(trimmer_it->output_messages)) {
1720 trimmer_it->state = TRIMMER_ITERATOR_STATE_ENDED;
1721 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_END;
1722 goto end;
1723 }
1724
1725 fill_message_array_from_output_messages(trimmer_it, msgs,
1726 capacity, count);
1727
1728 end:
1729 return status;
1730 }
1731
1732 static inline
1733 bt_component_class_message_iterator_next_method_status
1734 state_trim(struct trimmer_iterator *trimmer_it,
1735 bt_message_array_const msgs, uint64_t capacity,
1736 uint64_t *count)
1737 {
1738 bt_component_class_message_iterator_next_method_status status =
1739 BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK;
1740 bt_message_array_const my_msgs;
1741 uint64_t my_count;
1742 uint64_t i;
1743 bool reached_end = false;
1744
1745 while (g_queue_is_empty(trimmer_it->output_messages)) {
1746 status = (int) bt_self_component_port_input_message_iterator_next(
1747 trimmer_it->upstream_iter, &my_msgs, &my_count);
1748 if (G_UNLIKELY(status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK)) {
1749 if (status == BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_END) {
1750 status = end_iterator_streams(trimmer_it);
1751 if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) {
1752 goto end;
1753 }
1754
1755 trimmer_it->state =
1756 TRIMMER_ITERATOR_STATE_ENDING;
1757 status = state_ending(trimmer_it, msgs,
1758 capacity, count);
1759 }
1760
1761 goto end;
1762 }
1763
1764 BT_ASSERT(my_count > 0);
1765
1766 for (i = 0; i < my_count; i++) {
1767 status = handle_message(trimmer_it, my_msgs[i],
1768 &reached_end);
1769
1770 /*
1771 * handle_message() unconditionally consumes the
1772 * message reference.
1773 */
1774 my_msgs[i] = NULL;
1775
1776 if (G_UNLIKELY(status !=
1777 BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK)) {
1778 put_messages(my_msgs, my_count);
1779 goto end;
1780 }
1781
1782 if (G_UNLIKELY(reached_end)) {
1783 /*
1784 * This message's time was passed the
1785 * trimming time range's end time: we
1786 * are done. Their might still be
1787 * messages in the output message queue,
1788 * so move to the "ending" state and
1789 * apply it immediately since
1790 * state_trim() is called within the
1791 * "next" method.
1792 */
1793 put_messages(my_msgs, my_count);
1794 trimmer_it->state =
1795 TRIMMER_ITERATOR_STATE_ENDING;
1796 status = state_ending(trimmer_it, msgs,
1797 capacity, count);
1798 goto end;
1799 }
1800 }
1801 }
1802
1803 /*
1804 * There's at least one message in the output message queue:
1805 * move the messages to the output message array.
1806 */
1807 BT_ASSERT(!g_queue_is_empty(trimmer_it->output_messages));
1808 fill_message_array_from_output_messages(trimmer_it, msgs,
1809 capacity, count);
1810
1811 end:
1812 return status;
1813 }
1814
1815 BT_HIDDEN
1816 bt_component_class_message_iterator_next_method_status trimmer_msg_iter_next(
1817 bt_self_message_iterator *self_msg_iter,
1818 bt_message_array_const msgs, uint64_t capacity,
1819 uint64_t *count)
1820 {
1821 struct trimmer_iterator *trimmer_it =
1822 bt_self_message_iterator_get_data(self_msg_iter);
1823 bt_component_class_message_iterator_next_method_status status =
1824 BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK;
1825
1826 BT_ASSERT(trimmer_it);
1827
1828 if (G_LIKELY(trimmer_it->state == TRIMMER_ITERATOR_STATE_TRIM)) {
1829 status = state_trim(trimmer_it, msgs, capacity, count);
1830 if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) {
1831 goto end;
1832 }
1833 } else {
1834 switch (trimmer_it->state) {
1835 case TRIMMER_ITERATOR_STATE_SET_BOUNDS_NS_FROM_ORIGIN:
1836 status = state_set_trimmer_iterator_bounds(trimmer_it);
1837 if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) {
1838 goto end;
1839 }
1840
1841 status = state_seek_initially(trimmer_it);
1842 if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) {
1843 goto end;
1844 }
1845
1846 status = state_trim(trimmer_it, msgs, capacity, count);
1847 if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) {
1848 goto end;
1849 }
1850
1851 break;
1852 case TRIMMER_ITERATOR_STATE_SEEK_INITIALLY:
1853 status = state_seek_initially(trimmer_it);
1854 if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) {
1855 goto end;
1856 }
1857
1858 status = state_trim(trimmer_it, msgs, capacity, count);
1859 if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) {
1860 goto end;
1861 }
1862
1863 break;
1864 case TRIMMER_ITERATOR_STATE_ENDING:
1865 status = state_ending(trimmer_it, msgs, capacity,
1866 count);
1867 if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) {
1868 goto end;
1869 }
1870
1871 break;
1872 case TRIMMER_ITERATOR_STATE_ENDED:
1873 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_END;
1874 break;
1875 default:
1876 abort();
1877 }
1878 }
1879
1880 end:
1881 return status;
1882 }
1883
1884 BT_HIDDEN
1885 void trimmer_msg_iter_finalize(bt_self_message_iterator *self_msg_iter)
1886 {
1887 struct trimmer_iterator *trimmer_it =
1888 bt_self_message_iterator_get_data(self_msg_iter);
1889
1890 BT_ASSERT(trimmer_it);
1891 destroy_trimmer_iterator(trimmer_it);
1892 }
This page took 0.105269 seconds and 5 git commands to generate.