Fix: emf uri: surround by " "
[babeltrace.git] / lib / iterator.c
... / ...
CommitLineData
1/*
2 * iterator.c
3 *
4 * Babeltrace Library
5 *
6 * Copyright 2010-2011 EfficiOS Inc. and Linux Foundation
7 *
8 * Author: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
9 *
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
16 *
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
19 */
20
21#include <stdlib.h>
22#include <babeltrace/babeltrace.h>
23#include <babeltrace/context.h>
24#include <babeltrace/context-internal.h>
25#include <babeltrace/iterator-internal.h>
26#include <babeltrace/iterator.h>
27#include <babeltrace/prio_heap.h>
28#include <babeltrace/ctf/metadata.h>
29#include <babeltrace/ctf/events.h>
30#include <inttypes.h>
31
32static int babeltrace_filestream_seek(struct ctf_file_stream *file_stream,
33 const struct bt_iter_pos *begin_pos,
34 unsigned long stream_id);
35
36struct stream_saved_pos {
37 /*
38 * Use file_stream pointer to check if the trace collection we
39 * restore to match the one we saved from, for each stream.
40 */
41 struct ctf_file_stream *file_stream;
42 size_t cur_index; /* current index in packet index */
43 ssize_t offset; /* offset from base, in bits. EOF for end of file. */
44 uint64_t current_real_timestamp;
45 uint64_t current_cycles_timestamp;
46};
47
48struct bt_saved_pos {
49 struct trace_collection *tc;
50 GArray *stream_saved_pos; /* Contains struct stream_saved_pos */
51};
52
53static int stream_read_event(struct ctf_file_stream *sin)
54{
55 int ret;
56
57 ret = sin->pos.parent.event_cb(&sin->pos.parent, &sin->parent);
58 if (ret == EOF)
59 return EOF;
60 else if (ret) {
61 fprintf(stderr, "[error] Reading event failed.\n");
62 return ret;
63 }
64 return 0;
65}
66
67/*
68 * returns true if a < b, false otherwise.
69 */
70static int stream_compare(void *a, void *b)
71{
72 struct ctf_file_stream *s_a = a, *s_b = b;
73
74 if (s_a->parent.real_timestamp < s_b->parent.real_timestamp)
75 return 1;
76 else
77 return 0;
78}
79
80void bt_iter_free_pos(struct bt_iter_pos *iter_pos)
81{
82 if (!iter_pos)
83 return;
84
85 if (iter_pos->type == BT_SEEK_RESTORE && iter_pos->u.restore) {
86 if (iter_pos->u.restore->stream_saved_pos) {
87 g_array_free(
88 iter_pos->u.restore->stream_saved_pos,
89 TRUE);
90 }
91 g_free(iter_pos->u.restore);
92 }
93 g_free(iter_pos);
94}
95
96/*
97 * seek_file_stream_by_timestamp
98 *
99 * Browse a filestream by index, if an index contains the timestamp passed in
100 * argument, seek inside the corresponding packet it until we find the event we
101 * are looking for (either the exact timestamp or the event just after the
102 * timestamp).
103 *
104 * Return 0 if the seek succeded, EOF if we didn't find any packet
105 * containing the timestamp, or a positive integer for error.
106 *
107 * TODO: this should be turned into a binary search! It is currently
108 * doing a linear search in the packets. This is a O(n) operation on a
109 * very frequent code path.
110 */
111static int seek_file_stream_by_timestamp(struct ctf_file_stream *cfs,
112 uint64_t timestamp)
113{
114 struct ctf_stream_pos *stream_pos;
115 struct packet_index *index;
116 int i, ret;
117
118 stream_pos = &cfs->pos;
119 for (i = 0; i < stream_pos->packet_real_index->len; i++) {
120 index = &g_array_index(stream_pos->packet_real_index,
121 struct packet_index, i);
122 if (index->timestamp_end < timestamp)
123 continue;
124
125 stream_pos->packet_seek(&stream_pos->parent, i, SEEK_SET);
126 do {
127 ret = stream_read_event(cfs);
128 } while (cfs->parent.real_timestamp < timestamp && ret == 0);
129
130 /* Can return either EOF, 0, or error (> 0). */
131 return ret;
132 }
133 /*
134 * Cannot find the timestamp within the stream packets, return
135 * EOF.
136 */
137 return EOF;
138}
139
140/*
141 * seek_ctf_trace_by_timestamp : for each file stream, seek to the event with
142 * the corresponding timestamp
143 *
144 * Return 0 on success.
145 * If the timestamp is not part of any file stream, return EOF to inform the
146 * user the timestamp is out of the scope.
147 * On other errors, return positive value.
148 */
149static int seek_ctf_trace_by_timestamp(struct ctf_trace *tin,
150 uint64_t timestamp, struct ptr_heap *stream_heap)
151{
152 int i, j, ret;
153 int found = 0;
154
155 /* for each stream_class */
156 for (i = 0; i < tin->streams->len; i++) {
157 struct ctf_stream_declaration *stream_class;
158
159 stream_class = g_ptr_array_index(tin->streams, i);
160 if (!stream_class)
161 continue;
162 /* for each file_stream */
163 for (j = 0; j < stream_class->streams->len; j++) {
164 struct ctf_stream_definition *stream;
165 struct ctf_file_stream *cfs;
166
167 stream = g_ptr_array_index(stream_class->streams, j);
168 if (!stream)
169 continue;
170 cfs = container_of(stream, struct ctf_file_stream,
171 parent);
172 ret = seek_file_stream_by_timestamp(cfs, timestamp);
173 if (ret == 0) {
174 /* Add to heap */
175 ret = heap_insert(stream_heap, cfs);
176 if (ret) {
177 /* Return positive error. */
178 return -ret;
179 }
180 found = 1;
181 } else if (ret > 0) {
182 /*
183 * Error in seek (not EOF), failure.
184 */
185 return ret;
186 }
187 /* on EOF just do not put stream into heap. */
188 }
189 }
190
191 return found ? 0 : EOF;
192}
193
194/*
195 * Find timestamp of last event in the stream.
196 *
197 * Return value: 0 if OK, positive error value on error, EOF if no
198 * events were found.
199 */
200static int find_max_timestamp_ctf_file_stream(struct ctf_file_stream *cfs,
201 uint64_t *timestamp_end)
202{
203 int ret, count = 0, i;
204 uint64_t timestamp = 0;
205 struct ctf_stream_pos *stream_pos;
206
207 stream_pos = &cfs->pos;
208 /*
209 * We start by the last packet, and iterate backwards until we
210 * either find at least one event, or we reach the first packet
211 * (some packets can be empty).
212 */
213 for (i = stream_pos->packet_real_index->len - 1; i >= 0; i--) {
214 stream_pos->packet_seek(&stream_pos->parent, i, SEEK_SET);
215 count = 0;
216 /* read each event until we reach the end of the stream */
217 do {
218 ret = stream_read_event(cfs);
219 if (ret == 0) {
220 count++;
221 timestamp = cfs->parent.real_timestamp;
222 }
223 } while (ret == 0);
224
225 /* Error */
226 if (ret > 0)
227 goto end;
228 assert(ret == EOF);
229 if (count)
230 break;
231 }
232
233 if (count) {
234 *timestamp_end = timestamp;
235 ret = 0;
236 } else {
237 /* Return EOF if no events were found */
238 ret = EOF;
239 }
240end:
241 return ret;
242}
243
244/*
245 * Find the stream within a stream class that contains the event with
246 * the largest timestamp, and save that timestamp.
247 *
248 * Return 0 if OK, EOF if no events were found in the streams, or
249 * positive value on error.
250 */
251static int find_max_timestamp_ctf_stream_class(
252 struct ctf_stream_declaration *stream_class,
253 struct ctf_file_stream **cfsp,
254 uint64_t *max_timestamp)
255{
256 int ret = EOF, i;
257
258 for (i = 0; i < stream_class->streams->len; i++) {
259 struct ctf_stream_definition *stream;
260 struct ctf_file_stream *cfs;
261 uint64_t current_max_ts = 0;
262
263 stream = g_ptr_array_index(stream_class->streams, i);
264 if (!stream)
265 continue;
266 cfs = container_of(stream, struct ctf_file_stream, parent);
267 ret = find_max_timestamp_ctf_file_stream(cfs, &current_max_ts);
268 if (ret == EOF)
269 continue;
270 if (ret != 0)
271 break;
272 if (current_max_ts >= *max_timestamp) {
273 *max_timestamp = current_max_ts;
274 *cfsp = cfs;
275 }
276 }
277 assert(ret >= 0 || ret == EOF);
278 return ret;
279}
280
281/*
282 * seek_last_ctf_trace_collection: seek trace collection to last event.
283 *
284 * Return 0 if OK, EOF if no events were found, or positive error value
285 * on error.
286 */
287static int seek_last_ctf_trace_collection(struct trace_collection *tc,
288 struct ctf_file_stream **cfsp)
289{
290 int i, j, ret;
291 int found = 0;
292 uint64_t max_timestamp = 0;
293
294 if (!tc)
295 return 1;
296
297 /* For each trace in the trace_collection */
298 for (i = 0; i < tc->array->len; i++) {
299 struct ctf_trace *tin;
300 struct trace_descriptor *td_read;
301
302 td_read = g_ptr_array_index(tc->array, i);
303 if (!td_read)
304 continue;
305 tin = container_of(td_read, struct ctf_trace, parent);
306 /* For each stream_class in the trace */
307 for (j = 0; j < tin->streams->len; j++) {
308 struct ctf_stream_declaration *stream_class;
309
310 stream_class = g_ptr_array_index(tin->streams, j);
311 if (!stream_class)
312 continue;
313 ret = find_max_timestamp_ctf_stream_class(stream_class,
314 cfsp, &max_timestamp);
315 if (ret > 0)
316 goto end;
317 if (ret == 0)
318 found = 1;
319 assert(ret == EOF || ret == 0);
320 }
321 }
322 /*
323 * Now we know in which file stream the last event is located,
324 * and we know its timestamp.
325 */
326 if (!found) {
327 ret = EOF;
328 } else {
329 ret = seek_file_stream_by_timestamp(*cfsp, max_timestamp);
330 assert(ret == 0);
331 }
332end:
333 return ret;
334}
335
336int bt_iter_set_pos(struct bt_iter *iter, const struct bt_iter_pos *iter_pos)
337{
338 struct trace_collection *tc;
339 int i, ret;
340
341 if (!iter || !iter_pos)
342 return -EINVAL;
343
344 switch (iter_pos->type) {
345 case BT_SEEK_RESTORE:
346 if (!iter_pos->u.restore)
347 return -EINVAL;
348
349 heap_free(iter->stream_heap);
350 ret = heap_init(iter->stream_heap, 0, stream_compare);
351 if (ret < 0)
352 goto error_heap_init;
353
354 for (i = 0; i < iter_pos->u.restore->stream_saved_pos->len;
355 i++) {
356 struct stream_saved_pos *saved_pos;
357 struct ctf_stream_pos *stream_pos;
358 struct ctf_stream_definition *stream;
359
360 saved_pos = &g_array_index(
361 iter_pos->u.restore->stream_saved_pos,
362 struct stream_saved_pos, i);
363 stream = &saved_pos->file_stream->parent;
364 stream_pos = &saved_pos->file_stream->pos;
365
366 stream_pos->packet_seek(&stream_pos->parent,
367 saved_pos->cur_index, SEEK_SET);
368
369 /*
370 * the timestamp needs to be restored after
371 * packet_seek, because this function resets
372 * the timestamp to the beginning of the packet
373 */
374 stream->real_timestamp = saved_pos->current_real_timestamp;
375 stream->cycles_timestamp = saved_pos->current_cycles_timestamp;
376 stream_pos->offset = saved_pos->offset;
377 stream_pos->last_offset = LAST_OFFSET_POISON;
378
379 stream->prev_real_timestamp = 0;
380 stream->prev_real_timestamp_end = 0;
381 stream->prev_cycles_timestamp = 0;
382 stream->prev_cycles_timestamp_end = 0;
383
384 printf_debug("restored to cur_index = %zd and "
385 "offset = %zd, timestamp = %" PRIu64 "\n",
386 stream_pos->cur_index,
387 stream_pos->offset, stream->real_timestamp);
388
389 ret = stream_read_event(saved_pos->file_stream);
390 if (ret != 0) {
391 goto error;
392 }
393
394 /* Add to heap */
395 ret = heap_insert(iter->stream_heap,
396 saved_pos->file_stream);
397 if (ret)
398 goto error;
399 }
400 return 0;
401 case BT_SEEK_TIME:
402 tc = iter->ctx->tc;
403
404 heap_free(iter->stream_heap);
405 ret = heap_init(iter->stream_heap, 0, stream_compare);
406 if (ret < 0)
407 goto error_heap_init;
408
409 /* for each trace in the trace_collection */
410 for (i = 0; i < tc->array->len; i++) {
411 struct ctf_trace *tin;
412 struct trace_descriptor *td_read;
413
414 td_read = g_ptr_array_index(tc->array, i);
415 if (!td_read)
416 continue;
417 tin = container_of(td_read, struct ctf_trace, parent);
418
419 ret = seek_ctf_trace_by_timestamp(tin,
420 iter_pos->u.seek_time,
421 iter->stream_heap);
422 /*
423 * Positive errors are failure. Negative value
424 * is EOF (for which we continue with other
425 * traces). 0 is success. Note: on EOF, it just
426 * means that no stream has been added to the
427 * iterator for that trace, which is fine.
428 */
429 if (ret != 0 && ret != EOF)
430 goto error;
431 }
432 return 0;
433 case BT_SEEK_BEGIN:
434 tc = iter->ctx->tc;
435 heap_free(iter->stream_heap);
436 ret = heap_init(iter->stream_heap, 0, stream_compare);
437 if (ret < 0)
438 goto error_heap_init;
439
440 for (i = 0; i < tc->array->len; i++) {
441 struct ctf_trace *tin;
442 struct trace_descriptor *td_read;
443 int stream_id;
444
445 td_read = g_ptr_array_index(tc->array, i);
446 if (!td_read)
447 continue;
448 tin = container_of(td_read, struct ctf_trace, parent);
449
450 /* Populate heap with each stream */
451 for (stream_id = 0; stream_id < tin->streams->len;
452 stream_id++) {
453 struct ctf_stream_declaration *stream;
454 int filenr;
455
456 stream = g_ptr_array_index(tin->streams,
457 stream_id);
458 if (!stream)
459 continue;
460 for (filenr = 0; filenr < stream->streams->len;
461 filenr++) {
462 struct ctf_file_stream *file_stream;
463 file_stream = g_ptr_array_index(
464 stream->streams,
465 filenr);
466 if (!file_stream)
467 continue;
468 ret = babeltrace_filestream_seek(
469 file_stream, iter_pos,
470 stream_id);
471 if (ret != 0 && ret != EOF) {
472 goto error;
473 }
474 ret = heap_insert(iter->stream_heap, file_stream);
475 if (ret)
476 goto error;
477 }
478 }
479 }
480 break;
481 case BT_SEEK_LAST:
482 {
483 struct ctf_file_stream *cfs = NULL;
484
485 tc = iter->ctx->tc;
486 ret = seek_last_ctf_trace_collection(tc, &cfs);
487 if (ret != 0 || !cfs)
488 goto error;
489 /* remove all streams from the heap */
490 heap_free(iter->stream_heap);
491 /* Create a new empty heap */
492 ret = heap_init(iter->stream_heap, 0, stream_compare);
493 if (ret < 0)
494 goto error;
495 /* Insert the stream that contains the last event */
496 ret = heap_insert(iter->stream_heap, cfs);
497 if (ret)
498 goto error;
499 break;
500 }
501 default:
502 /* not implemented */
503 return -EINVAL;
504 }
505
506 return 0;
507
508error:
509 heap_free(iter->stream_heap);
510error_heap_init:
511 if (heap_init(iter->stream_heap, 0, stream_compare) < 0) {
512 heap_free(iter->stream_heap);
513 g_free(iter->stream_heap);
514 iter->stream_heap = NULL;
515 ret = -ENOMEM;
516 }
517
518 return ret;
519}
520
521struct bt_iter_pos *bt_iter_get_pos(struct bt_iter *iter)
522{
523 struct bt_iter_pos *pos;
524 struct trace_collection *tc;
525 struct ctf_file_stream *file_stream = NULL, *removed;
526 struct ptr_heap iter_heap_copy;
527 int ret;
528
529 if (!iter)
530 return NULL;
531
532 tc = iter->ctx->tc;
533 pos = g_new0(struct bt_iter_pos, 1);
534 pos->type = BT_SEEK_RESTORE;
535 pos->u.restore = g_new0(struct bt_saved_pos, 1);
536 pos->u.restore->tc = tc;
537 pos->u.restore->stream_saved_pos = g_array_new(FALSE, TRUE,
538 sizeof(struct stream_saved_pos));
539 if (!pos->u.restore->stream_saved_pos)
540 goto error;
541
542 ret = heap_copy(&iter_heap_copy, iter->stream_heap);
543 if (ret < 0)
544 goto error_heap;
545
546 /* iterate over each stream in the heap */
547 file_stream = heap_maximum(&iter_heap_copy);
548 while (file_stream != NULL) {
549 struct stream_saved_pos saved_pos;
550
551 assert(file_stream->pos.last_offset != LAST_OFFSET_POISON);
552 saved_pos.offset = file_stream->pos.last_offset;
553 saved_pos.file_stream = file_stream;
554 saved_pos.cur_index = file_stream->pos.cur_index;
555
556 saved_pos.current_real_timestamp = file_stream->parent.real_timestamp;
557 saved_pos.current_cycles_timestamp = file_stream->parent.cycles_timestamp;
558
559 g_array_append_val(
560 pos->u.restore->stream_saved_pos,
561 saved_pos);
562
563 printf_debug("stream : %" PRIu64 ", cur_index : %zd, "
564 "offset : %zd, "
565 "timestamp = %" PRIu64 "\n",
566 file_stream->parent.stream_id,
567 saved_pos.cur_index, saved_pos.offset,
568 saved_pos.current_real_timestamp);
569
570 /* remove the stream from the heap copy */
571 removed = heap_remove(&iter_heap_copy);
572 assert(removed == file_stream);
573
574 file_stream = heap_maximum(&iter_heap_copy);
575 }
576 heap_free(&iter_heap_copy);
577 return pos;
578
579error_heap:
580 g_array_free(pos->u.restore->stream_saved_pos, TRUE);
581error:
582 g_free(pos);
583 return NULL;
584}
585
586struct bt_iter_pos *bt_iter_create_time_pos(struct bt_iter *iter,
587 uint64_t timestamp)
588{
589 struct bt_iter_pos *pos;
590
591 if (!iter)
592 return NULL;
593
594 pos = g_new0(struct bt_iter_pos, 1);
595 pos->type = BT_SEEK_TIME;
596 pos->u.seek_time = timestamp;
597 return pos;
598}
599
600/*
601 * babeltrace_filestream_seek: seek a filestream to given position.
602 *
603 * The stream_id parameter is only useful for BT_SEEK_RESTORE.
604 */
605static int babeltrace_filestream_seek(struct ctf_file_stream *file_stream,
606 const struct bt_iter_pos *begin_pos,
607 unsigned long stream_id)
608{
609 int ret = 0;
610
611 if (!file_stream || !begin_pos)
612 return -EINVAL;
613
614 switch (begin_pos->type) {
615 case BT_SEEK_CUR:
616 /*
617 * just insert into the heap we should already know
618 * the timestamps
619 */
620 break;
621 case BT_SEEK_BEGIN:
622 file_stream->pos.packet_seek(&file_stream->pos.parent,
623 0, SEEK_SET);
624 ret = stream_read_event(file_stream);
625 break;
626 case BT_SEEK_TIME:
627 case BT_SEEK_RESTORE:
628 default:
629 assert(0); /* Not yet defined */
630 }
631
632 return ret;
633}
634
635int bt_iter_init(struct bt_iter *iter,
636 struct bt_context *ctx,
637 const struct bt_iter_pos *begin_pos,
638 const struct bt_iter_pos *end_pos)
639{
640 int i, stream_id;
641 int ret = 0;
642
643 if (!iter || !ctx)
644 return -EINVAL;
645
646 if (ctx->current_iterator) {
647 ret = -1;
648 goto error_ctx;
649 }
650
651 iter->stream_heap = g_new(struct ptr_heap, 1);
652 iter->end_pos = end_pos;
653 bt_context_get(ctx);
654 iter->ctx = ctx;
655
656 ret = heap_init(iter->stream_heap, 0, stream_compare);
657 if (ret < 0)
658 goto error_heap_init;
659
660 for (i = 0; i < ctx->tc->array->len; i++) {
661 struct ctf_trace *tin;
662 struct trace_descriptor *td_read;
663
664 td_read = g_ptr_array_index(ctx->tc->array, i);
665 if (!td_read)
666 continue;
667 tin = container_of(td_read, struct ctf_trace, parent);
668
669 /* Populate heap with each stream */
670 for (stream_id = 0; stream_id < tin->streams->len;
671 stream_id++) {
672 struct ctf_stream_declaration *stream;
673 int filenr;
674
675 stream = g_ptr_array_index(tin->streams, stream_id);
676 if (!stream)
677 continue;
678 for (filenr = 0; filenr < stream->streams->len;
679 filenr++) {
680 struct ctf_file_stream *file_stream;
681
682 file_stream = g_ptr_array_index(stream->streams,
683 filenr);
684 if (!file_stream)
685 continue;
686 if (begin_pos) {
687 ret = babeltrace_filestream_seek(
688 file_stream,
689 begin_pos,
690 stream_id);
691 } else {
692 struct bt_iter_pos pos;
693 pos.type = BT_SEEK_BEGIN;
694 ret = babeltrace_filestream_seek(
695 file_stream, &pos,
696 stream_id);
697 }
698 if (ret == EOF) {
699 ret = 0;
700 continue;
701 } else if (ret) {
702 goto error;
703 }
704 /* Add to heap */
705 ret = heap_insert(iter->stream_heap, file_stream);
706 if (ret)
707 goto error;
708 }
709 }
710 }
711
712 ctx->current_iterator = iter;
713 return 0;
714
715error:
716 heap_free(iter->stream_heap);
717error_heap_init:
718 g_free(iter->stream_heap);
719 iter->stream_heap = NULL;
720error_ctx:
721 return ret;
722}
723
724struct bt_iter *bt_iter_create(struct bt_context *ctx,
725 const struct bt_iter_pos *begin_pos,
726 const struct bt_iter_pos *end_pos)
727{
728 struct bt_iter *iter;
729 int ret;
730
731 if (!ctx)
732 return NULL;
733
734 iter = g_new0(struct bt_iter, 1);
735 ret = bt_iter_init(iter, ctx, begin_pos, end_pos);
736 if (ret) {
737 g_free(iter);
738 return NULL;
739 }
740 return iter;
741}
742
743void bt_iter_fini(struct bt_iter *iter)
744{
745 assert(iter);
746 if (iter->stream_heap) {
747 heap_free(iter->stream_heap);
748 g_free(iter->stream_heap);
749 }
750 iter->ctx->current_iterator = NULL;
751 bt_context_put(iter->ctx);
752}
753
754void bt_iter_destroy(struct bt_iter *iter)
755{
756 assert(iter);
757 bt_iter_fini(iter);
758 g_free(iter);
759}
760
761int bt_iter_next(struct bt_iter *iter)
762{
763 struct ctf_file_stream *file_stream, *removed;
764 int ret;
765
766 if (!iter)
767 return -EINVAL;
768
769 file_stream = heap_maximum(iter->stream_heap);
770 if (!file_stream) {
771 /* end of file for all streams */
772 ret = 0;
773 goto end;
774 }
775
776 ret = stream_read_event(file_stream);
777 if (ret == EOF) {
778 removed = heap_remove(iter->stream_heap);
779 assert(removed == file_stream);
780 ret = 0;
781 goto end;
782 } else if (ret) {
783 goto end;
784 }
785 /* Reinsert the file stream into the heap, and rebalance. */
786 removed = heap_replace_max(iter->stream_heap, file_stream);
787 assert(removed == file_stream);
788
789end:
790 return ret;
791}
This page took 0.025342 seconds and 4 git commands to generate.