*
* Babeltrace Trace Converter Library
*
- * Copyright 2010 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ * Copyright 2010-2011 EfficiOS Inc. and Linux Foundation
+ *
+ * Author: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
#include <babeltrace/ctf/types.h>
#include <babeltrace/ctf/metadata.h>
#include <babeltrace/ctf-text/types.h>
+#include <babeltrace/prio_heap.h>
-static
-int convert_event(struct ctf_text_stream_pos *sout,
- struct ctf_file_stream *sin)
+static int read_event(struct ctf_file_stream *sin)
{
- struct ctf_stream *stream_class = sin->stream;
- struct ctf_event *event_class;
- uint64_t id = 0;
- int len_index;
-
- if (sin->pos.offset == -EOF)
- return -EOF;
-
- /* Hide event payload struct brackets */
- sout->depth = -1;
-
- /* Read and print event header */
- if (stream_class->event_header) {
- generic_rw(&sin->pos.parent, &stream_class->event_header->p);
-
- /* lookup event id */
- len_index = struct_declaration_lookup_field_index(stream_class->event_header_decl,
- g_quark_from_static_string("id"));
- if (len_index >= 0) {
- struct definition_integer *defint;
- struct field *field;
-
- field = struct_definition_get_field_from_index(stream_class->event_header, len_index);
- assert(field->definition->declaration->id == CTF_TYPE_INTEGER);
- defint = container_of(field->definition, struct definition_integer, p);
- assert(defint->declaration->signedness == FALSE);
- id = defint->value._unsigned; /* set id */
- }
-
- generic_rw(&sout->parent, &stream_class->event_header->p);
- }
-
- /* Read and print stream-declared event context */
- if (stream_class->event_context) {
- generic_rw(&sin->pos.parent, &stream_class->event_context->p);
- generic_rw(&sout->parent, &stream_class->event_context->p);
- }
-
- if (id >= stream_class->events_by_id->len) {
- fprintf(stdout, "[error] Event id %" PRIu64 " is outside range.\n", id);
- return -EINVAL;
- }
- event_class = g_ptr_array_index(stream_class->events_by_id, id);
- if (!event_class) {
- fprintf(stdout, "[error] Event id %" PRIu64 " is unknown.\n", id);
- return -EINVAL;
- }
-
- /* Read and print event-declared event context */
- if (event_class->context) {
- generic_rw(&sin->pos.parent, &event_class->context->p);
- generic_rw(&sout->parent, &event_class->context->p);
- }
+ int ret;
- /* Read and print event payload */
- if (event_class->fields) {
- generic_rw(&sin->pos.parent, &event_class->fields->p);
- generic_rw(&sout->parent, &event_class->fields->p);
+ ret = sin->pos.parent.event_cb(&sin->pos.parent, &sin->parent);
+ if (ret == EOF)
+ return EOF;
+ else if (ret) {
+ fprintf(stdout, "[error] Reading event failed.\n");
+ return ret;
}
-
return 0;
}
-static
-int convert_stream(struct ctf_text_stream_pos *sout,
- struct ctf_file_stream *sin)
+/*
+ * returns true if a < b, false otherwise.
+ */
+int stream_compare(void *a, void *b)
{
- int ret;
-
- /* For each event, print header, context, payload */
- /* TODO: order events by timestamps across streams */
- for (;;) {
- ret = convert_event(sout, sin);
- if (ret == -EOF)
- break;
- else if (ret) {
- fprintf(stdout, "[error] Printing event failed.\n");
- goto error;
- }
- }
-
- return 0;
+ struct ctf_file_stream *s_a = a, *s_b = b;
-error:
- return ret;
+ if (s_a->parent.timestamp < s_b->parent.timestamp)
+ return 1;
+ else
+ return 0;
}
int convert_trace(struct trace_descriptor *td_write,
struct ctf_trace *tin = container_of(td_read, struct ctf_trace, parent);
struct ctf_text_stream_pos *sout =
container_of(td_write, struct ctf_text_stream_pos, trace_descriptor);
- int stream_id, filenr;
- int ret;
+ int stream_id;
+ int ret = 0;
- /* For each stream (TODO: order events by timestamp) */
+ tin->stream_heap = g_new(struct ptr_heap, 1);
+ heap_init(tin->stream_heap, 0, stream_compare);
+
+ /* Populate heap with each stream */
for (stream_id = 0; stream_id < tin->streams->len; stream_id++) {
- struct ctf_stream *stream = g_ptr_array_index(tin->streams, stream_id);
+ struct ctf_stream_class *stream = g_ptr_array_index(tin->streams, stream_id);
+ int filenr;
if (!stream)
continue;
- for (filenr = 0; filenr < stream->files->len; filenr++) {
- struct ctf_file_stream *file_stream = g_ptr_array_index(stream->files, filenr);
- ret = convert_stream(sout, file_stream);
+ for (filenr = 0; filenr < stream->streams->len; filenr++) {
+ struct ctf_file_stream *file_stream = g_ptr_array_index(stream->streams, filenr);
+ ret = read_event(file_stream);
+ if (ret == EOF) {
+ ret = 0;
+ continue;
+ } else if (ret)
+ goto end;
+ /* Add to heap */
+ ret = heap_insert(tin->stream_heap, file_stream);
if (ret) {
- fprintf(stdout, "[error] Printing stream %d failed.\n", stream_id);
- goto error;
+ fprintf(stdout, "[error] Out of memory.\n");
+ goto end;
}
}
}
- return 0;
+ /* Replace heap entries until EOF for each stream (heap empty) */
+ for (;;) {
+ struct ctf_file_stream *file_stream, *removed;
+
+ file_stream = heap_maximum(tin->stream_heap);
+ if (!file_stream) {
+ /* end of file for all streams */
+ ret = 0;
+ break;
+ }
+ ret = sout->parent.event_cb(&sout->parent, &file_stream->parent);
+ if (ret) {
+ fprintf(stdout, "[error] Writing event failed.\n");
+ goto end;
+ }
+ ret = read_event(file_stream);
+ if (ret == EOF) {
+ removed = heap_remove(tin->stream_heap);
+ assert(removed == file_stream);
+ ret = 0;
+ continue;
+ } else if (ret)
+ goto end;
+ /* Reinsert the file stream into the heap, and rebalance. */
+ removed = heap_replace_max(tin->stream_heap, file_stream);
+ assert(removed == file_stream);
+ }
-error:
+end:
+ heap_free(tin->stream_heap);
+ g_free(tin->stream_heap);
return ret;
}