babeltrace lib cleanup, folded with open/remove trace functions
[babeltrace.git] / lib / iterator.c
1 /*
2 * iterator.c
3 *
4 * Babeltrace Library
5 *
6 * Copyright 2010-2011 EfficiOS Inc. and Linux Foundation
7 *
8 * Author: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
9 *
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
16 *
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
19 */
20
21 #include <stdlib.h>
22 #include <babeltrace/babeltrace.h>
23 #include <babeltrace/callbacks-internal.h>
24 #include <babeltrace/context.h>
25 #include <babeltrace/ctf/metadata.h>
26 #include <babeltrace/iterator-internal.h>
27 #include <babeltrace/iterator.h>
28 #include <babeltrace/prio_heap.h>
29 #include <inttypes.h>
30
31 struct stream_saved_pos {
32 /*
33 * Use file_stream pointer to check if the trace collection we
34 * restore to match the one we saved from, for each stream.
35 */
36 struct ctf_file_stream *file_stream;
37 size_t cur_index; /* current index in packet index */
38 ssize_t offset; /* offset from base, in bits. EOF for end of file. */
39 uint64_t current_timestamp;
40 };
41
42 struct bt_saved_pos {
43 struct trace_collection *tc;
44 GArray *stream_saved_pos; /* Contains struct stream_saved_pos */
45 };
46
47 static int stream_read_event(struct ctf_file_stream *sin)
48 {
49 int ret;
50
51 ret = sin->pos.parent.event_cb(&sin->pos.parent, &sin->parent);
52 if (ret == EOF)
53 return EOF;
54 else if (ret) {
55 fprintf(stderr, "[error] Reading event failed.\n");
56 return ret;
57 }
58 return 0;
59 }
60
61 /*
62 * returns true if a < b, false otherwise.
63 */
64 int stream_compare(void *a, void *b)
65 {
66 struct ctf_file_stream *s_a = a, *s_b = b;
67
68 if (s_a->parent.timestamp < s_b->parent.timestamp)
69 return 1;
70 else
71 return 0;
72 }
73
74 void bt_iter_free_pos(struct bt_iter_pos *iter_pos)
75 {
76 if (!iter_pos)
77 return;
78
79 if (iter_pos->u.restore) {
80 if (iter_pos->u.restore->stream_saved_pos) {
81 g_array_free(
82 iter_pos->u.restore->stream_saved_pos,
83 TRUE);
84 }
85 g_free(iter_pos->u.restore);
86 }
87 g_free(iter_pos);
88 }
89
90 int bt_iter_set_pos(struct bt_iter *iter, const struct bt_iter_pos *iter_pos)
91 {
92 int i, ret;
93
94 if (iter_pos->u.restore) {
95 /* clear and recreate the heap */
96 heap_free(iter->stream_heap);
97 g_free(iter->stream_heap);
98 iter->stream_heap = g_new(struct ptr_heap, 1);
99 ret = heap_init(iter->stream_heap, 0, stream_compare);
100 if (ret < 0)
101 goto error_heap_init;
102
103 for (i = 0; i < iter_pos->u.restore->stream_saved_pos->len;
104 i++) {
105 struct stream_saved_pos *saved_pos;
106 struct ctf_stream_pos *stream_pos;
107 struct packet_index *index;
108 struct ctf_stream *stream;
109
110 saved_pos = &g_array_index(
111 iter_pos->u.restore->stream_saved_pos,
112 struct stream_saved_pos, i);
113 stream = &saved_pos->file_stream->parent;
114 stream_pos = &saved_pos->file_stream->pos;
115 index = &g_array_index(stream_pos->packet_index,
116 struct packet_index,
117 saved_pos->cur_index);
118
119 stream_pos->cur_index = saved_pos->cur_index;
120 stream_pos->move_pos_slow(stream_pos, index->offset,
121 SEEK_SET);
122
123 /*
124 * the timestamp needs to be restored after
125 * move_pos_slow, because this function resets
126 * the timestamp to the beginning of the packet
127 */
128 stream->timestamp = saved_pos->current_timestamp;
129 stream_pos->offset = saved_pos->offset;
130 stream_pos->last_offset = saved_pos->offset;
131
132 stream->prev_timestamp = 0;
133 stream->prev_timestamp_end = 0;
134 stream->consumed = 0;
135
136 printf_debug("restored to cur_index = %zd and "
137 "offset = %zd, timestamp = %" PRIu64 "\n",
138 stream_pos->cur_index,
139 stream_pos->offset, stream->timestamp);
140
141 stream_read_event(saved_pos->file_stream);
142
143 /* Add to heap */
144 ret = heap_insert(iter->stream_heap,
145 saved_pos->file_stream);
146 if (ret)
147 goto error;
148 }
149 } else if (iter_pos->u.seek_time) {
150 /* not yet implemented */
151 return -1;
152 } else {
153 /* nowhere to seek to */
154 return -1;
155 }
156
157 return 0;
158 error:
159 heap_free(iter->stream_heap);
160 error_heap_init:
161 g_free(iter->stream_heap);
162 return -1;
163 }
164
165 struct bt_iter_pos *bt_iter_get_pos(struct bt_iter *iter)
166 {
167 struct bt_iter_pos *pos;
168 struct trace_collection *tc = iter->ctx->tc;
169 int i, stream_class_id, stream_id;
170
171 pos = g_new0(struct bt_iter_pos, 1);
172 pos->u.restore = g_new0(struct bt_saved_pos, 1);
173 pos->u.restore->tc = tc;
174 pos->u.restore->stream_saved_pos = g_array_new(FALSE, TRUE,
175 sizeof(struct stream_saved_pos));
176 if (!pos->u.restore->stream_saved_pos)
177 goto error;
178
179 for (i = 0; i < tc->array->len; i++) {
180 struct ctf_trace *tin;
181 struct trace_descriptor *td_read;
182
183 td_read = g_ptr_array_index(tc->array, i);
184 tin = container_of(td_read, struct ctf_trace, parent);
185
186 for (stream_class_id = 0; stream_class_id < tin->streams->len;
187 stream_class_id++) {
188 struct ctf_stream_class *stream_class;
189
190 stream_class = g_ptr_array_index(tin->streams,
191 stream_class_id);
192 for (stream_id = 0;
193 stream_id < stream_class->streams->len;
194 stream_id++) {
195 struct ctf_stream *stream;
196 struct ctf_file_stream *cfs;
197 struct stream_saved_pos saved_pos;
198
199 stream = g_ptr_array_index(
200 stream_class->streams,
201 stream_id);
202 cfs = container_of(stream,
203 struct ctf_file_stream,
204 parent);
205
206 saved_pos.file_stream = cfs;
207 saved_pos.cur_index = cfs->pos.cur_index;
208
209 /*
210 * It is possible that an event was read during
211 * the last restore, never consumed and its
212 * position saved again. For this case, we
213 * need to check if the event really was
214 * consumed by the caller otherwise it is lost.
215 */
216 if (stream->consumed)
217 saved_pos.offset = cfs->pos.offset;
218 else
219 saved_pos.offset = cfs->pos.last_offset;
220
221 saved_pos.current_timestamp = stream->timestamp;
222
223 g_array_append_val(
224 pos->u.restore->stream_saved_pos,
225 saved_pos);
226
227 printf_debug("stream : %lu, cur_index : %zd, "
228 "offset : %zd, "
229 "timestamp = %" PRIu64 "\n",
230 stream->stream_id, saved_pos.cur_index,
231 saved_pos.offset,
232 saved_pos.current_timestamp);
233 }
234 }
235 }
236
237 return pos;
238
239 error:
240 return NULL;
241 }
242
243 /*
244 * babeltrace_filestream_seek: seek a filestream to given position.
245 *
246 * The stream_id parameter is only useful for BT_SEEK_RESTORE.
247 */
248 static int babeltrace_filestream_seek(struct ctf_file_stream *file_stream,
249 const struct bt_iter_pos *begin_pos,
250 unsigned long stream_id)
251 {
252 int ret = 0;
253
254 switch (begin_pos->type) {
255 case BT_SEEK_CUR:
256 /*
257 * just insert into the heap we should already know
258 * the timestamps
259 */
260 break;
261 case BT_SEEK_BEGIN:
262 file_stream->pos.move_pos_slow(&file_stream->pos, 0, SEEK_SET);
263 ret = stream_read_event(file_stream);
264 break;
265 case BT_SEEK_TIME:
266 case BT_SEEK_RESTORE:
267 case BT_SEEK_END:
268 default:
269 assert(0); /* Not yet defined */
270 }
271
272 return ret;
273 }
274
275 /*
276 * bt_iter_seek: seek iterator to given position.
277 */
278 int bt_iter_seek(struct bt_iter *iter,
279 const struct bt_iter_pos *begin_pos)
280 {
281 int i, stream_id;
282 int ret = 0;
283 struct trace_collection *tc = iter->ctx->tc;
284
285 for (i = 0; i < tc->array->len; i++) {
286 struct ctf_trace *tin;
287 struct trace_descriptor *td_read;
288
289 td_read = g_ptr_array_index(tc->array, i);
290 tin = container_of(td_read, struct ctf_trace, parent);
291
292 /* Populate heap with each stream */
293 for (stream_id = 0; stream_id < tin->streams->len;
294 stream_id++) {
295 struct ctf_stream_class *stream;
296 int filenr;
297
298 stream = g_ptr_array_index(tin->streams, stream_id);
299 for (filenr = 0; filenr < stream->streams->len;
300 filenr++) {
301 struct ctf_file_stream *file_stream;
302
303 file_stream = g_ptr_array_index(stream->streams,
304 filenr);
305 ret = babeltrace_filestream_seek(file_stream, begin_pos,
306 stream_id);
307 if (ret < 0)
308 goto end;
309 }
310 }
311 }
312 end:
313 return ret;
314 }
315
316 struct bt_iter *bt_iter_create(struct bt_context *ctx,
317 struct bt_iter_pos *begin_pos,
318 struct bt_iter_pos *end_pos)
319 {
320 int i, stream_id;
321 int ret = 0;
322 struct bt_iter *iter;
323
324 iter = g_new0(struct bt_iter, 1);
325 iter->stream_heap = g_new(struct ptr_heap, 1);
326 iter->end_pos = end_pos;
327 iter->callbacks = g_array_new(0, 1, sizeof(struct bt_stream_callbacks));
328 iter->recalculate_dep_graph = 0;
329 iter->main_callbacks.callback = NULL;
330 iter->dep_gc = g_ptr_array_new();
331 bt_context_get(ctx);
332 iter->ctx = ctx;
333
334 ret = heap_init(iter->stream_heap, 0, stream_compare);
335 if (ret < 0)
336 goto error_heap_init;
337
338 for (i = 0; i < ctx->tc->array->len; i++) {
339 struct ctf_trace *tin;
340 struct trace_descriptor *td_read;
341
342 td_read = g_ptr_array_index(ctx->tc->array, i);
343 tin = container_of(td_read, struct ctf_trace, parent);
344
345 /* Populate heap with each stream */
346 for (stream_id = 0; stream_id < tin->streams->len;
347 stream_id++) {
348 struct ctf_stream_class *stream;
349 int filenr;
350
351 stream = g_ptr_array_index(tin->streams, stream_id);
352 if (!stream)
353 continue;
354 for (filenr = 0; filenr < stream->streams->len;
355 filenr++) {
356 struct ctf_file_stream *file_stream;
357
358 file_stream = g_ptr_array_index(stream->streams,
359 filenr);
360
361 if (begin_pos) {
362 ret = babeltrace_filestream_seek(file_stream, begin_pos,
363 stream_id);
364 if (ret == EOF) {
365 ret = 0;
366 continue;
367 } else if (ret) {
368 goto error;
369 }
370 }
371 /* Add to heap */
372 ret = heap_insert(iter->stream_heap, file_stream);
373 if (ret)
374 goto error;
375 }
376 }
377 }
378
379 return iter;
380
381 error:
382 heap_free(iter->stream_heap);
383 error_heap_init:
384 g_free(iter->stream_heap);
385 g_free(iter);
386 return NULL;
387 }
388
389 void bt_iter_destroy(struct bt_iter *iter)
390 {
391 struct bt_stream_callbacks *bt_stream_cb;
392 struct bt_callback_chain *bt_chain;
393 int i, j;
394
395 heap_free(iter->stream_heap);
396 g_free(iter->stream_heap);
397
398 /* free all events callbacks */
399 if (iter->main_callbacks.callback)
400 g_array_free(iter->main_callbacks.callback, TRUE);
401
402 /* free per-event callbacks */
403 for (i = 0; i < iter->callbacks->len; i++) {
404 bt_stream_cb = &g_array_index(iter->callbacks,
405 struct bt_stream_callbacks, i);
406 if (!bt_stream_cb || !bt_stream_cb->per_id_callbacks)
407 continue;
408 for (j = 0; j < bt_stream_cb->per_id_callbacks->len; j++) {
409 bt_chain = &g_array_index(bt_stream_cb->per_id_callbacks,
410 struct bt_callback_chain, j);
411 if (bt_chain->callback) {
412 g_array_free(bt_chain->callback, TRUE);
413 }
414 }
415 g_array_free(bt_stream_cb->per_id_callbacks, TRUE);
416 }
417
418 bt_context_put(iter->ctx);
419
420 g_free(iter);
421 }
422
423 int bt_iter_next(struct bt_iter *iter)
424 {
425 struct ctf_file_stream *file_stream, *removed;
426 int ret;
427
428 file_stream = heap_maximum(iter->stream_heap);
429 if (!file_stream) {
430 /* end of file for all streams */
431 ret = 0;
432 goto end;
433 }
434
435 ret = stream_read_event(file_stream);
436 if (ret == EOF) {
437 removed = heap_remove(iter->stream_heap);
438 assert(removed == file_stream);
439 ret = 0;
440 goto end;
441 } else if (ret) {
442 goto end;
443 }
444 /* Reinsert the file stream into the heap, and rebalance. */
445 removed = heap_replace_max(iter->stream_heap, file_stream);
446 assert(removed == file_stream);
447
448 end:
449 return ret;
450 }
451
452 int bt_iter_read_event(struct bt_iter *iter,
453 struct ctf_stream **stream,
454 struct ctf_stream_event **event)
455 {
456 struct ctf_file_stream *file_stream;
457 int ret = 0;
458
459 file_stream = heap_maximum(iter->stream_heap);
460 if (!file_stream) {
461 /* end of file for all streams */
462 ret = EOF;
463 goto end;
464 }
465 *stream = &file_stream->parent;
466 *event = g_ptr_array_index((*stream)->events_by_id, (*stream)->event_id);
467
468 if ((*stream)->stream_id > iter->callbacks->len)
469 goto end;
470
471 process_callbacks(iter, *stream);
472
473 end:
474 return ret;
475 }
This page took 0.059789 seconds and 5 git commands to generate.