Move the add_traces_recursive out of the library
[babeltrace.git] / lib / iterator.c
CommitLineData
6f3077a2
JD
1/*
2 * iterator.c
3 *
4 * Babeltrace Library
5 *
6 * Copyright 2010-2011 EfficiOS Inc. and Linux Foundation
7 *
8 * Author: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
9 *
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
16 *
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
19 */
20
21#include <stdlib.h>
22#include <babeltrace/babeltrace.h>
23#include <babeltrace/callbacks-internal.h>
95d36295 24#include <babeltrace/context.h>
08c22d05 25#include <babeltrace/context-internal.h>
6f3077a2
JD
26#include <babeltrace/ctf/metadata.h>
27#include <babeltrace/iterator-internal.h>
6204d33c 28#include <babeltrace/iterator.h>
6f3077a2 29#include <babeltrace/prio_heap.h>
90fcbacc 30#include <inttypes.h>
6f3077a2
JD
31
32struct stream_saved_pos {
33 /*
34 * Use file_stream pointer to check if the trace collection we
35 * restore to match the one we saved from, for each stream.
36 */
37 struct ctf_file_stream *file_stream;
38 size_t cur_index; /* current index in packet index */
39 ssize_t offset; /* offset from base, in bits. EOF for end of file. */
90fcbacc 40 uint64_t current_timestamp;
6f3077a2
JD
41};
42
e8c92a62 43struct bt_saved_pos {
6f3077a2
JD
44 struct trace_collection *tc;
45 GArray *stream_saved_pos; /* Contains struct stream_saved_pos */
46};
47
48static int stream_read_event(struct ctf_file_stream *sin)
49{
50 int ret;
51
52 ret = sin->pos.parent.event_cb(&sin->pos.parent, &sin->parent);
53 if (ret == EOF)
54 return EOF;
55 else if (ret) {
3394d22e 56 fprintf(stderr, "[error] Reading event failed.\n");
6f3077a2
JD
57 return ret;
58 }
59 return 0;
60}
61
62/*
63 * returns true if a < b, false otherwise.
64 */
65int stream_compare(void *a, void *b)
66{
67 struct ctf_file_stream *s_a = a, *s_b = b;
68
69 if (s_a->parent.timestamp < s_b->parent.timestamp)
70 return 1;
71 else
72 return 0;
73}
74
90fcbacc
JD
75void bt_iter_free_pos(struct bt_iter_pos *iter_pos)
76{
6cba487f
MD
77 if (!iter_pos)
78 return;
79
80 if (iter_pos->u.restore) {
81 if (iter_pos->u.restore->stream_saved_pos) {
82 g_array_free(
83 iter_pos->u.restore->stream_saved_pos,
84 TRUE);
90fcbacc 85 }
6cba487f 86 g_free(iter_pos->u.restore);
90fcbacc 87 }
6cba487f 88 g_free(iter_pos);
90fcbacc
JD
89}
90
91int bt_iter_set_pos(struct bt_iter *iter, const struct bt_iter_pos *iter_pos)
92{
93 int i, ret;
94
95 if (iter_pos->u.restore) {
96 /* clear and recreate the heap */
97 heap_free(iter->stream_heap);
98 g_free(iter->stream_heap);
99 iter->stream_heap = g_new(struct ptr_heap, 1);
100 ret = heap_init(iter->stream_heap, 0, stream_compare);
101 if (ret < 0)
102 goto error_heap_init;
103
104 for (i = 0; i < iter_pos->u.restore->stream_saved_pos->len;
105 i++) {
106 struct stream_saved_pos *saved_pos;
107 struct ctf_stream_pos *stream_pos;
108 struct packet_index *index;
109 struct ctf_stream *stream;
110
111 saved_pos = &g_array_index(
112 iter_pos->u.restore->stream_saved_pos,
113 struct stream_saved_pos, i);
114 stream = &saved_pos->file_stream->parent;
115 stream_pos = &saved_pos->file_stream->pos;
116 index = &g_array_index(stream_pos->packet_index,
117 struct packet_index,
118 saved_pos->cur_index);
119
120 stream_pos->cur_index = saved_pos->cur_index;
121 stream_pos->move_pos_slow(stream_pos, index->offset,
122 SEEK_SET);
123
124 /*
125 * the timestamp needs to be restored after
126 * move_pos_slow, because this function resets
127 * the timestamp to the beginning of the packet
128 */
129 stream->timestamp = saved_pos->current_timestamp;
130 stream_pos->offset = saved_pos->offset;
131 stream_pos->last_offset = saved_pos->offset;
132
133 stream->prev_timestamp = 0;
134 stream->prev_timestamp_end = 0;
135 stream->consumed = 0;
136
137 printf_debug("restored to cur_index = %zd and "
138 "offset = %zd, timestamp = %" PRIu64 "\n",
139 stream_pos->cur_index,
140 stream_pos->offset, stream->timestamp);
141
142 stream_read_event(saved_pos->file_stream);
143
144 /* Add to heap */
145 ret = heap_insert(iter->stream_heap,
146 saved_pos->file_stream);
147 if (ret)
148 goto error;
149 }
150 } else if (iter_pos->u.seek_time) {
151 /* not yet implemented */
152 return -1;
153 } else {
154 /* nowhere to seek to */
155 return -1;
156 }
157
158 return 0;
159error:
160 heap_free(iter->stream_heap);
161error_heap_init:
162 g_free(iter->stream_heap);
163 return -1;
164}
165
166struct bt_iter_pos *bt_iter_get_pos(struct bt_iter *iter)
167{
168 struct bt_iter_pos *pos;
169 struct trace_collection *tc = iter->ctx->tc;
170 int i, stream_class_id, stream_id;
171
172 pos = g_new0(struct bt_iter_pos, 1);
90fcbacc 173 pos->u.restore = g_new0(struct bt_saved_pos, 1);
90fcbacc
JD
174 pos->u.restore->tc = tc;
175 pos->u.restore->stream_saved_pos = g_array_new(FALSE, TRUE,
176 sizeof(struct stream_saved_pos));
177 if (!pos->u.restore->stream_saved_pos)
178 goto error;
179
180 for (i = 0; i < tc->array->len; i++) {
181 struct ctf_trace *tin;
182 struct trace_descriptor *td_read;
183
184 td_read = g_ptr_array_index(tc->array, i);
185 tin = container_of(td_read, struct ctf_trace, parent);
186
187 for (stream_class_id = 0; stream_class_id < tin->streams->len;
188 stream_class_id++) {
189 struct ctf_stream_class *stream_class;
190
191 stream_class = g_ptr_array_index(tin->streams,
192 stream_class_id);
193 for (stream_id = 0;
194 stream_id < stream_class->streams->len;
195 stream_id++) {
196 struct ctf_stream *stream;
197 struct ctf_file_stream *cfs;
198 struct stream_saved_pos saved_pos;
199
200 stream = g_ptr_array_index(
201 stream_class->streams,
202 stream_id);
203 cfs = container_of(stream,
204 struct ctf_file_stream,
205 parent);
206
207 saved_pos.file_stream = cfs;
208 saved_pos.cur_index = cfs->pos.cur_index;
209
210 /*
211 * It is possible that an event was read during
212 * the last restore, never consumed and its
213 * position saved again. For this case, we
214 * need to check if the event really was
215 * consumed by the caller otherwise it is lost.
216 */
217 if (stream->consumed)
218 saved_pos.offset = cfs->pos.offset;
219 else
220 saved_pos.offset = cfs->pos.last_offset;
221
222 saved_pos.current_timestamp = stream->timestamp;
223
224 g_array_append_val(
225 pos->u.restore->stream_saved_pos,
226 saved_pos);
227
363402af 228 printf_debug("stream : %" PRIu64 ", cur_index : %zd, "
90fcbacc
JD
229 "offset : %zd, "
230 "timestamp = %" PRIu64 "\n",
231 stream->stream_id, saved_pos.cur_index,
232 saved_pos.offset,
233 saved_pos.current_timestamp);
234 }
235 }
236 }
237
238 return pos;
239
240error:
241 return NULL;
242}
243
6f3077a2
JD
244/*
245 * babeltrace_filestream_seek: seek a filestream to given position.
246 *
247 * The stream_id parameter is only useful for BT_SEEK_RESTORE.
248 */
249static int babeltrace_filestream_seek(struct ctf_file_stream *file_stream,
e8c92a62 250 const struct bt_iter_pos *begin_pos,
6f3077a2
JD
251 unsigned long stream_id)
252{
253 int ret = 0;
254
255 switch (begin_pos->type) {
256 case BT_SEEK_CUR:
257 /*
258 * just insert into the heap we should already know
259 * the timestamps
260 */
261 break;
262 case BT_SEEK_BEGIN:
263 file_stream->pos.move_pos_slow(&file_stream->pos, 0, SEEK_SET);
264 ret = stream_read_event(file_stream);
265 break;
266 case BT_SEEK_TIME:
267 case BT_SEEK_RESTORE:
268 case BT_SEEK_END:
269 default:
270 assert(0); /* Not yet defined */
271 }
272
273 return ret;
274}
275
276/*
e8c92a62 277 * bt_iter_seek: seek iterator to given position.
6f3077a2 278 */
e8c92a62
JD
279int bt_iter_seek(struct bt_iter *iter,
280 const struct bt_iter_pos *begin_pos)
6f3077a2
JD
281{
282 int i, stream_id;
283 int ret = 0;
95d36295 284 struct trace_collection *tc = iter->ctx->tc;
6f3077a2
JD
285
286 for (i = 0; i < tc->array->len; i++) {
287 struct ctf_trace *tin;
288 struct trace_descriptor *td_read;
289
290 td_read = g_ptr_array_index(tc->array, i);
291 tin = container_of(td_read, struct ctf_trace, parent);
292
293 /* Populate heap with each stream */
294 for (stream_id = 0; stream_id < tin->streams->len;
295 stream_id++) {
296 struct ctf_stream_class *stream;
297 int filenr;
298
299 stream = g_ptr_array_index(tin->streams, stream_id);
300 for (filenr = 0; filenr < stream->streams->len;
301 filenr++) {
302 struct ctf_file_stream *file_stream;
303
304 file_stream = g_ptr_array_index(stream->streams,
305 filenr);
306 ret = babeltrace_filestream_seek(file_stream, begin_pos,
307 stream_id);
308 if (ret < 0)
309 goto end;
310 }
311 }
312 }
313end:
314 return ret;
315}
316
e8c92a62
JD
317struct bt_iter *bt_iter_create(struct bt_context *ctx,
318 struct bt_iter_pos *begin_pos,
319 struct bt_iter_pos *end_pos)
6f3077a2
JD
320{
321 int i, stream_id;
322 int ret = 0;
e8c92a62 323 struct bt_iter *iter;
6f3077a2 324
90fcbacc 325 iter = g_new0(struct bt_iter, 1);
6f3077a2 326 iter->stream_heap = g_new(struct ptr_heap, 1);
6f3077a2
JD
327 iter->end_pos = end_pos;
328 iter->callbacks = g_array_new(0, 1, sizeof(struct bt_stream_callbacks));
329 iter->recalculate_dep_graph = 0;
330 iter->main_callbacks.callback = NULL;
331 iter->dep_gc = g_ptr_array_new();
6cba487f 332 bt_context_get(ctx);
95d36295 333 iter->ctx = ctx;
6f3077a2
JD
334
335 ret = heap_init(iter->stream_heap, 0, stream_compare);
336 if (ret < 0)
337 goto error_heap_init;
338
95d36295 339 for (i = 0; i < ctx->tc->array->len; i++) {
6f3077a2
JD
340 struct ctf_trace *tin;
341 struct trace_descriptor *td_read;
342
95d36295 343 td_read = g_ptr_array_index(ctx->tc->array, i);
6f3077a2
JD
344 tin = container_of(td_read, struct ctf_trace, parent);
345
346 /* Populate heap with each stream */
347 for (stream_id = 0; stream_id < tin->streams->len;
348 stream_id++) {
349 struct ctf_stream_class *stream;
350 int filenr;
351
352 stream = g_ptr_array_index(tin->streams, stream_id);
353 if (!stream)
354 continue;
355 for (filenr = 0; filenr < stream->streams->len;
356 filenr++) {
357 struct ctf_file_stream *file_stream;
358
359 file_stream = g_ptr_array_index(stream->streams,
360 filenr);
361
362 if (begin_pos) {
363 ret = babeltrace_filestream_seek(file_stream, begin_pos,
364 stream_id);
365 if (ret == EOF) {
366 ret = 0;
367 continue;
368 } else if (ret) {
369 goto error;
370 }
371 }
372 /* Add to heap */
373 ret = heap_insert(iter->stream_heap, file_stream);
374 if (ret)
375 goto error;
376 }
377 }
378 }
379
380 return iter;
381
382error:
383 heap_free(iter->stream_heap);
384error_heap_init:
385 g_free(iter->stream_heap);
90fcbacc 386 g_free(iter);
6f3077a2
JD
387 return NULL;
388}
389
e8c92a62 390void bt_iter_destroy(struct bt_iter *iter)
6f3077a2
JD
391{
392 struct bt_stream_callbacks *bt_stream_cb;
393 struct bt_callback_chain *bt_chain;
394 int i, j;
395
396 heap_free(iter->stream_heap);
397 g_free(iter->stream_heap);
398
399 /* free all events callbacks */
400 if (iter->main_callbacks.callback)
401 g_array_free(iter->main_callbacks.callback, TRUE);
402
403 /* free per-event callbacks */
404 for (i = 0; i < iter->callbacks->len; i++) {
405 bt_stream_cb = &g_array_index(iter->callbacks,
406 struct bt_stream_callbacks, i);
407 if (!bt_stream_cb || !bt_stream_cb->per_id_callbacks)
408 continue;
409 for (j = 0; j < bt_stream_cb->per_id_callbacks->len; j++) {
410 bt_chain = &g_array_index(bt_stream_cb->per_id_callbacks,
411 struct bt_callback_chain, j);
412 if (bt_chain->callback) {
413 g_array_free(bt_chain->callback, TRUE);
414 }
415 }
416 g_array_free(bt_stream_cb->per_id_callbacks, TRUE);
417 }
418
95d36295
JD
419 bt_context_put(iter->ctx);
420
90fcbacc 421 g_free(iter);
6f3077a2
JD
422}
423
e8c92a62 424int bt_iter_next(struct bt_iter *iter)
6f3077a2
JD
425{
426 struct ctf_file_stream *file_stream, *removed;
427 int ret;
428
429 file_stream = heap_maximum(iter->stream_heap);
430 if (!file_stream) {
431 /* end of file for all streams */
432 ret = 0;
433 goto end;
434 }
435
436 ret = stream_read_event(file_stream);
437 if (ret == EOF) {
438 removed = heap_remove(iter->stream_heap);
439 assert(removed == file_stream);
440 ret = 0;
441 goto end;
442 } else if (ret) {
443 goto end;
444 }
445 /* Reinsert the file stream into the heap, and rebalance. */
446 removed = heap_replace_max(iter->stream_heap, file_stream);
447 assert(removed == file_stream);
448
449end:
450 return ret;
451}
452
e8c92a62 453int bt_iter_read_event(struct bt_iter *iter,
6f3077a2
JD
454 struct ctf_stream **stream,
455 struct ctf_stream_event **event)
456{
457 struct ctf_file_stream *file_stream;
458 int ret = 0;
459
460 file_stream = heap_maximum(iter->stream_heap);
461 if (!file_stream) {
462 /* end of file for all streams */
463 ret = EOF;
464 goto end;
465 }
466 *stream = &file_stream->parent;
467 *event = g_ptr_array_index((*stream)->events_by_id, (*stream)->event_id);
468
469 if ((*stream)->stream_id > iter->callbacks->len)
470 goto end;
471
472 process_callbacks(iter, *stream);
473
474end:
475 return ret;
476}
This page took 0.067557 seconds and 4 git commands to generate.