From cdcf612f76937cfd654a3564fb77676768a44299 Mon Sep 17 00:00:00 2001 From: Julien Desfossez Date: Thu, 1 Jun 2017 14:16:59 -0400 Subject: [PATCH] fs-sink: fix handling static notifications MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Instead of relying on a stream count, keep a state for each stream. This allows to check that we only see each stream once and works even if each stream is started/completed sequentially. Signed-off-by: Julien Desfossez Signed-off-by: Jérémie Galarneau --- plugins/ctf/fs-sink/write.c | 102 ++++++++++++++++++++++++++++++----- plugins/ctf/fs-sink/writer.h | 15 +++++- 2 files changed, 104 insertions(+), 13 deletions(-) diff --git a/plugins/ctf/fs-sink/write.c b/plugins/ctf/fs-sink/write.c index 45ba3bcb..d05a22c4 100644 --- a/plugins/ctf/fs-sink/write.c +++ b/plugins/ctf/fs-sink/write.c @@ -58,6 +58,11 @@ gboolean empty_ht(gpointer key, gpointer value, gpointer user_data) return TRUE; } +void destroy_stream_state_key(gpointer key) +{ + g_free((enum fs_writer_stream_state *) key); +} + static void trace_is_static_listener(struct bt_ctf_trace *trace, void *data) { @@ -131,7 +136,9 @@ struct fs_writer *insert_new_writer( struct bt_ctf_trace *writer_trace = NULL; char trace_name[PATH_MAX]; enum bt_component_status ret; - struct fs_writer *fs_writer; + struct bt_ctf_stream *stream = NULL; + struct fs_writer *fs_writer = NULL; + int nr_stream, i; /* FIXME: replace with trace name when it will work. */ snprintf(trace_name, PATH_MAX, "%s/%s_%03d", @@ -172,13 +179,43 @@ struct fs_writer *insert_new_writer( goto error; } fs_writer->writer = ctf_writer; + fs_writer->trace = trace; + fs_writer->writer_trace = writer_trace; + BT_PUT(writer_trace); fs_writer->stream_class_map = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, (GDestroyNotify) unref_stream_class); fs_writer->stream_map = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, (GDestroyNotify) unref_stream); - fs_writer->trace = trace; - fs_writer->writer_trace = writer_trace; - BT_PUT(writer_trace); + fs_writer->stream_states = g_hash_table_new_full(g_direct_hash, + g_direct_equal, NULL, destroy_stream_state_key); + + /* Set all the existing streams in the unknown state. */ + nr_stream = bt_ctf_trace_get_stream_count(trace); + for (i = 0; i < nr_stream; i++) { + enum fs_writer_stream_state *v; + + stream = bt_ctf_trace_get_stream_by_index(trace, i); + if (!stream) { + fprintf(writer_component->err, + "[error] %s in %s:%d\n", __func__, + __FILE__, __LINE__); + goto error; + } + + v = g_new0(enum fs_writer_stream_state, 1); + if (!v) { + fprintf(writer_component->err, + "[error] %s in %s:%d\n", __func__, + __FILE__, __LINE__); + goto error; + } + *v = FS_WRITER_UNKNOWN_STREAM; + + g_hash_table_insert(fs_writer->stream_states, stream, v); + BT_PUT(stream); + } + + /* Check if the trace is already static or register a listener. */ if (bt_ctf_trace_is_static(trace)) { fs_writer->trace_static = 1; fs_writer->static_listener_id = -1; @@ -189,8 +226,6 @@ struct fs_writer *insert_new_writer( fprintf(writer_component->err, "[error] %s in %s:%d\n", __func__, __FILE__, __LINE__); - g_free(fs_writer); - fs_writer = NULL; goto error; } fs_writer->static_listener_id = ret; @@ -202,7 +237,10 @@ struct fs_writer *insert_new_writer( goto end; error: + g_free(fs_writer); + fs_writer = NULL; bt_put(writer_trace); + bt_put(stream); BT_PUT(ctf_writer); end: return fs_writer; @@ -380,6 +418,11 @@ void writer_close(struct writer_component *writer_component, g_hash_table_foreach_remove(fs_writer->stream_map, empty_ht, NULL); g_hash_table_destroy(fs_writer->stream_map); + + /* Empty the stream state HT. */ + g_hash_table_foreach_remove(fs_writer->stream_states, + empty_ht, NULL); + g_hash_table_destroy(fs_writer->stream_states); } BT_HIDDEN @@ -391,6 +434,7 @@ enum bt_component_status writer_stream_begin( struct fs_writer *fs_writer; struct bt_ctf_stream *writer_stream = NULL; enum bt_component_status ret = BT_COMPONENT_STATUS_OK; + enum fs_writer_stream_state *state; stream_class = bt_ctf_stream_get_class(stream); if (!stream_class) { @@ -405,6 +449,16 @@ enum bt_component_status writer_stream_begin( __func__, __FILE__, __LINE__); goto error; } + + /* Set the stream as active */ + state = g_hash_table_lookup(fs_writer->stream_states, stream); + if (*state != FS_WRITER_UNKNOWN_STREAM) { + fprintf(writer_component->err, "[error] Unexpected stream " + "state %d\n", *state); + goto error; + } + *state = FS_WRITER_ACTIVE_STREAM; + writer_stream = insert_new_stream(writer_component, fs_writer, stream_class, stream); if (!writer_stream) { @@ -423,6 +477,16 @@ end: return ret; } +void check_completed_trace(gpointer key, gpointer value, gpointer user_data) +{ + enum fs_writer_stream_state *state = value; + int *trace_completed = user_data; + + if (*state != FS_WRITER_COMPLETED_STREAM) { + *trace_completed = 0; + } +} + BT_HIDDEN enum bt_component_status writer_stream_end( struct writer_component *writer_component, @@ -432,6 +496,7 @@ enum bt_component_status writer_stream_end( struct fs_writer *fs_writer; struct bt_ctf_trace *trace = NULL; enum bt_component_status ret = BT_COMPONENT_STATUS_OK; + enum fs_writer_stream_state *state; stream_class = bt_ctf_stream_get_class(stream); if (!stream_class) { @@ -446,14 +511,27 @@ enum bt_component_status writer_stream_end( __func__, __FILE__, __LINE__); goto error; } + + state = g_hash_table_lookup(fs_writer->stream_states, stream); + if (*state != FS_WRITER_ACTIVE_STREAM) { + fprintf(writer_component->err, "[error] Unexpected stream " + "state %d\n", *state); + goto error; + } + *state = FS_WRITER_COMPLETED_STREAM; + g_hash_table_remove(fs_writer->stream_map, stream); - assert(fs_writer->active_streams > 0); - fs_writer->active_streams--; - if (fs_writer->active_streams == 0 && fs_writer->trace_static) { - writer_close(writer_component, fs_writer); - g_hash_table_remove(writer_component->trace_map, - fs_writer->trace); + if (fs_writer->trace_static) { + int trace_completed = 1; + + g_hash_table_foreach(fs_writer->stream_states, + check_completed_trace, &trace_completed); + if (trace_completed) { + writer_close(writer_component, fs_writer); + g_hash_table_remove(writer_component->trace_map, + fs_writer->trace); + } } goto end; diff --git a/plugins/ctf/fs-sink/writer.h b/plugins/ctf/fs-sink/writer.h index 8025d22f..566b42ec 100644 --- a/plugins/ctf/fs-sink/writer.h +++ b/plugins/ctf/fs-sink/writer.h @@ -37,13 +37,25 @@ struct writer_component { GString *trace_name_base; /* For the directory name suffix. */ int trace_id; - /* Map between struct bt_ctf_trace and struct bt_ctf_writer. */ + /* Map between struct bt_ctf_trace and struct fs_writer. */ GHashTable *trace_map; FILE *err; struct bt_notification_iterator *input_iterator; bool error; }; +enum fs_writer_stream_state { + /* + * We know the stream exists but we have never received a + * stream_begin notification for it. + */ + FS_WRITER_UNKNOWN_STREAM, + /* We know this stream is active (between stream_begin and _end). */ + FS_WRITER_ACTIVE_STREAM, + /* We have received a stream_end for this stream. */ + FS_WRITER_COMPLETED_STREAM, +}; + struct fs_writer { struct bt_ctf_writer *writer; struct bt_ctf_trace *trace; @@ -55,6 +67,7 @@ struct fs_writer { GHashTable *stream_map; /* Map between reader and writer stream class. */ GHashTable *stream_class_map; + GHashTable *stream_states; }; BT_HIDDEN -- 2.34.1