Fix type cast warning
[babeltrace.git] / converter / babeltrace-lib.c
1 /*
2 * babeltrace-lib.c
3 *
4 * Babeltrace Trace Converter Library
5 *
6 * Copyright 2010-2011 EfficiOS Inc. and Linux Foundation
7 *
8 * Author: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
9 *
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
16 *
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
19 */
20
21 #include <stdlib.h>
22 #include <errno.h>
23 #include <stdio.h>
24 #include <inttypes.h>
25 #include <babeltrace/babeltrace-internal.h>
26 #include <babeltrace/format.h>
27 #include <babeltrace/ctf/types.h>
28 #include <babeltrace/ctf/metadata.h>
29 #include <babeltrace/ctf-text/types.h>
30 #include <babeltrace/prio_heap.h>
31 #include <babeltrace/babeltrace.h>
32 #include <babeltrace/types.h>
33 #include <babeltrace/ctf/types.h>
34 #include <babeltrace/ctf-ir/metadata.h>
35 #include <stdarg.h>
36
37 struct stream_saved_pos {
38 /*
39 * Use file_stream pointer to check if the trace collection we
40 * restore to match the one we saved from, for each stream.
41 */
42 struct ctf_file_stream *file_stream;
43 size_t cur_index; /* current index in packet index */
44 ssize_t offset; /* offset from base, in bits. EOF for end of file. */
45 };
46
47 struct babeltrace_saved_pos {
48 struct trace_collection *tc;
49 GArray *stream_saved_pos; /* Contains struct stream_saved_pos */
50 };
51
52 struct bt_callback {
53 int prio; /* Callback order priority. Lower first. Dynamically assigned from dependency graph. */
54 void *private_data;
55 int flags;
56 struct bt_dependencies *depends;
57 struct bt_dependencies *weak_depends;
58 struct bt_dependencies *provides;
59 enum bt_cb_ret (*callback)(void *private_data, void *caller_data);
60 };
61
62 struct bt_callback_chain {
63 GArray *callback; /* Array of struct bt_callback, ordered by priority */
64 };
65
66 /*
67 * per id callbacks need to be per stream class because event ID vs
68 * event name mapping can vary from stream to stream.
69 */
70 struct bt_stream_callbacks {
71 GArray *per_id_callbacks; /* Array of struct bt_callback_chain */
72 };
73
74 /*
75 * struct babeltrace_iter: data structure representing an iterator on a trace
76 * collection.
77 */
78 struct babeltrace_iter {
79 struct ptr_heap *stream_heap;
80 struct trace_collection *tc;
81 struct trace_collection_pos *end_pos;
82 GArray *callbacks; /* Array of struct bt_stream_callbacks */
83 struct bt_callback_chain main_callbacks; /* For all events */
84 /*
85 * Flag indicating if dependency graph needs to be recalculated.
86 * Set by babeltrace_iter_add_callback(), and checked (and
87 * cleared) by upon entry into babeltrace_iter_read_event().
88 * babeltrace_iter_read_event() is responsible for calling dep
89 * graph calculation if it sees this flag set.
90 */
91 int recalculate_dep_graph;
92 /*
93 * Array of pointers to struct bt_dependencies, for garbage
94 * collection. We're not using a linked list here because each
95 * struct bt_dependencies can belong to more than one
96 * babeltrace_iter.
97 */
98 GPtrArray *dep_gc;
99 };
100
101 struct bt_dependencies {
102 GArray *deps; /* Array of GQuarks */
103 int refcount; /* free when decremented to 0 */
104 };
105
106 static
107 struct bt_dependencies *_babeltrace_dependencies_create(const char *first,
108 va_list ap)
109 {
110 const char *iter;
111 struct bt_dependencies *dep;
112
113 dep = g_new0(struct bt_dependencies, 1);
114 dep->refcount = 1;
115 dep->deps = g_array_new(FALSE, TRUE, sizeof(GQuark));
116 iter = first;
117 while (iter) {
118 GQuark q = g_quark_from_string(iter);
119 g_array_append_val(dep->deps, q);
120 iter = va_arg(ap, const char *);
121 }
122 return dep;
123 }
124
125 struct bt_dependencies *babeltrace_dependencies_create(const char *first, ...)
126 {
127 va_list ap;
128 struct bt_dependencies *deps;
129
130 va_start(ap, first);
131 deps = _babeltrace_dependencies_create(first, ap);
132 va_end(ap);
133 return deps;
134 }
135
136 /*
137 * babeltrace_iter_add_callback: Add a callback to iterator.
138 */
139 int babeltrace_iter_add_callback(struct babeltrace_iter *iter,
140 bt_event_name event, void *private_data, int flags,
141 enum bt_cb_ret (*callback)(void *private_data, void *caller_data),
142 struct bt_dependencies *depends,
143 struct bt_dependencies *weak_depends,
144 struct bt_dependencies *provides)
145 {
146 int i, stream_id;
147 gpointer *event_id_ptr;
148 unsigned long event_id;
149 struct trace_collection *tc = iter->tc;
150
151 for (i = 0; i < tc->array->len; i++) {
152 struct ctf_trace *tin;
153 struct trace_descriptor *td_read;
154
155 td_read = g_ptr_array_index(tc->array, i);
156 tin = container_of(td_read, struct ctf_trace, parent);
157
158 for (stream_id = 0; stream_id < tin->streams->len; stream_id++) {
159 struct ctf_stream_class *stream;
160 struct bt_stream_callbacks *bt_stream_cb = NULL;
161 struct bt_callback_chain *bt_chain = NULL;
162 struct bt_callback new_callback;
163
164 stream = g_ptr_array_index(tin->streams, stream_id);
165
166 /* find or create the bt_stream_callbacks for this stream */
167 if (iter->callbacks->len >= stream_id) {
168 bt_stream_cb = &g_array_index(iter->callbacks,
169 struct bt_stream_callbacks, stream->stream_id);
170 } else {
171 g_array_set_size(iter->callbacks, stream->stream_id);
172 }
173 if (!bt_stream_cb || !bt_stream_cb->per_id_callbacks) {
174 struct bt_stream_callbacks new_stream_cb;
175 new_stream_cb.per_id_callbacks = g_array_new(1, 1,
176 sizeof(struct bt_callback_chain));
177 g_array_insert_val(iter->callbacks, stream->stream_id, new_stream_cb);
178 bt_stream_cb = &g_array_index(iter->callbacks,
179 struct bt_stream_callbacks, stream->stream_id);
180 }
181
182 if (event) {
183 /* find the event id */
184 event_id_ptr = g_hash_table_lookup(stream->event_quark_to_id,
185 (gconstpointer) (unsigned long) event);
186 /* event not found in this stream class */
187 if (!event_id_ptr) {
188 printf("event not found\n");
189 continue;
190 }
191 event_id = (uint64_t)(unsigned long)*event_id_ptr;
192
193 /* find or create the bt_callback_chain for this event */
194 if (bt_stream_cb->per_id_callbacks->len >= event_id) {
195 bt_chain = &g_array_index(bt_stream_cb->per_id_callbacks,
196 struct bt_callback_chain, event_id);
197 } else {
198 g_array_set_size(bt_stream_cb->per_id_callbacks, event_id);
199 }
200 if (!bt_chain || !bt_chain->callback) {
201 struct bt_callback_chain new_chain;
202 new_chain.callback = g_array_new(1, 1, sizeof(struct bt_callback));
203 g_array_insert_val(bt_stream_cb->per_id_callbacks, event_id,
204 new_chain);
205 bt_chain = &g_array_index(bt_stream_cb->per_id_callbacks,
206 struct bt_callback_chain, event_id);
207 }
208 } else {
209 /* callback for all events */
210 if (!iter->main_callbacks.callback) {
211 iter->main_callbacks.callback = g_array_new(1, 1,
212 sizeof(struct bt_callback));
213 }
214 bt_chain = &iter->main_callbacks;
215 }
216
217 new_callback.private_data = private_data;
218 new_callback.flags = flags;
219 new_callback.callback = callback;
220 new_callback.depends = depends;
221 new_callback.weak_depends = weak_depends;
222 new_callback.provides = provides;
223
224 /* TODO : take care of priority, for now just FIFO */
225 g_array_append_val(bt_chain->callback, new_callback);
226 }
227 }
228
229 return 0;
230 }
231
232 static int stream_read_event(struct ctf_file_stream *sin)
233 {
234 int ret;
235
236 ret = sin->pos.parent.event_cb(&sin->pos.parent, &sin->parent);
237 if (ret == EOF)
238 return EOF;
239 else if (ret) {
240 fprintf(stdout, "[error] Reading event failed.\n");
241 return ret;
242 }
243 return 0;
244 }
245
246 /*
247 * returns true if a < b, false otherwise.
248 */
249 int stream_compare(void *a, void *b)
250 {
251 struct ctf_file_stream *s_a = a, *s_b = b;
252
253 if (s_a->parent.timestamp < s_b->parent.timestamp)
254 return 1;
255 else
256 return 0;
257 }
258
259 /*
260 * babeltrace_filestream_seek: seek a filestream to given position.
261 *
262 * The stream_id parameter is only useful for BT_SEEK_RESTORE.
263 */
264 static int babeltrace_filestream_seek(struct ctf_file_stream *file_stream,
265 const struct trace_collection_pos *begin_pos,
266 unsigned long stream_id)
267 {
268 int ret = 0;
269
270 switch (begin_pos->type) {
271 case BT_SEEK_CUR:
272 /*
273 * just insert into the heap we should already know
274 * the timestamps
275 */
276 break;
277 case BT_SEEK_BEGIN:
278 ctf_move_pos_slow(&file_stream->pos, 0, SEEK_SET);
279 ret = stream_read_event(file_stream);
280 break;
281 case BT_SEEK_TIME:
282 case BT_SEEK_RESTORE:
283 case BT_SEEK_END:
284 default:
285 assert(0); /* Not yet defined */
286 }
287
288 return ret;
289 }
290
291 /*
292 * babeltrace_iter_seek: seek iterator to given position.
293 */
294 int babeltrace_iter_seek(struct babeltrace_iter *iter,
295 const struct trace_collection_pos *begin_pos)
296 {
297 int i, stream_id;
298 int ret = 0;
299 struct trace_collection *tc = iter->tc;
300
301 for (i = 0; i < tc->array->len; i++) {
302 struct ctf_trace *tin;
303 struct trace_descriptor *td_read;
304
305 td_read = g_ptr_array_index(tc->array, i);
306 tin = container_of(td_read, struct ctf_trace, parent);
307
308 /* Populate heap with each stream */
309 for (stream_id = 0; stream_id < tin->streams->len;
310 stream_id++) {
311 struct ctf_stream_class *stream;
312 int filenr;
313
314 stream = g_ptr_array_index(tin->streams, stream_id);
315 for (filenr = 0; filenr < stream->streams->len;
316 filenr++) {
317 struct ctf_file_stream *file_stream;
318
319 file_stream = g_ptr_array_index(stream->streams,
320 filenr);
321 ret = babeltrace_filestream_seek(file_stream, begin_pos,
322 stream_id);
323 if (ret < 0)
324 goto end;
325 }
326 }
327 }
328 end:
329 return ret;
330 }
331
332 struct babeltrace_iter *babeltrace_iter_create(struct trace_collection *tc,
333 struct trace_collection_pos *begin_pos,
334 struct trace_collection_pos *end_pos)
335 {
336 int i, stream_id;
337 int ret = 0;
338 struct babeltrace_iter *iter;
339
340 iter = malloc(sizeof(struct babeltrace_iter));
341 if (!iter)
342 goto error_malloc;
343 iter->stream_heap = g_new(struct ptr_heap, 1);
344 iter->tc = tc;
345 iter->end_pos = end_pos;
346 iter->callbacks = g_array_new(0, 1, sizeof(struct bt_stream_callbacks));
347 iter->recalculate_dep_graph = 0;
348 iter->main_callbacks.callback = NULL;
349 iter->dep_gc = g_ptr_array_new();
350
351 ret = heap_init(iter->stream_heap, 0, stream_compare);
352 if (ret < 0)
353 goto error_heap_init;
354
355 for (i = 0; i < tc->array->len; i++) {
356 struct ctf_trace *tin;
357 struct trace_descriptor *td_read;
358
359 td_read = g_ptr_array_index(tc->array, i);
360 tin = container_of(td_read, struct ctf_trace, parent);
361
362 /* Populate heap with each stream */
363 for (stream_id = 0; stream_id < tin->streams->len;
364 stream_id++) {
365 struct ctf_stream_class *stream;
366 int filenr;
367
368 stream = g_ptr_array_index(tin->streams, stream_id);
369 if (!stream)
370 continue;
371 for (filenr = 0; filenr < stream->streams->len;
372 filenr++) {
373 struct ctf_file_stream *file_stream;
374
375 file_stream = g_ptr_array_index(stream->streams,
376 filenr);
377
378 if (begin_pos) {
379 ret = babeltrace_filestream_seek(file_stream, begin_pos,
380 stream_id);
381 if (ret == EOF) {
382 ret = 0;
383 continue;
384 } else if (ret) {
385 goto error;
386 }
387 }
388 /* Add to heap */
389 ret = heap_insert(iter->stream_heap, file_stream);
390 if (ret)
391 goto error;
392 }
393 }
394 }
395
396 return iter;
397
398 error:
399 heap_free(iter->stream_heap);
400 error_heap_init:
401 g_free(iter->stream_heap);
402 free(iter);
403 error_malloc:
404 return NULL;
405 }
406
407 void babeltrace_iter_destroy(struct babeltrace_iter *iter)
408 {
409 struct bt_stream_callbacks *bt_stream_cb;
410 struct bt_callback_chain *bt_chain;
411 int i, j;
412
413 heap_free(iter->stream_heap);
414 g_free(iter->stream_heap);
415
416 /* free all events callbacks */
417 if (iter->main_callbacks.callback)
418 g_array_free(iter->main_callbacks.callback, TRUE);
419
420 /* free per-event callbacks */
421 for (i = 0; i < iter->callbacks->len; i++) {
422 bt_stream_cb = &g_array_index(iter->callbacks,
423 struct bt_stream_callbacks, i);
424 if (!bt_stream_cb || !bt_stream_cb->per_id_callbacks)
425 continue;
426 for (j = 0; j < bt_stream_cb->per_id_callbacks->len; j++) {
427 bt_chain = &g_array_index(bt_stream_cb->per_id_callbacks,
428 struct bt_callback_chain, j);
429 g_array_free(bt_chain->callback, TRUE);
430 }
431 g_array_free(bt_stream_cb->per_id_callbacks, TRUE);
432 }
433
434 free(iter);
435 }
436
437 int babeltrace_iter_next(struct babeltrace_iter *iter)
438 {
439 struct ctf_file_stream *file_stream, *removed;
440 int ret;
441
442 file_stream = heap_maximum(iter->stream_heap);
443 if (!file_stream) {
444 /* end of file for all streams */
445 ret = 0;
446 goto end;
447 }
448
449 ret = stream_read_event(file_stream);
450 if (ret == EOF) {
451 removed = heap_remove(iter->stream_heap);
452 assert(removed == file_stream);
453 ret = 0;
454 goto end;
455 } else if (ret) {
456 goto end;
457 }
458 /* Reinsert the file stream into the heap, and rebalance. */
459 removed = heap_replace_max(iter->stream_heap, file_stream);
460 assert(removed == file_stream);
461
462 end:
463 return ret;
464 }
465
466 static
467 void process_callbacks(struct babeltrace_iter *iter,
468 struct ctf_stream *stream)
469 {
470 struct bt_stream_callbacks *bt_stream_cb;
471 struct bt_callback_chain *bt_chain;
472 struct bt_callback *cb;
473 int i;
474 enum bt_cb_ret ret;
475
476 /* process all events callback first */
477 if (iter->main_callbacks.callback) {
478 for (i = 0; i < iter->main_callbacks.callback->len; i++) {
479 cb = &g_array_index(iter->main_callbacks.callback, struct bt_callback, i);
480 if (!cb)
481 goto end;
482 ret = cb->callback(NULL, NULL);
483 switch (ret) {
484 case BT_CB_OK_STOP:
485 case BT_CB_ERROR_STOP:
486 goto end;
487 default:
488 break;
489 }
490 }
491 }
492
493 /* process per event callbacks */
494 bt_stream_cb = &g_array_index(iter->callbacks,
495 struct bt_stream_callbacks, stream->stream_id);
496 if (!bt_stream_cb || !bt_stream_cb->per_id_callbacks)
497 goto end;
498
499 if (stream->event_id > bt_stream_cb->per_id_callbacks->len)
500 goto end;
501 bt_chain = &g_array_index(bt_stream_cb->per_id_callbacks,
502 struct bt_callback_chain, stream->event_id);
503 if (!bt_chain || !bt_chain->callback)
504 goto end;
505
506 for (i = 0; i < bt_chain->callback->len; i++) {
507 cb = &g_array_index(bt_chain->callback, struct bt_callback, i);
508 if (!cb)
509 goto end;
510 ret = cb->callback(NULL, NULL);
511 switch (ret) {
512 case BT_CB_OK_STOP:
513 case BT_CB_ERROR_STOP:
514 goto end;
515 default:
516 break;
517 }
518 }
519
520 end:
521 return;
522 }
523
524 int babeltrace_iter_read_event(struct babeltrace_iter *iter,
525 struct ctf_stream **stream,
526 struct ctf_stream_event **event)
527 {
528 struct ctf_file_stream *file_stream;
529 int ret = 0;
530
531 file_stream = heap_maximum(iter->stream_heap);
532 if (!file_stream) {
533 /* end of file for all streams */
534 ret = EOF;
535 goto end;
536 }
537 *stream = &file_stream->parent;
538 *event = g_ptr_array_index((*stream)->events_by_id, (*stream)->event_id);
539
540 if ((*stream)->stream_id > iter->callbacks->len)
541 goto end;
542
543 process_callbacks(iter, *stream);
544
545 end:
546 return ret;
547 }
548
549 int convert_trace(struct trace_descriptor *td_write,
550 struct trace_collection *trace_collection_read)
551 {
552 struct babeltrace_iter *iter;
553 struct ctf_stream *stream;
554 struct ctf_stream_event *event;
555 struct ctf_text_stream_pos *sout;
556 struct trace_collection_pos begin_pos;
557 int ret = 0;
558
559 sout = container_of(td_write, struct ctf_text_stream_pos,
560 trace_descriptor);
561
562 begin_pos.type = BT_SEEK_BEGIN;
563 iter = babeltrace_iter_create(trace_collection_read, &begin_pos, NULL);
564 while (babeltrace_iter_read_event(iter, &stream, &event) == 0) {
565 ret = sout->parent.event_cb(&sout->parent, stream);
566 if (ret) {
567 fprintf(stdout, "[error] Writing event failed.\n");
568 goto end;
569 }
570 ret = babeltrace_iter_next(iter);
571 if (ret < 0)
572 goto end;
573 }
574 end:
575 babeltrace_iter_destroy(iter);
576 return ret;
577 }
This page took 0.040916 seconds and 5 git commands to generate.