API : split iterator headers from babeltrace.h
[babeltrace.git] / lib / iterator.c
CommitLineData
6f3077a2
JD
1/*
2 * iterator.c
3 *
4 * Babeltrace Library
5 *
6 * Copyright 2010-2011 EfficiOS Inc. and Linux Foundation
7 *
8 * Author: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
9 *
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
16 *
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
19 */
20
21#include <stdlib.h>
22#include <babeltrace/babeltrace.h>
23#include <babeltrace/callbacks-internal.h>
95d36295 24#include <babeltrace/context.h>
6f3077a2
JD
25#include <babeltrace/ctf/metadata.h>
26#include <babeltrace/iterator-internal.h>
6204d33c 27#include <babeltrace/iterator.h>
6f3077a2
JD
28#include <babeltrace/prio_heap.h>
29
30struct stream_saved_pos {
31 /*
32 * Use file_stream pointer to check if the trace collection we
33 * restore to match the one we saved from, for each stream.
34 */
35 struct ctf_file_stream *file_stream;
36 size_t cur_index; /* current index in packet index */
37 ssize_t offset; /* offset from base, in bits. EOF for end of file. */
38};
39
40struct babeltrace_saved_pos {
41 struct trace_collection *tc;
42 GArray *stream_saved_pos; /* Contains struct stream_saved_pos */
43};
44
45static int stream_read_event(struct ctf_file_stream *sin)
46{
47 int ret;
48
49 ret = sin->pos.parent.event_cb(&sin->pos.parent, &sin->parent);
50 if (ret == EOF)
51 return EOF;
52 else if (ret) {
53 fprintf(stdout, "[error] Reading event failed.\n");
54 return ret;
55 }
56 return 0;
57}
58
59/*
60 * returns true if a < b, false otherwise.
61 */
62int stream_compare(void *a, void *b)
63{
64 struct ctf_file_stream *s_a = a, *s_b = b;
65
66 if (s_a->parent.timestamp < s_b->parent.timestamp)
67 return 1;
68 else
69 return 0;
70}
71
72/*
73 * babeltrace_filestream_seek: seek a filestream to given position.
74 *
75 * The stream_id parameter is only useful for BT_SEEK_RESTORE.
76 */
77static int babeltrace_filestream_seek(struct ctf_file_stream *file_stream,
78 const struct trace_collection_pos *begin_pos,
79 unsigned long stream_id)
80{
81 int ret = 0;
82
83 switch (begin_pos->type) {
84 case BT_SEEK_CUR:
85 /*
86 * just insert into the heap we should already know
87 * the timestamps
88 */
89 break;
90 case BT_SEEK_BEGIN:
91 file_stream->pos.move_pos_slow(&file_stream->pos, 0, SEEK_SET);
92 ret = stream_read_event(file_stream);
93 break;
94 case BT_SEEK_TIME:
95 case BT_SEEK_RESTORE:
96 case BT_SEEK_END:
97 default:
98 assert(0); /* Not yet defined */
99 }
100
101 return ret;
102}
103
104/*
105 * babeltrace_iter_seek: seek iterator to given position.
106 */
107int babeltrace_iter_seek(struct babeltrace_iter *iter,
108 const struct trace_collection_pos *begin_pos)
109{
110 int i, stream_id;
111 int ret = 0;
95d36295 112 struct trace_collection *tc = iter->ctx->tc;
6f3077a2
JD
113
114 for (i = 0; i < tc->array->len; i++) {
115 struct ctf_trace *tin;
116 struct trace_descriptor *td_read;
117
118 td_read = g_ptr_array_index(tc->array, i);
119 tin = container_of(td_read, struct ctf_trace, parent);
120
121 /* Populate heap with each stream */
122 for (stream_id = 0; stream_id < tin->streams->len;
123 stream_id++) {
124 struct ctf_stream_class *stream;
125 int filenr;
126
127 stream = g_ptr_array_index(tin->streams, stream_id);
128 for (filenr = 0; filenr < stream->streams->len;
129 filenr++) {
130 struct ctf_file_stream *file_stream;
131
132 file_stream = g_ptr_array_index(stream->streams,
133 filenr);
134 ret = babeltrace_filestream_seek(file_stream, begin_pos,
135 stream_id);
136 if (ret < 0)
137 goto end;
138 }
139 }
140 }
141end:
142 return ret;
143}
144
95d36295 145struct babeltrace_iter *babeltrace_iter_create(struct bt_context *ctx,
6f3077a2
JD
146 struct trace_collection_pos *begin_pos,
147 struct trace_collection_pos *end_pos)
148{
149 int i, stream_id;
150 int ret = 0;
151 struct babeltrace_iter *iter;
152
153 iter = malloc(sizeof(struct babeltrace_iter));
154 if (!iter)
155 goto error_malloc;
156 iter->stream_heap = g_new(struct ptr_heap, 1);
6f3077a2
JD
157 iter->end_pos = end_pos;
158 iter->callbacks = g_array_new(0, 1, sizeof(struct bt_stream_callbacks));
159 iter->recalculate_dep_graph = 0;
160 iter->main_callbacks.callback = NULL;
161 iter->dep_gc = g_ptr_array_new();
95d36295
JD
162 if (bt_context_get(ctx) != 0)
163 goto error_ctx;
164 iter->ctx = ctx;
6f3077a2
JD
165
166 ret = heap_init(iter->stream_heap, 0, stream_compare);
167 if (ret < 0)
168 goto error_heap_init;
169
95d36295 170 for (i = 0; i < ctx->tc->array->len; i++) {
6f3077a2
JD
171 struct ctf_trace *tin;
172 struct trace_descriptor *td_read;
173
95d36295 174 td_read = g_ptr_array_index(ctx->tc->array, i);
6f3077a2
JD
175 tin = container_of(td_read, struct ctf_trace, parent);
176
177 /* Populate heap with each stream */
178 for (stream_id = 0; stream_id < tin->streams->len;
179 stream_id++) {
180 struct ctf_stream_class *stream;
181 int filenr;
182
183 stream = g_ptr_array_index(tin->streams, stream_id);
184 if (!stream)
185 continue;
186 for (filenr = 0; filenr < stream->streams->len;
187 filenr++) {
188 struct ctf_file_stream *file_stream;
189
190 file_stream = g_ptr_array_index(stream->streams,
191 filenr);
192
193 if (begin_pos) {
194 ret = babeltrace_filestream_seek(file_stream, begin_pos,
195 stream_id);
196 if (ret == EOF) {
197 ret = 0;
198 continue;
199 } else if (ret) {
200 goto error;
201 }
202 }
203 /* Add to heap */
204 ret = heap_insert(iter->stream_heap, file_stream);
205 if (ret)
206 goto error;
207 }
208 }
209 }
210
211 return iter;
212
213error:
214 heap_free(iter->stream_heap);
215error_heap_init:
216 g_free(iter->stream_heap);
95d36295 217error_ctx:
6f3077a2
JD
218 free(iter);
219error_malloc:
220 return NULL;
221}
222
223void babeltrace_iter_destroy(struct babeltrace_iter *iter)
224{
225 struct bt_stream_callbacks *bt_stream_cb;
226 struct bt_callback_chain *bt_chain;
227 int i, j;
228
229 heap_free(iter->stream_heap);
230 g_free(iter->stream_heap);
231
232 /* free all events callbacks */
233 if (iter->main_callbacks.callback)
234 g_array_free(iter->main_callbacks.callback, TRUE);
235
236 /* free per-event callbacks */
237 for (i = 0; i < iter->callbacks->len; i++) {
238 bt_stream_cb = &g_array_index(iter->callbacks,
239 struct bt_stream_callbacks, i);
240 if (!bt_stream_cb || !bt_stream_cb->per_id_callbacks)
241 continue;
242 for (j = 0; j < bt_stream_cb->per_id_callbacks->len; j++) {
243 bt_chain = &g_array_index(bt_stream_cb->per_id_callbacks,
244 struct bt_callback_chain, j);
245 if (bt_chain->callback) {
246 g_array_free(bt_chain->callback, TRUE);
247 }
248 }
249 g_array_free(bt_stream_cb->per_id_callbacks, TRUE);
250 }
251
95d36295
JD
252 bt_context_put(iter->ctx);
253
6f3077a2
JD
254 free(iter);
255}
256
257int babeltrace_iter_next(struct babeltrace_iter *iter)
258{
259 struct ctf_file_stream *file_stream, *removed;
260 int ret;
261
262 file_stream = heap_maximum(iter->stream_heap);
263 if (!file_stream) {
264 /* end of file for all streams */
265 ret = 0;
266 goto end;
267 }
268
269 ret = stream_read_event(file_stream);
270 if (ret == EOF) {
271 removed = heap_remove(iter->stream_heap);
272 assert(removed == file_stream);
273 ret = 0;
274 goto end;
275 } else if (ret) {
276 goto end;
277 }
278 /* Reinsert the file stream into the heap, and rebalance. */
279 removed = heap_replace_max(iter->stream_heap, file_stream);
280 assert(removed == file_stream);
281
282end:
283 return ret;
284}
285
286int babeltrace_iter_read_event(struct babeltrace_iter *iter,
287 struct ctf_stream **stream,
288 struct ctf_stream_event **event)
289{
290 struct ctf_file_stream *file_stream;
291 int ret = 0;
292
293 file_stream = heap_maximum(iter->stream_heap);
294 if (!file_stream) {
295 /* end of file for all streams */
296 ret = EOF;
297 goto end;
298 }
299 *stream = &file_stream->parent;
300 *event = g_ptr_array_index((*stream)->events_by_id, (*stream)->event_id);
301
302 if ((*stream)->stream_id > iter->callbacks->len)
303 goto end;
304
305 process_callbacks(iter, *stream);
306
307end:
308 return ret;
309}
This page took 0.034536 seconds and 4 git commands to generate.