trimmer: free GMatchInfo object in set_bound_from_str
[babeltrace.git] / src / plugins / utils / trimmer / trimmer.c
CommitLineData
cab3f160 1/*
cab3f160 2 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
781751d8 3 * Copyright 2019 Philippe Proulx <pproulx@efficios.com>
cab3f160
JG
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a copy
6 * of this software and associated documentation files (the "Software"), to deal
7 * in the Software without restriction, including without limitation the rights
8 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 * copies of the Software, and to permit persons to whom the Software is
10 * furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21 * SOFTWARE.
22 */
23
19f15b11 24#define BT_LOG_OUTPUT_LEVEL (trimmer_comp->log_level)
b03487ab 25#define BT_LOG_TAG "PLUGIN/FLT.UTILS.TRIMMER"
3fa1b6a3 26#include "logging/comp-logging.h"
f4f9e43b 27
57952005
MJ
28#include "compat/utc.h"
29#include "compat/time.h"
71c5da58 30#include <babeltrace2/babeltrace.h>
57952005 31#include "common/common.h"
57952005 32#include "common/assert.h"
969c1d8a 33#include <stdbool.h>
781751d8
PP
34#include <stdint.h>
35#include <inttypes.h>
36#include <glib.h>
b7cbc799 37#include "compat/glib.h"
e0111295 38#include "plugins/common/param-validation/param-validation.h"
781751d8
PP
39
40#include "trimmer.h"
41
42#define NS_PER_S INT64_C(1000000000)
43
44static const char * const in_port_name = "in";
45
46struct trimmer_time {
47 unsigned int hour, minute, second, ns;
48};
49
50struct trimmer_bound {
51 /*
52 * Nanoseconds from origin, valid if `is_set` is set and
53 * `is_infinite` is false.
54 */
55 int64_t ns_from_origin;
56
57 /* True if this bound's full time (`ns_from_origin`) is set */
58 bool is_set;
59
60 /*
61 * True if this bound represents the infinity (negative or
62 * positive depending on which bound it is). If this is true,
63 * then we don't care about `ns_from_origin` above.
64 */
65 bool is_infinite;
66
67 /*
68 * This bound's time without the date; this time is used to set
69 * `ns_from_origin` once we know the date.
70 */
71 struct trimmer_time time;
72};
73
74struct trimmer_comp {
75 struct trimmer_bound begin, end;
76 bool is_gmt;
19f15b11 77 bt_logging_level log_level;
b70dca78 78 bt_self_component *self_comp;
781751d8
PP
79};
80
81enum trimmer_iterator_state {
82 /*
83 * Find the first message's date and set the bounds's times
84 * accordingly.
85 */
86 TRIMMER_ITERATOR_STATE_SET_BOUNDS_NS_FROM_ORIGIN,
87
88 /*
89 * Initially seek to the trimming range's beginning time.
90 */
91 TRIMMER_ITERATOR_STATE_SEEK_INITIALLY,
92
93 /*
94 * Fill the output message queue for as long as received input
95 * messages are within the trimming time range.
96 */
97 TRIMMER_ITERATOR_STATE_TRIM,
98
99 /* Flush the remaining messages in the output message queue */
100 TRIMMER_ITERATOR_STATE_ENDING,
101
102 /* Trimming operation and message iterator is ended */
103 TRIMMER_ITERATOR_STATE_ENDED,
104};
105
106struct trimmer_iterator {
107 /* Weak */
108 struct trimmer_comp *trimmer_comp;
109
110 /* Weak */
111 bt_self_message_iterator *self_msg_iter;
112
113 enum trimmer_iterator_state state;
114
115 /* Owned by this */
116 bt_self_component_port_input_message_iterator *upstream_iter;
117 struct trimmer_bound begin, end;
118
119 /*
120 * Queue of `const bt_message *` (owned by the queue).
121 *
122 * This is where the trimming operation pushes the messages to
123 * output by this message iterator.
124 */
125 GQueue *output_messages;
126
127 /*
128 * Hash table of `bt_stream *` (weak) to
129 * `struct trimmer_iterator_stream_state *` (owned by the HT).
130 */
131 GHashTable *stream_states;
132};
133
134struct trimmer_iterator_stream_state {
781751d8
PP
135 /* Weak */
136 const bt_stream *stream;
137
b7cbc799
SM
138 /* Have we seen a message with clock_snapshot going through this stream? */
139 bool seen_clock_snapshot;
140
781751d8
PP
141 /* Owned by this (`NULL` initially and between packets) */
142 const bt_packet *cur_packet;
781751d8 143};
cab3f160
JG
144
145static
781751d8 146void destroy_trimmer_comp(struct trimmer_comp *trimmer_comp)
cab3f160 147{
781751d8
PP
148 BT_ASSERT(trimmer_comp);
149 g_free(trimmer_comp);
cab3f160
JG
150}
151
152static
781751d8
PP
153struct trimmer_comp *create_trimmer_comp(void)
154{
155 return g_new0(struct trimmer_comp, 1);
156}
157
158BT_HIDDEN
159void trimmer_finalize(bt_self_component_filter *self_comp)
cab3f160 160{
781751d8
PP
161 struct trimmer_comp *trimmer_comp =
162 bt_self_component_get_data(
163 bt_self_component_filter_as_self_component(self_comp));
164
165 if (trimmer_comp) {
166 destroy_trimmer_comp(trimmer_comp);
167 }
cab3f160
JG
168}
169
8f9023e8
SM
170/*
171 * Compile regex in `pattern`, and try to match `string`. If there's a match,
172 * return true and set `*match_info` to the list of matches. The list of
173 * matches must be freed by the caller. If there's no match, return false and
174 * set `*match_info` to NULL;
175 */
176static
177bool compile_and_match(const char *pattern, const char *string, GMatchInfo **match_info) {
178 bool matches = false;
179 GError *regex_error = NULL;
180 GRegex *regex;
181
182 regex = g_regex_new(pattern, 0, 0, &regex_error);
183 if (!regex) {
184 goto end;
185 }
186
187 matches = g_regex_match(regex, string, 0, match_info);
188 if (!matches) {
189 /*
190 * g_regex_match allocates `*match_info` even if it returns
191 * FALSE. If there's no match, we have no use for it, so free
192 * it immediatly and don't return it to the caller.
193 */
fc487522 194 g_match_info_free(*match_info);
8f9023e8
SM
195 *match_info = NULL;
196 }
197
198 g_regex_unref(regex);
199
200end:
201
202 if (regex_error) {
203 g_error_free(regex_error);
204 }
205
206 return matches;
207}
208
209/*
210 * Convert the captured text in match number `match_num` in `match_info`
211 * to an unsigned integer.
212 */
213static
214guint64 match_to_uint(const GMatchInfo *match_info, gint match_num) {
215 gchar *text, *endptr;
216 guint64 result;
217
218 text = g_match_info_fetch(match_info, match_num);
219 BT_ASSERT(text);
220
221 /*
222 * Because the input is carefully sanitized with regexes by the caller,
223 * we assume that g_ascii_strtoull cannot fail.
224 */
225 errno = 0;
226 result = g_ascii_strtoull(text, &endptr, 10);
227 BT_ASSERT(endptr > text);
228 BT_ASSERT(errno == 0);
229
230 g_free(text);
231
232 return result;
233}
234
235/*
236 * When parsing the nanoseconds part, .512 means .512000000, not .000000512.
237 * This function is like match_to_uint, but multiplies the parsed number to get
238 * the expected result.
239 */
240static
241guint64 match_to_uint_ns(const GMatchInfo *match_info, gint match_num) {
242 guint64 nanoseconds;
243 gboolean ret;
244 gint start_pos, end_pos, power;
245 static int pow10[] = {
246 1, 10, 100, 1000, 10000, 100000, 1000000, 10000000, 100000000,
247 };
248
249 nanoseconds = match_to_uint(match_info, match_num);
250
251 /* Multiply by 10 as many times as there are omitted digits. */
252 ret = g_match_info_fetch_pos(match_info, match_num, &start_pos, &end_pos);
253 BT_ASSERT(ret);
254
255 power = 9 - (end_pos - start_pos);
256 BT_ASSERT(power >= 0 && power <= 8);
257
258 nanoseconds *= pow10[power];
259
260 return nanoseconds;
261}
262
781751d8
PP
263/*
264 * Sets the time (in ns from origin) of a trimmer bound from date and
265 * time components.
266 *
267 * Returns a negative value if anything goes wrong.
268 */
269static
270int set_bound_ns_from_origin(struct trimmer_bound *bound,
271 unsigned int year, unsigned int month, unsigned int day,
272 unsigned int hour, unsigned int minute, unsigned int second,
273 unsigned int ns, bool is_gmt)
cab3f160 274{
781751d8
PP
275 int ret = 0;
276 time_t result;
277 struct tm tm = {
278 .tm_sec = second,
279 .tm_min = minute,
280 .tm_hour = hour,
281 .tm_mday = day,
282 .tm_mon = month - 1,
283 .tm_year = year - 1900,
284 .tm_isdst = -1,
285 };
286
287 if (is_gmt) {
288 result = bt_timegm(&tm);
289 } else {
290 result = mktime(&tm);
291 }
292
293 if (result < 0) {
294 ret = -1;
295 goto end;
296 }
cab3f160 297
781751d8
PP
298 BT_ASSERT(bound);
299 bound->ns_from_origin = (int64_t) result;
300 bound->ns_from_origin *= NS_PER_S;
301 bound->ns_from_origin += ns;
302 bound->is_set = true;
303
304end:
305 return ret;
cab3f160
JG
306}
307
528debdf
MD
308/*
309 * Parses a timestamp, figuring out its format.
310 *
311 * Returns a negative value if anything goes wrong.
312 *
313 * Expected formats:
314 *
781751d8
PP
315 * YYYY-MM-DD hh:mm[:ss[.ns]]
316 * [hh:mm:]ss[.ns]
317 * [-]s[.ns]
318 *
319 * TODO: Check overflows.
320 */
321static
19f15b11
PP
322int set_bound_from_str(struct trimmer_comp *trimmer_comp,
323 const char *str, struct trimmer_bound *bound, bool is_gmt)
781751d8 324{
8f9023e8
SM
325/* Matches YYYY-MM-DD */
326#define DATE_RE "([0-9]{4})-([0-9]{2})-([0-9]{2})"
327
328/* Matches HH:MM[:SS[.NS]] */
329#define TIME_RE "([0-9]{2}):([0-9]{2})(?::([0-9]{2})(?:\\.([0-9]{1,9}))?)?"
330
331/* Matches [-]SS[.NS] */
332#define S_NS_RE "^(-?)([0-9]+)(?:\\.([0-9]{1,9}))?$"
333
334 GMatchInfo *match_info;
781751d8 335 int ret = 0;
781751d8 336
8f9023e8
SM
337 /* Try `YYYY-MM-DD hh:mm[:ss[.ns]]` format */
338 if (compile_and_match("^" DATE_RE " " TIME_RE "$", str, &match_info)) {
339 unsigned int year = 0, month = 0, day = 0, hours = 0, minutes = 0, seconds = 0, nanoseconds = 0;
340 gint match_count = g_match_info_get_match_count(match_info);
781751d8 341
8f9023e8 342 BT_ASSERT(match_count >= 6 && match_count <= 8);
781751d8 343
8f9023e8
SM
344 year = match_to_uint(match_info, 1);
345 month = match_to_uint(match_info, 2);
346 day = match_to_uint(match_info, 3);
347 hours = match_to_uint(match_info, 4);
348 minutes = match_to_uint(match_info, 5);
781751d8 349
8f9023e8
SM
350 if (match_count >= 7) {
351 seconds = match_to_uint(match_info, 6);
352 }
781751d8 353
8f9023e8
SM
354 if (match_count >= 8) {
355 nanoseconds = match_to_uint_ns(match_info, 7);
356 }
357
358 set_bound_ns_from_origin(bound, year, month, day, hours, minutes, seconds, nanoseconds, is_gmt);
781751d8 359
781751d8
PP
360 goto end;
361 }
362
8f9023e8
SM
363 if (compile_and_match("^" DATE_RE "$", str, &match_info)) {
364 unsigned int year = 0, month = 0, day = 0;
365
366 BT_ASSERT(g_match_info_get_match_count(match_info) == 4);
367
368 year = match_to_uint(match_info, 1);
369 month = match_to_uint(match_info, 2);
370 day = match_to_uint(match_info, 3);
371
372 set_bound_ns_from_origin(bound, year, month, day, 0, 0, 0, 0, is_gmt);
373
781751d8
PP
374 goto end;
375 }
376
8f9023e8
SM
377 /* Try `hh:mm[:ss[.ns]]` format */
378 if (compile_and_match("^" TIME_RE "$", str, &match_info)) {
379 gint match_count = g_match_info_get_match_count(match_info);
380 BT_ASSERT(match_count >= 3 && match_count <= 5);
381 bound->time.hour = match_to_uint(match_info, 1);
382 bound->time.minute = match_to_uint(match_info, 2);
383
384 if (match_count >= 4) {
385 bound->time.second = match_to_uint(match_info, 3);
386 }
387
388 if (match_count >= 5) {
389 bound->time.ns = match_to_uint_ns(match_info, 4);
390 }
391
781751d8
PP
392 goto end;
393 }
394
8f9023e8
SM
395 /* Try `[-]s[.ns]` format */
396 if (compile_and_match("^" S_NS_RE "$", str, &match_info)) {
397 gboolean is_neg, fetch_pos_ret;
398 gint start_pos, end_pos, match_count;
399 guint64 seconds, nanoseconds = 0;
400
401 match_count = g_match_info_get_match_count(match_info);
402 BT_ASSERT(match_count >= 3 && match_count <= 4);
403
404 /* Check for presence of negation sign. */
405 fetch_pos_ret = g_match_info_fetch_pos(match_info, 1, &start_pos, &end_pos);
406 BT_ASSERT(fetch_pos_ret);
407 is_neg = (end_pos - start_pos) > 0;
408
409 seconds = match_to_uint(match_info, 2);
410
411 if (match_count >= 4) {
412 nanoseconds = match_to_uint_ns(match_info, 3);
413 }
414
415 bound->ns_from_origin = seconds * NS_PER_S + nanoseconds;
416
417 if (is_neg) {
418 bound->ns_from_origin = -bound->ns_from_origin;
419 }
420
781751d8 421 bound->is_set = true;
8f9023e8 422
781751d8
PP
423 goto end;
424 }
425
ccaf1ae5
SM
426 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
427 "Invalid date/time format: param=\"%s\"", str);
781751d8
PP
428 ret = -1;
429
430end:
6ed8408f
SM
431 g_match_info_free(match_info);
432
781751d8
PP
433 return ret;
434}
435
436/*
437 * Sets a trimmer bound's properties from a parameter string/integer
438 * value.
439 *
440 * Returns a negative value if anything goes wrong.
528debdf 441 */
44d3cbf0 442static
19f15b11
PP
443int set_bound_from_param(struct trimmer_comp *trimmer_comp,
444 const char *param_name, const bt_value *param,
781751d8 445 struct trimmer_bound *bound, bool is_gmt)
528debdf
MD
446{
447 int ret;
922fc2dd 448 const char *arg;
781751d8 449 char tmp_arg[64];
922fc2dd 450
68d9d039 451 if (bt_value_is_signed_integer(param)) {
60bbfc7c 452 int64_t value = bt_value_integer_signed_get(param);
781751d8
PP
453
454 /*
455 * Just convert it to a temporary string to handle
456 * everything the same way.
457 */
458 sprintf(tmp_arg, "%" PRId64, value);
459 arg = tmp_arg;
781751d8 460 } else {
e0111295
SM
461 BT_ASSERT(bt_value_is_string(param));
462 arg = bt_value_string_get(param);
922fc2dd
PP
463 }
464
19f15b11 465 ret = set_bound_from_str(trimmer_comp, arg, bound, is_gmt);
781751d8 466
781751d8
PP
467 return ret;
468}
469
470static
19f15b11
PP
471int validate_trimmer_bounds(struct trimmer_comp *trimmer_comp,
472 struct trimmer_bound *begin, struct trimmer_bound *end)
781751d8
PP
473{
474 int ret = 0;
475
476 BT_ASSERT(begin->is_set);
477 BT_ASSERT(end->is_set);
478
479 if (!begin->is_infinite && !end->is_infinite &&
480 begin->ns_from_origin > end->ns_from_origin) {
ccaf1ae5
SM
481 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
482 "Trimming time range's beginning time is greater than end time: "
781751d8
PP
483 "begin-ns-from-origin=%" PRId64 ", "
484 "end-ns-from-origin=%" PRId64,
485 begin->ns_from_origin,
486 end->ns_from_origin);
487 ret = -1;
488 goto end;
528debdf
MD
489 }
490
781751d8 491 if (!begin->is_infinite && begin->ns_from_origin == INT64_MIN) {
ccaf1ae5
SM
492 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
493 "Invalid trimming time range's beginning time: "
781751d8
PP
494 "ns-from-origin=%" PRId64,
495 begin->ns_from_origin);
496 ret = -1;
497 goto end;
498 }
528debdf 499
781751d8 500 if (!end->is_infinite && end->ns_from_origin == INT64_MIN) {
ccaf1ae5
SM
501 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
502 "Invalid trimming time range's end time: "
781751d8
PP
503 "ns-from-origin=%" PRId64,
504 end->ns_from_origin);
505 ret = -1;
506 goto end;
507 }
528debdf 508
781751d8
PP
509end:
510 return ret;
528debdf
MD
511}
512
513static
e0111295
SM
514enum bt_param_validation_status validate_bound_type(
515 const bt_value *value,
516 struct bt_param_validation_context *context)
517{
518 enum bt_param_validation_status status = BT_PARAM_VALIDATION_STATUS_OK;
519
520 if (!bt_value_is_signed_integer(value) &&
521 !bt_value_is_string(value)) {
522 status = bt_param_validation_error(context,
523 "unexpected type: expected-types=[%s, %s], actual-type=%s",
524 bt_common_value_type_string(BT_VALUE_TYPE_SIGNED_INTEGER),
525 bt_common_value_type_string(BT_VALUE_TYPE_STRING),
526 bt_common_value_type_string(bt_value_get_type(value)));
527 }
528
529 return status;
530}
531
5ca9cc5a 532static
e0111295
SM
533struct bt_param_validation_map_value_entry_descr trimmer_params[] = {
534 { "gmt", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL, { .type = BT_VALUE_TYPE_BOOL } },
535 { "begin", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL, { .validation_func = validate_bound_type } },
536 { "end", BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_OPTIONAL, { .validation_func = validate_bound_type } },
537 BT_PARAM_VALIDATION_MAP_VALUE_ENTRY_END
538};
539
540static
541bt_component_class_initialize_method_status init_trimmer_comp_from_params(
542 struct trimmer_comp *trimmer_comp,
781751d8 543 const bt_value *params)
44d3cbf0 544{
781751d8 545 const bt_value *value;
e0111295
SM
546 bt_component_class_initialize_method_status status;
547 enum bt_param_validation_status validation_status;
548 gchar *validate_error = NULL;
549
550 validation_status = bt_param_validation_validate(params,
551 trimmer_params, &validate_error);
552 if (validation_status == BT_PARAM_VALIDATION_STATUS_MEMORY_ERROR) {
553 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
554 goto end;
555 } else if (validation_status == BT_PARAM_VALIDATION_STATUS_VALIDATION_ERROR) {
556 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
557 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp, "%s",
558 validate_error);
559 goto end;
560 }
44d3cbf0 561
8b45963b 562 BT_ASSERT(params);
781751d8 563 value = bt_value_map_borrow_entry_value_const(params, "gmt");
528debdf 564 if (value) {
781751d8 565 trimmer_comp->is_gmt = (bool) bt_value_bool_get(value);
528debdf
MD
566 }
567
781751d8 568 value = bt_value_map_borrow_entry_value_const(params, "begin");
528debdf 569 if (value) {
19f15b11 570 if (set_bound_from_param(trimmer_comp, "begin", value,
781751d8
PP
571 &trimmer_comp->begin, trimmer_comp->is_gmt)) {
572 /* set_bound_from_param() logs errors */
e0111295 573 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
922fc2dd 574 goto end;
44d3cbf0 575 }
781751d8
PP
576 } else {
577 trimmer_comp->begin.is_infinite = true;
578 trimmer_comp->begin.is_set = true;
44d3cbf0 579 }
528debdf 580
781751d8 581 value = bt_value_map_borrow_entry_value_const(params, "end");
528debdf 582 if (value) {
19f15b11 583 if (set_bound_from_param(trimmer_comp, "end", value,
781751d8
PP
584 &trimmer_comp->end, trimmer_comp->is_gmt)) {
585 /* set_bound_from_param() logs errors */
e0111295 586 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
922fc2dd 587 goto end;
528debdf 588 }
781751d8
PP
589 } else {
590 trimmer_comp->end.is_infinite = true;
591 trimmer_comp->end.is_set = true;
528debdf 592 }
922fc2dd 593
781751d8
PP
594 if (trimmer_comp->begin.is_set && trimmer_comp->end.is_set) {
595 /* validate_trimmer_bounds() logs errors */
e0111295
SM
596 if (validate_trimmer_bounds(trimmer_comp,
597 &trimmer_comp->begin, &trimmer_comp->end)) {
598 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_ERROR;
599 goto end;
600 }
55595636 601 }
781751d8 602
e0111295
SM
603 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
604
605end:
606 g_free(validate_error);
607
608 return status;
44d3cbf0
JG
609}
610
4175c1d5 611bt_component_class_initialize_method_status trimmer_init(
fb25b9e3 612 bt_self_component_filter *self_comp_flt,
e3250e61 613 bt_self_component_filter_configuration *config,
781751d8 614 const bt_value *params, void *init_data)
cab3f160 615{
e0111295 616 bt_component_class_initialize_method_status status;
fb25b9e3 617 bt_self_component_add_port_status add_port_status;
781751d8 618 struct trimmer_comp *trimmer_comp = create_trimmer_comp();
b70dca78
PP
619 bt_self_component *self_comp =
620 bt_self_component_filter_as_self_component(self_comp_flt);
e0111295 621
781751d8 622 if (!trimmer_comp) {
4175c1d5 623 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
781751d8 624 goto error;
cab3f160
JG
625 }
626
19f15b11 627 trimmer_comp->log_level = bt_component_get_logging_level(
b70dca78
PP
628 bt_self_component_as_component(self_comp));
629 trimmer_comp->self_comp = self_comp;
e0111295 630
fb25b9e3 631 add_port_status = bt_self_component_filter_add_input_port(
b70dca78 632 self_comp_flt, in_port_name, NULL, NULL);
e0111295
SM
633 if (add_port_status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) {
634 status = (int) add_port_status;
fb25b9e3 635 goto error;
b9d103be
PP
636 }
637
fb25b9e3 638 add_port_status = bt_self_component_filter_add_output_port(
b70dca78 639 self_comp_flt, "out", NULL, NULL);
e0111295
SM
640 if (add_port_status != BT_SELF_COMPONENT_ADD_PORT_STATUS_OK) {
641 status = (int) add_port_status;
fb25b9e3 642 goto error;
b9d103be
PP
643 }
644
e0111295
SM
645 status = init_trimmer_comp_from_params(trimmer_comp, params);
646 if (status != BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK) {
cab3f160
JG
647 goto error;
648 }
649
b70dca78 650 bt_self_component_set_data(self_comp, trimmer_comp);
e0111295
SM
651
652 status = BT_COMPONENT_CLASS_INITIALIZE_METHOD_STATUS_OK;
781751d8
PP
653 goto end;
654
655error:
781751d8
PP
656 if (trimmer_comp) {
657 destroy_trimmer_comp(trimmer_comp);
658 }
659
cab3f160 660end:
64e6ceea 661 return status;
781751d8
PP
662}
663
664static
665void destroy_trimmer_iterator(struct trimmer_iterator *trimmer_it)
666{
ab8b2b1b
SM
667 if (!trimmer_it) {
668 goto end;
669 }
670
781751d8
PP
671 bt_self_component_port_input_message_iterator_put_ref(
672 trimmer_it->upstream_iter);
673
674 if (trimmer_it->output_messages) {
675 g_queue_free(trimmer_it->output_messages);
676 }
677
678 if (trimmer_it->stream_states) {
679 g_hash_table_destroy(trimmer_it->stream_states);
680 }
681
682 g_free(trimmer_it);
ab8b2b1b
SM
683end:
684 return;
781751d8
PP
685}
686
687static
688void destroy_trimmer_iterator_stream_state(
689 struct trimmer_iterator_stream_state *sstate)
690{
691 BT_ASSERT(sstate);
692 BT_PACKET_PUT_REF_AND_RESET(sstate->cur_packet);
781751d8
PP
693 g_free(sstate);
694}
695
696BT_HIDDEN
4175c1d5 697bt_component_class_message_iterator_initialize_method_status trimmer_msg_iter_init(
781751d8 698 bt_self_message_iterator *self_msg_iter,
9415de1c 699 bt_self_message_iterator_configuration *config,
781751d8
PP
700 bt_self_component_filter *self_comp,
701 bt_self_component_port_output *port)
702{
4175c1d5 703 bt_component_class_message_iterator_initialize_method_status status;
ab8b2b1b
SM
704 bt_self_component_port_input_message_iterator_create_from_message_iterator_status
705 msg_iter_status;
781751d8
PP
706 struct trimmer_iterator *trimmer_it;
707
708 trimmer_it = g_new0(struct trimmer_iterator, 1);
709 if (!trimmer_it) {
4175c1d5 710 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
ab8b2b1b 711 goto error;
781751d8
PP
712 }
713
714 trimmer_it->trimmer_comp = bt_self_component_get_data(
715 bt_self_component_filter_as_self_component(self_comp));
716 BT_ASSERT(trimmer_it->trimmer_comp);
717
718 if (trimmer_it->trimmer_comp->begin.is_set &&
719 trimmer_it->trimmer_comp->end.is_set) {
720 /*
721 * Both trimming time range's bounds are set, so skip
722 * the
723 * `TRIMMER_ITERATOR_STATE_SET_BOUNDS_NS_FROM_ORIGIN`
724 * phase.
725 */
726 trimmer_it->state = TRIMMER_ITERATOR_STATE_SEEK_INITIALLY;
727 }
728
729 trimmer_it->begin = trimmer_it->trimmer_comp->begin;
730 trimmer_it->end = trimmer_it->trimmer_comp->end;
ab8b2b1b 731 msg_iter_status =
692f1a01
PP
732 bt_self_component_port_input_message_iterator_create_from_message_iterator(
733 self_msg_iter,
781751d8 734 bt_self_component_filter_borrow_input_port_by_name(
ab8b2b1b
SM
735 self_comp, in_port_name), &trimmer_it->upstream_iter);
736 if (msg_iter_status != BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_CREATE_FROM_MESSAGE_ITERATOR_STATUS_OK) {
737 status = (int) msg_iter_status;
738 goto error;
781751d8
PP
739 }
740
741 trimmer_it->output_messages = g_queue_new();
742 if (!trimmer_it->output_messages) {
4175c1d5 743 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
ab8b2b1b 744 goto error;
781751d8
PP
745 }
746
747 trimmer_it->stream_states = g_hash_table_new_full(g_direct_hash,
748 g_direct_equal, NULL,
749 (GDestroyNotify) destroy_trimmer_iterator_stream_state);
750 if (!trimmer_it->stream_states) {
4175c1d5 751 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
ab8b2b1b 752 goto error;
781751d8
PP
753 }
754
c49bf79b
SM
755 /*
756 * The trimmer requires upstream messages to have times, so it can
757 * always seek forward.
758 */
759 bt_self_message_iterator_configuration_set_can_seek_forward(
760 config, BT_TRUE);
761
781751d8
PP
762 trimmer_it->self_msg_iter = self_msg_iter;
763 bt_self_message_iterator_set_data(self_msg_iter, trimmer_it);
764
4175c1d5 765 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_OK;
ab8b2b1b
SM
766 goto end;
767
768error:
769 destroy_trimmer_iterator(trimmer_it);
781751d8 770
ab8b2b1b 771end:
781751d8
PP
772 return status;
773}
774
775static inline
776int get_msg_ns_from_origin(const bt_message *msg, int64_t *ns_from_origin,
b7cbc799 777 bool *has_clock_snapshot)
781751d8
PP
778{
779 const bt_clock_class *clock_class = NULL;
780 const bt_clock_snapshot *clock_snapshot = NULL;
781751d8
PP
781 int ret = 0;
782
ec4a3354
PP
783 BT_ASSERT_DBG(msg);
784 BT_ASSERT_DBG(ns_from_origin);
785 BT_ASSERT_DBG(has_clock_snapshot);
781751d8
PP
786
787 switch (bt_message_get_type(msg)) {
788 case BT_MESSAGE_TYPE_EVENT:
789 clock_class =
790 bt_message_event_borrow_stream_class_default_clock_class_const(
791 msg);
85e7137b 792 if (G_UNLIKELY(!clock_class)) {
781751d8
PP
793 goto error;
794 }
795
11ddb3ef
PP
796 clock_snapshot = bt_message_event_borrow_default_clock_snapshot_const(
797 msg);
781751d8
PP
798 break;
799 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
800 clock_class =
801 bt_message_packet_beginning_borrow_stream_class_default_clock_class_const(
802 msg);
85e7137b 803 if (G_UNLIKELY(!clock_class)) {
781751d8
PP
804 goto error;
805 }
806
11ddb3ef
PP
807 clock_snapshot = bt_message_packet_beginning_borrow_default_clock_snapshot_const(
808 msg);
781751d8
PP
809 break;
810 case BT_MESSAGE_TYPE_PACKET_END:
811 clock_class =
812 bt_message_packet_end_borrow_stream_class_default_clock_class_const(
813 msg);
85e7137b 814 if (G_UNLIKELY(!clock_class)) {
781751d8
PP
815 goto error;
816 }
817
11ddb3ef
PP
818 clock_snapshot = bt_message_packet_end_borrow_default_clock_snapshot_const(
819 msg);
781751d8 820 break;
b7cbc799
SM
821 case BT_MESSAGE_TYPE_STREAM_BEGINNING:
822 {
823 enum bt_message_stream_clock_snapshot_state cs_state;
824
781751d8 825 clock_class =
b7cbc799 826 bt_message_stream_beginning_borrow_stream_class_default_clock_class_const(msg);
85e7137b 827 if (G_UNLIKELY(!clock_class)) {
781751d8
PP
828 goto error;
829 }
830
b7cbc799
SM
831 cs_state = bt_message_stream_beginning_borrow_default_clock_snapshot_const(msg, &clock_snapshot);
832 if (cs_state != BT_MESSAGE_STREAM_CLOCK_SNAPSHOT_STATE_KNOWN) {
833 goto no_clock_snapshot;
781751d8
PP
834 }
835
781751d8 836 break;
b7cbc799
SM
837 }
838 case BT_MESSAGE_TYPE_STREAM_END:
839 {
840 enum bt_message_stream_clock_snapshot_state cs_state;
841
781751d8 842 clock_class =
b7cbc799 843 bt_message_stream_end_borrow_stream_class_default_clock_class_const(msg);
85e7137b 844 if (G_UNLIKELY(!clock_class)) {
781751d8
PP
845 goto error;
846 }
847
b7cbc799
SM
848 cs_state = bt_message_stream_end_borrow_default_clock_snapshot_const(msg, &clock_snapshot);
849 if (cs_state != BT_MESSAGE_STREAM_CLOCK_SNAPSHOT_STATE_KNOWN) {
781751d8
PP
850 goto no_clock_snapshot;
851 }
852
853 break;
b7cbc799
SM
854 }
855 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
781751d8 856 clock_class =
b7cbc799 857 bt_message_discarded_events_borrow_stream_class_default_clock_class_const(
781751d8 858 msg);
85e7137b 859 if (G_UNLIKELY(!clock_class)) {
781751d8
PP
860 goto error;
861 }
862
b7cbc799
SM
863 clock_snapshot = bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(
864 msg);
865 break;
866 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
867 clock_class =
868 bt_message_discarded_packets_borrow_stream_class_default_clock_class_const(
869 msg);
870 if (G_UNLIKELY(!clock_class)) {
871 goto error;
781751d8
PP
872 }
873
b7cbc799
SM
874 clock_snapshot = bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(
875 msg);
781751d8
PP
876 break;
877 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
11ddb3ef 878 clock_snapshot =
781751d8 879 bt_message_message_iterator_inactivity_borrow_default_clock_snapshot_const(
11ddb3ef 880 msg);
781751d8
PP
881 break;
882 default:
883 goto no_clock_snapshot;
884 }
885
781751d8
PP
886 ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot,
887 ns_from_origin);
85e7137b 888 if (G_UNLIKELY(ret)) {
781751d8
PP
889 goto error;
890 }
891
b7cbc799 892 *has_clock_snapshot = true;
781751d8
PP
893 goto end;
894
895no_clock_snapshot:
b7cbc799 896 *has_clock_snapshot = false;
781751d8
PP
897 goto end;
898
cab3f160 899error:
781751d8
PP
900 ret = -1;
901
902end:
903 return ret;
904}
905
906static inline
907void put_messages(bt_message_array_const msgs, uint64_t count)
908{
909 uint64_t i;
910
911 for (i = 0; i < count; i++) {
912 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
913 }
914}
915
916static inline
19f15b11
PP
917int set_trimmer_iterator_bound(struct trimmer_iterator *trimmer_it,
918 struct trimmer_bound *bound, int64_t ns_from_origin,
919 bool is_gmt)
781751d8 920{
19f15b11 921 struct trimmer_comp *trimmer_comp = trimmer_it->trimmer_comp;
781751d8 922 struct tm tm;
ce8955eb 923 struct tm *res;
781751d8
PP
924 time_t time_seconds = (time_t) (ns_from_origin / NS_PER_S);
925 int ret = 0;
926
927 BT_ASSERT(!bound->is_set);
928 errno = 0;
929
930 /* We only need to extract the date from this time */
931 if (is_gmt) {
ce8955eb 932 res = bt_gmtime_r(&time_seconds, &tm);
781751d8 933 } else {
ce8955eb 934 res = bt_localtime_r(&time_seconds, &tm);
781751d8
PP
935 }
936
ce8955eb 937 if (!res) {
ccaf1ae5
SM
938 BT_COMP_LOGE_APPEND_CAUSE_ERRNO(trimmer_comp->self_comp,
939 "Cannot convert timestamp to date and time",
8e67b873 940 ": ts=%" PRId64, (int64_t) time_seconds);
781751d8
PP
941 ret = -1;
942 goto end;
943 }
944
945 ret = set_bound_ns_from_origin(bound, tm.tm_year + 1900, tm.tm_mon + 1,
946 tm.tm_mday, bound->time.hour, bound->time.minute,
947 bound->time.second, bound->time.ns, is_gmt);
948
949end:
cab3f160
JG
950 return ret;
951}
781751d8
PP
952
953static
fb25b9e3
PP
954bt_component_class_message_iterator_next_method_status
955state_set_trimmer_iterator_bounds(
781751d8
PP
956 struct trimmer_iterator *trimmer_it)
957{
fb25b9e3 958 bt_message_iterator_next_status upstream_iter_status =
621b4e7e 959 BT_MESSAGE_ITERATOR_NEXT_STATUS_OK;
781751d8
PP
960 struct trimmer_comp *trimmer_comp = trimmer_it->trimmer_comp;
961 bt_message_array_const msgs;
962 uint64_t count = 0;
963 int64_t ns_from_origin = INT64_MIN;
964 uint64_t i;
965 int ret;
966
967 BT_ASSERT(!trimmer_it->begin.is_set ||
968 !trimmer_it->end.is_set);
969
970 while (true) {
971 upstream_iter_status =
972 bt_self_component_port_input_message_iterator_next(
973 trimmer_it->upstream_iter, &msgs, &count);
fb25b9e3 974 if (upstream_iter_status != BT_MESSAGE_ITERATOR_NEXT_STATUS_OK) {
781751d8
PP
975 goto end;
976 }
977
978 for (i = 0; i < count; i++) {
979 const bt_message *msg = msgs[i];
b7cbc799 980 bool has_ns_from_origin;
781751d8 981 ret = get_msg_ns_from_origin(msg, &ns_from_origin,
b7cbc799 982 &has_ns_from_origin);
781751d8
PP
983 if (ret) {
984 goto error;
985 }
986
b7cbc799 987 if (!has_ns_from_origin) {
781751d8
PP
988 continue;
989 }
990
ec4a3354 991 BT_ASSERT_DBG(ns_from_origin != INT64_MIN &&
781751d8
PP
992 ns_from_origin != INT64_MAX);
993 put_messages(msgs, count);
994 goto found;
995 }
996
997 put_messages(msgs, count);
998 }
999
1000found:
1001 if (!trimmer_it->begin.is_set) {
1002 BT_ASSERT(!trimmer_it->begin.is_infinite);
19f15b11 1003 ret = set_trimmer_iterator_bound(trimmer_it, &trimmer_it->begin,
781751d8
PP
1004 ns_from_origin, trimmer_comp->is_gmt);
1005 if (ret) {
1006 goto error;
1007 }
1008 }
1009
1010 if (!trimmer_it->end.is_set) {
1011 BT_ASSERT(!trimmer_it->end.is_infinite);
19f15b11 1012 ret = set_trimmer_iterator_bound(trimmer_it, &trimmer_it->end,
781751d8
PP
1013 ns_from_origin, trimmer_comp->is_gmt);
1014 if (ret) {
1015 goto error;
1016 }
1017 }
1018
19f15b11
PP
1019 ret = validate_trimmer_bounds(trimmer_it->trimmer_comp,
1020 &trimmer_it->begin, &trimmer_it->end);
781751d8
PP
1021 if (ret) {
1022 goto error;
1023 }
1024
1025 goto end;
1026
1027error:
1028 put_messages(msgs, count);
621b4e7e 1029 upstream_iter_status = BT_MESSAGE_ITERATOR_NEXT_STATUS_ERROR;
781751d8
PP
1030
1031end:
1032 return (int) upstream_iter_status;
1033}
1034
1035static
fb25b9e3 1036bt_component_class_message_iterator_next_method_status state_seek_initially(
781751d8
PP
1037 struct trimmer_iterator *trimmer_it)
1038{
19f15b11 1039 struct trimmer_comp *trimmer_comp = trimmer_it->trimmer_comp;
9e8e8b43 1040 bt_component_class_message_iterator_next_method_status status;
781751d8
PP
1041
1042 BT_ASSERT(trimmer_it->begin.is_set);
1043
1044 if (trimmer_it->begin.is_infinite) {
9e8e8b43
SM
1045 bt_bool can_seek;
1046
1047 status = (int) bt_self_component_port_input_message_iterator_can_seek_beginning(
1048 trimmer_it->upstream_iter, &can_seek);
1049 if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) {
1050 if (status < 0) {
1051 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
1052 "Cannot make upstream message iterator initially seek its beginning.");
1053 }
1054
1055 goto end;
1056 }
1057
1058 if (!can_seek) {
ccaf1ae5
SM
1059 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
1060 "Cannot make upstream message iterator initially seek its beginning.");
fb25b9e3 1061 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
781751d8
PP
1062 goto end;
1063 }
1064
1065 status = (int) bt_self_component_port_input_message_iterator_seek_beginning(
1066 trimmer_it->upstream_iter);
1067 } else {
9e8e8b43
SM
1068 bt_bool can_seek;
1069
1070 status = (int) bt_self_component_port_input_message_iterator_can_seek_ns_from_origin(
1071 trimmer_it->upstream_iter, trimmer_it->begin.ns_from_origin,
1072 &can_seek);
1073
1074 if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) {
1075 if (status < 0) {
1076 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
1077 "Cannot make upstream message iterator initially seek: seek-ns-from-origin=%" PRId64,
1078 trimmer_it->begin.ns_from_origin);
1079 }
1080
1081 goto end;
1082 }
1083
1084 if (!can_seek) {
ccaf1ae5
SM
1085 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
1086 "Cannot make upstream message iterator initially seek: seek-ns-from-origin=%" PRId64,
781751d8 1087 trimmer_it->begin.ns_from_origin);
fb25b9e3 1088 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
781751d8
PP
1089 goto end;
1090 }
1091
1092 status = (int) bt_self_component_port_input_message_iterator_seek_ns_from_origin(
1093 trimmer_it->upstream_iter, trimmer_it->begin.ns_from_origin);
1094 }
1095
fb25b9e3 1096 if (status == BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) {
781751d8
PP
1097 trimmer_it->state = TRIMMER_ITERATOR_STATE_TRIM;
1098 }
1099
1100end:
1101 return status;
1102}
1103
1104static inline
1105void push_message(struct trimmer_iterator *trimmer_it, const bt_message *msg)
1106{
1107 g_queue_push_head(trimmer_it->output_messages, (void *) msg);
1108}
1109
1110static inline
1111const bt_message *pop_message(struct trimmer_iterator *trimmer_it)
1112{
1113 return g_queue_pop_tail(trimmer_it->output_messages);
1114}
1115
1116static inline
1117int clock_raw_value_from_ns_from_origin(const bt_clock_class *clock_class,
1118 int64_t ns_from_origin, uint64_t *raw_value)
1119{
1120
1121 int64_t cc_offset_s;
1122 uint64_t cc_offset_cycles;
1123 uint64_t cc_freq;
1124
1125 bt_clock_class_get_offset(clock_class, &cc_offset_s, &cc_offset_cycles);
1126 cc_freq = bt_clock_class_get_frequency(clock_class);
1127 return bt_common_clock_value_from_ns_from_origin(cc_offset_s,
1128 cc_offset_cycles, cc_freq, ns_from_origin, raw_value);
1129}
1130
1131static inline
fb25b9e3
PP
1132bt_component_class_message_iterator_next_method_status
1133end_stream(struct trimmer_iterator *trimmer_it,
781751d8
PP
1134 struct trimmer_iterator_stream_state *sstate)
1135{
fb25b9e3
PP
1136 bt_component_class_message_iterator_next_method_status status =
1137 BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK;
b7cbc799
SM
1138 /* Initialize to silence maybe-uninitialized warning. */
1139 uint64_t raw_value = 0;
781751d8
PP
1140 bt_message *msg = NULL;
1141
1142 BT_ASSERT(!trimmer_it->end.is_infinite);
b7cbc799 1143 BT_ASSERT(sstate->stream);
781751d8 1144
b7cbc799
SM
1145 /*
1146 * If we haven't seen a message with a clock snapshot, we don't know if the trimmer's end bound is within
1147 * the clock's range, so it wouldn't be safe to try to convert ns_from_origin to a clock value.
1148 *
1149 * Also, it would be a bit of a lie to generate a stream end message with the end bound as its
1150 * clock snapshot, because we don't really know if the stream existed at that time. If we have
1151 * seen a message with a clock snapshot and the stream is cut short by another message with a
1152 * clock snapshot, then we are sure that the the end bound time is not below the clock range,
1153 * and we know the stream was active at that time (and that we cut it short).
1154 */
1155 if (sstate->seen_clock_snapshot) {
1156 const bt_clock_class *clock_class;
1157 int ret;
781751d8 1158
781751d8
PP
1159 clock_class = bt_stream_class_borrow_default_clock_class_const(
1160 bt_stream_borrow_class_const(sstate->stream));
1161 BT_ASSERT(clock_class);
1162 ret = clock_raw_value_from_ns_from_origin(clock_class,
1163 trimmer_it->end.ns_from_origin, &raw_value);
1164 if (ret) {
fb25b9e3 1165 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
781751d8
PP
1166 goto end;
1167 }
b7cbc799
SM
1168 }
1169
1170 if (sstate->cur_packet) {
1171 /*
1172 * Create and push a packet end message, making its time
1173 * the trimming range's end time.
1174 *
1175 * We know that we must have seen a clock snapshot, the one in
1176 * the packet beginning message, since trimmer currently
1177 * requires packet messages to have clock snapshots (see comment
1178 * in create_stream_state_entry).
1179 */
1180 BT_ASSERT(sstate->seen_clock_snapshot);
781751d8
PP
1181
1182 msg = bt_message_packet_end_create_with_default_clock_snapshot(
1183 trimmer_it->self_msg_iter, sstate->cur_packet,
1184 raw_value);
1185 if (!msg) {
fb25b9e3 1186 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_MEMORY_ERROR;
781751d8
PP
1187 goto end;
1188 }
1189
1190 push_message(trimmer_it, msg);
1191 msg = NULL;
1192 BT_PACKET_PUT_REF_AND_RESET(sstate->cur_packet);
781751d8
PP
1193 }
1194
b7cbc799 1195 /* Create and push a stream end message. */
781751d8
PP
1196 msg = bt_message_stream_end_create(trimmer_it->self_msg_iter,
1197 sstate->stream);
1198 if (!msg) {
fb25b9e3 1199 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_MEMORY_ERROR;
781751d8
PP
1200 goto end;
1201 }
1202
b7cbc799
SM
1203 if (sstate->seen_clock_snapshot) {
1204 bt_message_stream_end_set_default_clock_snapshot(msg, raw_value);
1205 }
1206
781751d8
PP
1207 push_message(trimmer_it, msg);
1208 msg = NULL;
1209
1210 /*
1211 * Just to make sure that we don't use this stream state again
1212 * in the future without an obvious error.
1213 */
1214 sstate->stream = NULL;
1215
1216end:
1217 bt_message_put_ref(msg);
1218 return status;
1219}
1220
1221static inline
fb25b9e3 1222bt_component_class_message_iterator_next_method_status end_iterator_streams(
781751d8
PP
1223 struct trimmer_iterator *trimmer_it)
1224{
fb25b9e3
PP
1225 bt_component_class_message_iterator_next_method_status status =
1226 BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK;
781751d8
PP
1227 GHashTableIter iter;
1228 gpointer key, sstate;
1229
1230 if (trimmer_it->end.is_infinite) {
1231 /*
1232 * An infinite trimming range's end time guarantees that
1233 * we received (and pushed) all the appropriate end
1234 * messages.
1235 */
1236 goto remove_all;
1237 }
1238
1239 /*
1240 * End each stream and then remove them from the hash table of
1241 * stream states to release unneeded references.
1242 */
1243 g_hash_table_iter_init(&iter, trimmer_it->stream_states);
1244
1245 while (g_hash_table_iter_next(&iter, &key, &sstate)) {
1246 status = end_stream(trimmer_it, sstate);
1247 if (status) {
1248 goto end;
1249 }
1250 }
1251
1252remove_all:
1253 g_hash_table_remove_all(trimmer_it->stream_states);
1254
1255end:
1256 return status;
1257}
1258
b7cbc799
SM
1259static
1260bt_component_class_message_iterator_next_method_status
1261create_stream_state_entry(
1262 struct trimmer_iterator *trimmer_it,
1263 const struct bt_stream *stream,
1264 struct trimmer_iterator_stream_state **stream_state)
1265{
1266 struct trimmer_comp *trimmer_comp = trimmer_it->trimmer_comp;
1267 bt_component_class_message_iterator_next_method_status status;
1268 struct trimmer_iterator_stream_state *sstate;
1269 const bt_stream_class *sc;
1270
1271 BT_ASSERT(!bt_g_hash_table_contains(trimmer_it->stream_states, stream));
1272
1273 /*
1274 * Validate right now that the stream's class
1275 * has a registered default clock class so that
1276 * an existing stream state guarantees existing
1277 * default clock snapshots for its associated
1278 * messages.
1279 *
1280 * Also check that clock snapshots are always
1281 * known.
1282 */
1283 sc = bt_stream_borrow_class_const(stream);
1284 if (!bt_stream_class_borrow_default_clock_class_const(sc)) {
ccaf1ae5
SM
1285 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
1286 "Unsupported stream: stream class does "
b7cbc799
SM
1287 "not have a default clock class: "
1288 "stream-addr=%p, "
1289 "stream-id=%" PRIu64 ", "
1290 "stream-name=\"%s\"",
1291 stream, bt_stream_get_id(stream),
1292 bt_stream_get_name(stream));
1293 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
1294 goto end;
1295 }
1296
1297 /*
1298 * Temporary: make sure packet beginning, packet
1299 * end, discarded events, and discarded packets
1300 * messages have default clock snapshots until
1301 * the support for not having them is
1302 * implemented.
1303 */
1304 if (!bt_stream_class_packets_have_beginning_default_clock_snapshot(
1305 sc)) {
ccaf1ae5
SM
1306 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
1307 "Unsupported stream: packets have no beginning clock snapshot: "
b7cbc799
SM
1308 "stream-addr=%p, "
1309 "stream-id=%" PRIu64 ", "
1310 "stream-name=\"%s\"",
1311 stream, bt_stream_get_id(stream),
1312 bt_stream_get_name(stream));
1313 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
1314 goto end;
1315 }
1316
1317 if (!bt_stream_class_packets_have_end_default_clock_snapshot(
1318 sc)) {
ccaf1ae5
SM
1319 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
1320 "Unsupported stream: packets have no end clock snapshot: "
b7cbc799
SM
1321 "stream-addr=%p, "
1322 "stream-id=%" PRIu64 ", "
1323 "stream-name=\"%s\"",
1324 stream, bt_stream_get_id(stream),
1325 bt_stream_get_name(stream));
1326 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
1327 goto end;
1328 }
1329
1330 if (bt_stream_class_supports_discarded_events(sc) &&
1331 !bt_stream_class_discarded_events_have_default_clock_snapshots(sc)) {
ccaf1ae5
SM
1332 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
1333 "Unsupported stream: discarded events have no clock snapshots: "
b7cbc799
SM
1334 "stream-addr=%p, "
1335 "stream-id=%" PRIu64 ", "
1336 "stream-name=\"%s\"",
1337 stream, bt_stream_get_id(stream),
1338 bt_stream_get_name(stream));
1339 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
1340 goto end;
1341 }
1342
1343 if (bt_stream_class_supports_discarded_packets(sc) &&
1344 !bt_stream_class_discarded_packets_have_default_clock_snapshots(sc)) {
ccaf1ae5
SM
1345 BT_COMP_LOGE_APPEND_CAUSE(trimmer_comp->self_comp,
1346 "Unsupported stream: discarded packets "
b7cbc799
SM
1347 "have no clock snapshots: "
1348 "stream-addr=%p, "
1349 "stream-id=%" PRIu64 ", "
1350 "stream-name=\"%s\"",
1351 stream, bt_stream_get_id(stream),
1352 bt_stream_get_name(stream));
1353 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
1354 goto end;
1355 }
1356
1357 sstate = g_new0(struct trimmer_iterator_stream_state, 1);
1358 if (!sstate) {
1359 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_MEMORY_ERROR;
1360 goto end;
1361 }
1362
1363 sstate->stream = stream;
1364
1365 g_hash_table_insert(trimmer_it->stream_states, (void *) stream, sstate);
1366
1367 *stream_state = sstate;
1368
1369 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK;
1370
1371end:
1372 return status;
1373}
1374
1375static
1376struct trimmer_iterator_stream_state *get_stream_state_entry(
1377 struct trimmer_iterator *trimmer_it,
1378 const struct bt_stream *stream)
1379{
1380 struct trimmer_iterator_stream_state *sstate;
1381
ec4a3354 1382 BT_ASSERT_DBG(stream);
b7cbc799 1383 sstate = g_hash_table_lookup(trimmer_it->stream_states, stream);
ec4a3354 1384 BT_ASSERT_DBG(sstate);
b7cbc799
SM
1385
1386 return sstate;
1387}
1388
781751d8
PP
1389/*
1390 * Handles a message which is associated to a given stream state. This
1391 * _could_ make the iterator's output message queue grow; this could
1392 * also consume the message without pushing anything to this queue, only
1393 * modifying the stream state.
1394 *
1395 * This function consumes the `msg` reference, _whatever the outcome_.
1396 *
b7cbc799
SM
1397 * If non-NULL, `ns_from_origin` is the message's time, as given by
1398 * get_msg_ns_from_origin(). If NULL, the message doesn't have a time.
781751d8
PP
1399 *
1400 * This function sets `reached_end` if handling this message made the
1401 * iterator reach the end of the trimming range. Note that the output
1402 * message queue could contain messages even if this function sets
1403 * `reached_end`.
1404 */
b7cbc799 1405static
fb25b9e3 1406bt_component_class_message_iterator_next_method_status
b7cbc799 1407handle_message_with_stream(
781751d8 1408 struct trimmer_iterator *trimmer_it, const bt_message *msg,
b7cbc799
SM
1409 const struct bt_stream *stream, const int64_t *ns_from_origin,
1410 bool *reached_end)
781751d8 1411{
fb25b9e3
PP
1412 bt_component_class_message_iterator_next_method_status status =
1413 BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK;
781751d8
PP
1414 bt_message_type msg_type = bt_message_get_type(msg);
1415 int ret;
b7cbc799
SM
1416 struct trimmer_iterator_stream_state *sstate = NULL;
1417
1418 /*
1419 * Retrieve the stream's state - except if the message is stream
1420 * beginning, in which case we don't know about about this stream yet.
1421 */
1422 if (msg_type != BT_MESSAGE_TYPE_STREAM_BEGINNING) {
1423 sstate = get_stream_state_entry(trimmer_it, stream);
1424 }
781751d8
PP
1425
1426 switch (msg_type) {
1427 case BT_MESSAGE_TYPE_EVENT:
b7cbc799
SM
1428 /*
1429 * Event messages always have a clock snapshot if the stream
1430 * class has a clock class. And we know it has, otherwise we
1431 * couldn't be using the trimmer component.
1432 */
ec4a3354 1433 BT_ASSERT_DBG(ns_from_origin);
b7cbc799 1434
85e7137b 1435 if (G_UNLIKELY(!trimmer_it->end.is_infinite &&
b7cbc799 1436 *ns_from_origin > trimmer_it->end.ns_from_origin)) {
781751d8
PP
1437 status = end_iterator_streams(trimmer_it);
1438 *reached_end = true;
1439 break;
1440 }
1441
b7cbc799
SM
1442 sstate->seen_clock_snapshot = true;
1443
781751d8
PP
1444 push_message(trimmer_it, msg);
1445 msg = NULL;
1446 break;
b7cbc799 1447
781751d8 1448 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
b7cbc799
SM
1449 /*
1450 * Packet beginning messages won't have a clock snapshot if
1451 * stream_class->packets_have_beginning_default_clock_snapshot
1452 * is false. But for now, assume they always do.
1453 */
1454 BT_ASSERT(ns_from_origin);
1455 BT_ASSERT(!sstate->cur_packet);
1456
85e7137b 1457 if (G_UNLIKELY(!trimmer_it->end.is_infinite &&
b7cbc799 1458 *ns_from_origin > trimmer_it->end.ns_from_origin)) {
781751d8
PP
1459 status = end_iterator_streams(trimmer_it);
1460 *reached_end = true;
1461 break;
1462 }
1463
781751d8
PP
1464 sstate->cur_packet =
1465 bt_message_packet_beginning_borrow_packet_const(msg);
1466 bt_packet_get_ref(sstate->cur_packet);
b7cbc799
SM
1467
1468 sstate->seen_clock_snapshot = true;
1469
781751d8
PP
1470 push_message(trimmer_it, msg);
1471 msg = NULL;
1472 break;
b7cbc799 1473
781751d8 1474 case BT_MESSAGE_TYPE_PACKET_END:
b7cbc799
SM
1475 /*
1476 * Packet end messages won't have a clock snapshot if
1477 * stream_class->packets_have_end_default_clock_snapshot
1478 * is false. But for now, assume they always do.
1479 */
1480 BT_ASSERT(ns_from_origin);
1481 BT_ASSERT(sstate->cur_packet);
781751d8 1482
85e7137b 1483 if (G_UNLIKELY(!trimmer_it->end.is_infinite &&
b7cbc799 1484 *ns_from_origin > trimmer_it->end.ns_from_origin)) {
781751d8
PP
1485 status = end_iterator_streams(trimmer_it);
1486 *reached_end = true;
1487 break;
1488 }
1489
781751d8 1490 BT_PACKET_PUT_REF_AND_RESET(sstate->cur_packet);
b7cbc799
SM
1491
1492 sstate->seen_clock_snapshot = true;
1493
781751d8
PP
1494 push_message(trimmer_it, msg);
1495 msg = NULL;
1496 break;
b7cbc799 1497
781751d8
PP
1498 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
1499 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
1500 {
1501 /*
1502 * `ns_from_origin` is the message's time range's
1503 * beginning time here.
1504 */
1505 int64_t end_ns_from_origin;
1506 const bt_clock_snapshot *end_cs;
1507
b7cbc799
SM
1508 BT_ASSERT(ns_from_origin);
1509
1510 sstate->seen_clock_snapshot = true;
1511
781751d8
PP
1512 if (bt_message_get_type(msg) ==
1513 BT_MESSAGE_TYPE_DISCARDED_EVENTS) {
1514 /*
1515 * Safe to ignore the return value because we
1516 * know there's a default clock and it's always
1517 * known.
1518 */
5ef34326 1519 end_cs = bt_message_discarded_events_borrow_end_default_clock_snapshot_const(
11ddb3ef 1520 msg);
781751d8
PP
1521 } else {
1522 /*
1523 * Safe to ignore the return value because we
1524 * know there's a default clock and it's always
1525 * known.
1526 */
5ef34326 1527 end_cs = bt_message_discarded_packets_borrow_end_default_clock_snapshot_const(
11ddb3ef 1528 msg);
781751d8
PP
1529 }
1530
1531 if (bt_clock_snapshot_get_ns_from_origin(end_cs,
1532 &end_ns_from_origin)) {
fb25b9e3 1533 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
781751d8
PP
1534 goto end;
1535 }
1536
781751d8 1537 if (!trimmer_it->end.is_infinite &&
b7cbc799 1538 *ns_from_origin > trimmer_it->end.ns_from_origin) {
781751d8
PP
1539 status = end_iterator_streams(trimmer_it);
1540 *reached_end = true;
1541 break;
1542 }
1543
1544 if (!trimmer_it->end.is_infinite &&
1545 end_ns_from_origin > trimmer_it->end.ns_from_origin) {
1546 /*
1547 * This message's end time is outside the
1548 * trimming time range: replace it with a new
1549 * message having an end time equal to the
1550 * trimming time range's end and without a
1551 * count.
1552 */
1553 const bt_clock_class *clock_class =
1554 bt_clock_snapshot_borrow_clock_class_const(
1555 end_cs);
1556 const bt_clock_snapshot *begin_cs;
1557 bt_message *new_msg;
1558 uint64_t end_raw_value;
1559
1560 ret = clock_raw_value_from_ns_from_origin(clock_class,
1561 trimmer_it->end.ns_from_origin, &end_raw_value);
1562 if (ret) {
fb25b9e3 1563 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
781751d8
PP
1564 goto end;
1565 }
1566
1567 if (msg_type == BT_MESSAGE_TYPE_DISCARDED_EVENTS) {
5ef34326 1568 begin_cs = bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(
11ddb3ef 1569 msg);
781751d8
PP
1570 new_msg = bt_message_discarded_events_create_with_default_clock_snapshots(
1571 trimmer_it->self_msg_iter,
1572 sstate->stream,
1573 bt_clock_snapshot_get_value(begin_cs),
1574 end_raw_value);
1575 } else {
5ef34326 1576 begin_cs = bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(
11ddb3ef 1577 msg);
781751d8
PP
1578 new_msg = bt_message_discarded_packets_create_with_default_clock_snapshots(
1579 trimmer_it->self_msg_iter,
1580 sstate->stream,
1581 bt_clock_snapshot_get_value(begin_cs),
1582 end_raw_value);
1583 }
1584
1585 if (!new_msg) {
fb25b9e3 1586 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_MEMORY_ERROR;
781751d8
PP
1587 goto end;
1588 }
1589
1590 /* Replace the original message */
1591 BT_MESSAGE_MOVE_REF(msg, new_msg);
1592 }
1593
781751d8
PP
1594 push_message(trimmer_it, msg);
1595 msg = NULL;
1596 break;
1597 }
b7cbc799
SM
1598
1599 case BT_MESSAGE_TYPE_STREAM_BEGINNING:
1600 /*
1601 * If this message has a time and this time is greater than the
1602 * trimmer's end bound, it triggers the end of the trim window.
1603 */
1604 if (G_UNLIKELY(ns_from_origin && !trimmer_it->end.is_infinite &&
1605 *ns_from_origin > trimmer_it->end.ns_from_origin)) {
781751d8
PP
1606 status = end_iterator_streams(trimmer_it);
1607 *reached_end = true;
1608 break;
1609 }
1610
b7cbc799
SM
1611 /* Learn about this stream. */
1612 status = create_stream_state_entry(trimmer_it, stream, &sstate);
1613 if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) {
1614 goto end;
781751d8
PP
1615 }
1616
b7cbc799
SM
1617 if (ns_from_origin) {
1618 sstate->seen_clock_snapshot = true;
781751d8
PP
1619 }
1620
143feff5
SM
1621 push_message(trimmer_it, msg);
1622 msg = NULL;
781751d8
PP
1623 break;
1624 case BT_MESSAGE_TYPE_STREAM_END:
b7cbc799
SM
1625 {
1626 gboolean removed;
1627
143feff5 1628 /*
b7cbc799
SM
1629 * If this message has a time and this time is greater than the
1630 * trimmer's end bound, it triggers the end of the trim window.
143feff5 1631 */
b7cbc799
SM
1632 if (G_UNLIKELY(ns_from_origin && !trimmer_it->end.is_infinite &&
1633 *ns_from_origin > trimmer_it->end.ns_from_origin)) {
1634 status = end_iterator_streams(trimmer_it);
1635 *reached_end = true;
1636 break;
781751d8 1637 }
b7cbc799
SM
1638
1639 /*
1640 * Either the stream end message's time is within the trimmer's
1641 * bounds, or it doesn't have a time. In both cases, pass
1642 * the message unmodified.
1643 */
1644 push_message(trimmer_it, msg);
1645 msg = NULL;
1646
1647 /* Forget about this stream. */
1648 removed = g_hash_table_remove(trimmer_it->stream_states, sstate->stream);
1649 BT_ASSERT(removed);
781751d8 1650 break;
b7cbc799 1651 }
781751d8
PP
1652 default:
1653 break;
1654 }
1655
1656end:
1657 /* We release the message's reference whatever the outcome */
1658 bt_message_put_ref(msg);
b7cbc799 1659 return status;
781751d8
PP
1660}
1661
1662/*
1663 * Handles an input message. This _could_ make the iterator's output
1664 * message queue grow; this could also consume the message without
1665 * pushing anything to this queue, only modifying the stream state.
1666 *
1667 * This function consumes the `msg` reference, _whatever the outcome_.
1668 *
1669 * This function sets `reached_end` if handling this message made the
1670 * iterator reach the end of the trimming range. Note that the output
1671 * message queue could contain messages even if this function sets
1672 * `reached_end`.
1673 */
1674static inline
fb25b9e3 1675bt_component_class_message_iterator_next_method_status handle_message(
781751d8
PP
1676 struct trimmer_iterator *trimmer_it, const bt_message *msg,
1677 bool *reached_end)
1678{
fb25b9e3 1679 bt_component_class_message_iterator_next_method_status status;
781751d8
PP
1680 const bt_stream *stream = NULL;
1681 int64_t ns_from_origin = INT64_MIN;
1564e3dc 1682 bool has_ns_from_origin = false;
781751d8 1683 int ret;
781751d8
PP
1684
1685 /* Find message's associated stream */
1686 switch (bt_message_get_type(msg)) {
1687 case BT_MESSAGE_TYPE_EVENT:
1688 stream = bt_event_borrow_stream_const(
1689 bt_message_event_borrow_event_const(msg));
1690 break;
1691 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
1692 stream = bt_packet_borrow_stream_const(
1693 bt_message_packet_beginning_borrow_packet_const(msg));
1694 break;
1695 case BT_MESSAGE_TYPE_PACKET_END:
1696 stream = bt_packet_borrow_stream_const(
1697 bt_message_packet_end_borrow_packet_const(msg));
1698 break;
1699 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
1700 stream = bt_message_discarded_events_borrow_stream_const(msg);
1701 break;
1702 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
1703 stream = bt_message_discarded_packets_borrow_stream_const(msg);
1704 break;
781751d8
PP
1705 case BT_MESSAGE_TYPE_STREAM_BEGINNING:
1706 stream = bt_message_stream_beginning_borrow_stream_const(msg);
1707 break;
1708 case BT_MESSAGE_TYPE_STREAM_END:
1709 stream = bt_message_stream_end_borrow_stream_const(msg);
1710 break;
1711 default:
1712 break;
1713 }
1714
781751d8 1715 /* Retrieve the message's time */
b7cbc799 1716 ret = get_msg_ns_from_origin(msg, &ns_from_origin, &has_ns_from_origin);
85e7137b 1717 if (G_UNLIKELY(ret)) {
fb25b9e3 1718 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
781751d8
PP
1719 goto end;
1720 }
1721
b7cbc799 1722 if (G_LIKELY(stream)) {
781751d8 1723 /* Message associated to a stream */
b7cbc799
SM
1724 status = handle_message_with_stream(trimmer_it, msg,
1725 stream, has_ns_from_origin ? &ns_from_origin : NULL, reached_end);
781751d8
PP
1726
1727 /*
1728 * handle_message_with_stream_state() unconditionally
1729 * consumes `msg`.
1730 */
1731 msg = NULL;
1732 } else {
1733 /*
1734 * Message not associated to a stream (message iterator
1735 * inactivity).
1736 */
85e7137b 1737 if (G_UNLIKELY(ns_from_origin > trimmer_it->end.ns_from_origin)) {
781751d8
PP
1738 BT_MESSAGE_PUT_REF_AND_RESET(msg);
1739 status = end_iterator_streams(trimmer_it);
1740 *reached_end = true;
1741 } else {
1742 push_message(trimmer_it, msg);
fb25b9e3 1743 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK;
781751d8
PP
1744 msg = NULL;
1745 }
1746 }
1747
1748end:
1749 /* We release the message's reference whatever the outcome */
1750 bt_message_put_ref(msg);
1751 return status;
1752}
1753
1754static inline
1755void fill_message_array_from_output_messages(
1756 struct trimmer_iterator *trimmer_it,
1757 bt_message_array_const msgs, uint64_t capacity, uint64_t *count)
1758{
1759 *count = 0;
1760
1761 /*
1762 * Move auto-seek messages to the output array (which is this
1763 * iterator's base message array).
1764 */
1765 while (capacity > 0 && !g_queue_is_empty(trimmer_it->output_messages)) {
1766 msgs[*count] = pop_message(trimmer_it);
1767 capacity--;
1768 (*count)++;
1769 }
1770
ec4a3354 1771 BT_ASSERT_DBG(*count > 0);
781751d8
PP
1772}
1773
1774static inline
fb25b9e3 1775bt_component_class_message_iterator_next_method_status state_ending(
781751d8
PP
1776 struct trimmer_iterator *trimmer_it,
1777 bt_message_array_const msgs, uint64_t capacity,
1778 uint64_t *count)
1779{
fb25b9e3
PP
1780 bt_component_class_message_iterator_next_method_status status =
1781 BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK;
781751d8
PP
1782
1783 if (g_queue_is_empty(trimmer_it->output_messages)) {
1784 trimmer_it->state = TRIMMER_ITERATOR_STATE_ENDED;
fb25b9e3 1785 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_END;
781751d8
PP
1786 goto end;
1787 }
1788
1789 fill_message_array_from_output_messages(trimmer_it, msgs,
1790 capacity, count);
1791
1792end:
1793 return status;
1794}
1795
1796static inline
fb25b9e3
PP
1797bt_component_class_message_iterator_next_method_status
1798state_trim(struct trimmer_iterator *trimmer_it,
781751d8
PP
1799 bt_message_array_const msgs, uint64_t capacity,
1800 uint64_t *count)
1801{
fb25b9e3
PP
1802 bt_component_class_message_iterator_next_method_status status =
1803 BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK;
781751d8
PP
1804 bt_message_array_const my_msgs;
1805 uint64_t my_count;
1806 uint64_t i;
1807 bool reached_end = false;
1808
1809 while (g_queue_is_empty(trimmer_it->output_messages)) {
1810 status = (int) bt_self_component_port_input_message_iterator_next(
1811 trimmer_it->upstream_iter, &my_msgs, &my_count);
fb25b9e3
PP
1812 if (G_UNLIKELY(status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK)) {
1813 if (status == BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_END) {
781751d8 1814 status = end_iterator_streams(trimmer_it);
fb25b9e3 1815 if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) {
781751d8
PP
1816 goto end;
1817 }
1818
1819 trimmer_it->state =
1820 TRIMMER_ITERATOR_STATE_ENDING;
1821 status = state_ending(trimmer_it, msgs,
1822 capacity, count);
1823 }
1824
1825 goto end;
1826 }
1827
ec4a3354 1828 BT_ASSERT_DBG(my_count > 0);
781751d8
PP
1829
1830 for (i = 0; i < my_count; i++) {
1831 status = handle_message(trimmer_it, my_msgs[i],
1832 &reached_end);
1833
1834 /*
1835 * handle_message() unconditionally consumes the
1836 * message reference.
1837 */
1838 my_msgs[i] = NULL;
1839
85e7137b 1840 if (G_UNLIKELY(status !=
fb25b9e3 1841 BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK)) {
781751d8
PP
1842 put_messages(my_msgs, my_count);
1843 goto end;
1844 }
1845
85e7137b 1846 if (G_UNLIKELY(reached_end)) {
781751d8
PP
1847 /*
1848 * This message's time was passed the
1849 * trimming time range's end time: we
1850 * are done. Their might still be
1851 * messages in the output message queue,
1852 * so move to the "ending" state and
1853 * apply it immediately since
1854 * state_trim() is called within the
1855 * "next" method.
1856 */
1857 put_messages(my_msgs, my_count);
1858 trimmer_it->state =
1859 TRIMMER_ITERATOR_STATE_ENDING;
1860 status = state_ending(trimmer_it, msgs,
1861 capacity, count);
1862 goto end;
1863 }
1864 }
1865 }
1866
1867 /*
1868 * There's at least one message in the output message queue:
1869 * move the messages to the output message array.
1870 */
ec4a3354 1871 BT_ASSERT_DBG(!g_queue_is_empty(trimmer_it->output_messages));
781751d8
PP
1872 fill_message_array_from_output_messages(trimmer_it, msgs,
1873 capacity, count);
1874
1875end:
1876 return status;
1877}
1878
1879BT_HIDDEN
fb25b9e3 1880bt_component_class_message_iterator_next_method_status trimmer_msg_iter_next(
781751d8
PP
1881 bt_self_message_iterator *self_msg_iter,
1882 bt_message_array_const msgs, uint64_t capacity,
1883 uint64_t *count)
1884{
1885 struct trimmer_iterator *trimmer_it =
1886 bt_self_message_iterator_get_data(self_msg_iter);
fb25b9e3
PP
1887 bt_component_class_message_iterator_next_method_status status =
1888 BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK;
781751d8 1889
ec4a3354 1890 BT_ASSERT_DBG(trimmer_it);
781751d8 1891
85e7137b 1892 if (G_LIKELY(trimmer_it->state == TRIMMER_ITERATOR_STATE_TRIM)) {
781751d8 1893 status = state_trim(trimmer_it, msgs, capacity, count);
fb25b9e3 1894 if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) {
781751d8
PP
1895 goto end;
1896 }
1897 } else {
1898 switch (trimmer_it->state) {
1899 case TRIMMER_ITERATOR_STATE_SET_BOUNDS_NS_FROM_ORIGIN:
1900 status = state_set_trimmer_iterator_bounds(trimmer_it);
fb25b9e3 1901 if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) {
781751d8
PP
1902 goto end;
1903 }
1904
1905 status = state_seek_initially(trimmer_it);
fb25b9e3 1906 if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) {
781751d8
PP
1907 goto end;
1908 }
1909
1910 status = state_trim(trimmer_it, msgs, capacity, count);
fb25b9e3 1911 if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) {
781751d8
PP
1912 goto end;
1913 }
1914
1915 break;
1916 case TRIMMER_ITERATOR_STATE_SEEK_INITIALLY:
1917 status = state_seek_initially(trimmer_it);
fb25b9e3 1918 if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) {
781751d8
PP
1919 goto end;
1920 }
1921
1922 status = state_trim(trimmer_it, msgs, capacity, count);
fb25b9e3 1923 if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) {
781751d8
PP
1924 goto end;
1925 }
1926
1927 break;
1928 case TRIMMER_ITERATOR_STATE_ENDING:
1929 status = state_ending(trimmer_it, msgs, capacity,
1930 count);
fb25b9e3 1931 if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) {
781751d8
PP
1932 goto end;
1933 }
1934
1935 break;
1936 case TRIMMER_ITERATOR_STATE_ENDED:
fb25b9e3 1937 status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_END;
781751d8
PP
1938 break;
1939 default:
24847fc7 1940 bt_common_abort();
781751d8
PP
1941 }
1942 }
1943
1944end:
1945 return status;
1946}
1947
1948BT_HIDDEN
1949void trimmer_msg_iter_finalize(bt_self_message_iterator *self_msg_iter)
1950{
1951 struct trimmer_iterator *trimmer_it =
1952 bt_self_message_iterator_get_data(self_msg_iter);
1953
1954 BT_ASSERT(trimmer_it);
1955 destroy_trimmer_iterator(trimmer_it);
1956}
This page took 0.1589 seconds and 4 git commands to generate.