From 0d0f514904a3ebb85775abceed30da0675f55cc6 Mon Sep 17 00:00:00 2001 From: Mathieu Desnoyers Date: Mon, 23 May 2011 16:38:37 -0400 Subject: [PATCH] Use priority heap to order by timestamp Signed-off-by: Mathieu Desnoyers --- Makefile.am | 2 +- converter/Makefile.am | 3 +- converter/babeltrace-lib.c | 96 +++++++++++++++++++++---------- include/babeltrace/ctf/metadata.h | 3 + 4 files changed, 73 insertions(+), 31 deletions(-) diff --git a/Makefile.am b/Makefile.am index 67485eda..531a4a82 100644 --- a/Makefile.am +++ b/Makefile.am @@ -2,4 +2,4 @@ AM_CFLAGS = $(PACKAGE_CFLAGS) -I$(top_srcdir)/include ACLOCAL_AMFLAGS = -I m4 -SUBDIRS = types formats converter lib tests +SUBDIRS = types formats lib converter tests diff --git a/converter/Makefile.am b/converter/Makefile.am index e7bcc690..794740d6 100644 --- a/converter/Makefile.am +++ b/converter/Makefile.am @@ -10,7 +10,8 @@ libbabeltrace_la_SOURCES = \ libbabeltrace_la_LIBADD = \ $(top_builddir)/types/libbabeltrace_types.la \ $(top_builddir)/formats/ctf/libctf.la \ - $(top_builddir)/formats/ctf-text/libctf-text.la + $(top_builddir)/formats/ctf-text/libctf-text.la \ + $(top_builddir)/lib/libprio_heap.la babeltrace_SOURCES = \ babeltrace.c diff --git a/converter/babeltrace-lib.c b/converter/babeltrace-lib.c index aba42199..534e4edd 100644 --- a/converter/babeltrace-lib.c +++ b/converter/babeltrace-lib.c @@ -25,34 +25,33 @@ #include #include #include +#include -static -int convert_stream(struct ctf_text_stream_pos *sout, - struct ctf_file_stream *sin) +static int read_event(struct ctf_file_stream *sin) { int ret; - /* For each event, print header, context, payload */ - /* TODO: order events by timestamps across streams */ - for (;;) { - ret = sin->pos.parent.event_cb(&sin->pos.parent, &sin->stream); - if (ret == EOF) - break; - else if (ret) { - fprintf(stdout, "[error] Reading event failed.\n"); - goto error; - } - ret = sout->parent.event_cb(&sout->parent, &sin->stream); - if (ret) { - fprintf(stdout, "[error] Writing event failed.\n"); - goto error; - } + ret = sin->pos.parent.event_cb(&sin->pos.parent, &sin->stream); + if (ret == EOF) + return EOF; + else if (ret) { + fprintf(stdout, "[error] Reading event failed.\n"); + return ret; } - return 0; +} -error: - return ret; +/* + * returns true if a < b, false otherwise. + */ +int stream_compare(void *a, void *b) +{ + struct ctf_file_stream *s_a = a, *s_b = b; + + if (s_a->stream.timestamp < s_b->stream.timestamp) + return 1; + else + return 0; } int convert_trace(struct trace_descriptor *td_write, @@ -61,27 +60,66 @@ int convert_trace(struct trace_descriptor *td_write, struct ctf_trace *tin = container_of(td_read, struct ctf_trace, parent); struct ctf_text_stream_pos *sout = container_of(td_write, struct ctf_text_stream_pos, trace_descriptor); - int stream_id, filenr; - int ret; + int stream_id; + int ret = 0; + + tin->stream_heap = g_new(struct ptr_heap, 1); + heap_init(tin->stream_heap, 0, stream_compare); - /* For each stream (TODO: order events by timestamp) */ + /* Populate heap with each stream */ for (stream_id = 0; stream_id < tin->streams->len; stream_id++) { struct ctf_stream_class *stream = g_ptr_array_index(tin->streams, stream_id); + int filenr; if (!stream) continue; for (filenr = 0; filenr < stream->files->len; filenr++) { struct ctf_file_stream *file_stream = g_ptr_array_index(stream->files, filenr); - ret = convert_stream(sout, file_stream); + ret = read_event(file_stream); + if (ret == EOF) { + ret = 0; + continue; + } else if (ret) + goto end; + /* Add to heap */ + ret = heap_insert(tin->stream_heap, file_stream); if (ret) { - fprintf(stdout, "[error] Printing stream %d failed.\n", stream_id); - goto error; + fprintf(stdout, "[error] Out of memory.\n"); + goto end; } } } - return 0; + /* Replace heap entries until EOF for each stream (heap empty) */ + for (;;) { + struct ctf_file_stream *file_stream, *removed; + + file_stream = heap_maximum(tin->stream_heap); + if (!file_stream) { + /* end of file for all streams */ + ret = 0; + break; + } + ret = sout->parent.event_cb(&sout->parent, &file_stream->stream); + if (ret) { + fprintf(stdout, "[error] Writing event failed.\n"); + goto end; + } + ret = read_event(file_stream); + if (ret == EOF) { + removed = heap_remove(tin->stream_heap); + assert(removed == file_stream); + ret = 0; + continue; + } else if (ret) + goto end; + /* Reinsert the file stream into the heap, and rebalance. */ + removed = heap_replace_max(tin->stream_heap, file_stream); + assert(removed == file_stream); + } -error: +end: + heap_free(tin->stream_heap); + g_free(tin->stream_heap); return ret; } diff --git a/include/babeltrace/ctf/metadata.h b/include/babeltrace/ctf/metadata.h index 25baa3a9..453c9f70 100644 --- a/include/babeltrace/ctf/metadata.h +++ b/include/babeltrace/ctf/metadata.h @@ -96,6 +96,9 @@ struct ctf_trace { DIR *dir; int dirfd; int flags; /* open flags */ + + /* Heap of streams, ordered to always get the lowest timestam */ + struct ptr_heap *stream_heap; }; #define CTF_STREAM_SET_FIELD(ctf_stream, field) \ -- 2.34.1