Add time delta to ctf-text
[babeltrace.git] / converter / babeltrace-lib.c
1 /*
2 * babeltrace-lib.c
3 *
4 * Babeltrace Trace Converter Library
5 *
6 * Copyright 2010-2011 EfficiOS Inc. and Linux Foundation
7 *
8 * Author: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
9 *
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
16 *
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
19 */
20
21 #include <stdlib.h>
22 #include <errno.h>
23 #include <stdio.h>
24 #include <inttypes.h>
25 #include <babeltrace/babeltrace-internal.h>
26 #include <babeltrace/format.h>
27 #include <babeltrace/ctf/types.h>
28 #include <babeltrace/ctf/metadata.h>
29 #include <babeltrace/ctf-text/types.h>
30 #include <babeltrace/prio_heap.h>
31 #include <babeltrace/babeltrace.h>
32 #include <babeltrace/types.h>
33 #include <babeltrace/ctf/types.h>
34 #include <babeltrace/ctf-ir/metadata.h>
35 #include <stdarg.h>
36
37 int babeltrace_verbose, babeltrace_debug;
38
39 struct stream_saved_pos {
40 /*
41 * Use file_stream pointer to check if the trace collection we
42 * restore to match the one we saved from, for each stream.
43 */
44 struct ctf_file_stream *file_stream;
45 size_t cur_index; /* current index in packet index */
46 ssize_t offset; /* offset from base, in bits. EOF for end of file. */
47 };
48
49 struct babeltrace_saved_pos {
50 struct trace_collection *tc;
51 GArray *stream_saved_pos; /* Contains struct stream_saved_pos */
52 };
53
54 struct bt_callback {
55 int prio; /* Callback order priority. Lower first. Dynamically assigned from dependency graph. */
56 void *private_data;
57 int flags;
58 struct bt_dependencies *depends;
59 struct bt_dependencies *weak_depends;
60 struct bt_dependencies *provides;
61 enum bt_cb_ret (*callback)(struct bt_ctf_data *ctf_data,
62 void *private_data);
63 };
64
65 struct bt_callback_chain {
66 GArray *callback; /* Array of struct bt_callback, ordered by priority */
67 };
68
69 /*
70 * per id callbacks need to be per stream class because event ID vs
71 * event name mapping can vary from stream to stream.
72 */
73 struct bt_stream_callbacks {
74 GArray *per_id_callbacks; /* Array of struct bt_callback_chain */
75 };
76
77 /*
78 * struct babeltrace_iter: data structure representing an iterator on a trace
79 * collection.
80 */
81 struct babeltrace_iter {
82 struct ptr_heap *stream_heap;
83 struct trace_collection *tc;
84 struct trace_collection_pos *end_pos;
85 GArray *callbacks; /* Array of struct bt_stream_callbacks */
86 struct bt_callback_chain main_callbacks; /* For all events */
87 /*
88 * Flag indicating if dependency graph needs to be recalculated.
89 * Set by babeltrace_iter_add_callback(), and checked (and
90 * cleared) by upon entry into babeltrace_iter_read_event().
91 * babeltrace_iter_read_event() is responsible for calling dep
92 * graph calculation if it sees this flag set.
93 */
94 int recalculate_dep_graph;
95 /*
96 * Array of pointers to struct bt_dependencies, for garbage
97 * collection. We're not using a linked list here because each
98 * struct bt_dependencies can belong to more than one
99 * babeltrace_iter.
100 */
101 GPtrArray *dep_gc;
102 };
103
104 struct bt_dependencies {
105 GArray *deps; /* Array of GQuarks */
106 int refcount; /* free when decremented to 0 */
107 };
108
109 static
110 struct bt_dependencies *_babeltrace_dependencies_create(const char *first,
111 va_list ap)
112 {
113 const char *iter;
114 struct bt_dependencies *dep;
115
116 dep = g_new0(struct bt_dependencies, 1);
117 dep->refcount = 1;
118 dep->deps = g_array_new(FALSE, TRUE, sizeof(GQuark));
119 iter = first;
120 while (iter) {
121 GQuark q = g_quark_from_string(iter);
122 g_array_append_val(dep->deps, q);
123 iter = va_arg(ap, const char *);
124 }
125 return dep;
126 }
127
128 struct bt_dependencies *babeltrace_dependencies_create(const char *first, ...)
129 {
130 va_list ap;
131 struct bt_dependencies *deps;
132
133 va_start(ap, first);
134 deps = _babeltrace_dependencies_create(first, ap);
135 va_end(ap);
136 return deps;
137 }
138
139 /*
140 * babeltrace_iter_add_callback: Add a callback to iterator.
141 */
142 int babeltrace_iter_add_callback(struct babeltrace_iter *iter,
143 bt_event_name event, void *private_data, int flags,
144 enum bt_cb_ret (*callback)(struct bt_ctf_data *ctf_data,
145 void *private_data),
146 struct bt_dependencies *depends,
147 struct bt_dependencies *weak_depends,
148 struct bt_dependencies *provides)
149 {
150 int i, stream_id;
151 gpointer *event_id_ptr;
152 unsigned long event_id;
153 struct trace_collection *tc = iter->tc;
154
155 for (i = 0; i < tc->array->len; i++) {
156 struct ctf_trace *tin;
157 struct trace_descriptor *td_read;
158
159 td_read = g_ptr_array_index(tc->array, i);
160 tin = container_of(td_read, struct ctf_trace, parent);
161
162 for (stream_id = 0; stream_id < tin->streams->len; stream_id++) {
163 struct ctf_stream_class *stream;
164 struct bt_stream_callbacks *bt_stream_cb = NULL;
165 struct bt_callback_chain *bt_chain = NULL;
166 struct bt_callback new_callback;
167
168 stream = g_ptr_array_index(tin->streams, stream_id);
169
170 /* find or create the bt_stream_callbacks for this stream */
171 if (iter->callbacks->len >= stream_id) {
172 bt_stream_cb = &g_array_index(iter->callbacks,
173 struct bt_stream_callbacks, stream->stream_id);
174 } else {
175 g_array_set_size(iter->callbacks, stream->stream_id);
176 }
177 if (!bt_stream_cb || !bt_stream_cb->per_id_callbacks) {
178 struct bt_stream_callbacks new_stream_cb;
179 new_stream_cb.per_id_callbacks = g_array_new(1, 1,
180 sizeof(struct bt_callback_chain));
181 g_array_insert_val(iter->callbacks, stream->stream_id, new_stream_cb);
182 bt_stream_cb = &g_array_index(iter->callbacks,
183 struct bt_stream_callbacks, stream->stream_id);
184 }
185
186 if (event) {
187 /* find the event id */
188 event_id_ptr = g_hash_table_lookup(stream->event_quark_to_id,
189 (gconstpointer) (unsigned long) event);
190 /* event not found in this stream class */
191 if (!event_id_ptr) {
192 printf("event not found\n");
193 continue;
194 }
195 event_id = (uint64_t)(unsigned long)*event_id_ptr;
196
197 /* find or create the bt_callback_chain for this event */
198 if (bt_stream_cb->per_id_callbacks->len >= event_id) {
199 bt_chain = &g_array_index(bt_stream_cb->per_id_callbacks,
200 struct bt_callback_chain, event_id);
201 } else {
202 g_array_set_size(bt_stream_cb->per_id_callbacks, event_id);
203 }
204 if (!bt_chain || !bt_chain->callback) {
205 struct bt_callback_chain new_chain;
206 new_chain.callback = g_array_new(1, 1, sizeof(struct bt_callback));
207 g_array_insert_val(bt_stream_cb->per_id_callbacks, event_id,
208 new_chain);
209 bt_chain = &g_array_index(bt_stream_cb->per_id_callbacks,
210 struct bt_callback_chain, event_id);
211 }
212 } else {
213 /* callback for all events */
214 if (!iter->main_callbacks.callback) {
215 iter->main_callbacks.callback = g_array_new(1, 1,
216 sizeof(struct bt_callback));
217 }
218 bt_chain = &iter->main_callbacks;
219 }
220
221 new_callback.private_data = private_data;
222 new_callback.flags = flags;
223 new_callback.callback = callback;
224 new_callback.depends = depends;
225 new_callback.weak_depends = weak_depends;
226 new_callback.provides = provides;
227
228 /* TODO : take care of priority, for now just FIFO */
229 g_array_append_val(bt_chain->callback, new_callback);
230 }
231 }
232
233 return 0;
234 }
235
236 static int stream_read_event(struct ctf_file_stream *sin)
237 {
238 int ret;
239
240 ret = sin->pos.parent.event_cb(&sin->pos.parent, &sin->parent);
241 if (ret == EOF)
242 return EOF;
243 else if (ret) {
244 fprintf(stdout, "[error] Reading event failed.\n");
245 return ret;
246 }
247 return 0;
248 }
249
250 /*
251 * returns true if a < b, false otherwise.
252 */
253 int stream_compare(void *a, void *b)
254 {
255 struct ctf_file_stream *s_a = a, *s_b = b;
256
257 if (s_a->parent.timestamp < s_b->parent.timestamp)
258 return 1;
259 else
260 return 0;
261 }
262
263 /*
264 * babeltrace_filestream_seek: seek a filestream to given position.
265 *
266 * The stream_id parameter is only useful for BT_SEEK_RESTORE.
267 */
268 static int babeltrace_filestream_seek(struct ctf_file_stream *file_stream,
269 const struct trace_collection_pos *begin_pos,
270 unsigned long stream_id)
271 {
272 int ret = 0;
273
274 switch (begin_pos->type) {
275 case BT_SEEK_CUR:
276 /*
277 * just insert into the heap we should already know
278 * the timestamps
279 */
280 break;
281 case BT_SEEK_BEGIN:
282 file_stream->pos.move_pos_slow(&file_stream->pos, 0, SEEK_SET);
283 ret = stream_read_event(file_stream);
284 break;
285 case BT_SEEK_TIME:
286 case BT_SEEK_RESTORE:
287 case BT_SEEK_END:
288 default:
289 assert(0); /* Not yet defined */
290 }
291
292 return ret;
293 }
294
295 /*
296 * babeltrace_iter_seek: seek iterator to given position.
297 */
298 int babeltrace_iter_seek(struct babeltrace_iter *iter,
299 const struct trace_collection_pos *begin_pos)
300 {
301 int i, stream_id;
302 int ret = 0;
303 struct trace_collection *tc = iter->tc;
304
305 for (i = 0; i < tc->array->len; i++) {
306 struct ctf_trace *tin;
307 struct trace_descriptor *td_read;
308
309 td_read = g_ptr_array_index(tc->array, i);
310 tin = container_of(td_read, struct ctf_trace, parent);
311
312 /* Populate heap with each stream */
313 for (stream_id = 0; stream_id < tin->streams->len;
314 stream_id++) {
315 struct ctf_stream_class *stream;
316 int filenr;
317
318 stream = g_ptr_array_index(tin->streams, stream_id);
319 for (filenr = 0; filenr < stream->streams->len;
320 filenr++) {
321 struct ctf_file_stream *file_stream;
322
323 file_stream = g_ptr_array_index(stream->streams,
324 filenr);
325 ret = babeltrace_filestream_seek(file_stream, begin_pos,
326 stream_id);
327 if (ret < 0)
328 goto end;
329 }
330 }
331 }
332 end:
333 return ret;
334 }
335
336 struct babeltrace_iter *babeltrace_iter_create(struct trace_collection *tc,
337 struct trace_collection_pos *begin_pos,
338 struct trace_collection_pos *end_pos)
339 {
340 int i, stream_id;
341 int ret = 0;
342 struct babeltrace_iter *iter;
343
344 iter = malloc(sizeof(struct babeltrace_iter));
345 if (!iter)
346 goto error_malloc;
347 iter->stream_heap = g_new(struct ptr_heap, 1);
348 iter->tc = tc;
349 iter->end_pos = end_pos;
350 iter->callbacks = g_array_new(0, 1, sizeof(struct bt_stream_callbacks));
351 iter->recalculate_dep_graph = 0;
352 iter->main_callbacks.callback = NULL;
353 iter->dep_gc = g_ptr_array_new();
354
355 ret = heap_init(iter->stream_heap, 0, stream_compare);
356 if (ret < 0)
357 goto error_heap_init;
358
359 for (i = 0; i < tc->array->len; i++) {
360 struct ctf_trace *tin;
361 struct trace_descriptor *td_read;
362
363 td_read = g_ptr_array_index(tc->array, i);
364 tin = container_of(td_read, struct ctf_trace, parent);
365
366 /* Populate heap with each stream */
367 for (stream_id = 0; stream_id < tin->streams->len;
368 stream_id++) {
369 struct ctf_stream_class *stream;
370 int filenr;
371
372 stream = g_ptr_array_index(tin->streams, stream_id);
373 if (!stream)
374 continue;
375 for (filenr = 0; filenr < stream->streams->len;
376 filenr++) {
377 struct ctf_file_stream *file_stream;
378
379 file_stream = g_ptr_array_index(stream->streams,
380 filenr);
381
382 if (begin_pos) {
383 ret = babeltrace_filestream_seek(file_stream, begin_pos,
384 stream_id);
385 if (ret == EOF) {
386 ret = 0;
387 continue;
388 } else if (ret) {
389 goto error;
390 }
391 }
392 /* Add to heap */
393 ret = heap_insert(iter->stream_heap, file_stream);
394 if (ret)
395 goto error;
396 }
397 }
398 }
399
400 return iter;
401
402 error:
403 heap_free(iter->stream_heap);
404 error_heap_init:
405 g_free(iter->stream_heap);
406 free(iter);
407 error_malloc:
408 return NULL;
409 }
410
411 void babeltrace_iter_destroy(struct babeltrace_iter *iter)
412 {
413 struct bt_stream_callbacks *bt_stream_cb;
414 struct bt_callback_chain *bt_chain;
415 int i, j;
416
417 heap_free(iter->stream_heap);
418 g_free(iter->stream_heap);
419
420 /* free all events callbacks */
421 if (iter->main_callbacks.callback)
422 g_array_free(iter->main_callbacks.callback, TRUE);
423
424 /* free per-event callbacks */
425 for (i = 0; i < iter->callbacks->len; i++) {
426 bt_stream_cb = &g_array_index(iter->callbacks,
427 struct bt_stream_callbacks, i);
428 if (!bt_stream_cb || !bt_stream_cb->per_id_callbacks)
429 continue;
430 for (j = 0; j < bt_stream_cb->per_id_callbacks->len; j++) {
431 bt_chain = &g_array_index(bt_stream_cb->per_id_callbacks,
432 struct bt_callback_chain, j);
433 if (bt_chain->callback) {
434 g_array_free(bt_chain->callback, TRUE);
435 }
436 }
437 g_array_free(bt_stream_cb->per_id_callbacks, TRUE);
438 }
439
440 free(iter);
441 }
442
443 int babeltrace_iter_next(struct babeltrace_iter *iter)
444 {
445 struct ctf_file_stream *file_stream, *removed;
446 int ret;
447
448 file_stream = heap_maximum(iter->stream_heap);
449 if (!file_stream) {
450 /* end of file for all streams */
451 ret = 0;
452 goto end;
453 }
454
455 ret = stream_read_event(file_stream);
456 if (ret == EOF) {
457 removed = heap_remove(iter->stream_heap);
458 assert(removed == file_stream);
459 ret = 0;
460 goto end;
461 } else if (ret) {
462 goto end;
463 }
464 /* Reinsert the file stream into the heap, and rebalance. */
465 removed = heap_replace_max(iter->stream_heap, file_stream);
466 assert(removed == file_stream);
467
468 end:
469 return ret;
470 }
471
472 static
473 struct ctf_stream_event *extract_ctf_stream_event(struct ctf_stream *stream)
474 {
475 struct ctf_stream_class *stream_class = stream->stream_class;
476 struct ctf_event *event_class;
477 struct ctf_stream_event *event;
478 uint64_t id = stream->event_id;
479
480 if (id >= stream_class->events_by_id->len) {
481 fprintf(stdout, "[error] Event id %" PRIu64 " is outside range.\n", id);
482 return NULL;
483 }
484 event = g_ptr_array_index(stream->events_by_id, id);
485 if (!event) {
486 fprintf(stdout, "[error] Event id %" PRIu64 " is unknown.\n", id);
487 return NULL;
488 }
489 event_class = g_ptr_array_index(stream_class->events_by_id, id);
490 if (!event_class) {
491 fprintf(stdout, "[error] Event id %" PRIu64 " is unknown.\n", id);
492 return NULL;
493 }
494
495 return event;
496 }
497
498 static
499 void process_callbacks(struct babeltrace_iter *iter,
500 struct ctf_stream *stream)
501 {
502 struct bt_stream_callbacks *bt_stream_cb;
503 struct bt_callback_chain *bt_chain;
504 struct bt_callback *cb;
505 int i;
506 enum bt_cb_ret ret;
507 struct bt_ctf_data ctf_data;
508
509 ctf_data.event = extract_ctf_stream_event(stream);
510 ctf_data.stream = stream;
511
512 /* process all events callback first */
513 if (iter->main_callbacks.callback) {
514 for (i = 0; i < iter->main_callbacks.callback->len; i++) {
515 cb = &g_array_index(iter->main_callbacks.callback, struct bt_callback, i);
516 if (!cb)
517 goto end;
518 ret = cb->callback(&ctf_data, cb->private_data);
519 switch (ret) {
520 case BT_CB_OK_STOP:
521 case BT_CB_ERROR_STOP:
522 goto end;
523 default:
524 break;
525 }
526 }
527 }
528
529 /* process per event callbacks */
530 bt_stream_cb = &g_array_index(iter->callbacks,
531 struct bt_stream_callbacks, stream->stream_id);
532 if (!bt_stream_cb || !bt_stream_cb->per_id_callbacks)
533 goto end;
534
535 if (stream->event_id > bt_stream_cb->per_id_callbacks->len)
536 goto end;
537 bt_chain = &g_array_index(bt_stream_cb->per_id_callbacks,
538 struct bt_callback_chain, stream->event_id);
539 if (!bt_chain || !bt_chain->callback)
540 goto end;
541
542 for (i = 0; i < bt_chain->callback->len; i++) {
543 cb = &g_array_index(bt_chain->callback, struct bt_callback, i);
544 if (!cb)
545 goto end;
546 ret = cb->callback(&ctf_data, cb->private_data);
547 switch (ret) {
548 case BT_CB_OK_STOP:
549 case BT_CB_ERROR_STOP:
550 goto end;
551 default:
552 break;
553 }
554 }
555
556 end:
557 return;
558 }
559
560 int babeltrace_iter_read_event(struct babeltrace_iter *iter,
561 struct ctf_stream **stream,
562 struct ctf_stream_event **event)
563 {
564 struct ctf_file_stream *file_stream;
565 int ret = 0;
566
567 file_stream = heap_maximum(iter->stream_heap);
568 if (!file_stream) {
569 /* end of file for all streams */
570 ret = EOF;
571 goto end;
572 }
573 *stream = &file_stream->parent;
574 *event = g_ptr_array_index((*stream)->events_by_id, (*stream)->event_id);
575
576 if ((*stream)->stream_id > iter->callbacks->len)
577 goto end;
578
579 process_callbacks(iter, *stream);
580
581 end:
582 return ret;
583 }
584
585 int convert_trace(struct trace_descriptor *td_write,
586 struct trace_collection *trace_collection_read)
587 {
588 struct babeltrace_iter *iter;
589 struct ctf_stream *stream;
590 struct ctf_stream_event *event;
591 struct ctf_text_stream_pos *sout;
592 struct trace_collection_pos begin_pos;
593 int ret = 0;
594
595 sout = container_of(td_write, struct ctf_text_stream_pos,
596 trace_descriptor);
597
598 begin_pos.type = BT_SEEK_BEGIN;
599 iter = babeltrace_iter_create(trace_collection_read, &begin_pos, NULL);
600 while (babeltrace_iter_read_event(iter, &stream, &event) == 0) {
601 ret = sout->parent.event_cb(&sout->parent, stream);
602 if (ret) {
603 fprintf(stdout, "[error] Writing event failed.\n");
604 goto end;
605 }
606 ret = babeltrace_iter_next(iter);
607 if (ret < 0)
608 goto end;
609 }
610 end:
611 babeltrace_iter_destroy(iter);
612 return ret;
613 }
614
615 static
616 void __attribute__((constructor)) init_babeltrace_lib(void)
617 {
618 if (getenv("BABELTRACE_VERBOSE"))
619 babeltrace_verbose = 1;
620 if (getenv("BABELTRACE_DEBUG"))
621 babeltrace_debug = 1;
622 }
This page took 0.066336 seconds and 4 git commands to generate.