API : iterator get and set position
[babeltrace.git] / lib / iterator.c
1 /*
2 * iterator.c
3 *
4 * Babeltrace Library
5 *
6 * Copyright 2010-2011 EfficiOS Inc. and Linux Foundation
7 *
8 * Author: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
9 *
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
16 *
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
19 */
20
21 #include <stdlib.h>
22 #include <babeltrace/babeltrace.h>
23 #include <babeltrace/callbacks-internal.h>
24 #include <babeltrace/context.h>
25 #include <babeltrace/ctf/metadata.h>
26 #include <babeltrace/iterator-internal.h>
27 #include <babeltrace/iterator.h>
28 #include <babeltrace/prio_heap.h>
29 #include <inttypes.h>
30
31 struct stream_saved_pos {
32 /*
33 * Use file_stream pointer to check if the trace collection we
34 * restore to match the one we saved from, for each stream.
35 */
36 struct ctf_file_stream *file_stream;
37 size_t cur_index; /* current index in packet index */
38 ssize_t offset; /* offset from base, in bits. EOF for end of file. */
39 uint64_t current_timestamp;
40 };
41
42 struct bt_saved_pos {
43 struct trace_collection *tc;
44 GArray *stream_saved_pos; /* Contains struct stream_saved_pos */
45 };
46
47 static int stream_read_event(struct ctf_file_stream *sin)
48 {
49 int ret;
50
51 ret = sin->pos.parent.event_cb(&sin->pos.parent, &sin->parent);
52 if (ret == EOF)
53 return EOF;
54 else if (ret) {
55 fprintf(stderr, "[error] Reading event failed.\n");
56 return ret;
57 }
58 return 0;
59 }
60
61 /*
62 * returns true if a < b, false otherwise.
63 */
64 int stream_compare(void *a, void *b)
65 {
66 struct ctf_file_stream *s_a = a, *s_b = b;
67
68 if (s_a->parent.timestamp < s_b->parent.timestamp)
69 return 1;
70 else
71 return 0;
72 }
73
74 void bt_iter_free_pos(struct bt_iter_pos *iter_pos)
75 {
76 if (iter_pos) {
77 if (iter_pos->u.restore) {
78 if (iter_pos->u.restore->stream_saved_pos) {
79 g_array_free(
80 iter_pos->u.restore->stream_saved_pos,
81 TRUE);
82 }
83 g_free(iter_pos->u.restore);
84 }
85 g_free(iter_pos);
86 }
87 }
88
89 int bt_iter_set_pos(struct bt_iter *iter, const struct bt_iter_pos *iter_pos)
90 {
91 int i, ret;
92
93 if (iter_pos->u.restore) {
94 /* clear and recreate the heap */
95 heap_free(iter->stream_heap);
96 g_free(iter->stream_heap);
97 iter->stream_heap = g_new(struct ptr_heap, 1);
98 ret = heap_init(iter->stream_heap, 0, stream_compare);
99 if (ret < 0)
100 goto error_heap_init;
101
102 for (i = 0; i < iter_pos->u.restore->stream_saved_pos->len;
103 i++) {
104 struct stream_saved_pos *saved_pos;
105 struct ctf_stream_pos *stream_pos;
106 struct packet_index *index;
107 struct ctf_stream *stream;
108
109 saved_pos = &g_array_index(
110 iter_pos->u.restore->stream_saved_pos,
111 struct stream_saved_pos, i);
112 stream = &saved_pos->file_stream->parent;
113 stream_pos = &saved_pos->file_stream->pos;
114 index = &g_array_index(stream_pos->packet_index,
115 struct packet_index,
116 saved_pos->cur_index);
117
118 stream_pos->cur_index = saved_pos->cur_index;
119 stream_pos->move_pos_slow(stream_pos, index->offset,
120 SEEK_SET);
121
122 /*
123 * the timestamp needs to be restored after
124 * move_pos_slow, because this function resets
125 * the timestamp to the beginning of the packet
126 */
127 stream->timestamp = saved_pos->current_timestamp;
128 stream_pos->offset = saved_pos->offset;
129 stream_pos->last_offset = saved_pos->offset;
130
131 stream->prev_timestamp = 0;
132 stream->prev_timestamp_end = 0;
133 stream->consumed = 0;
134
135 printf_debug("restored to cur_index = %zd and "
136 "offset = %zd, timestamp = %" PRIu64 "\n",
137 stream_pos->cur_index,
138 stream_pos->offset, stream->timestamp);
139
140 stream_read_event(saved_pos->file_stream);
141
142 /* Add to heap */
143 ret = heap_insert(iter->stream_heap,
144 saved_pos->file_stream);
145 if (ret)
146 goto error;
147 }
148 } else if (iter_pos->u.seek_time) {
149 /* not yet implemented */
150 return -1;
151 } else {
152 /* nowhere to seek to */
153 return -1;
154 }
155
156 return 0;
157 error:
158 heap_free(iter->stream_heap);
159 error_heap_init:
160 g_free(iter->stream_heap);
161 return -1;
162 }
163
164 struct bt_iter_pos *bt_iter_get_pos(struct bt_iter *iter)
165 {
166 struct bt_iter_pos *pos;
167 struct trace_collection *tc = iter->ctx->tc;
168 int i, stream_class_id, stream_id;
169
170 pos = g_new0(struct bt_iter_pos, 1);
171 if (!pos) {
172 perror("allocating bt_iter_pos");
173 goto error;
174 }
175
176 pos->u.restore = g_new0(struct bt_saved_pos, 1);
177 if (!pos->u.restore) {
178 perror("allocating bt_saved_pos");
179 goto error;
180 }
181
182 pos->u.restore->tc = tc;
183 pos->u.restore->stream_saved_pos = g_array_new(FALSE, TRUE,
184 sizeof(struct stream_saved_pos));
185 if (!pos->u.restore->stream_saved_pos)
186 goto error;
187
188 for (i = 0; i < tc->array->len; i++) {
189 struct ctf_trace *tin;
190 struct trace_descriptor *td_read;
191
192 td_read = g_ptr_array_index(tc->array, i);
193 tin = container_of(td_read, struct ctf_trace, parent);
194
195 for (stream_class_id = 0; stream_class_id < tin->streams->len;
196 stream_class_id++) {
197 struct ctf_stream_class *stream_class;
198
199 stream_class = g_ptr_array_index(tin->streams,
200 stream_class_id);
201 for (stream_id = 0;
202 stream_id < stream_class->streams->len;
203 stream_id++) {
204 struct ctf_stream *stream;
205 struct ctf_file_stream *cfs;
206 struct stream_saved_pos saved_pos;
207
208 stream = g_ptr_array_index(
209 stream_class->streams,
210 stream_id);
211 cfs = container_of(stream,
212 struct ctf_file_stream,
213 parent);
214
215 saved_pos.file_stream = cfs;
216 saved_pos.cur_index = cfs->pos.cur_index;
217
218 /*
219 * It is possible that an event was read during
220 * the last restore, never consumed and its
221 * position saved again. For this case, we
222 * need to check if the event really was
223 * consumed by the caller otherwise it is lost.
224 */
225 if (stream->consumed)
226 saved_pos.offset = cfs->pos.offset;
227 else
228 saved_pos.offset = cfs->pos.last_offset;
229
230 saved_pos.current_timestamp = stream->timestamp;
231
232 g_array_append_val(
233 pos->u.restore->stream_saved_pos,
234 saved_pos);
235
236 printf_debug("stream : %lu, cur_index : %zd, "
237 "offset : %zd, "
238 "timestamp = %" PRIu64 "\n",
239 stream->stream_id, saved_pos.cur_index,
240 saved_pos.offset,
241 saved_pos.current_timestamp);
242 }
243 }
244 }
245
246 return pos;
247
248 error:
249 return NULL;
250 }
251
252 /*
253 * babeltrace_filestream_seek: seek a filestream to given position.
254 *
255 * The stream_id parameter is only useful for BT_SEEK_RESTORE.
256 */
257 static int babeltrace_filestream_seek(struct ctf_file_stream *file_stream,
258 const struct bt_iter_pos *begin_pos,
259 unsigned long stream_id)
260 {
261 int ret = 0;
262
263 switch (begin_pos->type) {
264 case BT_SEEK_CUR:
265 /*
266 * just insert into the heap we should already know
267 * the timestamps
268 */
269 break;
270 case BT_SEEK_BEGIN:
271 file_stream->pos.move_pos_slow(&file_stream->pos, 0, SEEK_SET);
272 ret = stream_read_event(file_stream);
273 break;
274 case BT_SEEK_TIME:
275 case BT_SEEK_RESTORE:
276 case BT_SEEK_END:
277 default:
278 assert(0); /* Not yet defined */
279 }
280
281 return ret;
282 }
283
284 /*
285 * bt_iter_seek: seek iterator to given position.
286 */
287 int bt_iter_seek(struct bt_iter *iter,
288 const struct bt_iter_pos *begin_pos)
289 {
290 int i, stream_id;
291 int ret = 0;
292 struct trace_collection *tc = iter->ctx->tc;
293
294 for (i = 0; i < tc->array->len; i++) {
295 struct ctf_trace *tin;
296 struct trace_descriptor *td_read;
297
298 td_read = g_ptr_array_index(tc->array, i);
299 tin = container_of(td_read, struct ctf_trace, parent);
300
301 /* Populate heap with each stream */
302 for (stream_id = 0; stream_id < tin->streams->len;
303 stream_id++) {
304 struct ctf_stream_class *stream;
305 int filenr;
306
307 stream = g_ptr_array_index(tin->streams, stream_id);
308 for (filenr = 0; filenr < stream->streams->len;
309 filenr++) {
310 struct ctf_file_stream *file_stream;
311
312 file_stream = g_ptr_array_index(stream->streams,
313 filenr);
314 ret = babeltrace_filestream_seek(file_stream, begin_pos,
315 stream_id);
316 if (ret < 0)
317 goto end;
318 }
319 }
320 }
321 end:
322 return ret;
323 }
324
325 struct bt_iter *bt_iter_create(struct bt_context *ctx,
326 struct bt_iter_pos *begin_pos,
327 struct bt_iter_pos *end_pos)
328 {
329 int i, stream_id;
330 int ret = 0;
331 struct bt_iter *iter;
332
333 iter = g_new0(struct bt_iter, 1);
334 if (!iter)
335 goto error_malloc;
336 iter->stream_heap = g_new(struct ptr_heap, 1);
337 iter->end_pos = end_pos;
338 iter->callbacks = g_array_new(0, 1, sizeof(struct bt_stream_callbacks));
339 iter->recalculate_dep_graph = 0;
340 iter->main_callbacks.callback = NULL;
341 iter->dep_gc = g_ptr_array_new();
342 if (bt_context_get(ctx) != 0)
343 goto error_ctx;
344 iter->ctx = ctx;
345
346 ret = heap_init(iter->stream_heap, 0, stream_compare);
347 if (ret < 0)
348 goto error_heap_init;
349
350 for (i = 0; i < ctx->tc->array->len; i++) {
351 struct ctf_trace *tin;
352 struct trace_descriptor *td_read;
353
354 td_read = g_ptr_array_index(ctx->tc->array, i);
355 tin = container_of(td_read, struct ctf_trace, parent);
356
357 /* Populate heap with each stream */
358 for (stream_id = 0; stream_id < tin->streams->len;
359 stream_id++) {
360 struct ctf_stream_class *stream;
361 int filenr;
362
363 stream = g_ptr_array_index(tin->streams, stream_id);
364 if (!stream)
365 continue;
366 for (filenr = 0; filenr < stream->streams->len;
367 filenr++) {
368 struct ctf_file_stream *file_stream;
369
370 file_stream = g_ptr_array_index(stream->streams,
371 filenr);
372
373 if (begin_pos) {
374 ret = babeltrace_filestream_seek(file_stream, begin_pos,
375 stream_id);
376 if (ret == EOF) {
377 ret = 0;
378 continue;
379 } else if (ret) {
380 goto error;
381 }
382 }
383 /* Add to heap */
384 ret = heap_insert(iter->stream_heap, file_stream);
385 if (ret)
386 goto error;
387 }
388 }
389 }
390
391 return iter;
392
393 error:
394 heap_free(iter->stream_heap);
395 error_heap_init:
396 g_free(iter->stream_heap);
397 error_ctx:
398 g_free(iter);
399 error_malloc:
400 return NULL;
401 }
402
403 void bt_iter_destroy(struct bt_iter *iter)
404 {
405 struct bt_stream_callbacks *bt_stream_cb;
406 struct bt_callback_chain *bt_chain;
407 int i, j;
408
409 heap_free(iter->stream_heap);
410 g_free(iter->stream_heap);
411
412 /* free all events callbacks */
413 if (iter->main_callbacks.callback)
414 g_array_free(iter->main_callbacks.callback, TRUE);
415
416 /* free per-event callbacks */
417 for (i = 0; i < iter->callbacks->len; i++) {
418 bt_stream_cb = &g_array_index(iter->callbacks,
419 struct bt_stream_callbacks, i);
420 if (!bt_stream_cb || !bt_stream_cb->per_id_callbacks)
421 continue;
422 for (j = 0; j < bt_stream_cb->per_id_callbacks->len; j++) {
423 bt_chain = &g_array_index(bt_stream_cb->per_id_callbacks,
424 struct bt_callback_chain, j);
425 if (bt_chain->callback) {
426 g_array_free(bt_chain->callback, TRUE);
427 }
428 }
429 g_array_free(bt_stream_cb->per_id_callbacks, TRUE);
430 }
431
432 bt_context_put(iter->ctx);
433
434 g_free(iter);
435 }
436
437 int bt_iter_next(struct bt_iter *iter)
438 {
439 struct ctf_file_stream *file_stream, *removed;
440 int ret;
441
442 file_stream = heap_maximum(iter->stream_heap);
443 if (!file_stream) {
444 /* end of file for all streams */
445 ret = 0;
446 goto end;
447 }
448
449 ret = stream_read_event(file_stream);
450 if (ret == EOF) {
451 removed = heap_remove(iter->stream_heap);
452 assert(removed == file_stream);
453 ret = 0;
454 goto end;
455 } else if (ret) {
456 goto end;
457 }
458 /* Reinsert the file stream into the heap, and rebalance. */
459 removed = heap_replace_max(iter->stream_heap, file_stream);
460 assert(removed == file_stream);
461
462 end:
463 return ret;
464 }
465
466 int bt_iter_read_event(struct bt_iter *iter,
467 struct ctf_stream **stream,
468 struct ctf_stream_event **event)
469 {
470 struct ctf_file_stream *file_stream;
471 int ret = 0;
472
473 file_stream = heap_maximum(iter->stream_heap);
474 if (!file_stream) {
475 /* end of file for all streams */
476 ret = EOF;
477 goto end;
478 }
479 *stream = &file_stream->parent;
480 *event = g_ptr_array_index((*stream)->events_by_id, (*stream)->event_id);
481
482 if ((*stream)->stream_id > iter->callbacks->len)
483 goto end;
484
485 process_callbacks(iter, *stream);
486
487 end:
488 return ret;
489 }
This page took 0.040122 seconds and 5 git commands to generate.