callbacks: use correct type for element size
[babeltrace.git] / converter / babeltrace-lib.c
1 /*
2 * babeltrace-lib.c
3 *
4 * Babeltrace Trace Converter Library
5 *
6 * Copyright 2010-2011 EfficiOS Inc. and Linux Foundation
7 *
8 * Author: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
9 *
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
16 *
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
19 */
20
21 #include <stdlib.h>
22 #include <errno.h>
23 #include <stdio.h>
24 #include <inttypes.h>
25 #include <babeltrace/babeltrace-internal.h>
26 #include <babeltrace/format.h>
27 #include <babeltrace/ctf/types.h>
28 #include <babeltrace/ctf/metadata.h>
29 #include <babeltrace/ctf-text/types.h>
30 #include <babeltrace/prio_heap.h>
31 #include <babeltrace/babeltrace.h>
32 #include <babeltrace/types.h>
33 #include <babeltrace/ctf/types.h>
34 #include <babeltrace/ctf-ir/metadata.h>
35 #include <stdarg.h>
36
37 int babeltrace_verbose, babeltrace_debug;
38
39 struct stream_saved_pos {
40 /*
41 * Use file_stream pointer to check if the trace collection we
42 * restore to match the one we saved from, for each stream.
43 */
44 struct ctf_file_stream *file_stream;
45 size_t cur_index; /* current index in packet index */
46 ssize_t offset; /* offset from base, in bits. EOF for end of file. */
47 };
48
49 struct babeltrace_saved_pos {
50 struct trace_collection *tc;
51 GArray *stream_saved_pos; /* Contains struct stream_saved_pos */
52 };
53
54 struct bt_callback {
55 int prio; /* Callback order priority. Lower first. Dynamically assigned from dependency graph. */
56 void *private_data;
57 int flags;
58 struct bt_dependencies *depends;
59 struct bt_dependencies *weak_depends;
60 struct bt_dependencies *provides;
61 enum bt_cb_ret (*callback)(struct bt_ctf_data *ctf_data,
62 void *private_data);
63 };
64
65 struct bt_callback_chain {
66 GArray *callback; /* Array of struct bt_callback, ordered by priority */
67 };
68
69 /*
70 * per id callbacks need to be per stream class because event ID vs
71 * event name mapping can vary from stream to stream.
72 */
73 struct bt_stream_callbacks {
74 GArray *per_id_callbacks; /* Array of struct bt_callback_chain */
75 };
76
77 /*
78 * struct babeltrace_iter: data structure representing an iterator on a trace
79 * collection.
80 */
81 struct babeltrace_iter {
82 struct ptr_heap *stream_heap;
83 struct trace_collection *tc;
84 struct trace_collection_pos *end_pos;
85 GArray *callbacks; /* Array of struct bt_stream_callbacks */
86 struct bt_callback_chain main_callbacks; /* For all events */
87 /*
88 * Flag indicating if dependency graph needs to be recalculated.
89 * Set by babeltrace_iter_add_callback(), and checked (and
90 * cleared) by upon entry into babeltrace_iter_read_event().
91 * babeltrace_iter_read_event() is responsible for calling dep
92 * graph calculation if it sees this flag set.
93 */
94 int recalculate_dep_graph;
95 /*
96 * Array of pointers to struct bt_dependencies, for garbage
97 * collection. We're not using a linked list here because each
98 * struct bt_dependencies can belong to more than one
99 * babeltrace_iter.
100 */
101 GPtrArray *dep_gc;
102 };
103
104 struct bt_dependencies {
105 GArray *deps; /* Array of GQuarks */
106 int refcount; /* free when decremented to 0 */
107 };
108
109 static
110 struct bt_dependencies *_babeltrace_dependencies_create(const char *first,
111 va_list ap)
112 {
113 const char *iter;
114 struct bt_dependencies *dep;
115
116 dep = g_new0(struct bt_dependencies, 1);
117 dep->refcount = 1;
118 dep->deps = g_array_new(FALSE, TRUE, sizeof(GQuark));
119 iter = first;
120 while (iter) {
121 GQuark q = g_quark_from_string(iter);
122 g_array_append_val(dep->deps, q);
123 iter = va_arg(ap, const char *);
124 }
125 return dep;
126 }
127
128 struct bt_dependencies *babeltrace_dependencies_create(const char *first, ...)
129 {
130 va_list ap;
131 struct bt_dependencies *deps;
132
133 va_start(ap, first);
134 deps = _babeltrace_dependencies_create(first, ap);
135 va_end(ap);
136 return deps;
137 }
138
139 /*
140 * babeltrace_iter_add_callback: Add a callback to iterator.
141 */
142 int babeltrace_iter_add_callback(struct babeltrace_iter *iter,
143 bt_event_name event, void *private_data, int flags,
144 enum bt_cb_ret (*callback)(struct bt_ctf_data *ctf_data,
145 void *private_data),
146 struct bt_dependencies *depends,
147 struct bt_dependencies *weak_depends,
148 struct bt_dependencies *provides)
149 {
150 int i, stream_id;
151 gpointer *event_id_ptr;
152 unsigned long event_id;
153 struct trace_collection *tc = iter->tc;
154
155 for (i = 0; i < tc->array->len; i++) {
156 struct ctf_trace *tin;
157 struct trace_descriptor *td_read;
158
159 td_read = g_ptr_array_index(tc->array, i);
160 tin = container_of(td_read, struct ctf_trace, parent);
161
162 for (stream_id = 0; stream_id < tin->streams->len; stream_id++) {
163 struct ctf_stream_class *stream;
164 struct bt_stream_callbacks *bt_stream_cb = NULL;
165 struct bt_callback_chain *bt_chain = NULL;
166 struct bt_callback new_callback;
167
168 stream = g_ptr_array_index(tin->streams, stream_id);
169
170 if (stream_id >= iter->callbacks->len) {
171 g_array_set_size(iter->callbacks, stream->stream_id + 1);
172 }
173 bt_stream_cb = &g_array_index(iter->callbacks,
174 struct bt_stream_callbacks, stream->stream_id);
175 if (!bt_stream_cb->per_id_callbacks) {
176 bt_stream_cb->per_id_callbacks = g_array_new(FALSE, TRUE,
177 sizeof(struct bt_callback_chain));
178 }
179
180 if (event) {
181 /* find the event id */
182 event_id_ptr = g_hash_table_lookup(stream->event_quark_to_id,
183 (gconstpointer) (unsigned long) event);
184 /* event not found in this stream class */
185 if (!event_id_ptr) {
186 printf("event not found\n");
187 continue;
188 }
189 event_id = (uint64_t)(unsigned long) *event_id_ptr;
190
191 /* find or create the bt_callback_chain for this event */
192 if (event_id >= bt_stream_cb->per_id_callbacks->len) {
193 g_array_set_size(bt_stream_cb->per_id_callbacks, event_id + 1);
194 }
195 bt_chain = &g_array_index(bt_stream_cb->per_id_callbacks,
196 struct bt_callback_chain, event_id);
197 if (!bt_chain->callback) {
198 bt_chain->callback = g_array_new(FALSE, TRUE,
199 sizeof(struct bt_callback));
200 }
201 } else {
202 /* callback for all events */
203 if (!iter->main_callbacks.callback) {
204 iter->main_callbacks.callback = g_array_new(FALSE, TRUE,
205 sizeof(struct bt_callback));
206 }
207 bt_chain = &iter->main_callbacks;
208 }
209
210 new_callback.private_data = private_data;
211 new_callback.flags = flags;
212 new_callback.callback = callback;
213 new_callback.depends = depends;
214 new_callback.weak_depends = weak_depends;
215 new_callback.provides = provides;
216
217 /* TODO : take care of priority, for now just FIFO */
218 g_array_append_val(bt_chain->callback, new_callback);
219 }
220 }
221
222 return 0;
223 }
224
225 static int stream_read_event(struct ctf_file_stream *sin)
226 {
227 int ret;
228
229 ret = sin->pos.parent.event_cb(&sin->pos.parent, &sin->parent);
230 if (ret == EOF)
231 return EOF;
232 else if (ret) {
233 fprintf(stdout, "[error] Reading event failed.\n");
234 return ret;
235 }
236 return 0;
237 }
238
239 /*
240 * returns true if a < b, false otherwise.
241 */
242 int stream_compare(void *a, void *b)
243 {
244 struct ctf_file_stream *s_a = a, *s_b = b;
245
246 if (s_a->parent.timestamp < s_b->parent.timestamp)
247 return 1;
248 else
249 return 0;
250 }
251
252 /*
253 * babeltrace_filestream_seek: seek a filestream to given position.
254 *
255 * The stream_id parameter is only useful for BT_SEEK_RESTORE.
256 */
257 static int babeltrace_filestream_seek(struct ctf_file_stream *file_stream,
258 const struct trace_collection_pos *begin_pos,
259 unsigned long stream_id)
260 {
261 int ret = 0;
262
263 switch (begin_pos->type) {
264 case BT_SEEK_CUR:
265 /*
266 * just insert into the heap we should already know
267 * the timestamps
268 */
269 break;
270 case BT_SEEK_BEGIN:
271 file_stream->pos.move_pos_slow(&file_stream->pos, 0, SEEK_SET);
272 ret = stream_read_event(file_stream);
273 break;
274 case BT_SEEK_TIME:
275 case BT_SEEK_RESTORE:
276 case BT_SEEK_END:
277 default:
278 assert(0); /* Not yet defined */
279 }
280
281 return ret;
282 }
283
284 /*
285 * babeltrace_iter_seek: seek iterator to given position.
286 */
287 int babeltrace_iter_seek(struct babeltrace_iter *iter,
288 const struct trace_collection_pos *begin_pos)
289 {
290 int i, stream_id;
291 int ret = 0;
292 struct trace_collection *tc = iter->tc;
293
294 for (i = 0; i < tc->array->len; i++) {
295 struct ctf_trace *tin;
296 struct trace_descriptor *td_read;
297
298 td_read = g_ptr_array_index(tc->array, i);
299 tin = container_of(td_read, struct ctf_trace, parent);
300
301 /* Populate heap with each stream */
302 for (stream_id = 0; stream_id < tin->streams->len;
303 stream_id++) {
304 struct ctf_stream_class *stream;
305 int filenr;
306
307 stream = g_ptr_array_index(tin->streams, stream_id);
308 for (filenr = 0; filenr < stream->streams->len;
309 filenr++) {
310 struct ctf_file_stream *file_stream;
311
312 file_stream = g_ptr_array_index(stream->streams,
313 filenr);
314 ret = babeltrace_filestream_seek(file_stream, begin_pos,
315 stream_id);
316 if (ret < 0)
317 goto end;
318 }
319 }
320 }
321 end:
322 return ret;
323 }
324
325 struct babeltrace_iter *babeltrace_iter_create(struct trace_collection *tc,
326 struct trace_collection_pos *begin_pos,
327 struct trace_collection_pos *end_pos)
328 {
329 int i, stream_id;
330 int ret = 0;
331 struct babeltrace_iter *iter;
332
333 iter = malloc(sizeof(struct babeltrace_iter));
334 if (!iter)
335 goto error_malloc;
336 iter->stream_heap = g_new(struct ptr_heap, 1);
337 iter->tc = tc;
338 iter->end_pos = end_pos;
339 iter->callbacks = g_array_new(0, 1, sizeof(struct bt_stream_callbacks));
340 iter->recalculate_dep_graph = 0;
341 iter->main_callbacks.callback = NULL;
342 iter->dep_gc = g_ptr_array_new();
343
344 ret = heap_init(iter->stream_heap, 0, stream_compare);
345 if (ret < 0)
346 goto error_heap_init;
347
348 for (i = 0; i < tc->array->len; i++) {
349 struct ctf_trace *tin;
350 struct trace_descriptor *td_read;
351
352 td_read = g_ptr_array_index(tc->array, i);
353 tin = container_of(td_read, struct ctf_trace, parent);
354
355 /* Populate heap with each stream */
356 for (stream_id = 0; stream_id < tin->streams->len;
357 stream_id++) {
358 struct ctf_stream_class *stream;
359 int filenr;
360
361 stream = g_ptr_array_index(tin->streams, stream_id);
362 if (!stream)
363 continue;
364 for (filenr = 0; filenr < stream->streams->len;
365 filenr++) {
366 struct ctf_file_stream *file_stream;
367
368 file_stream = g_ptr_array_index(stream->streams,
369 filenr);
370
371 if (begin_pos) {
372 ret = babeltrace_filestream_seek(file_stream, begin_pos,
373 stream_id);
374 if (ret == EOF) {
375 ret = 0;
376 continue;
377 } else if (ret) {
378 goto error;
379 }
380 }
381 /* Add to heap */
382 ret = heap_insert(iter->stream_heap, file_stream);
383 if (ret)
384 goto error;
385 }
386 }
387 }
388
389 return iter;
390
391 error:
392 heap_free(iter->stream_heap);
393 error_heap_init:
394 g_free(iter->stream_heap);
395 free(iter);
396 error_malloc:
397 return NULL;
398 }
399
400 void babeltrace_iter_destroy(struct babeltrace_iter *iter)
401 {
402 struct bt_stream_callbacks *bt_stream_cb;
403 struct bt_callback_chain *bt_chain;
404 int i, j;
405
406 heap_free(iter->stream_heap);
407 g_free(iter->stream_heap);
408
409 /* free all events callbacks */
410 if (iter->main_callbacks.callback)
411 g_array_free(iter->main_callbacks.callback, TRUE);
412
413 /* free per-event callbacks */
414 for (i = 0; i < iter->callbacks->len; i++) {
415 bt_stream_cb = &g_array_index(iter->callbacks,
416 struct bt_stream_callbacks, i);
417 if (!bt_stream_cb || !bt_stream_cb->per_id_callbacks)
418 continue;
419 for (j = 0; j < bt_stream_cb->per_id_callbacks->len; j++) {
420 bt_chain = &g_array_index(bt_stream_cb->per_id_callbacks,
421 struct bt_callback_chain, j);
422 if (bt_chain->callback) {
423 g_array_free(bt_chain->callback, TRUE);
424 }
425 }
426 g_array_free(bt_stream_cb->per_id_callbacks, TRUE);
427 }
428
429 free(iter);
430 }
431
432 int babeltrace_iter_next(struct babeltrace_iter *iter)
433 {
434 struct ctf_file_stream *file_stream, *removed;
435 int ret;
436
437 file_stream = heap_maximum(iter->stream_heap);
438 if (!file_stream) {
439 /* end of file for all streams */
440 ret = 0;
441 goto end;
442 }
443
444 ret = stream_read_event(file_stream);
445 if (ret == EOF) {
446 removed = heap_remove(iter->stream_heap);
447 assert(removed == file_stream);
448 ret = 0;
449 goto end;
450 } else if (ret) {
451 goto end;
452 }
453 /* Reinsert the file stream into the heap, and rebalance. */
454 removed = heap_replace_max(iter->stream_heap, file_stream);
455 assert(removed == file_stream);
456
457 end:
458 return ret;
459 }
460
461 static
462 struct ctf_stream_event *extract_ctf_stream_event(struct ctf_stream *stream)
463 {
464 struct ctf_stream_class *stream_class = stream->stream_class;
465 struct ctf_event *event_class;
466 struct ctf_stream_event *event;
467 uint64_t id = stream->event_id;
468
469 if (id >= stream_class->events_by_id->len) {
470 fprintf(stdout, "[error] Event id %" PRIu64 " is outside range.\n", id);
471 return NULL;
472 }
473 event = g_ptr_array_index(stream->events_by_id, id);
474 if (!event) {
475 fprintf(stdout, "[error] Event id %" PRIu64 " is unknown.\n", id);
476 return NULL;
477 }
478 event_class = g_ptr_array_index(stream_class->events_by_id, id);
479 if (!event_class) {
480 fprintf(stdout, "[error] Event id %" PRIu64 " is unknown.\n", id);
481 return NULL;
482 }
483
484 return event;
485 }
486
487 static
488 void process_callbacks(struct babeltrace_iter *iter,
489 struct ctf_stream *stream)
490 {
491 struct bt_stream_callbacks *bt_stream_cb;
492 struct bt_callback_chain *bt_chain;
493 struct bt_callback *cb;
494 int i;
495 enum bt_cb_ret ret;
496 struct bt_ctf_data ctf_data;
497
498 ctf_data.event = extract_ctf_stream_event(stream);
499 ctf_data.stream = stream;
500
501 /* process all events callback first */
502 if (iter->main_callbacks.callback) {
503 for (i = 0; i < iter->main_callbacks.callback->len; i++) {
504 cb = &g_array_index(iter->main_callbacks.callback, struct bt_callback, i);
505 if (!cb)
506 goto end;
507 ret = cb->callback(&ctf_data, cb->private_data);
508 switch (ret) {
509 case BT_CB_OK_STOP:
510 case BT_CB_ERROR_STOP:
511 goto end;
512 default:
513 break;
514 }
515 }
516 }
517
518 /* process per event callbacks */
519 bt_stream_cb = &g_array_index(iter->callbacks,
520 struct bt_stream_callbacks, stream->stream_id);
521 if (!bt_stream_cb || !bt_stream_cb->per_id_callbacks)
522 goto end;
523
524 if (stream->event_id > bt_stream_cb->per_id_callbacks->len)
525 goto end;
526 bt_chain = &g_array_index(bt_stream_cb->per_id_callbacks,
527 struct bt_callback_chain, stream->event_id);
528 if (!bt_chain || !bt_chain->callback)
529 goto end;
530
531 for (i = 0; i < bt_chain->callback->len; i++) {
532 cb = &g_array_index(bt_chain->callback, struct bt_callback, i);
533 if (!cb)
534 goto end;
535 ret = cb->callback(&ctf_data, cb->private_data);
536 switch (ret) {
537 case BT_CB_OK_STOP:
538 case BT_CB_ERROR_STOP:
539 goto end;
540 default:
541 break;
542 }
543 }
544
545 end:
546 return;
547 }
548
549 int babeltrace_iter_read_event(struct babeltrace_iter *iter,
550 struct ctf_stream **stream,
551 struct ctf_stream_event **event)
552 {
553 struct ctf_file_stream *file_stream;
554 int ret = 0;
555
556 file_stream = heap_maximum(iter->stream_heap);
557 if (!file_stream) {
558 /* end of file for all streams */
559 ret = EOF;
560 goto end;
561 }
562 *stream = &file_stream->parent;
563 *event = g_ptr_array_index((*stream)->events_by_id, (*stream)->event_id);
564
565 if ((*stream)->stream_id > iter->callbacks->len)
566 goto end;
567
568 process_callbacks(iter, *stream);
569
570 end:
571 return ret;
572 }
573
574 int convert_trace(struct trace_descriptor *td_write,
575 struct trace_collection *trace_collection_read)
576 {
577 struct babeltrace_iter *iter;
578 struct ctf_stream *stream;
579 struct ctf_stream_event *event;
580 struct ctf_text_stream_pos *sout;
581 struct trace_collection_pos begin_pos;
582 int ret = 0;
583
584 sout = container_of(td_write, struct ctf_text_stream_pos,
585 trace_descriptor);
586
587 begin_pos.type = BT_SEEK_BEGIN;
588 iter = babeltrace_iter_create(trace_collection_read, &begin_pos, NULL);
589 while (babeltrace_iter_read_event(iter, &stream, &event) == 0) {
590 ret = sout->parent.event_cb(&sout->parent, stream);
591 if (ret) {
592 fprintf(stdout, "[error] Writing event failed.\n");
593 goto end;
594 }
595 ret = babeltrace_iter_next(iter);
596 if (ret < 0)
597 goto end;
598 }
599 end:
600 babeltrace_iter_destroy(iter);
601 return ret;
602 }
603
604 static
605 void __attribute__((constructor)) init_babeltrace_lib(void)
606 {
607 if (getenv("BABELTRACE_VERBOSE"))
608 babeltrace_verbose = 1;
609 if (getenv("BABELTRACE_DEBUG"))
610 babeltrace_debug = 1;
611 }
This page took 0.041239 seconds and 4 git commands to generate.