Fix : add the missing seek begin
[babeltrace.git] / lib / iterator.c
CommitLineData
6f3077a2
JD
1/*
2 * iterator.c
3 *
4 * Babeltrace Library
5 *
6 * Copyright 2010-2011 EfficiOS Inc. and Linux Foundation
7 *
8 * Author: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
9 *
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
16 *
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
19 */
20
21#include <stdlib.h>
22#include <babeltrace/babeltrace.h>
95d36295 23#include <babeltrace/context.h>
08c22d05 24#include <babeltrace/context-internal.h>
6f3077a2 25#include <babeltrace/iterator-internal.h>
6204d33c 26#include <babeltrace/iterator.h>
6f3077a2 27#include <babeltrace/prio_heap.h>
e4195791 28#include <babeltrace/ctf/metadata.h>
9843982d 29#include <babeltrace/ctf/events.h>
90fcbacc 30#include <inttypes.h>
6f3077a2 31
55c21ea5
JD
32static int babeltrace_filestream_seek(struct ctf_file_stream *file_stream,
33 const struct bt_iter_pos *begin_pos,
34 unsigned long stream_id);
35
6f3077a2
JD
36struct stream_saved_pos {
37 /*
38 * Use file_stream pointer to check if the trace collection we
39 * restore to match the one we saved from, for each stream.
40 */
41 struct ctf_file_stream *file_stream;
42 size_t cur_index; /* current index in packet index */
43 ssize_t offset; /* offset from base, in bits. EOF for end of file. */
90fcbacc 44 uint64_t current_timestamp;
6f3077a2
JD
45};
46
e8c92a62 47struct bt_saved_pos {
6f3077a2
JD
48 struct trace_collection *tc;
49 GArray *stream_saved_pos; /* Contains struct stream_saved_pos */
50};
51
52static int stream_read_event(struct ctf_file_stream *sin)
53{
54 int ret;
55
56 ret = sin->pos.parent.event_cb(&sin->pos.parent, &sin->parent);
57 if (ret == EOF)
58 return EOF;
59 else if (ret) {
3394d22e 60 fprintf(stderr, "[error] Reading event failed.\n");
6f3077a2
JD
61 return ret;
62 }
63 return 0;
64}
65
66/*
67 * returns true if a < b, false otherwise.
68 */
69int stream_compare(void *a, void *b)
70{
71 struct ctf_file_stream *s_a = a, *s_b = b;
72
73 if (s_a->parent.timestamp < s_b->parent.timestamp)
74 return 1;
75 else
76 return 0;
77}
78
90fcbacc
JD
79void bt_iter_free_pos(struct bt_iter_pos *iter_pos)
80{
6cba487f
MD
81 if (!iter_pos)
82 return;
83
84 if (iter_pos->u.restore) {
85 if (iter_pos->u.restore->stream_saved_pos) {
86 g_array_free(
87 iter_pos->u.restore->stream_saved_pos,
88 TRUE);
90fcbacc 89 }
6cba487f 90 g_free(iter_pos->u.restore);
90fcbacc 91 }
6cba487f 92 g_free(iter_pos);
90fcbacc
JD
93}
94
dc81689e
JD
95/*
96 * seek_file_stream_by_timestamp
97 *
98 * Browse a filestream by index, if an index contains the timestamp passed in
99 * argument, seek inside the corresponding packet it until we find the event we
100 * are looking for (either the exact timestamp or the event just after the
101 * timestamp).
102 *
103 * Return 0 if the seek succeded and EOF if we didn't find any packet
104 * containing the timestamp.
105 */
106static int seek_file_stream_by_timestamp(struct ctf_file_stream *cfs,
107 uint64_t timestamp)
108{
109 struct ctf_stream_pos *stream_pos;
110 struct packet_index *index;
111 int i, ret;
112
113 stream_pos = &cfs->pos;
114 for (i = 0; i < stream_pos->packet_index->len; i++) {
115 index = &g_array_index(stream_pos->packet_index,
116 struct packet_index, i);
48d5f76c 117 if (index->timestamp_end <= timestamp)
dc81689e
JD
118 continue;
119
20d0dcf9 120 stream_pos->packet_seek(&stream_pos->parent, i, SEEK_SET);
dc81689e
JD
121 while (cfs->parent.timestamp < timestamp) {
122 ret = stream_read_event(cfs);
123 if (ret < 0)
124 break;
125 }
126 return 0;
127 }
128 return EOF;
129}
130
131/*
132 * seek_ctf_trace_by_timestamp : for each file stream, seek to the event with
133 * the corresponding timestamp
134 *
135 * Return 0 on success.
136 * If the timestamp is not part of any file stream, return EOF to inform the
137 * user the timestamp is out of the scope
138 */
139static int seek_ctf_trace_by_timestamp(struct ctf_trace *tin,
140 uint64_t timestamp, struct ptr_heap *stream_heap)
141{
142 int i, j, ret;
143 int found = EOF;
144
145 /* for each stream_class */
146 for (i = 0; i < tin->streams->len; i++) {
147 struct ctf_stream_class *stream_class;
148
149 stream_class = g_ptr_array_index(tin->streams, i);
150 /* for each file_stream */
151 for (j = 0; j < stream_class->streams->len; j++) {
152 struct ctf_stream *stream;
153 struct ctf_file_stream *cfs;
154
155 stream = g_ptr_array_index(stream_class->streams, j);
156 cfs = container_of(stream, struct ctf_file_stream,
157 parent);
158 ret = seek_file_stream_by_timestamp(cfs, timestamp);
159 if (ret == 0) {
160 /* Add to heap */
161 ret = heap_insert(stream_heap, cfs);
162 if (ret)
163 goto error;
164 found = 0;
165 }
166 }
167 }
168
169 return found;
170
171error:
172 return -2;
173}
174
90fcbacc
JD
175int bt_iter_set_pos(struct bt_iter *iter, const struct bt_iter_pos *iter_pos)
176{
dc81689e 177 struct trace_collection *tc;
90fcbacc
JD
178 int i, ret;
179
dc81689e
JD
180 switch (iter_pos->type) {
181 case BT_SEEK_RESTORE:
182 if (!iter_pos->u.restore)
7344374e 183 return -EINVAL;
dc81689e 184
90fcbacc 185 heap_free(iter->stream_heap);
90fcbacc
JD
186 ret = heap_init(iter->stream_heap, 0, stream_compare);
187 if (ret < 0)
dc81689e 188 goto error;
90fcbacc
JD
189
190 for (i = 0; i < iter_pos->u.restore->stream_saved_pos->len;
191 i++) {
192 struct stream_saved_pos *saved_pos;
193 struct ctf_stream_pos *stream_pos;
90fcbacc
JD
194 struct ctf_stream *stream;
195
196 saved_pos = &g_array_index(
197 iter_pos->u.restore->stream_saved_pos,
198 struct stream_saved_pos, i);
199 stream = &saved_pos->file_stream->parent;
200 stream_pos = &saved_pos->file_stream->pos;
90fcbacc 201
06789ffd 202 stream_pos->packet_seek(&stream_pos->parent,
20d0dcf9 203 saved_pos->cur_index, SEEK_SET);
90fcbacc
JD
204
205 /*
206 * the timestamp needs to be restored after
06789ffd 207 * packet_seek, because this function resets
90fcbacc
JD
208 * the timestamp to the beginning of the packet
209 */
210 stream->timestamp = saved_pos->current_timestamp;
211 stream_pos->offset = saved_pos->offset;
212 stream_pos->last_offset = saved_pos->offset;
213
214 stream->prev_timestamp = 0;
215 stream->prev_timestamp_end = 0;
216 stream->consumed = 0;
217
218 printf_debug("restored to cur_index = %zd and "
219 "offset = %zd, timestamp = %" PRIu64 "\n",
220 stream_pos->cur_index,
221 stream_pos->offset, stream->timestamp);
222
223 stream_read_event(saved_pos->file_stream);
224
225 /* Add to heap */
226 ret = heap_insert(iter->stream_heap,
227 saved_pos->file_stream);
228 if (ret)
229 goto error;
230 }
dc81689e
JD
231 case BT_SEEK_TIME:
232 tc = iter->ctx->tc;
233
dc81689e
JD
234 heap_free(iter->stream_heap);
235 ret = heap_init(iter->stream_heap, 0, stream_compare);
236 if (ret < 0)
237 goto error;
238
239 /* for each trace in the trace_collection */
240 for (i = 0; i < tc->array->len; i++) {
241 struct ctf_trace *tin;
242 struct trace_descriptor *td_read;
243
244 td_read = g_ptr_array_index(tc->array, i);
245 tin = container_of(td_read, struct ctf_trace, parent);
246
247 ret = seek_ctf_trace_by_timestamp(tin,
248 iter_pos->u.seek_time,
249 iter->stream_heap);
250 if (ret < 0)
251 goto error;
252 }
253 return 0;
55c21ea5
JD
254 case BT_SEEK_BEGIN:
255 tc = iter->ctx->tc;
256 for (i = 0; i < tc->array->len; i++) {
257 struct ctf_trace *tin;
258 struct trace_descriptor *td_read;
259 int stream_id;
260
261 td_read = g_ptr_array_index(tc->array, i);
262 tin = container_of(td_read, struct ctf_trace, parent);
263
264 /* Populate heap with each stream */
265 for (stream_id = 0; stream_id < tin->streams->len;
266 stream_id++) {
267 struct ctf_stream_class *stream;
268 int filenr;
269
270 stream = g_ptr_array_index(tin->streams,
271 stream_id);
272 if (!stream)
273 continue;
274 for (filenr = 0; filenr < stream->streams->len;
275 filenr++) {
276 struct ctf_file_stream *file_stream;
277 file_stream = g_ptr_array_index(
278 stream->streams,
279 filenr);
280 ret = babeltrace_filestream_seek(
281 file_stream, iter_pos,
282 stream_id);
283 }
284 }
285 }
286 break;
dc81689e
JD
287 default:
288 /* not implemented */
7344374e 289 return -EINVAL;
90fcbacc
JD
290 }
291
292 return 0;
dc81689e 293
90fcbacc
JD
294error:
295 heap_free(iter->stream_heap);
dc81689e
JD
296 if (heap_init(iter->stream_heap, 0, stream_compare) < 0) {
297 heap_free(iter->stream_heap);
298 g_free(iter->stream_heap);
299 iter->stream_heap = NULL;
300 ret = -ENOMEM;
301 }
302 return ret;
90fcbacc
JD
303}
304
305struct bt_iter_pos *bt_iter_get_pos(struct bt_iter *iter)
306{
307 struct bt_iter_pos *pos;
308 struct trace_collection *tc = iter->ctx->tc;
309 int i, stream_class_id, stream_id;
310
311 pos = g_new0(struct bt_iter_pos, 1);
90fcbacc 312 pos->u.restore = g_new0(struct bt_saved_pos, 1);
90fcbacc
JD
313 pos->u.restore->tc = tc;
314 pos->u.restore->stream_saved_pos = g_array_new(FALSE, TRUE,
315 sizeof(struct stream_saved_pos));
316 if (!pos->u.restore->stream_saved_pos)
317 goto error;
318
319 for (i = 0; i < tc->array->len; i++) {
320 struct ctf_trace *tin;
321 struct trace_descriptor *td_read;
322
323 td_read = g_ptr_array_index(tc->array, i);
324 tin = container_of(td_read, struct ctf_trace, parent);
325
326 for (stream_class_id = 0; stream_class_id < tin->streams->len;
327 stream_class_id++) {
328 struct ctf_stream_class *stream_class;
329
330 stream_class = g_ptr_array_index(tin->streams,
331 stream_class_id);
332 for (stream_id = 0;
333 stream_id < stream_class->streams->len;
334 stream_id++) {
335 struct ctf_stream *stream;
336 struct ctf_file_stream *cfs;
337 struct stream_saved_pos saved_pos;
338
339 stream = g_ptr_array_index(
340 stream_class->streams,
341 stream_id);
342 cfs = container_of(stream,
343 struct ctf_file_stream,
344 parent);
345
346 saved_pos.file_stream = cfs;
347 saved_pos.cur_index = cfs->pos.cur_index;
348
349 /*
350 * It is possible that an event was read during
351 * the last restore, never consumed and its
352 * position saved again. For this case, we
353 * need to check if the event really was
354 * consumed by the caller otherwise it is lost.
355 */
356 if (stream->consumed)
357 saved_pos.offset = cfs->pos.offset;
358 else
359 saved_pos.offset = cfs->pos.last_offset;
360
361 saved_pos.current_timestamp = stream->timestamp;
362
363 g_array_append_val(
364 pos->u.restore->stream_saved_pos,
365 saved_pos);
366
363402af 367 printf_debug("stream : %" PRIu64 ", cur_index : %zd, "
90fcbacc
JD
368 "offset : %zd, "
369 "timestamp = %" PRIu64 "\n",
370 stream->stream_id, saved_pos.cur_index,
371 saved_pos.offset,
372 saved_pos.current_timestamp);
373 }
374 }
375 }
376
377 return pos;
378
379error:
380 return NULL;
381}
382
dc81689e
JD
383struct bt_iter_pos *bt_iter_create_time_pos(struct bt_iter *iter,
384 uint64_t timestamp)
385{
386 struct bt_iter_pos *pos;
387
388 pos = g_new0(struct bt_iter_pos, 1);
389 pos->type = BT_SEEK_TIME;
390 pos->u.seek_time = timestamp;
391 return pos;
392}
393
6f3077a2
JD
394/*
395 * babeltrace_filestream_seek: seek a filestream to given position.
396 *
397 * The stream_id parameter is only useful for BT_SEEK_RESTORE.
398 */
399static int babeltrace_filestream_seek(struct ctf_file_stream *file_stream,
e8c92a62 400 const struct bt_iter_pos *begin_pos,
6f3077a2
JD
401 unsigned long stream_id)
402{
403 int ret = 0;
404
405 switch (begin_pos->type) {
406 case BT_SEEK_CUR:
407 /*
408 * just insert into the heap we should already know
409 * the timestamps
410 */
411 break;
412 case BT_SEEK_BEGIN:
06789ffd 413 file_stream->pos.packet_seek(&file_stream->pos.parent,
d6425aaf 414 0, SEEK_SET);
6f3077a2
JD
415 ret = stream_read_event(file_stream);
416 break;
417 case BT_SEEK_TIME:
418 case BT_SEEK_RESTORE:
419 case BT_SEEK_END:
420 default:
421 assert(0); /* Not yet defined */
422 }
423
424 return ret;
425}
426
427/*
e8c92a62 428 * bt_iter_seek: seek iterator to given position.
6f3077a2 429 */
e8c92a62
JD
430int bt_iter_seek(struct bt_iter *iter,
431 const struct bt_iter_pos *begin_pos)
6f3077a2
JD
432{
433 int i, stream_id;
434 int ret = 0;
95d36295 435 struct trace_collection *tc = iter->ctx->tc;
6f3077a2
JD
436
437 for (i = 0; i < tc->array->len; i++) {
438 struct ctf_trace *tin;
439 struct trace_descriptor *td_read;
440
441 td_read = g_ptr_array_index(tc->array, i);
442 tin = container_of(td_read, struct ctf_trace, parent);
443
444 /* Populate heap with each stream */
445 for (stream_id = 0; stream_id < tin->streams->len;
446 stream_id++) {
447 struct ctf_stream_class *stream;
448 int filenr;
449
450 stream = g_ptr_array_index(tin->streams, stream_id);
451 for (filenr = 0; filenr < stream->streams->len;
452 filenr++) {
453 struct ctf_file_stream *file_stream;
454
455 file_stream = g_ptr_array_index(stream->streams,
456 filenr);
457 ret = babeltrace_filestream_seek(file_stream, begin_pos,
458 stream_id);
459 if (ret < 0)
460 goto end;
461 }
462 }
463 }
464end:
465 return ret;
466}
467
e4195791
MD
468int bt_iter_init(struct bt_iter *iter,
469 struct bt_context *ctx,
04ae3991
MD
470 const struct bt_iter_pos *begin_pos,
471 const struct bt_iter_pos *end_pos)
6f3077a2
JD
472{
473 int i, stream_id;
474 int ret = 0;
6f3077a2 475
e003e871
JD
476 if (ctx->current_iterator) {
477 ret = -1;
478 goto error_ctx;
479 }
480
6f3077a2 481 iter->stream_heap = g_new(struct ptr_heap, 1);
6f3077a2 482 iter->end_pos = end_pos;
6cba487f 483 bt_context_get(ctx);
95d36295 484 iter->ctx = ctx;
6f3077a2
JD
485
486 ret = heap_init(iter->stream_heap, 0, stream_compare);
487 if (ret < 0)
488 goto error_heap_init;
489
95d36295 490 for (i = 0; i < ctx->tc->array->len; i++) {
6f3077a2
JD
491 struct ctf_trace *tin;
492 struct trace_descriptor *td_read;
493
95d36295 494 td_read = g_ptr_array_index(ctx->tc->array, i);
6f3077a2
JD
495 tin = container_of(td_read, struct ctf_trace, parent);
496
497 /* Populate heap with each stream */
498 for (stream_id = 0; stream_id < tin->streams->len;
499 stream_id++) {
500 struct ctf_stream_class *stream;
501 int filenr;
502
503 stream = g_ptr_array_index(tin->streams, stream_id);
504 if (!stream)
505 continue;
506 for (filenr = 0; filenr < stream->streams->len;
507 filenr++) {
508 struct ctf_file_stream *file_stream;
509
510 file_stream = g_ptr_array_index(stream->streams,
511 filenr);
512
513 if (begin_pos) {
b42d4e4e
JD
514 ret = babeltrace_filestream_seek(
515 file_stream,
516 begin_pos,
6f3077a2 517 stream_id);
b42d4e4e
JD
518 } else {
519 struct bt_iter_pos pos;
520 pos.type = BT_SEEK_BEGIN;
521 ret = babeltrace_filestream_seek(
522 file_stream, &pos,
523 stream_id);
524 }
525 if (ret == EOF) {
526 ret = 0;
527 continue;
528 } else if (ret) {
529 goto error;
6f3077a2
JD
530 }
531 /* Add to heap */
532 ret = heap_insert(iter->stream_heap, file_stream);
533 if (ret)
534 goto error;
535 }
536 }
537 }
538
e003e871 539 ctx->current_iterator = iter;
e4195791 540 return 0;
6f3077a2
JD
541
542error:
543 heap_free(iter->stream_heap);
544error_heap_init:
545 g_free(iter->stream_heap);
e003e871 546error_ctx:
e4195791 547 return ret;
6f3077a2
JD
548}
549
e4195791 550struct bt_iter *bt_iter_create(struct bt_context *ctx,
04ae3991
MD
551 const struct bt_iter_pos *begin_pos,
552 const struct bt_iter_pos *end_pos)
e4195791
MD
553{
554 struct bt_iter *iter;
555 int ret;
556
557 iter = g_new0(struct bt_iter, 1);
558 ret = bt_iter_init(iter, ctx, begin_pos, end_pos);
559 if (ret) {
560 g_free(iter);
561 return NULL;
562 }
563 return iter;
564}
565
566void bt_iter_fini(struct bt_iter *iter)
6f3077a2 567{
dc81689e
JD
568 if (iter->stream_heap) {
569 heap_free(iter->stream_heap);
570 g_free(iter->stream_heap);
571 }
e003e871 572 iter->ctx->current_iterator = NULL;
95d36295 573 bt_context_put(iter->ctx);
e4195791 574}
95d36295 575
e4195791
MD
576void bt_iter_destroy(struct bt_iter *iter)
577{
578 bt_iter_fini(iter);
90fcbacc 579 g_free(iter);
6f3077a2
JD
580}
581
e8c92a62 582int bt_iter_next(struct bt_iter *iter)
6f3077a2
JD
583{
584 struct ctf_file_stream *file_stream, *removed;
585 int ret;
586
587 file_stream = heap_maximum(iter->stream_heap);
588 if (!file_stream) {
589 /* end of file for all streams */
590 ret = 0;
591 goto end;
592 }
593
594 ret = stream_read_event(file_stream);
595 if (ret == EOF) {
596 removed = heap_remove(iter->stream_heap);
597 assert(removed == file_stream);
598 ret = 0;
599 goto end;
600 } else if (ret) {
601 goto end;
602 }
603 /* Reinsert the file stream into the heap, and rebalance. */
604 removed = heap_replace_max(iter->stream_heap, file_stream);
605 assert(removed == file_stream);
606
607end:
608 return ret;
609}
This page took 0.048623 seconds and 4 git commands to generate.