Fix: flt.utils.trimmer: accept inited streams ending without other messages
[babeltrace.git] / plugins / utils / trimmer / trimmer.c
CommitLineData
cab3f160 1/*
cab3f160 2 * Copyright 2016 Jérémie Galarneau <jeremie.galarneau@efficios.com>
781751d8 3 * Copyright 2019 Philippe Proulx <pproulx@efficios.com>
cab3f160
JG
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a copy
6 * of this software and associated documentation files (the "Software"), to deal
7 * in the Software without restriction, including without limitation the rights
8 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 * copies of the Software, and to permit persons to whom the Software is
10 * furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21 * SOFTWARE.
22 */
23
f4f9e43b
PP
24#define BT_LOG_TAG "PLUGIN-UTILS-TRIMMER-FLT"
25#include "logging.h"
26
3d2f08e7 27#include <babeltrace/compat/utc-internal.h>
781751d8 28#include <babeltrace/compat/time-internal.h>
e0831b38 29#include <babeltrace/babeltrace.h>
781751d8 30#include <babeltrace/common-internal.h>
7d61fa8e 31#include <plugins-common.h>
8b45963b 32#include <babeltrace/assert-internal.h>
781751d8
PP
33#include <stdint.h>
34#include <inttypes.h>
35#include <glib.h>
36
37#include "trimmer.h"
38
39#define NS_PER_S INT64_C(1000000000)
40
41static const char * const in_port_name = "in";
42
43struct trimmer_time {
44 unsigned int hour, minute, second, ns;
45};
46
47struct trimmer_bound {
48 /*
49 * Nanoseconds from origin, valid if `is_set` is set and
50 * `is_infinite` is false.
51 */
52 int64_t ns_from_origin;
53
54 /* True if this bound's full time (`ns_from_origin`) is set */
55 bool is_set;
56
57 /*
58 * True if this bound represents the infinity (negative or
59 * positive depending on which bound it is). If this is true,
60 * then we don't care about `ns_from_origin` above.
61 */
62 bool is_infinite;
63
64 /*
65 * This bound's time without the date; this time is used to set
66 * `ns_from_origin` once we know the date.
67 */
68 struct trimmer_time time;
69};
70
71struct trimmer_comp {
72 struct trimmer_bound begin, end;
73 bool is_gmt;
74};
75
76enum trimmer_iterator_state {
77 /*
78 * Find the first message's date and set the bounds's times
79 * accordingly.
80 */
81 TRIMMER_ITERATOR_STATE_SET_BOUNDS_NS_FROM_ORIGIN,
82
83 /*
84 * Initially seek to the trimming range's beginning time.
85 */
86 TRIMMER_ITERATOR_STATE_SEEK_INITIALLY,
87
88 /*
89 * Fill the output message queue for as long as received input
90 * messages are within the trimming time range.
91 */
92 TRIMMER_ITERATOR_STATE_TRIM,
93
94 /* Flush the remaining messages in the output message queue */
95 TRIMMER_ITERATOR_STATE_ENDING,
96
97 /* Trimming operation and message iterator is ended */
98 TRIMMER_ITERATOR_STATE_ENDED,
99};
100
101struct trimmer_iterator {
102 /* Weak */
103 struct trimmer_comp *trimmer_comp;
104
105 /* Weak */
106 bt_self_message_iterator *self_msg_iter;
107
108 enum trimmer_iterator_state state;
109
110 /* Owned by this */
111 bt_self_component_port_input_message_iterator *upstream_iter;
112 struct trimmer_bound begin, end;
113
114 /*
115 * Queue of `const bt_message *` (owned by the queue).
116 *
117 * This is where the trimming operation pushes the messages to
118 * output by this message iterator.
119 */
120 GQueue *output_messages;
121
122 /*
123 * Hash table of `bt_stream *` (weak) to
124 * `struct trimmer_iterator_stream_state *` (owned by the HT).
125 */
126 GHashTable *stream_states;
127};
128
129struct trimmer_iterator_stream_state {
130 /*
131 * True if both stream beginning and initial stream activity
132 * beginning messages were pushed for this stream.
133 */
134 bool inited;
135
136 /*
137 * True if the last pushed message for this stream was a stream
138 * activity end message.
139 */
140 bool last_msg_is_stream_activity_end;
141
142 /*
143 * Time to use for a generated stream end activity message when
144 * ending the stream.
145 */
146 int64_t stream_act_end_ns_from_origin;
147
148 /* Weak */
149 const bt_stream *stream;
150
151 /* Owned by this (`NULL` initially and between packets) */
152 const bt_packet *cur_packet;
153
154 /* Owned by this */
155 const bt_message *stream_beginning_msg;
156};
cab3f160
JG
157
158static
781751d8 159void destroy_trimmer_comp(struct trimmer_comp *trimmer_comp)
cab3f160 160{
781751d8
PP
161 BT_ASSERT(trimmer_comp);
162 g_free(trimmer_comp);
cab3f160
JG
163}
164
165static
781751d8
PP
166struct trimmer_comp *create_trimmer_comp(void)
167{
168 return g_new0(struct trimmer_comp, 1);
169}
170
171BT_HIDDEN
172void trimmer_finalize(bt_self_component_filter *self_comp)
cab3f160 173{
781751d8
PP
174 struct trimmer_comp *trimmer_comp =
175 bt_self_component_get_data(
176 bt_self_component_filter_as_self_component(self_comp));
177
178 if (trimmer_comp) {
179 destroy_trimmer_comp(trimmer_comp);
180 }
cab3f160
JG
181}
182
781751d8
PP
183/*
184 * Sets the time (in ns from origin) of a trimmer bound from date and
185 * time components.
186 *
187 * Returns a negative value if anything goes wrong.
188 */
189static
190int set_bound_ns_from_origin(struct trimmer_bound *bound,
191 unsigned int year, unsigned int month, unsigned int day,
192 unsigned int hour, unsigned int minute, unsigned int second,
193 unsigned int ns, bool is_gmt)
cab3f160 194{
781751d8
PP
195 int ret = 0;
196 time_t result;
197 struct tm tm = {
198 .tm_sec = second,
199 .tm_min = minute,
200 .tm_hour = hour,
201 .tm_mday = day,
202 .tm_mon = month - 1,
203 .tm_year = year - 1900,
204 .tm_isdst = -1,
205 };
206
207 if (is_gmt) {
208 result = bt_timegm(&tm);
209 } else {
210 result = mktime(&tm);
211 }
212
213 if (result < 0) {
214 ret = -1;
215 goto end;
216 }
cab3f160 217
781751d8
PP
218 BT_ASSERT(bound);
219 bound->ns_from_origin = (int64_t) result;
220 bound->ns_from_origin *= NS_PER_S;
221 bound->ns_from_origin += ns;
222 bound->is_set = true;
223
224end:
225 return ret;
cab3f160
JG
226}
227
528debdf
MD
228/*
229 * Parses a timestamp, figuring out its format.
230 *
231 * Returns a negative value if anything goes wrong.
232 *
233 * Expected formats:
234 *
781751d8
PP
235 * YYYY-MM-DD hh:mm[:ss[.ns]]
236 * [hh:mm:]ss[.ns]
237 * [-]s[.ns]
238 *
239 * TODO: Check overflows.
240 */
241static
242int set_bound_from_str(const char *str, struct trimmer_bound *bound,
243 bool is_gmt)
244{
245 int ret = 0;
246 int s_ret;
247 unsigned int year, month, day, hour, minute, second, ns;
248 char dummy;
249
250 /* Try `YYYY-MM-DD hh:mm:ss.ns` format */
251 s_ret = sscanf(str, "%u-%u-%u %u:%u:%u.%u%c", &year, &month, &day,
252 &hour, &minute, &second, &ns, &dummy);
253 if (s_ret == 7) {
254 ret = set_bound_ns_from_origin(bound, year, month, day,
255 hour, minute, second, ns, is_gmt);
256 goto end;
257 }
258
259 /* Try `YYYY-MM-DD hh:mm:ss` format */
260 s_ret = sscanf(str, "%u-%u-%u %u:%u:%u%c", &year, &month, &day,
261 &hour, &minute, &second, &dummy);
262 if (s_ret == 6) {
263 ret = set_bound_ns_from_origin(bound, year, month, day,
264 hour, minute, second, 0, is_gmt);
265 goto end;
266 }
267
268 /* Try `YYYY-MM-DD hh:mm` format */
269 s_ret = sscanf(str, "%u-%u-%u %u:%u%c", &year, &month, &day,
270 &hour, &minute, &dummy);
271 if (s_ret == 5) {
272 ret = set_bound_ns_from_origin(bound, year, month, day,
273 hour, minute, 0, 0, is_gmt);
274 goto end;
275 }
276
277 /* Try `YYYY-MM-DD` format */
278 s_ret = sscanf(str, "%u-%u-%u%c", &year, &month, &day, &dummy);
279 if (s_ret == 3) {
280 ret = set_bound_ns_from_origin(bound, year, month, day,
281 0, 0, 0, 0, is_gmt);
282 goto end;
283 }
284
285 /* Try `hh:mm:ss.ns` format */
286 s_ret = sscanf(str, "%u:%u:%u.%u%c", &hour, &minute, &second, &ns,
287 &dummy);
288 if (s_ret == 4) {
289 bound->time.hour = hour;
290 bound->time.minute = minute;
291 bound->time.second = second;
292 bound->time.ns = ns;
293 goto end;
294 }
295
296 /* Try `hh:mm:ss` format */
297 s_ret = sscanf(str, "%u:%u:%u%c", &hour, &minute, &second, &dummy);
298 if (s_ret == 3) {
299 bound->time.hour = hour;
300 bound->time.minute = minute;
301 bound->time.second = second;
302 bound->time.ns = 0;
303 goto end;
304 }
305
306 /* Try `-s.ns` format */
307 s_ret = sscanf(str, "-%u.%u%c", &second, &ns, &dummy);
308 if (s_ret == 2) {
309 bound->ns_from_origin = -((int64_t) second) * NS_PER_S;
310 bound->ns_from_origin -= (int64_t) ns;
311 bound->is_set = true;
312 goto end;
313 }
314
315 /* Try `s.ns` format */
316 s_ret = sscanf(str, "%u.%u%c", &second, &ns, &dummy);
317 if (s_ret == 2) {
318 bound->ns_from_origin = ((int64_t) second) * NS_PER_S;
319 bound->ns_from_origin += (int64_t) ns;
320 bound->is_set = true;
321 goto end;
322 }
323
324 /* Try `-s` format */
325 s_ret = sscanf(str, "-%u%c", &second, &dummy);
326 if (s_ret == 1) {
327 bound->ns_from_origin = -((int64_t) second) * NS_PER_S;
328 bound->is_set = true;
329 goto end;
330 }
331
332 /* Try `s` format */
333 s_ret = sscanf(str, "%u%c", &second, &dummy);
334 if (s_ret == 1) {
335 bound->ns_from_origin = (int64_t) second * NS_PER_S;
336 bound->is_set = true;
337 goto end;
338 }
339
340 BT_LOGE("Invalid date/time format: param=\"%s\"", str);
341 ret = -1;
342
343end:
344 return ret;
345}
346
347/*
348 * Sets a trimmer bound's properties from a parameter string/integer
349 * value.
350 *
351 * Returns a negative value if anything goes wrong.
528debdf 352 */
44d3cbf0 353static
781751d8
PP
354int set_bound_from_param(const char *param_name, const bt_value *param,
355 struct trimmer_bound *bound, bool is_gmt)
528debdf
MD
356{
357 int ret;
922fc2dd 358 const char *arg;
781751d8 359 char tmp_arg[64];
922fc2dd
PP
360
361 if (bt_value_is_integer(param)) {
781751d8
PP
362 int64_t value = bt_value_integer_get(param);
363
364 /*
365 * Just convert it to a temporary string to handle
366 * everything the same way.
367 */
368 sprintf(tmp_arg, "%" PRId64, value);
369 arg = tmp_arg;
370 } else if (bt_value_is_string(param)) {
371 arg = bt_value_string_get(param);
372 } else {
922fc2dd
PP
373 BT_LOGE("`%s` parameter must be an integer or a string value.",
374 param_name);
781751d8
PP
375 ret = -1;
376 goto end;
922fc2dd
PP
377 }
378
781751d8
PP
379 ret = set_bound_from_str(arg, bound, is_gmt);
380
381end:
382 return ret;
383}
384
385static
386int validate_trimmer_bounds(struct trimmer_bound *begin,
387 struct trimmer_bound *end)
388{
389 int ret = 0;
390
391 BT_ASSERT(begin->is_set);
392 BT_ASSERT(end->is_set);
393
394 if (!begin->is_infinite && !end->is_infinite &&
395 begin->ns_from_origin > end->ns_from_origin) {
396 BT_LOGE("Trimming time range's beginning time is greater than end time: "
397 "begin-ns-from-origin=%" PRId64 ", "
398 "end-ns-from-origin=%" PRId64,
399 begin->ns_from_origin,
400 end->ns_from_origin);
401 ret = -1;
402 goto end;
528debdf
MD
403 }
404
781751d8
PP
405 if (!begin->is_infinite && begin->ns_from_origin == INT64_MIN) {
406 BT_LOGE("Invalid trimming time range's beginning time: "
407 "ns-from-origin=%" PRId64,
408 begin->ns_from_origin);
409 ret = -1;
410 goto end;
411 }
528debdf 412
781751d8
PP
413 if (!end->is_infinite && end->ns_from_origin == INT64_MIN) {
414 BT_LOGE("Invalid trimming time range's end time: "
415 "ns-from-origin=%" PRId64,
416 end->ns_from_origin);
417 ret = -1;
418 goto end;
419 }
528debdf 420
781751d8
PP
421end:
422 return ret;
528debdf
MD
423}
424
425static
781751d8
PP
426int init_trimmer_comp_from_params(struct trimmer_comp *trimmer_comp,
427 const bt_value *params)
44d3cbf0 428{
781751d8
PP
429 const bt_value *value;
430 int ret = 0;
44d3cbf0 431
8b45963b 432 BT_ASSERT(params);
781751d8 433 value = bt_value_map_borrow_entry_value_const(params, "gmt");
528debdf 434 if (value) {
781751d8 435 trimmer_comp->is_gmt = (bool) bt_value_bool_get(value);
528debdf
MD
436 }
437
781751d8 438 value = bt_value_map_borrow_entry_value_const(params, "begin");
528debdf 439 if (value) {
781751d8
PP
440 if (set_bound_from_param("begin", value,
441 &trimmer_comp->begin, trimmer_comp->is_gmt)) {
442 /* set_bound_from_param() logs errors */
443 ret = BT_SELF_COMPONENT_STATUS_ERROR;
922fc2dd 444 goto end;
44d3cbf0 445 }
781751d8
PP
446 } else {
447 trimmer_comp->begin.is_infinite = true;
448 trimmer_comp->begin.is_set = true;
44d3cbf0 449 }
528debdf 450
781751d8 451 value = bt_value_map_borrow_entry_value_const(params, "end");
528debdf 452 if (value) {
781751d8
PP
453 if (set_bound_from_param("end", value,
454 &trimmer_comp->end, trimmer_comp->is_gmt)) {
455 /* set_bound_from_param() logs errors */
456 ret = BT_SELF_COMPONENT_STATUS_ERROR;
922fc2dd 457 goto end;
528debdf 458 }
781751d8
PP
459 } else {
460 trimmer_comp->end.is_infinite = true;
461 trimmer_comp->end.is_set = true;
528debdf 462 }
922fc2dd 463
528debdf 464end:
781751d8
PP
465 if (trimmer_comp->begin.is_set && trimmer_comp->end.is_set) {
466 /* validate_trimmer_bounds() logs errors */
467 ret = validate_trimmer_bounds(&trimmer_comp->begin,
468 &trimmer_comp->end);
55595636 469 }
781751d8 470
528debdf 471 return ret;
44d3cbf0
JG
472}
473
781751d8
PP
474bt_self_component_status trimmer_init(bt_self_component_filter *self_comp,
475 const bt_value *params, void *init_data)
cab3f160 476{
781751d8
PP
477 int ret;
478 bt_self_component_status status;
479 struct trimmer_comp *trimmer_comp = create_trimmer_comp();
cab3f160 480
781751d8 481 if (!trimmer_comp) {
64e6ceea 482 status = BT_SELF_COMPONENT_STATUS_NOMEM;
781751d8 483 goto error;
cab3f160
JG
484 }
485
781751d8
PP
486 status = bt_self_component_filter_add_input_port(
487 self_comp, in_port_name, NULL, NULL);
488 if (status != BT_SELF_COMPONENT_STATUS_OK) {
b9d103be
PP
489 goto error;
490 }
491
781751d8
PP
492 status = bt_self_component_filter_add_output_port(
493 self_comp, "out", NULL, NULL);
494 if (status != BT_SELF_COMPONENT_STATUS_OK) {
b9d103be
PP
495 goto error;
496 }
497
781751d8
PP
498 ret = init_trimmer_comp_from_params(trimmer_comp, params);
499 if (ret) {
64e6ceea 500 status = BT_SELF_COMPONENT_STATUS_ERROR;
cab3f160
JG
501 goto error;
502 }
503
781751d8
PP
504 bt_self_component_set_data(
505 bt_self_component_filter_as_self_component(self_comp),
506 trimmer_comp);
507 goto end;
508
509error:
510 if (status == BT_SELF_COMPONENT_STATUS_OK) {
511 status = BT_SELF_COMPONENT_STATUS_ERROR;
512 }
513
514 if (trimmer_comp) {
515 destroy_trimmer_comp(trimmer_comp);
516 }
517
cab3f160 518end:
64e6ceea 519 return status;
781751d8
PP
520}
521
522static
523void destroy_trimmer_iterator(struct trimmer_iterator *trimmer_it)
524{
525 BT_ASSERT(trimmer_it);
526 bt_self_component_port_input_message_iterator_put_ref(
527 trimmer_it->upstream_iter);
528
529 if (trimmer_it->output_messages) {
530 g_queue_free(trimmer_it->output_messages);
531 }
532
533 if (trimmer_it->stream_states) {
534 g_hash_table_destroy(trimmer_it->stream_states);
535 }
536
537 g_free(trimmer_it);
538}
539
540static
541void destroy_trimmer_iterator_stream_state(
542 struct trimmer_iterator_stream_state *sstate)
543{
544 BT_ASSERT(sstate);
545 BT_PACKET_PUT_REF_AND_RESET(sstate->cur_packet);
546 BT_MESSAGE_PUT_REF_AND_RESET(sstate->stream_beginning_msg);
547 g_free(sstate);
548}
549
550BT_HIDDEN
551bt_self_message_iterator_status trimmer_msg_iter_init(
552 bt_self_message_iterator *self_msg_iter,
553 bt_self_component_filter *self_comp,
554 bt_self_component_port_output *port)
555{
556 bt_self_message_iterator_status status =
557 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
558 struct trimmer_iterator *trimmer_it;
559
560 trimmer_it = g_new0(struct trimmer_iterator, 1);
561 if (!trimmer_it) {
562 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
563 goto end;
564 }
565
566 trimmer_it->trimmer_comp = bt_self_component_get_data(
567 bt_self_component_filter_as_self_component(self_comp));
568 BT_ASSERT(trimmer_it->trimmer_comp);
569
570 if (trimmer_it->trimmer_comp->begin.is_set &&
571 trimmer_it->trimmer_comp->end.is_set) {
572 /*
573 * Both trimming time range's bounds are set, so skip
574 * the
575 * `TRIMMER_ITERATOR_STATE_SET_BOUNDS_NS_FROM_ORIGIN`
576 * phase.
577 */
578 trimmer_it->state = TRIMMER_ITERATOR_STATE_SEEK_INITIALLY;
579 }
580
581 trimmer_it->begin = trimmer_it->trimmer_comp->begin;
582 trimmer_it->end = trimmer_it->trimmer_comp->end;
583 trimmer_it->upstream_iter =
584 bt_self_component_port_input_message_iterator_create(
585 bt_self_component_filter_borrow_input_port_by_name(
586 self_comp, in_port_name));
587 if (!trimmer_it->upstream_iter) {
588 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
589 goto end;
590 }
591
592 trimmer_it->output_messages = g_queue_new();
593 if (!trimmer_it->output_messages) {
594 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
595 goto end;
596 }
597
598 trimmer_it->stream_states = g_hash_table_new_full(g_direct_hash,
599 g_direct_equal, NULL,
600 (GDestroyNotify) destroy_trimmer_iterator_stream_state);
601 if (!trimmer_it->stream_states) {
602 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
603 goto end;
604 }
605
606 trimmer_it->self_msg_iter = self_msg_iter;
607 bt_self_message_iterator_set_data(self_msg_iter, trimmer_it);
608
609end:
610 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK && trimmer_it) {
611 destroy_trimmer_iterator(trimmer_it);
612 }
613
614 return status;
615}
616
617static inline
618int get_msg_ns_from_origin(const bt_message *msg, int64_t *ns_from_origin,
619 bool *skip)
620{
621 const bt_clock_class *clock_class = NULL;
622 const bt_clock_snapshot *clock_snapshot = NULL;
623 bt_clock_snapshot_state cs_state = BT_CLOCK_SNAPSHOT_STATE_KNOWN;
624 bt_message_stream_activity_clock_snapshot_state sa_cs_state;
625 int ret = 0;
626
627 BT_ASSERT(msg);
628 BT_ASSERT(ns_from_origin);
629 BT_ASSERT(skip);
630
631 switch (bt_message_get_type(msg)) {
632 case BT_MESSAGE_TYPE_EVENT:
633 clock_class =
634 bt_message_event_borrow_stream_class_default_clock_class_const(
635 msg);
636 if (unlikely(!clock_class)) {
637 goto error;
638 }
639
640 cs_state = bt_message_event_borrow_default_clock_snapshot_const(
641 msg, &clock_snapshot);
642 break;
643 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
644 clock_class =
645 bt_message_packet_beginning_borrow_stream_class_default_clock_class_const(
646 msg);
647 if (unlikely(!clock_class)) {
648 goto error;
649 }
650
651 cs_state = bt_message_packet_beginning_borrow_default_clock_snapshot_const(
652 msg, &clock_snapshot);
653 break;
654 case BT_MESSAGE_TYPE_PACKET_END:
655 clock_class =
656 bt_message_packet_end_borrow_stream_class_default_clock_class_const(
657 msg);
658 if (unlikely(!clock_class)) {
659 goto error;
660 }
661
662 cs_state = bt_message_packet_end_borrow_default_clock_snapshot_const(
663 msg, &clock_snapshot);
664 break;
665 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
666 clock_class =
667 bt_message_discarded_events_borrow_stream_class_default_clock_class_const(
668 msg);
669 if (unlikely(!clock_class)) {
670 goto error;
671 }
672
673 cs_state = bt_message_discarded_events_borrow_default_beginning_clock_snapshot_const(
674 msg, &clock_snapshot);
675 break;
676 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
677 clock_class =
678 bt_message_discarded_packets_borrow_stream_class_default_clock_class_const(
679 msg);
680 if (unlikely(!clock_class)) {
681 goto error;
682 }
683
684 cs_state = bt_message_discarded_packets_borrow_default_beginning_clock_snapshot_const(
685 msg, &clock_snapshot);
686 break;
687 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING:
688 clock_class =
689 bt_message_stream_activity_beginning_borrow_stream_class_default_clock_class_const(
690 msg);
691 if (unlikely(!clock_class)) {
692 goto error;
693 }
694
695 sa_cs_state = bt_message_stream_activity_beginning_borrow_default_clock_snapshot_const(
696 msg, &clock_snapshot);
697 if (sa_cs_state == BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_UNKNOWN ||
698 sa_cs_state == BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_INFINITE) {
699 /* Lowest possible time to always include them */
700 *ns_from_origin = INT64_MIN;
701 goto no_clock_snapshot;
702 }
703
704 break;
705 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END:
706 clock_class =
707 bt_message_stream_activity_end_borrow_stream_class_default_clock_class_const(
708 msg);
709 if (unlikely(!clock_class)) {
710 goto error;
711 }
712
713 sa_cs_state = bt_message_stream_activity_end_borrow_default_clock_snapshot_const(
714 msg, &clock_snapshot);
715 if (sa_cs_state == BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_UNKNOWN) {
716 /* Lowest time to always include it */
717 *ns_from_origin = INT64_MIN;
718 goto no_clock_snapshot;
719 } else if (sa_cs_state == BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_INFINITE) {
720 /* Greatest time to always exclude it */
721 *ns_from_origin = INT64_MAX;
722 goto no_clock_snapshot;
723 }
724
725 break;
726 case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
727 cs_state =
728 bt_message_message_iterator_inactivity_borrow_default_clock_snapshot_const(
729 msg, &clock_snapshot);
730 break;
731 default:
732 goto no_clock_snapshot;
733 }
734
735 if (unlikely(cs_state != BT_CLOCK_SNAPSHOT_STATE_KNOWN)) {
736 BT_LOGE_STR("Unsupported unknown clock snapshot.");
737 ret = -1;
738 goto end;
739 }
740
741 ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot,
742 ns_from_origin);
743 if (unlikely(ret)) {
744 goto error;
745 }
746
747 goto end;
748
749no_clock_snapshot:
750 *skip = true;
751 goto end;
752
cab3f160 753error:
781751d8
PP
754 ret = -1;
755
756end:
757 return ret;
758}
759
760static inline
761void put_messages(bt_message_array_const msgs, uint64_t count)
762{
763 uint64_t i;
764
765 for (i = 0; i < count; i++) {
766 BT_MESSAGE_PUT_REF_AND_RESET(msgs[i]);
767 }
768}
769
770static inline
771int set_trimmer_iterator_bound(struct trimmer_bound *bound,
772 int64_t ns_from_origin, bool is_gmt)
773{
774 struct tm tm;
775 time_t time_seconds = (time_t) (ns_from_origin / NS_PER_S);
776 int ret = 0;
777
778 BT_ASSERT(!bound->is_set);
779 errno = 0;
780
781 /* We only need to extract the date from this time */
782 if (is_gmt) {
783 bt_gmtime_r(&time_seconds, &tm);
784 } else {
785 bt_localtime_r(&time_seconds, &tm);
786 }
787
788 if (errno) {
789 BT_LOGE_ERRNO("Cannot convert timestamp to date and time",
790 "ts=%" PRId64, (int64_t) time_seconds);
791 ret = -1;
792 goto end;
793 }
794
795 ret = set_bound_ns_from_origin(bound, tm.tm_year + 1900, tm.tm_mon + 1,
796 tm.tm_mday, bound->time.hour, bound->time.minute,
797 bound->time.second, bound->time.ns, is_gmt);
798
799end:
cab3f160
JG
800 return ret;
801}
781751d8
PP
802
803static
804bt_self_message_iterator_status state_set_trimmer_iterator_bounds(
805 struct trimmer_iterator *trimmer_it)
806{
807 bt_message_iterator_status upstream_iter_status =
808 BT_MESSAGE_ITERATOR_STATUS_OK;
809 struct trimmer_comp *trimmer_comp = trimmer_it->trimmer_comp;
810 bt_message_array_const msgs;
811 uint64_t count = 0;
812 int64_t ns_from_origin = INT64_MIN;
813 uint64_t i;
814 int ret;
815
816 BT_ASSERT(!trimmer_it->begin.is_set ||
817 !trimmer_it->end.is_set);
818
819 while (true) {
820 upstream_iter_status =
821 bt_self_component_port_input_message_iterator_next(
822 trimmer_it->upstream_iter, &msgs, &count);
823 if (upstream_iter_status != BT_MESSAGE_ITERATOR_STATUS_OK) {
824 goto end;
825 }
826
827 for (i = 0; i < count; i++) {
828 const bt_message *msg = msgs[i];
829 bool skip = false;
830 int ret;
831
832 ret = get_msg_ns_from_origin(msg, &ns_from_origin,
833 &skip);
834 if (ret) {
835 goto error;
836 }
837
838 if (skip) {
839 continue;
840 }
841
842 BT_ASSERT(ns_from_origin != INT64_MIN &&
843 ns_from_origin != INT64_MAX);
844 put_messages(msgs, count);
845 goto found;
846 }
847
848 put_messages(msgs, count);
849 }
850
851found:
852 if (!trimmer_it->begin.is_set) {
853 BT_ASSERT(!trimmer_it->begin.is_infinite);
854 ret = set_trimmer_iterator_bound(&trimmer_it->begin,
855 ns_from_origin, trimmer_comp->is_gmt);
856 if (ret) {
857 goto error;
858 }
859 }
860
861 if (!trimmer_it->end.is_set) {
862 BT_ASSERT(!trimmer_it->end.is_infinite);
863 ret = set_trimmer_iterator_bound(&trimmer_it->end,
864 ns_from_origin, trimmer_comp->is_gmt);
865 if (ret) {
866 goto error;
867 }
868 }
869
870 ret = validate_trimmer_bounds(&trimmer_it->begin,
871 &trimmer_it->end);
872 if (ret) {
873 goto error;
874 }
875
876 goto end;
877
878error:
879 put_messages(msgs, count);
880 upstream_iter_status = BT_MESSAGE_ITERATOR_STATUS_ERROR;
881
882end:
883 return (int) upstream_iter_status;
884}
885
886static
887bt_self_message_iterator_status state_seek_initially(
888 struct trimmer_iterator *trimmer_it)
889{
890 bt_self_message_iterator_status status =
891 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
892
893 BT_ASSERT(trimmer_it->begin.is_set);
894
895 if (trimmer_it->begin.is_infinite) {
896 if (!bt_self_component_port_input_message_iterator_can_seek_beginning(
897 trimmer_it->upstream_iter)) {
898 BT_LOGE_STR("Cannot make upstream message iterator initially seek its beginning.");
899 status = BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
900 goto end;
901 }
902
903 status = (int) bt_self_component_port_input_message_iterator_seek_beginning(
904 trimmer_it->upstream_iter);
905 } else {
906 if (!bt_self_component_port_input_message_iterator_can_seek_ns_from_origin(
907 trimmer_it->upstream_iter,
908 trimmer_it->begin.ns_from_origin)) {
909 BT_LOGE("Cannot make upstream message iterator initially seek: "
910 "seek-ns-from-origin=%" PRId64,
911 trimmer_it->begin.ns_from_origin);
912 status = BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
913 goto end;
914 }
915
916 status = (int) bt_self_component_port_input_message_iterator_seek_ns_from_origin(
917 trimmer_it->upstream_iter, trimmer_it->begin.ns_from_origin);
918 }
919
920 if (status == BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
921 trimmer_it->state = TRIMMER_ITERATOR_STATE_TRIM;
922 }
923
924end:
925 return status;
926}
927
928static inline
929void push_message(struct trimmer_iterator *trimmer_it, const bt_message *msg)
930{
931 g_queue_push_head(trimmer_it->output_messages, (void *) msg);
932}
933
934static inline
935const bt_message *pop_message(struct trimmer_iterator *trimmer_it)
936{
937 return g_queue_pop_tail(trimmer_it->output_messages);
938}
939
940static inline
941int clock_raw_value_from_ns_from_origin(const bt_clock_class *clock_class,
942 int64_t ns_from_origin, uint64_t *raw_value)
943{
944
945 int64_t cc_offset_s;
946 uint64_t cc_offset_cycles;
947 uint64_t cc_freq;
948
949 bt_clock_class_get_offset(clock_class, &cc_offset_s, &cc_offset_cycles);
950 cc_freq = bt_clock_class_get_frequency(clock_class);
951 return bt_common_clock_value_from_ns_from_origin(cc_offset_s,
952 cc_offset_cycles, cc_freq, ns_from_origin, raw_value);
953}
954
955static inline
956bt_self_message_iterator_status end_stream(struct trimmer_iterator *trimmer_it,
957 struct trimmer_iterator_stream_state *sstate)
958{
959 bt_self_message_iterator_status status =
960 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
961 uint64_t raw_value;
962 const bt_clock_class *clock_class;
963 int ret;
964 bt_message *msg = NULL;
965
966 BT_ASSERT(!trimmer_it->end.is_infinite);
967
968 if (!sstate->stream) {
969 goto end;
970 }
971
972 if (sstate->cur_packet) {
973 /*
974 * The last message could not have been a stream
975 * activity end message if we have a current packet.
976 */
977 BT_ASSERT(!sstate->last_msg_is_stream_activity_end);
978
979 /*
980 * Create and push a packet end message, making its time
981 * the trimming range's end time.
982 */
983 clock_class = bt_stream_class_borrow_default_clock_class_const(
984 bt_stream_borrow_class_const(sstate->stream));
985 BT_ASSERT(clock_class);
986 ret = clock_raw_value_from_ns_from_origin(clock_class,
987 trimmer_it->end.ns_from_origin, &raw_value);
988 if (ret) {
989 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
990 goto end;
991 }
992
993 msg = bt_message_packet_end_create_with_default_clock_snapshot(
994 trimmer_it->self_msg_iter, sstate->cur_packet,
995 raw_value);
996 if (!msg) {
997 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
998 goto end;
999 }
1000
1001 push_message(trimmer_it, msg);
1002 msg = NULL;
1003 BT_PACKET_PUT_REF_AND_RESET(sstate->cur_packet);
1004
1005 /*
1006 * Because we generated a packet end message, set the
1007 * stream activity end message's time to use to the
1008 * trimming range's end time (this packet end message's
1009 * time).
1010 */
1011 sstate->stream_act_end_ns_from_origin =
1012 trimmer_it->end.ns_from_origin;
1013 }
1014
1015 if (!sstate->last_msg_is_stream_activity_end) {
1016 /* Create and push a stream activity end message */
1017 msg = bt_message_stream_activity_end_create(
1018 trimmer_it->self_msg_iter, sstate->stream);
1019 if (!msg) {
1020 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
1021 goto end;
1022 }
1023
1024 clock_class = bt_stream_class_borrow_default_clock_class_const(
1025 bt_stream_borrow_class_const(sstate->stream));
1026 BT_ASSERT(clock_class);
c24092f2
PP
1027
1028 if (sstate->stream_act_end_ns_from_origin == INT64_MIN) {
1029 /*
1030 * We received at least what is necessary to
1031 * have a stream state (stream beginning and
1032 * stream activity beginning messages), but
1033 * nothing else: use the trimmer range's end
1034 * time.
1035 */
1036 sstate->stream_act_end_ns_from_origin =
1037 trimmer_it->end.ns_from_origin;
1038 }
1039
781751d8
PP
1040 ret = clock_raw_value_from_ns_from_origin(clock_class,
1041 sstate->stream_act_end_ns_from_origin, &raw_value);
1042 if (ret) {
1043 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1044 goto end;
1045 }
1046
1047 bt_message_stream_activity_end_set_default_clock_snapshot(
1048 msg, raw_value);
1049 push_message(trimmer_it, msg);
1050 msg = NULL;
1051 }
1052
1053 /* Create and push a stream end message */
1054 msg = bt_message_stream_end_create(trimmer_it->self_msg_iter,
1055 sstate->stream);
1056 if (!msg) {
1057 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
1058 goto end;
1059 }
1060
1061 push_message(trimmer_it, msg);
1062 msg = NULL;
1063
1064 /*
1065 * Just to make sure that we don't use this stream state again
1066 * in the future without an obvious error.
1067 */
1068 sstate->stream = NULL;
1069
1070end:
1071 bt_message_put_ref(msg);
1072 return status;
1073}
1074
1075static inline
1076bt_self_message_iterator_status end_iterator_streams(
1077 struct trimmer_iterator *trimmer_it)
1078{
1079 bt_self_message_iterator_status status =
1080 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1081 GHashTableIter iter;
1082 gpointer key, sstate;
1083
1084 if (trimmer_it->end.is_infinite) {
1085 /*
1086 * An infinite trimming range's end time guarantees that
1087 * we received (and pushed) all the appropriate end
1088 * messages.
1089 */
1090 goto remove_all;
1091 }
1092
1093 /*
1094 * End each stream and then remove them from the hash table of
1095 * stream states to release unneeded references.
1096 */
1097 g_hash_table_iter_init(&iter, trimmer_it->stream_states);
1098
1099 while (g_hash_table_iter_next(&iter, &key, &sstate)) {
1100 status = end_stream(trimmer_it, sstate);
1101 if (status) {
1102 goto end;
1103 }
1104 }
1105
1106remove_all:
1107 g_hash_table_remove_all(trimmer_it->stream_states);
1108
1109end:
1110 return status;
1111}
1112
1113static inline
1114bt_self_message_iterator_status create_stream_beginning_activity_message(
1115 struct trimmer_iterator *trimmer_it,
1116 const bt_stream *stream,
1117 const bt_clock_class *clock_class, bt_message **msg)
1118{
1119 bt_self_message_iterator_status status =
1120 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1121
1122 BT_ASSERT(msg);
1123 BT_ASSERT(!trimmer_it->begin.is_infinite);
1124
1125 *msg = bt_message_stream_activity_beginning_create(
1126 trimmer_it->self_msg_iter, stream);
1127 if (!*msg) {
1128 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
1129 goto end;
1130 }
1131
1132 if (clock_class) {
1133 int ret;
1134 uint64_t raw_value;
1135
1136 ret = clock_raw_value_from_ns_from_origin(clock_class,
1137 trimmer_it->begin.ns_from_origin, &raw_value);
1138 if (ret) {
1139 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1140 bt_message_put_ref(*msg);
1141 goto end;
1142 }
1143
1144 bt_message_stream_activity_beginning_set_default_clock_snapshot(
1145 *msg, raw_value);
1146 }
1147
1148end:
1149 return status;
1150}
1151
1152/*
1153 * Makes sure to initialize a stream state, pushing the appropriate
1154 * initial messages.
1155 *
1156 * `stream_act_beginning_msg` is an initial stream activity beginning
1157 * message to potentially use, depending on its clock snapshot state.
1158 * This function consumes `stream_act_beginning_msg` unconditionally.
1159 */
1160static inline
1161bt_self_message_iterator_status ensure_stream_state_is_inited(
1162 struct trimmer_iterator *trimmer_it,
1163 struct trimmer_iterator_stream_state *sstate,
1164 const bt_message *stream_act_beginning_msg)
1165{
1166 bt_self_message_iterator_status status =
1167 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1168 bt_message *new_msg = NULL;
1169 const bt_clock_class *clock_class =
1170 bt_stream_class_borrow_default_clock_class_const(
1171 bt_stream_borrow_class_const(sstate->stream));
1172
1173 BT_ASSERT(!sstate->inited);
1174
1175 if (!sstate->stream_beginning_msg) {
1176 /* No initial stream beginning message: create one */
1177 sstate->stream_beginning_msg =
1178 bt_message_stream_beginning_create(
1179 trimmer_it->self_msg_iter, sstate->stream);
1180 if (!sstate->stream_beginning_msg) {
1181 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
1182 goto end;
1183 }
1184 }
1185
1186 /* Push initial stream beginning message */
1187 BT_ASSERT(sstate->stream_beginning_msg);
1188 push_message(trimmer_it, sstate->stream_beginning_msg);
1189 sstate->stream_beginning_msg = NULL;
1190
1191 if (stream_act_beginning_msg) {
1192 /*
1193 * Initial stream activity beginning message exists: if
1194 * its time is -inf, then create and push a new one
1195 * having the trimming range's beginning time. Otherwise
1196 * push it as is (known and unknown).
1197 */
1198 const bt_clock_snapshot *cs;
1199 bt_message_stream_activity_clock_snapshot_state sa_cs_state;
1200
1201 sa_cs_state = bt_message_stream_activity_beginning_borrow_default_clock_snapshot_const(
1202 stream_act_beginning_msg, &cs);
1203 if (sa_cs_state == BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_INFINITE &&
1204 !trimmer_it->begin.is_infinite) {
1205 /*
1206 * -inf time: use trimming range's beginning
1207 * time (which is not -inf).
1208 */
1209 status = create_stream_beginning_activity_message(
1210 trimmer_it, sstate->stream, clock_class,
1211 &new_msg);
1212 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1213 goto end;
1214 }
1215
1216 push_message(trimmer_it, new_msg);
1217 new_msg = NULL;
1218 } else {
1219 /* Known/unknown: push as is */
1220 push_message(trimmer_it, stream_act_beginning_msg);
1221 stream_act_beginning_msg = NULL;
1222 }
1223 } else {
1224 BT_ASSERT(!trimmer_it->begin.is_infinite);
1225
1226 /*
1227 * No stream beginning activity message: create and push
1228 * a new message.
1229 */
1230 status = create_stream_beginning_activity_message(
1231 trimmer_it, sstate->stream, clock_class, &new_msg);
1232 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1233 goto end;
1234 }
1235
1236 push_message(trimmer_it, new_msg);
1237 new_msg = NULL;
1238 }
1239
1240 sstate->inited = true;
1241
1242end:
1243 bt_message_put_ref(new_msg);
1244 bt_message_put_ref(stream_act_beginning_msg);
1245 return status;
1246}
1247
1248static inline
1249bt_self_message_iterator_status ensure_cur_packet_exists(
1250 struct trimmer_iterator *trimmer_it,
1251 struct trimmer_iterator_stream_state *sstate,
1252 const bt_packet *packet)
1253{
1254 bt_self_message_iterator_status status =
1255 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1256 int ret;
1257 const bt_clock_class *clock_class =
1258 bt_stream_class_borrow_default_clock_class_const(
1259 bt_stream_borrow_class_const(sstate->stream));
1260 bt_message *msg = NULL;
1261 uint64_t raw_value;
1262
1263 BT_ASSERT(!trimmer_it->begin.is_infinite);
1264 BT_ASSERT(!sstate->cur_packet);
1265
1266 /*
1267 * Create and push an initial packet beginning message,
1268 * making its time the trimming range's beginning time.
1269 */
1270 ret = clock_raw_value_from_ns_from_origin(clock_class,
1271 trimmer_it->begin.ns_from_origin, &raw_value);
1272 if (ret) {
1273 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1274 goto end;
1275 }
1276
1277 msg = bt_message_packet_beginning_create_with_default_clock_snapshot(
1278 trimmer_it->self_msg_iter, packet, raw_value);
1279 if (!msg) {
1280 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
1281 goto end;
1282 }
1283
1284 push_message(trimmer_it, msg);
1285 msg = NULL;
1286
1287 /* Set packet as this stream's current packet */
1288 sstate->cur_packet = packet;
1289 bt_packet_get_ref(sstate->cur_packet);
1290
1291end:
1292 bt_message_put_ref(msg);
1293 return status;
1294}
1295
1296/*
1297 * Handles a message which is associated to a given stream state. This
1298 * _could_ make the iterator's output message queue grow; this could
1299 * also consume the message without pushing anything to this queue, only
1300 * modifying the stream state.
1301 *
1302 * This function consumes the `msg` reference, _whatever the outcome_.
1303 *
1304 * `ns_from_origin` is the message's time, as given by
1305 * get_msg_ns_from_origin().
1306 *
1307 * This function sets `reached_end` if handling this message made the
1308 * iterator reach the end of the trimming range. Note that the output
1309 * message queue could contain messages even if this function sets
1310 * `reached_end`.
1311 */
1312static inline
1313bt_self_message_iterator_status handle_message_with_stream_state(
1314 struct trimmer_iterator *trimmer_it, const bt_message *msg,
1315 struct trimmer_iterator_stream_state *sstate,
1316 int64_t ns_from_origin, bool *reached_end)
1317{
1318 bt_self_message_iterator_status status =
1319 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1320 bt_message_type msg_type = bt_message_get_type(msg);
1321 int ret;
1322
1323 switch (msg_type) {
1324 case BT_MESSAGE_TYPE_EVENT:
1325 if (unlikely(!trimmer_it->end.is_infinite &&
1326 ns_from_origin > trimmer_it->end.ns_from_origin)) {
1327 status = end_iterator_streams(trimmer_it);
1328 *reached_end = true;
1329 break;
1330 }
1331
1332 if (unlikely(!sstate->inited)) {
1333 status = ensure_stream_state_is_inited(trimmer_it,
1334 sstate, NULL);
1335 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1336 goto end;
1337 }
1338 }
1339
1340 if (unlikely(!sstate->cur_packet)) {
1341 const bt_event *event =
1342 bt_message_event_borrow_event_const(msg);
1343 const bt_packet *packet = bt_event_borrow_packet_const(
1344 event);
1345
1346 status = ensure_cur_packet_exists(trimmer_it, sstate,
1347 packet);
1348 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1349 goto end;
1350 }
1351 }
1352
1353 BT_ASSERT(sstate->cur_packet);
1354 push_message(trimmer_it, msg);
1355 msg = NULL;
1356 break;
1357 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
1358 if (unlikely(!trimmer_it->end.is_infinite &&
1359 ns_from_origin > trimmer_it->end.ns_from_origin)) {
1360 status = end_iterator_streams(trimmer_it);
1361 *reached_end = true;
1362 break;
1363 }
1364
1365 if (unlikely(!sstate->inited)) {
1366 status = ensure_stream_state_is_inited(trimmer_it,
1367 sstate, NULL);
1368 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1369 goto end;
1370 }
1371 }
1372
1373 BT_ASSERT(!sstate->cur_packet);
1374 sstate->cur_packet =
1375 bt_message_packet_beginning_borrow_packet_const(msg);
1376 bt_packet_get_ref(sstate->cur_packet);
1377 push_message(trimmer_it, msg);
1378 msg = NULL;
1379 break;
1380 case BT_MESSAGE_TYPE_PACKET_END:
1381 sstate->stream_act_end_ns_from_origin = ns_from_origin;
1382
1383 if (unlikely(!trimmer_it->end.is_infinite &&
1384 ns_from_origin > trimmer_it->end.ns_from_origin)) {
1385 status = end_iterator_streams(trimmer_it);
1386 *reached_end = true;
1387 break;
1388 }
1389
1390 if (unlikely(!sstate->inited)) {
1391 status = ensure_stream_state_is_inited(trimmer_it,
1392 sstate, NULL);
1393 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1394 goto end;
1395 }
1396 }
1397
1398 if (unlikely(!sstate->cur_packet)) {
1399 const bt_packet *packet =
1400 bt_message_packet_end_borrow_packet_const(msg);
1401
1402 status = ensure_cur_packet_exists(trimmer_it, sstate,
1403 packet);
1404 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1405 goto end;
1406 }
1407 }
1408
1409 BT_ASSERT(sstate->cur_packet);
1410 BT_PACKET_PUT_REF_AND_RESET(sstate->cur_packet);
1411 push_message(trimmer_it, msg);
1412 msg = NULL;
1413 break;
1414 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
1415 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
1416 {
1417 /*
1418 * `ns_from_origin` is the message's time range's
1419 * beginning time here.
1420 */
1421 int64_t end_ns_from_origin;
1422 const bt_clock_snapshot *end_cs;
1423
1424 if (bt_message_get_type(msg) ==
1425 BT_MESSAGE_TYPE_DISCARDED_EVENTS) {
1426 /*
1427 * Safe to ignore the return value because we
1428 * know there's a default clock and it's always
1429 * known.
1430 */
1431 (void) bt_message_discarded_events_borrow_default_end_clock_snapshot_const(
1432 msg, &end_cs);
1433 } else {
1434 /*
1435 * Safe to ignore the return value because we
1436 * know there's a default clock and it's always
1437 * known.
1438 */
1439 (void) bt_message_discarded_packets_borrow_default_end_clock_snapshot_const(
1440 msg, &end_cs);
1441 }
1442
1443 if (bt_clock_snapshot_get_ns_from_origin(end_cs,
1444 &end_ns_from_origin)) {
1445 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1446 goto end;
1447 }
1448
1449 sstate->stream_act_end_ns_from_origin = end_ns_from_origin;
1450
1451 if (!trimmer_it->end.is_infinite &&
1452 ns_from_origin > trimmer_it->end.ns_from_origin) {
1453 status = end_iterator_streams(trimmer_it);
1454 *reached_end = true;
1455 break;
1456 }
1457
1458 if (!trimmer_it->end.is_infinite &&
1459 end_ns_from_origin > trimmer_it->end.ns_from_origin) {
1460 /*
1461 * This message's end time is outside the
1462 * trimming time range: replace it with a new
1463 * message having an end time equal to the
1464 * trimming time range's end and without a
1465 * count.
1466 */
1467 const bt_clock_class *clock_class =
1468 bt_clock_snapshot_borrow_clock_class_const(
1469 end_cs);
1470 const bt_clock_snapshot *begin_cs;
1471 bt_message *new_msg;
1472 uint64_t end_raw_value;
1473
1474 ret = clock_raw_value_from_ns_from_origin(clock_class,
1475 trimmer_it->end.ns_from_origin, &end_raw_value);
1476 if (ret) {
1477 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1478 goto end;
1479 }
1480
1481 if (msg_type == BT_MESSAGE_TYPE_DISCARDED_EVENTS) {
1482 (void) bt_message_discarded_events_borrow_default_beginning_clock_snapshot_const(
1483 msg, &begin_cs);
1484 new_msg = bt_message_discarded_events_create_with_default_clock_snapshots(
1485 trimmer_it->self_msg_iter,
1486 sstate->stream,
1487 bt_clock_snapshot_get_value(begin_cs),
1488 end_raw_value);
1489 } else {
1490 (void) bt_message_discarded_packets_borrow_default_beginning_clock_snapshot_const(
1491 msg, &begin_cs);
1492 new_msg = bt_message_discarded_packets_create_with_default_clock_snapshots(
1493 trimmer_it->self_msg_iter,
1494 sstate->stream,
1495 bt_clock_snapshot_get_value(begin_cs),
1496 end_raw_value);
1497 }
1498
1499 if (!new_msg) {
1500 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
1501 goto end;
1502 }
1503
1504 /* Replace the original message */
1505 BT_MESSAGE_MOVE_REF(msg, new_msg);
1506 }
1507
1508 if (unlikely(!sstate->inited)) {
1509 status = ensure_stream_state_is_inited(trimmer_it,
1510 sstate, NULL);
1511 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1512 goto end;
1513 }
1514 }
1515
1516 push_message(trimmer_it, msg);
1517 msg = NULL;
1518 break;
1519 }
1520 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING:
1521 if (!trimmer_it->end.is_infinite &&
1522 ns_from_origin > trimmer_it->end.ns_from_origin) {
1523 /*
1524 * This only happens when the message's time is
1525 * known and is greater than the trimming
1526 * range's end time. Unknown and -inf times are
1527 * always less than
1528 * `trimmer_it->end.ns_from_origin`.
1529 */
1530 status = end_iterator_streams(trimmer_it);
1531 *reached_end = true;
1532 break;
1533 }
1534
1535 if (!sstate->inited) {
1536 status = ensure_stream_state_is_inited(trimmer_it,
1537 sstate, msg);
1538 msg = NULL;
1539 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1540 goto end;
1541 }
1542 } else {
1543 push_message(trimmer_it, msg);
1544 msg = NULL;
1545 }
1546
1547 break;
1548 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END:
1549 if (trimmer_it->end.is_infinite) {
1550 push_message(trimmer_it, msg);
1551 msg = NULL;
1552 break;
1553 }
1554
1555 if (ns_from_origin == INT64_MIN) {
1556 /* Unknown: push as is if stream state is inited */
1557 if (sstate->inited) {
1558 push_message(trimmer_it, msg);
1559 msg = NULL;
1560 sstate->last_msg_is_stream_activity_end = true;
1561 }
1562 } else if (ns_from_origin == INT64_MAX) {
1563 /* Infinite: use trimming range's end time */
1564 sstate->stream_act_end_ns_from_origin =
1565 trimmer_it->end.ns_from_origin;
1566 } else {
1567 /* Known: check if outside of trimming range */
1568 if (ns_from_origin > trimmer_it->end.ns_from_origin) {
1569 sstate->stream_act_end_ns_from_origin =
1570 trimmer_it->end.ns_from_origin;
1571 status = end_iterator_streams(trimmer_it);
1572 *reached_end = true;
1573 break;
1574 }
1575
1576 if (!sstate->inited) {
1577 /*
1578 * First message for this stream is a
1579 * stream activity end: we can't deduce
1580 * anything about the stream activity
1581 * beginning's time, and using this
1582 * message's time would make a useless
1583 * pair of stream activity beginning/end
1584 * with the same time. Just skip this
1585 * message and wait for something
1586 * useful.
1587 */
1588 break;
1589 }
1590
1591 push_message(trimmer_it, msg);
1592 msg = NULL;
1593 sstate->last_msg_is_stream_activity_end = true;
1594 sstate->stream_act_end_ns_from_origin = ns_from_origin;
1595 }
1596
1597 break;
1598 case BT_MESSAGE_TYPE_STREAM_BEGINNING:
1599 /*
1600 * We don't know what follows at this point, so just
1601 * keep this message until we know what to do with it
1602 * (it will be used in ensure_stream_state_is_inited()).
1603 */
1604 BT_ASSERT(!sstate->inited);
1605 BT_MESSAGE_MOVE_REF(sstate->stream_beginning_msg, msg);
1606 break;
1607 case BT_MESSAGE_TYPE_STREAM_END:
1608 if (sstate->inited) {
1609 /*
1610 * This is the end of an inited stream: end this
1611 * stream if its stream activity end message
1612 * time is not the trimming range's end time
1613 * (which means the final stream activity end
1614 * message had an infinite time). end_stream()
1615 * will generate its own stream end message.
1616 */
1617 if (trimmer_it->end.is_infinite) {
1618 push_message(trimmer_it, msg);
1619 msg = NULL;
1620 g_hash_table_remove(trimmer_it->stream_states,
1621 sstate->stream);
1622 } else if (sstate->stream_act_end_ns_from_origin <
1623 trimmer_it->end.ns_from_origin) {
1624 status = end_stream(trimmer_it, sstate);
1625 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1626 goto end;
1627 }
1628
1629 /* We won't need this stream state again */
1630 g_hash_table_remove(trimmer_it->stream_states,
1631 sstate->stream);
1632 }
1633 } else {
1634 /* We dont't need this stream state anymore */
1635 g_hash_table_remove(trimmer_it->stream_states, sstate->stream);
1636 }
1637
1638 break;
1639 default:
1640 break;
1641 }
1642
1643end:
1644 /* We release the message's reference whatever the outcome */
1645 bt_message_put_ref(msg);
1646 return BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1647}
1648
1649/*
1650 * Handles an input message. This _could_ make the iterator's output
1651 * message queue grow; this could also consume the message without
1652 * pushing anything to this queue, only modifying the stream state.
1653 *
1654 * This function consumes the `msg` reference, _whatever the outcome_.
1655 *
1656 * This function sets `reached_end` if handling this message made the
1657 * iterator reach the end of the trimming range. Note that the output
1658 * message queue could contain messages even if this function sets
1659 * `reached_end`.
1660 */
1661static inline
1662bt_self_message_iterator_status handle_message(
1663 struct trimmer_iterator *trimmer_it, const bt_message *msg,
1664 bool *reached_end)
1665{
1666 bt_self_message_iterator_status status;
1667 const bt_stream *stream = NULL;
1668 int64_t ns_from_origin = INT64_MIN;
1669 bool skip;
1670 int ret;
1671 struct trimmer_iterator_stream_state *sstate = NULL;
1672
1673 /* Find message's associated stream */
1674 switch (bt_message_get_type(msg)) {
1675 case BT_MESSAGE_TYPE_EVENT:
1676 stream = bt_event_borrow_stream_const(
1677 bt_message_event_borrow_event_const(msg));
1678 break;
1679 case BT_MESSAGE_TYPE_PACKET_BEGINNING:
1680 stream = bt_packet_borrow_stream_const(
1681 bt_message_packet_beginning_borrow_packet_const(msg));
1682 break;
1683 case BT_MESSAGE_TYPE_PACKET_END:
1684 stream = bt_packet_borrow_stream_const(
1685 bt_message_packet_end_borrow_packet_const(msg));
1686 break;
1687 case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
1688 stream = bt_message_discarded_events_borrow_stream_const(msg);
1689 break;
1690 case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
1691 stream = bt_message_discarded_packets_borrow_stream_const(msg);
1692 break;
1693 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING:
1694 stream = bt_message_stream_activity_beginning_borrow_stream_const(msg);
1695 break;
1696 case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END:
1697 stream = bt_message_stream_activity_end_borrow_stream_const(msg);
1698 break;
1699 case BT_MESSAGE_TYPE_STREAM_BEGINNING:
1700 stream = bt_message_stream_beginning_borrow_stream_const(msg);
1701 break;
1702 case BT_MESSAGE_TYPE_STREAM_END:
1703 stream = bt_message_stream_end_borrow_stream_const(msg);
1704 break;
1705 default:
1706 break;
1707 }
1708
1709 if (likely(stream)) {
1710 /* Find stream state */
1711 sstate = g_hash_table_lookup(trimmer_it->stream_states,
1712 stream);
1713 if (unlikely(!sstate)) {
1714 /* No stream state yet: create one now */
1715 const bt_stream_class *sc;
1716
1717 /*
1718 * Validate right now that the stream's class
1719 * has a registered default clock class so that
1720 * an existing stream state guarantees existing
1721 * default clock snapshots for its associated
1722 * messages.
1723 *
1724 * Also check that clock snapshots are always
1725 * known.
1726 */
1727 sc = bt_stream_borrow_class_const(stream);
1728 if (!bt_stream_class_borrow_default_clock_class_const(sc)) {
1729 BT_LOGE("Unsupported stream: stream class does "
1730 "not have a default clock class: "
1731 "stream-addr=%p, "
1732 "stream-id=%" PRIu64 ", "
1733 "stream-name=\"%s\"",
1734 stream, bt_stream_get_id(stream),
1735 bt_stream_get_name(stream));
1736 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1737 goto end;
1738 }
1739
1740 if (!bt_stream_class_default_clock_is_always_known(sc)) {
1741 BT_LOGE("Unsupported stream: clock does not "
1742 "always have a known value: "
1743 "stream-addr=%p, "
1744 "stream-id=%" PRIu64 ", "
1745 "stream-name=\"%s\"",
1746 stream, bt_stream_get_id(stream),
1747 bt_stream_get_name(stream));
1748 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1749 goto end;
1750 }
1751
1752 sstate = g_new0(struct trimmer_iterator_stream_state,
1753 1);
1754 if (!sstate) {
1755 status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
1756 goto end;
1757 }
1758
1759 sstate->stream = stream;
1760 sstate->stream_act_end_ns_from_origin = INT64_MIN;
1761 g_hash_table_insert(trimmer_it->stream_states,
1762 (void *) stream, sstate);
1763 }
1764 }
1765
1766 /* Retrieve the message's time */
1767 ret = get_msg_ns_from_origin(msg, &ns_from_origin, &skip);
1768 if (unlikely(ret)) {
1769 status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
1770 goto end;
1771 }
1772
1773 if (likely(sstate)) {
1774 /* Message associated to a stream */
1775 status = handle_message_with_stream_state(trimmer_it, msg,
1776 sstate, ns_from_origin, reached_end);
1777
1778 /*
1779 * handle_message_with_stream_state() unconditionally
1780 * consumes `msg`.
1781 */
1782 msg = NULL;
1783 } else {
1784 /*
1785 * Message not associated to a stream (message iterator
1786 * inactivity).
1787 */
1788 if (unlikely(ns_from_origin > trimmer_it->end.ns_from_origin)) {
1789 BT_MESSAGE_PUT_REF_AND_RESET(msg);
1790 status = end_iterator_streams(trimmer_it);
1791 *reached_end = true;
1792 } else {
1793 push_message(trimmer_it, msg);
64e6ceea 1794 status = BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
781751d8
PP
1795 msg = NULL;
1796 }
1797 }
1798
1799end:
1800 /* We release the message's reference whatever the outcome */
1801 bt_message_put_ref(msg);
1802 return status;
1803}
1804
1805static inline
1806void fill_message_array_from_output_messages(
1807 struct trimmer_iterator *trimmer_it,
1808 bt_message_array_const msgs, uint64_t capacity, uint64_t *count)
1809{
1810 *count = 0;
1811
1812 /*
1813 * Move auto-seek messages to the output array (which is this
1814 * iterator's base message array).
1815 */
1816 while (capacity > 0 && !g_queue_is_empty(trimmer_it->output_messages)) {
1817 msgs[*count] = pop_message(trimmer_it);
1818 capacity--;
1819 (*count)++;
1820 }
1821
1822 BT_ASSERT(*count > 0);
1823}
1824
1825static inline
1826bt_self_message_iterator_status state_ending(
1827 struct trimmer_iterator *trimmer_it,
1828 bt_message_array_const msgs, uint64_t capacity,
1829 uint64_t *count)
1830{
1831 bt_self_message_iterator_status status =
1832 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1833
1834 if (g_queue_is_empty(trimmer_it->output_messages)) {
1835 trimmer_it->state = TRIMMER_ITERATOR_STATE_ENDED;
1836 status = BT_SELF_MESSAGE_ITERATOR_STATUS_END;
1837 goto end;
1838 }
1839
1840 fill_message_array_from_output_messages(trimmer_it, msgs,
1841 capacity, count);
1842
1843end:
1844 return status;
1845}
1846
1847static inline
1848bt_self_message_iterator_status state_trim(struct trimmer_iterator *trimmer_it,
1849 bt_message_array_const msgs, uint64_t capacity,
1850 uint64_t *count)
1851{
1852 bt_self_message_iterator_status status =
1853 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1854 bt_message_array_const my_msgs;
1855 uint64_t my_count;
1856 uint64_t i;
1857 bool reached_end = false;
1858
1859 while (g_queue_is_empty(trimmer_it->output_messages)) {
1860 status = (int) bt_self_component_port_input_message_iterator_next(
1861 trimmer_it->upstream_iter, &my_msgs, &my_count);
1862 if (unlikely(status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK)) {
1863 if (status == BT_SELF_MESSAGE_ITERATOR_STATUS_END) {
1864 status = end_iterator_streams(trimmer_it);
1865 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1866 goto end;
1867 }
1868
1869 trimmer_it->state =
1870 TRIMMER_ITERATOR_STATE_ENDING;
1871 status = state_ending(trimmer_it, msgs,
1872 capacity, count);
1873 }
1874
1875 goto end;
1876 }
1877
1878 BT_ASSERT(my_count > 0);
1879
1880 for (i = 0; i < my_count; i++) {
1881 status = handle_message(trimmer_it, my_msgs[i],
1882 &reached_end);
1883
1884 /*
1885 * handle_message() unconditionally consumes the
1886 * message reference.
1887 */
1888 my_msgs[i] = NULL;
1889
1890 if (unlikely(status !=
1891 BT_SELF_MESSAGE_ITERATOR_STATUS_OK)) {
1892 put_messages(my_msgs, my_count);
1893 goto end;
1894 }
1895
1896 if (unlikely(reached_end)) {
1897 /*
1898 * This message's time was passed the
1899 * trimming time range's end time: we
1900 * are done. Their might still be
1901 * messages in the output message queue,
1902 * so move to the "ending" state and
1903 * apply it immediately since
1904 * state_trim() is called within the
1905 * "next" method.
1906 */
1907 put_messages(my_msgs, my_count);
1908 trimmer_it->state =
1909 TRIMMER_ITERATOR_STATE_ENDING;
1910 status = state_ending(trimmer_it, msgs,
1911 capacity, count);
1912 goto end;
1913 }
1914 }
1915 }
1916
1917 /*
1918 * There's at least one message in the output message queue:
1919 * move the messages to the output message array.
1920 */
1921 BT_ASSERT(!g_queue_is_empty(trimmer_it->output_messages));
1922 fill_message_array_from_output_messages(trimmer_it, msgs,
1923 capacity, count);
1924
1925end:
1926 return status;
1927}
1928
1929BT_HIDDEN
1930bt_self_message_iterator_status trimmer_msg_iter_next(
1931 bt_self_message_iterator *self_msg_iter,
1932 bt_message_array_const msgs, uint64_t capacity,
1933 uint64_t *count)
1934{
1935 struct trimmer_iterator *trimmer_it =
1936 bt_self_message_iterator_get_data(self_msg_iter);
1937 bt_self_message_iterator_status status =
1938 BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
1939
1940 BT_ASSERT(trimmer_it);
1941
1942 if (likely(trimmer_it->state == TRIMMER_ITERATOR_STATE_TRIM)) {
1943 status = state_trim(trimmer_it, msgs, capacity, count);
1944 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1945 goto end;
1946 }
1947 } else {
1948 switch (trimmer_it->state) {
1949 case TRIMMER_ITERATOR_STATE_SET_BOUNDS_NS_FROM_ORIGIN:
1950 status = state_set_trimmer_iterator_bounds(trimmer_it);
1951 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1952 goto end;
1953 }
1954
1955 status = state_seek_initially(trimmer_it);
1956 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1957 goto end;
1958 }
1959
1960 status = state_trim(trimmer_it, msgs, capacity, count);
1961 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1962 goto end;
1963 }
1964
1965 break;
1966 case TRIMMER_ITERATOR_STATE_SEEK_INITIALLY:
1967 status = state_seek_initially(trimmer_it);
1968 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1969 goto end;
1970 }
1971
1972 status = state_trim(trimmer_it, msgs, capacity, count);
1973 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1974 goto end;
1975 }
1976
1977 break;
1978 case TRIMMER_ITERATOR_STATE_ENDING:
1979 status = state_ending(trimmer_it, msgs, capacity,
1980 count);
1981 if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
1982 goto end;
1983 }
1984
1985 break;
1986 case TRIMMER_ITERATOR_STATE_ENDED:
1987 status = BT_SELF_MESSAGE_ITERATOR_STATUS_END;
1988 break;
1989 default:
1990 abort();
1991 }
1992 }
1993
1994end:
1995 return status;
1996}
1997
1998BT_HIDDEN
1999void trimmer_msg_iter_finalize(bt_self_message_iterator *self_msg_iter)
2000{
2001 struct trimmer_iterator *trimmer_it =
2002 bt_self_message_iterator_get_data(self_msg_iter);
2003
2004 BT_ASSERT(trimmer_it);
2005 destroy_trimmer_iterator(trimmer_it);
2006}
This page took 0.114424 seconds and 4 git commands to generate.