Fix parallel build for lib/
[babeltrace.git] / lib / iterator.c
CommitLineData
6f3077a2
JD
1/*
2 * iterator.c
3 *
4 * Babeltrace Library
5 *
6 * Copyright 2010-2011 EfficiOS Inc. and Linux Foundation
7 *
8 * Author: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
9 *
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
16 *
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
19 */
20
21#include <stdlib.h>
22#include <babeltrace/babeltrace.h>
23#include <babeltrace/callbacks-internal.h>
24#include <babeltrace/ctf/metadata.h>
25#include <babeltrace/iterator-internal.h>
26#include <babeltrace/prio_heap.h>
27
28struct stream_saved_pos {
29 /*
30 * Use file_stream pointer to check if the trace collection we
31 * restore to match the one we saved from, for each stream.
32 */
33 struct ctf_file_stream *file_stream;
34 size_t cur_index; /* current index in packet index */
35 ssize_t offset; /* offset from base, in bits. EOF for end of file. */
36};
37
38struct babeltrace_saved_pos {
39 struct trace_collection *tc;
40 GArray *stream_saved_pos; /* Contains struct stream_saved_pos */
41};
42
43static int stream_read_event(struct ctf_file_stream *sin)
44{
45 int ret;
46
47 ret = sin->pos.parent.event_cb(&sin->pos.parent, &sin->parent);
48 if (ret == EOF)
49 return EOF;
50 else if (ret) {
51 fprintf(stdout, "[error] Reading event failed.\n");
52 return ret;
53 }
54 return 0;
55}
56
57/*
58 * returns true if a < b, false otherwise.
59 */
60int stream_compare(void *a, void *b)
61{
62 struct ctf_file_stream *s_a = a, *s_b = b;
63
64 if (s_a->parent.timestamp < s_b->parent.timestamp)
65 return 1;
66 else
67 return 0;
68}
69
70/*
71 * babeltrace_filestream_seek: seek a filestream to given position.
72 *
73 * The stream_id parameter is only useful for BT_SEEK_RESTORE.
74 */
75static int babeltrace_filestream_seek(struct ctf_file_stream *file_stream,
76 const struct trace_collection_pos *begin_pos,
77 unsigned long stream_id)
78{
79 int ret = 0;
80
81 switch (begin_pos->type) {
82 case BT_SEEK_CUR:
83 /*
84 * just insert into the heap we should already know
85 * the timestamps
86 */
87 break;
88 case BT_SEEK_BEGIN:
89 file_stream->pos.move_pos_slow(&file_stream->pos, 0, SEEK_SET);
90 ret = stream_read_event(file_stream);
91 break;
92 case BT_SEEK_TIME:
93 case BT_SEEK_RESTORE:
94 case BT_SEEK_END:
95 default:
96 assert(0); /* Not yet defined */
97 }
98
99 return ret;
100}
101
102/*
103 * babeltrace_iter_seek: seek iterator to given position.
104 */
105int babeltrace_iter_seek(struct babeltrace_iter *iter,
106 const struct trace_collection_pos *begin_pos)
107{
108 int i, stream_id;
109 int ret = 0;
110 struct trace_collection *tc = iter->tc;
111
112 for (i = 0; i < tc->array->len; i++) {
113 struct ctf_trace *tin;
114 struct trace_descriptor *td_read;
115
116 td_read = g_ptr_array_index(tc->array, i);
117 tin = container_of(td_read, struct ctf_trace, parent);
118
119 /* Populate heap with each stream */
120 for (stream_id = 0; stream_id < tin->streams->len;
121 stream_id++) {
122 struct ctf_stream_class *stream;
123 int filenr;
124
125 stream = g_ptr_array_index(tin->streams, stream_id);
126 for (filenr = 0; filenr < stream->streams->len;
127 filenr++) {
128 struct ctf_file_stream *file_stream;
129
130 file_stream = g_ptr_array_index(stream->streams,
131 filenr);
132 ret = babeltrace_filestream_seek(file_stream, begin_pos,
133 stream_id);
134 if (ret < 0)
135 goto end;
136 }
137 }
138 }
139end:
140 return ret;
141}
142
143struct babeltrace_iter *babeltrace_iter_create(struct trace_collection *tc,
144 struct trace_collection_pos *begin_pos,
145 struct trace_collection_pos *end_pos)
146{
147 int i, stream_id;
148 int ret = 0;
149 struct babeltrace_iter *iter;
150
151 iter = malloc(sizeof(struct babeltrace_iter));
152 if (!iter)
153 goto error_malloc;
154 iter->stream_heap = g_new(struct ptr_heap, 1);
155 iter->tc = tc;
156 iter->end_pos = end_pos;
157 iter->callbacks = g_array_new(0, 1, sizeof(struct bt_stream_callbacks));
158 iter->recalculate_dep_graph = 0;
159 iter->main_callbacks.callback = NULL;
160 iter->dep_gc = g_ptr_array_new();
161
162 ret = heap_init(iter->stream_heap, 0, stream_compare);
163 if (ret < 0)
164 goto error_heap_init;
165
166 for (i = 0; i < tc->array->len; i++) {
167 struct ctf_trace *tin;
168 struct trace_descriptor *td_read;
169
170 td_read = g_ptr_array_index(tc->array, i);
171 tin = container_of(td_read, struct ctf_trace, parent);
172
173 /* Populate heap with each stream */
174 for (stream_id = 0; stream_id < tin->streams->len;
175 stream_id++) {
176 struct ctf_stream_class *stream;
177 int filenr;
178
179 stream = g_ptr_array_index(tin->streams, stream_id);
180 if (!stream)
181 continue;
182 for (filenr = 0; filenr < stream->streams->len;
183 filenr++) {
184 struct ctf_file_stream *file_stream;
185
186 file_stream = g_ptr_array_index(stream->streams,
187 filenr);
188
189 if (begin_pos) {
190 ret = babeltrace_filestream_seek(file_stream, begin_pos,
191 stream_id);
192 if (ret == EOF) {
193 ret = 0;
194 continue;
195 } else if (ret) {
196 goto error;
197 }
198 }
199 /* Add to heap */
200 ret = heap_insert(iter->stream_heap, file_stream);
201 if (ret)
202 goto error;
203 }
204 }
205 }
206
207 return iter;
208
209error:
210 heap_free(iter->stream_heap);
211error_heap_init:
212 g_free(iter->stream_heap);
213 free(iter);
214error_malloc:
215 return NULL;
216}
217
218void babeltrace_iter_destroy(struct babeltrace_iter *iter)
219{
220 struct bt_stream_callbacks *bt_stream_cb;
221 struct bt_callback_chain *bt_chain;
222 int i, j;
223
224 heap_free(iter->stream_heap);
225 g_free(iter->stream_heap);
226
227 /* free all events callbacks */
228 if (iter->main_callbacks.callback)
229 g_array_free(iter->main_callbacks.callback, TRUE);
230
231 /* free per-event callbacks */
232 for (i = 0; i < iter->callbacks->len; i++) {
233 bt_stream_cb = &g_array_index(iter->callbacks,
234 struct bt_stream_callbacks, i);
235 if (!bt_stream_cb || !bt_stream_cb->per_id_callbacks)
236 continue;
237 for (j = 0; j < bt_stream_cb->per_id_callbacks->len; j++) {
238 bt_chain = &g_array_index(bt_stream_cb->per_id_callbacks,
239 struct bt_callback_chain, j);
240 if (bt_chain->callback) {
241 g_array_free(bt_chain->callback, TRUE);
242 }
243 }
244 g_array_free(bt_stream_cb->per_id_callbacks, TRUE);
245 }
246
247 free(iter);
248}
249
250int babeltrace_iter_next(struct babeltrace_iter *iter)
251{
252 struct ctf_file_stream *file_stream, *removed;
253 int ret;
254
255 file_stream = heap_maximum(iter->stream_heap);
256 if (!file_stream) {
257 /* end of file for all streams */
258 ret = 0;
259 goto end;
260 }
261
262 ret = stream_read_event(file_stream);
263 if (ret == EOF) {
264 removed = heap_remove(iter->stream_heap);
265 assert(removed == file_stream);
266 ret = 0;
267 goto end;
268 } else if (ret) {
269 goto end;
270 }
271 /* Reinsert the file stream into the heap, and rebalance. */
272 removed = heap_replace_max(iter->stream_heap, file_stream);
273 assert(removed == file_stream);
274
275end:
276 return ret;
277}
278
279int babeltrace_iter_read_event(struct babeltrace_iter *iter,
280 struct ctf_stream **stream,
281 struct ctf_stream_event **event)
282{
283 struct ctf_file_stream *file_stream;
284 int ret = 0;
285
286 file_stream = heap_maximum(iter->stream_heap);
287 if (!file_stream) {
288 /* end of file for all streams */
289 ret = EOF;
290 goto end;
291 }
292 *stream = &file_stream->parent;
293 *event = g_ptr_array_index((*stream)->events_by_id, (*stream)->event_id);
294
295 if ((*stream)->stream_id > iter->callbacks->len)
296 goto end;
297
298 process_callbacks(iter, *stream);
299
300end:
301 return ret;
302}
This page took 0.049336 seconds and 4 git commands to generate.