Add callback support
[babeltrace.git] / converter / babeltrace-lib.c
CommitLineData
46322b33
MD
1/*
2 * babeltrace-lib.c
3 *
4 * Babeltrace Trace Converter Library
5 *
64fa3fec
MD
6 * Copyright 2010-2011 EfficiOS Inc. and Linux Foundation
7 *
8 * Author: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
46322b33
MD
9 *
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
16 *
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
19 */
20
21#include <stdlib.h>
22#include <errno.h>
23#include <stdio.h>
847bf71a 24#include <inttypes.h>
70bd0a12 25#include <babeltrace/babeltrace-internal.h>
46322b33
MD
26#include <babeltrace/format.h>
27#include <babeltrace/ctf/types.h>
28#include <babeltrace/ctf/metadata.h>
29#include <babeltrace/ctf-text/types.h>
0d0f5149 30#include <babeltrace/prio_heap.h>
70bd0a12
JD
31#include <babeltrace/babeltrace.h>
32#include <babeltrace/types.h>
33#include <babeltrace/ctf/types.h>
34#include <babeltrace/ctf-ir/metadata.h>
c34ccddd 35#include <stdarg.h>
70bd0a12 36
063f7048
MD
37struct stream_saved_pos {
38 /*
39 * Use file_stream pointer to check if the trace collection we
40 * restore to match the one we saved from, for each stream.
41 */
42 struct ctf_file_stream *file_stream;
43 size_t cur_index; /* current index in packet index */
44 ssize_t offset; /* offset from base, in bits. EOF for end of file. */
45};
46
47struct babeltrace_saved_pos {
48 struct trace_collection *tc;
49 GArray *stream_saved_pos; /* Contains struct stream_saved_pos */
50};
51
c34ccddd
MD
52struct bt_callback {
53 int prio; /* Callback order priority. Lower first. Dynamically assigned from dependency graph. */
54 void *private_data;
8ee2fac4
JD
55 int flags;
56 struct bt_dependencies *depends;
57 struct bt_dependencies *weak_depends;
58 struct bt_dependencies *provides;
c34ccddd
MD
59 enum bt_cb_ret (*callback)(void *private_data, void *caller_data);
60};
61
62struct bt_callback_chain {
63 GArray *callback; /* Array of struct bt_callback, ordered by priority */
64};
65
66/*
67 * per id callbacks need to be per stream class because event ID vs
68 * event name mapping can vary from stream to stream.
69 */
70struct bt_stream_callbacks {
71 GArray *per_id_callbacks; /* Array of struct bt_callback_chain */
72};
73
70bd0a12
JD
74/*
75 * struct babeltrace_iter: data structure representing an iterator on a trace
76 * collection.
77 */
78struct babeltrace_iter {
79 struct ptr_heap *stream_heap;
80 struct trace_collection *tc;
063f7048 81 struct trace_collection_pos *end_pos;
8ee2fac4 82 GArray *callbacks; /* Array of struct bt_stream_callbacks */
c34ccddd
MD
83 struct bt_callback_chain main_callbacks; /* For all events */
84 /*
85 * Flag indicating if dependency graph needs to be recalculated.
86 * Set by babeltrace_iter_add_callback(), and checked (and
87 * cleared) by upon entry into babeltrace_iter_read_event().
88 * babeltrace_iter_read_event() is responsible for calling dep
89 * graph calculation if it sees this flag set.
90 */
91 int recalculate_dep_graph;
92 /*
93 * Array of pointers to struct bt_dependencies, for garbage
94 * collection. We're not using a linked list here because each
95 * struct bt_dependencies can belong to more than one
96 * babeltrace_iter.
97 */
98 GPtrArray *dep_gc;
70bd0a12
JD
99};
100
c34ccddd
MD
101struct bt_dependencies {
102 GArray *deps; /* Array of GQuarks */
103 int refcount; /* free when decremented to 0 */
104};
105
106static
107struct bt_dependencies *_babeltrace_dependencies_create(const char *first,
108 va_list ap)
109{
110 const char *iter;
111 struct bt_dependencies *dep;
112
113 dep = g_new0(struct bt_dependencies, 1);
114 dep->refcount = 1;
115 dep->deps = g_array_new(FALSE, TRUE, sizeof(GQuark));
116 iter = first;
117 while (iter) {
118 GQuark q = g_quark_from_string(iter);
119 g_array_append_val(dep->deps, q);
120 iter = va_arg(ap, const char *);
121 }
122 return dep;
123}
124
125struct bt_dependencies *babeltrace_dependencies_create(const char *first, ...)
126{
127 va_list ap;
128 struct bt_dependencies *deps;
129
130 va_start(ap, first);
131 deps = _babeltrace_dependencies_create(first, ap);
132 va_end(ap);
133 return deps;
134}
135
8ee2fac4
JD
136/*
137 * babeltrace_iter_add_callback: Add a callback to iterator.
138 */
139int babeltrace_iter_add_callback(struct babeltrace_iter *iter,
140 bt_event_name event, void *private_data, int flags,
141 enum bt_cb_ret (*callback)(void *private_data, void *caller_data),
142 struct bt_dependencies *depends,
143 struct bt_dependencies *weak_depends,
144 struct bt_dependencies *provides)
145{
146 int i, stream_id;
147 gpointer *event_id_ptr;
148 unsigned long event_id;
149 struct trace_collection *tc = iter->tc;
150
151 for (i = 0; i < tc->array->len; i++) {
152 struct ctf_trace *tin;
153 struct trace_descriptor *td_read;
154
155 td_read = g_ptr_array_index(tc->array, i);
156 tin = container_of(td_read, struct ctf_trace, parent);
157
158 for (stream_id = 0; stream_id < tin->streams->len; stream_id++) {
159 struct ctf_stream_class *stream;
160 struct bt_stream_callbacks *bt_stream_cb = NULL;
161 struct bt_callback_chain *bt_chain = NULL;
162 struct bt_callback new_callback;
163
164 stream = g_ptr_array_index(tin->streams, stream_id);
165
166 /* find or create the bt_stream_callbacks for this stream */
167 if (iter->callbacks->len >= stream_id) {
168 bt_stream_cb = &g_array_index(iter->callbacks,
169 struct bt_stream_callbacks, stream->stream_id);
170 } else {
171 g_array_set_size(iter->callbacks, stream->stream_id);
172 }
173 if (!bt_stream_cb || !bt_stream_cb->per_id_callbacks) {
174 struct bt_stream_callbacks new_stream_cb;
175 new_stream_cb.per_id_callbacks = g_array_new(1, 1,
176 sizeof(struct bt_callback_chain));
177 g_array_insert_val(iter->callbacks, stream->stream_id, new_stream_cb);
178 bt_stream_cb = &g_array_index(iter->callbacks,
179 struct bt_stream_callbacks, stream->stream_id);
180 }
181
182 if (event) {
183 /* find the event id */
184 event_id_ptr = g_hash_table_lookup(stream->event_quark_to_id,
185 (gconstpointer) (unsigned long) event);
186 /* event not found in this stream class */
187 if (!event_id_ptr) {
188 printf("event not found\n");
189 continue;
190 }
191 event_id = (uint64_t)*event_id_ptr;
192
193 /* find or create the bt_callback_chain for this event */
194 if (bt_stream_cb->per_id_callbacks->len >= event_id) {
195 bt_chain = &g_array_index(bt_stream_cb->per_id_callbacks,
196 struct bt_callback_chain, event_id);
197 } else {
198 g_array_set_size(bt_stream_cb->per_id_callbacks, event_id);
199 }
200 if (!bt_chain || !bt_chain->callback) {
201 struct bt_callback_chain new_chain;
202 new_chain.callback = g_array_new(1, 1, sizeof(struct bt_callback));
203 g_array_insert_val(bt_stream_cb->per_id_callbacks, event_id,
204 new_chain);
205 bt_chain = &g_array_index(bt_stream_cb->per_id_callbacks,
206 struct bt_callback_chain, event_id);
207 }
208 } else {
209 /* callback for all events */
210 if (!iter->main_callbacks.callback) {
211 iter->main_callbacks.callback = g_array_new(1, 1,
212 sizeof(struct bt_callback));
213 }
214 bt_chain = &iter->main_callbacks;
215 }
216
217 new_callback.private_data = private_data;
218 new_callback.flags = flags;
219 new_callback.callback = callback;
220 new_callback.depends = depends;
221 new_callback.weak_depends = weak_depends;
222 new_callback.provides = provides;
223
224 /* TODO : take care of priority, for now just FIFO */
225 g_array_append_val(bt_chain->callback, new_callback);
226 }
227 }
228
229 return 0;
230}
231
9e501292 232static int stream_read_event(struct ctf_file_stream *sin)
46322b33
MD
233{
234 int ret;
235
2d0bea29 236 ret = sin->pos.parent.event_cb(&sin->pos.parent, &sin->parent);
0d0f5149
MD
237 if (ret == EOF)
238 return EOF;
239 else if (ret) {
240 fprintf(stdout, "[error] Reading event failed.\n");
241 return ret;
46322b33 242 }
46322b33 243 return 0;
0d0f5149 244}
46322b33 245
0d0f5149
MD
246/*
247 * returns true if a < b, false otherwise.
248 */
249int stream_compare(void *a, void *b)
250{
251 struct ctf_file_stream *s_a = a, *s_b = b;
252
2d0bea29 253 if (s_a->parent.timestamp < s_b->parent.timestamp)
0d0f5149
MD
254 return 1;
255 else
256 return 0;
46322b33
MD
257}
258
877fb419
JD
259/*
260 * babeltrace_filestream_seek: seek a filestream to given position.
261 *
262 * The stream_id parameter is only useful for BT_SEEK_RESTORE.
263 */
264static int babeltrace_filestream_seek(struct ctf_file_stream *file_stream,
265 const struct trace_collection_pos *begin_pos,
266 unsigned long stream_id)
267{
268 int ret = 0;
269
270 switch (begin_pos->type) {
271 case BT_SEEK_CUR:
272 /*
273 * just insert into the heap we should already know
274 * the timestamps
275 */
276 break;
277 case BT_SEEK_BEGIN:
278 ctf_move_pos_slow(&file_stream->pos, 0, SEEK_SET);
279 ret = stream_read_event(file_stream);
280 break;
281 case BT_SEEK_TIME:
282 case BT_SEEK_RESTORE:
283 case BT_SEEK_END:
284 default:
285 assert(0); /* Not yet defined */
286 }
287
288 return ret;
289}
290
291/*
292 * babeltrace_iter_seek: seek iterator to given position.
293 */
294int babeltrace_iter_seek(struct babeltrace_iter *iter,
295 const struct trace_collection_pos *begin_pos)
296{
297 int i, stream_id;
298 int ret = 0;
299 struct trace_collection *tc = iter->tc;
300
301 for (i = 0; i < tc->array->len; i++) {
302 struct ctf_trace *tin;
303 struct trace_descriptor *td_read;
304
305 td_read = g_ptr_array_index(tc->array, i);
306 tin = container_of(td_read, struct ctf_trace, parent);
307
308 /* Populate heap with each stream */
309 for (stream_id = 0; stream_id < tin->streams->len;
310 stream_id++) {
311 struct ctf_stream_class *stream;
312 int filenr;
313
314 stream = g_ptr_array_index(tin->streams, stream_id);
315 for (filenr = 0; filenr < stream->streams->len;
316 filenr++) {
317 struct ctf_file_stream *file_stream;
318
319 file_stream = g_ptr_array_index(stream->streams,
320 filenr);
321 ret = babeltrace_filestream_seek(file_stream, begin_pos,
322 stream_id);
323 if (ret < 0)
324 goto end;
325 }
326 }
327 }
328end:
329 return ret;
330}
331
063f7048
MD
332struct babeltrace_iter *babeltrace_iter_create(struct trace_collection *tc,
333 struct trace_collection_pos *begin_pos,
334 struct trace_collection_pos *end_pos)
46322b33 335{
afb48eae 336 int i, stream_id;
0d0f5149 337 int ret = 0;
9e501292 338 struct babeltrace_iter *iter;
0d0f5149 339
9e501292
JD
340 iter = malloc(sizeof(struct babeltrace_iter));
341 if (!iter)
342 goto error_malloc;
343 iter->stream_heap = g_new(struct ptr_heap, 1);
344 iter->tc = tc;
877fb419 345 iter->end_pos = end_pos;
8ee2fac4
JD
346 iter->callbacks = g_array_new(0, 1, sizeof(struct bt_stream_callbacks));
347 iter->recalculate_dep_graph = 0;
348 iter->main_callbacks.callback = NULL;
349 iter->dep_gc = g_ptr_array_new();
9e501292
JD
350
351 ret = heap_init(iter->stream_heap, 0, stream_compare);
352 if (ret < 0)
353 goto error_heap_init;
46322b33 354
9e501292 355 for (i = 0; i < tc->array->len; i++) {
afb48eae
AA
356 struct ctf_trace *tin;
357 struct trace_descriptor *td_read;
46322b33 358
9e501292 359 td_read = g_ptr_array_index(tc->array, i);
afb48eae
AA
360 tin = container_of(td_read, struct ctf_trace, parent);
361
362 /* Populate heap with each stream */
363 for (stream_id = 0; stream_id < tin->streams->len;
364 stream_id++) {
365 struct ctf_stream_class *stream;
366 int filenr;
367
368 stream = g_ptr_array_index(tin->streams, stream_id);
369 if (!stream)
0d0f5149 370 continue;
afb48eae
AA
371 for (filenr = 0; filenr < stream->streams->len;
372 filenr++) {
373 struct ctf_file_stream *file_stream;
374
375 file_stream = g_ptr_array_index(stream->streams,
376 filenr);
377
877fb419
JD
378 if (begin_pos) {
379 ret = babeltrace_filestream_seek(file_stream, begin_pos,
380 stream_id);
381 if (ret == EOF) {
382 ret = 0;
383 continue;
384 } else if (ret) {
385 goto error;
386 }
afb48eae
AA
387 }
388 /* Add to heap */
9e501292
JD
389 ret = heap_insert(iter->stream_heap, file_stream);
390 if (ret)
391 goto error;
46322b33
MD
392 }
393 }
394 }
395
9e501292 396 return iter;
0d0f5149 397
9e501292
JD
398error:
399 heap_free(iter->stream_heap);
400error_heap_init:
401 g_free(iter->stream_heap);
402 free(iter);
403error_malloc:
404 return NULL;
405}
406
407void babeltrace_iter_destroy(struct babeltrace_iter *iter)
408{
409 heap_free(iter->stream_heap);
410 g_free(iter->stream_heap);
411 free(iter);
412}
413
414int babeltrace_iter_next(struct babeltrace_iter *iter)
415{
416 struct ctf_file_stream *file_stream, *removed;
417 int ret;
418
419 file_stream = heap_maximum(iter->stream_heap);
420 if (!file_stream) {
421 /* end of file for all streams */
422 ret = 0;
423 goto end;
424 }
425
426 ret = stream_read_event(file_stream);
427 if (ret == EOF) {
428 removed = heap_remove(iter->stream_heap);
429 assert(removed == file_stream);
430 ret = 0;
431 goto end;
432 } else if (ret) {
433 goto end;
434 }
435 /* Reinsert the file stream into the heap, and rebalance. */
436 removed = heap_replace_max(iter->stream_heap, file_stream);
437 assert(removed == file_stream);
438
439end:
440 return ret;
441}
442
8ee2fac4
JD
443static
444void process_callbacks(struct babeltrace_iter *iter,
445 struct ctf_stream *stream)
446{
447 struct bt_stream_callbacks *bt_stream_cb;
448 struct bt_callback_chain *bt_chain;
449 struct bt_callback *cb;
450 int i;
451 enum bt_cb_ret ret;
452
453 /* process all events callback first */
454 if (iter->main_callbacks.callback) {
455 for (i = 0; i < iter->main_callbacks.callback->len; i++) {
456 cb = &g_array_index(iter->main_callbacks.callback, struct bt_callback, i);
457 if (!cb)
458 goto end;
459 ret = cb->callback(NULL, NULL);
460 switch (ret) {
461 case BT_CB_OK_STOP:
462 case BT_CB_ERROR_STOP:
463 goto end;
464 default:
465 break;
466 }
467 }
468 }
469
470 /* process per event callbacks */
471 bt_stream_cb = &g_array_index(iter->callbacks,
472 struct bt_stream_callbacks, stream->stream_id);
473 if (!bt_stream_cb || !bt_stream_cb->per_id_callbacks)
474 goto end;
475
476 if (stream->event_id > bt_stream_cb->per_id_callbacks->len)
477 goto end;
478 bt_chain = &g_array_index(bt_stream_cb->per_id_callbacks,
479 struct bt_callback_chain, stream->event_id);
480 if (!bt_chain || !bt_chain->callback)
481 goto end;
482
483 for (i = 0; i < bt_chain->callback->len; i++) {
484 cb = &g_array_index(bt_chain->callback, struct bt_callback, i);
485 if (!cb)
486 goto end;
487 ret = cb->callback(NULL, NULL);
488 switch (ret) {
489 case BT_CB_OK_STOP:
490 case BT_CB_ERROR_STOP:
491 goto end;
492 default:
493 break;
494 }
495 }
496
497end:
498 return;
499}
500
9e501292
JD
501int babeltrace_iter_read_event(struct babeltrace_iter *iter,
502 struct ctf_stream **stream,
503 struct ctf_stream_event **event)
504{
505 struct ctf_file_stream *file_stream;
506 int ret = 0;
507
508 file_stream = heap_maximum(iter->stream_heap);
509 if (!file_stream) {
510 /* end of file for all streams */
511 ret = EOF;
512 goto end;
513 }
514 *stream = &file_stream->parent;
515 *event = g_ptr_array_index((*stream)->events_by_id, (*stream)->event_id);
8ee2fac4
JD
516
517 if ((*stream)->stream_id > iter->callbacks->len)
518 goto end;
519
520 process_callbacks(iter, *stream);
521
9e501292
JD
522end:
523 return ret;
524}
525
526int convert_trace(struct trace_descriptor *td_write,
527 struct trace_collection *trace_collection_read)
528{
529 struct babeltrace_iter *iter;
530 struct ctf_stream *stream;
531 struct ctf_stream_event *event;
532 struct ctf_text_stream_pos *sout;
877fb419 533 struct trace_collection_pos begin_pos;
9e501292
JD
534 int ret = 0;
535
536 sout = container_of(td_write, struct ctf_text_stream_pos,
537 trace_descriptor);
538
877fb419
JD
539 begin_pos.type = BT_SEEK_BEGIN;
540 iter = babeltrace_iter_create(trace_collection_read, &begin_pos, NULL);
9e501292
JD
541 while (babeltrace_iter_read_event(iter, &stream, &event) == 0) {
542 ret = sout->parent.event_cb(&sout->parent, stream);
0d0f5149
MD
543 if (ret) {
544 fprintf(stdout, "[error] Writing event failed.\n");
545 goto end;
546 }
9e501292
JD
547 ret = babeltrace_iter_next(iter);
548 if (ret < 0)
0d0f5149 549 goto end;
0d0f5149 550 }
0d0f5149 551end:
9e501292 552 babeltrace_iter_destroy(iter);
46322b33
MD
553 return ret;
554}
This page took 0.071043 seconds and 4 git commands to generate.