From f3168545b6b7501165eb94181b205b0d2afb6b19 Mon Sep 17 00:00:00 2001 From: Julien Desfossez Date: Mon, 29 May 2017 18:30:27 -0400 Subject: [PATCH] fs-sink: trace static handling + cleanup teardown MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit When a trace is static and we have seen the stream_end notification for all the active streams, we can free the resources for handling the trace. This also highlighted that a few bits were missing for a clean teardown of the plugin. Signed-off-by: Julien Desfossez Signed-off-by: Jérémie Galarneau --- plugins/ctf/fs-sink/write.c | 144 ++++++++++++++++++++++++++++++----- plugins/ctf/fs-sink/writer.c | 39 ++++++++-- plugins/ctf/fs-sink/writer.h | 11 +++ 3 files changed, 169 insertions(+), 25 deletions(-) diff --git a/plugins/ctf/fs-sink/write.c b/plugins/ctf/fs-sink/write.c index 75360565..2285e92a 100644 --- a/plugins/ctf/fs-sink/write.c +++ b/plugins/ctf/fs-sink/write.c @@ -35,11 +35,18 @@ #include #include #include +#include #include #include "writer.h" +static +void trace_is_static_listener(struct bt_ctf_trace *trace, void *data) +{ + *((int *) data) = 1; +} + static struct bt_ctf_stream_class *insert_new_stream_class( struct writer_component *writer_component, @@ -159,7 +166,7 @@ struct bt_ctf_event_class *get_event_class(struct writer_component *writer_compo bt_ctf_event_class_get_id(event_class)); } -struct bt_ctf_writer *insert_new_writer( +struct fs_writer *insert_new_writer( struct writer_component *writer_component, struct bt_ctf_trace *trace) { @@ -167,6 +174,7 @@ struct bt_ctf_writer *insert_new_writer( struct bt_ctf_trace *writer_trace = NULL; char trace_name[PATH_MAX]; enum bt_component_status ret; + struct fs_writer *fs_writer; /* FIXME: replace with trace name when it will work. */ snprintf(trace_name, PATH_MAX, "%s/%s_%03d", @@ -196,12 +204,38 @@ struct bt_ctf_writer *insert_new_writer( fprintf(writer_component->err, "[error] %s in %s:%d\n", __func__, __FILE__, __LINE__); BT_PUT(ctf_writer); - goto end; + goto error; + } + + fs_writer = g_new0(struct fs_writer, 1); + if (!fs_writer) { + fprintf(writer_component->err, + "[error] %s in %s:%d\n", __func__, __FILE__, + __LINE__); + goto error; } + fs_writer->writer = ctf_writer; + fs_writer->writer_trace = writer_trace; BT_PUT(writer_trace); + if (bt_ctf_trace_is_static(trace)) { + fs_writer->trace_static = 1; + fs_writer->static_listener_id = -1; + } else { + ret = bt_ctf_trace_add_is_static_listener(trace, + trace_is_static_listener, &fs_writer->trace_static); + if (ret < 0) { + fprintf(writer_component->err, + "[error] %s in %s:%d\n", __func__, __FILE__, + __LINE__); + g_free(fs_writer); + fs_writer = NULL; + goto error; + } + fs_writer->static_listener_id = ret; + } g_hash_table_insert(writer_component->trace_map, (gpointer) trace, - ctf_writer); + fs_writer); goto end; @@ -209,37 +243,35 @@ error: bt_put(writer_trace); BT_PUT(ctf_writer); end: - return ctf_writer; + return fs_writer; } static -struct bt_ctf_writer *get_writer(struct writer_component *writer_component, +struct fs_writer *get_fs_writer(struct writer_component *writer_component, struct bt_ctf_stream_class *stream_class) { struct bt_ctf_trace *trace = NULL; - struct bt_ctf_writer *ctf_writer = NULL; + struct fs_writer *fs_writer; trace = bt_ctf_stream_class_get_trace(stream_class); if (!trace) { - ctf_writer = NULL; fprintf(writer_component->err, "[error] %s in %s:%d\n", __func__, __FILE__, __LINE__); goto error; } - ctf_writer = g_hash_table_lookup(writer_component->trace_map, + fs_writer = g_hash_table_lookup(writer_component->trace_map, (gpointer) trace); - if (!ctf_writer) { - ctf_writer = insert_new_writer(writer_component, trace); + if (!fs_writer) { + fs_writer = insert_new_writer(writer_component, trace); } - bt_get(ctf_writer); BT_PUT(trace); goto end; error: - BT_PUT(ctf_writer); + fs_writer = NULL; end: - return ctf_writer; + return fs_writer; } static @@ -258,17 +290,25 @@ struct bt_ctf_stream *get_writer_stream( goto error; } - ctf_writer = get_writer(writer_component, stream_class); - if (!ctf_writer) { - fprintf(writer_component->err, "[error] %s in %s:%d\n", - __func__, __FILE__, __LINE__); - goto error; - } - writer_stream = lookup_stream(writer_component, stream); if (!writer_stream) { + struct fs_writer *fs_writer; + + fs_writer = get_fs_writer(writer_component, stream_class); + if (!fs_writer) { + fprintf(writer_component->err, "[error] %s in %s:%d\n", + __func__, __FILE__, __LINE__); + goto error; + } + ctf_writer = bt_get(fs_writer->writer); writer_stream = insert_new_stream(writer_component, ctf_writer, stream_class, stream); + if (!writer_stream) { + fprintf(writer_component->err, "[error] %s in %s:%d\n", + __func__, __FILE__, __LINE__); + goto error; + } + fs_writer->active_streams++; } bt_get(writer_stream); @@ -282,6 +322,70 @@ end: return writer_stream; } +BT_HIDDEN +enum bt_component_status writer_close( + struct writer_component *writer_component, + struct fs_writer *fs_writer, + struct bt_ctf_trace *trace) +{ + enum bt_component_status ret = BT_COMPONENT_STATUS_OK; + + if (fs_writer->static_listener_id > 0) { + bt_ctf_trace_remove_is_static_listener(trace, + fs_writer->static_listener_id); + } + g_hash_table_remove(writer_component->trace_map, trace); + return ret; +} + +BT_HIDDEN +enum bt_component_status writer_stream_end( + struct writer_component *writer_component, + struct bt_ctf_stream *stream) +{ + struct bt_ctf_stream_class *stream_class = NULL; + struct fs_writer *fs_writer; + struct bt_ctf_trace *trace = NULL; + enum bt_component_status ret = BT_COMPONENT_STATUS_OK; + + g_hash_table_remove(writer_component->stream_map, stream); + + stream_class = bt_ctf_stream_get_class(stream); + if (!stream_class) { + fprintf(writer_component->err, "[error] %s in %s:%d\n", + __func__, __FILE__, __LINE__); + goto error; + } + + fs_writer = get_fs_writer(writer_component, stream_class); + if (!fs_writer) { + fprintf(writer_component->err, "[error] %s in %s:%d\n", + __func__, __FILE__, __LINE__); + goto error; + } + + assert(fs_writer->active_streams > 0); + fs_writer->active_streams--; + if (fs_writer->active_streams == 0 && fs_writer->trace_static) { + trace = bt_ctf_stream_class_get_trace(stream_class); + if (!trace) { + fprintf(writer_component->err, "[error] %s in %s:%d\n", + __func__, __FILE__, __LINE__); + goto error; + } + ret = writer_close(writer_component, fs_writer, trace); + } + + goto end; + +error: + ret = BT_COMPONENT_STATUS_ERROR; +end: + BT_PUT(trace); + BT_PUT(stream_class); + return ret; +} + BT_HIDDEN enum bt_component_status writer_new_packet( struct writer_component *writer_component, diff --git a/plugins/ctf/fs-sink/writer.c b/plugins/ctf/fs-sink/writer.c index 18abc9e3..56bab6e8 100644 --- a/plugins/ctf/fs-sink/writer.c +++ b/plugins/ctf/fs-sink/writer.c @@ -38,6 +38,7 @@ #include #include #include +#include #include #include #include @@ -45,13 +46,28 @@ #include "writer.h" #include +gboolean empty_ht(gpointer key, gpointer value, gpointer user_data) +{ + return TRUE; +} + static void destroy_writer_component_data(struct writer_component *writer_component) { bt_put(writer_component->input_iterator); - g_hash_table_destroy(writer_component->stream_map); + + g_hash_table_foreach_remove(writer_component->stream_class_map, + empty_ht, NULL); g_hash_table_destroy(writer_component->stream_class_map); + + g_hash_table_foreach_remove(writer_component->stream_map, + empty_ht, NULL); + g_hash_table_destroy(writer_component->stream_map); + + g_hash_table_foreach_remove(writer_component->trace_map, + empty_ht, NULL); g_hash_table_destroy(writer_component->trace_map); + g_string_free(writer_component->base_path, true); g_string_free(writer_component->trace_name_base, true); } @@ -79,9 +95,10 @@ void unref_stream(struct bt_ctf_stream_class *writer_stream) } static -void unref_trace(struct bt_ctf_writer *writer) +void free_fs_writer(struct fs_writer *fs_writer) { - bt_put(writer); + bt_put(fs_writer->writer); + g_free(fs_writer); } static @@ -108,7 +125,7 @@ struct writer_component *create_writer_component(void) * Reader to writer corresponding structures. */ writer_component->trace_map = g_hash_table_new_full(g_direct_hash, - g_direct_equal, NULL, (GDestroyNotify) unref_trace); + g_direct_equal, NULL, (GDestroyNotify) free_fs_writer); writer_component->stream_class_map = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, (GDestroyNotify) unref_stream_class); writer_component->stream_map = g_hash_table_new_full(g_direct_hash, @@ -167,7 +184,6 @@ enum bt_component_status handle_notification( ret = BT_COMPONENT_STATUS_ERROR; goto end; } - ret = BT_COMPONENT_STATUS_OK; ret = writer_output_event(writer_component, event); bt_put(event); if (ret != BT_COMPONENT_STATUS_OK) { @@ -176,7 +192,18 @@ enum bt_component_status handle_notification( break; } case BT_NOTIFICATION_TYPE_STREAM_END: + { + struct bt_ctf_stream *stream = + bt_notification_stream_end_get_stream(notification); + + if (!stream) { + ret = BT_COMPONENT_STATUS_ERROR; + goto end; + } + ret = writer_stream_end(writer_component, stream); + bt_put(stream); break; + } default: puts("Unhandled notification type"); } @@ -193,8 +220,10 @@ void writer_component_port_connected( struct bt_private_connection *connection; struct writer_component *writer; static const enum bt_notification_type notif_types[] = { + BT_NOTIFICATION_TYPE_EVENT, BT_NOTIFICATION_TYPE_PACKET_BEGIN, BT_NOTIFICATION_TYPE_PACKET_END, + BT_NOTIFICATION_TYPE_STREAM_END, BT_NOTIFICATION_TYPE_SENTINEL, }; diff --git a/plugins/ctf/fs-sink/writer.h b/plugins/ctf/fs-sink/writer.h index 1765a961..d34b77f9 100644 --- a/plugins/ctf/fs-sink/writer.h +++ b/plugins/ctf/fs-sink/writer.h @@ -49,6 +49,14 @@ struct writer_component { bool error; }; +struct fs_writer { + struct bt_ctf_writer *writer; + struct bt_ctf_trace *writer_trace; + int static_listener_id; + unsigned int active_streams; + int trace_static; +}; + BT_HIDDEN enum bt_component_status writer_output_event(struct writer_component *writer, struct bt_ctf_event *event); @@ -58,6 +66,9 @@ enum bt_component_status writer_new_packet(struct writer_component *writer, BT_HIDDEN enum bt_component_status writer_close_packet(struct writer_component *writer, struct bt_ctf_packet *packet); +BT_HIDDEN +enum bt_component_status writer_stream_end(struct writer_component *writer, + struct bt_ctf_stream *stream); BT_HIDDEN enum bt_component_status writer_component_init( -- 2.34.1