fs-sink: trace static handling + cleanup teardown
authorJulien Desfossez <jdesfossez@efficios.com>
Mon, 29 May 2017 22:30:27 +0000 (18:30 -0400)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Fri, 9 Jun 2017 20:58:16 +0000 (16:58 -0400)
When a trace is static and we have seen the stream_end notification for
all the active streams, we can free the resources for handling the
trace.

This also highlighted that a few bits were missing for a clean teardown
of the plugin.

Signed-off-by: Julien Desfossez <jdesfossez@efficios.com>
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
plugins/ctf/fs-sink/write.c
plugins/ctf/fs-sink/writer.c
plugins/ctf/fs-sink/writer.h

index 7536056568a59ca9d399a116433492b127bf8dcb..2285e92a2e4dc27e7eb39cead4bed82d49281265 100644 (file)
 #include <babeltrace/ctf-ir/fields.h>
 #include <babeltrace/ctf-writer/stream-class.h>
 #include <babeltrace/ctf-writer/stream.h>
+#include <assert.h>
 
 #include <ctfcopytrace.h>
 
 #include "writer.h"
 
+static
+void trace_is_static_listener(struct bt_ctf_trace *trace, void *data)
+{
+       *((int *) data) = 1;
+}
+
 static
 struct bt_ctf_stream_class *insert_new_stream_class(
                struct writer_component *writer_component,
@@ -159,7 +166,7 @@ struct bt_ctf_event_class *get_event_class(struct writer_component *writer_compo
                        bt_ctf_event_class_get_id(event_class));
 }
 
-struct bt_ctf_writer *insert_new_writer(
+struct fs_writer *insert_new_writer(
                struct writer_component *writer_component,
                struct bt_ctf_trace *trace)
 {
@@ -167,6 +174,7 @@ struct bt_ctf_writer *insert_new_writer(
        struct bt_ctf_trace *writer_trace = NULL;
        char trace_name[PATH_MAX];
        enum bt_component_status ret;
+       struct fs_writer *fs_writer;
 
        /* FIXME: replace with trace name when it will work. */
        snprintf(trace_name, PATH_MAX, "%s/%s_%03d",
@@ -196,12 +204,38 @@ struct bt_ctf_writer *insert_new_writer(
                fprintf(writer_component->err, "[error] %s in %s:%d\n",
                                __func__, __FILE__, __LINE__);
                BT_PUT(ctf_writer);
-               goto end;
+               goto error;
+       }
+
+       fs_writer = g_new0(struct fs_writer, 1);
+       if (!fs_writer) {
+               fprintf(writer_component->err,
+                               "[error] %s in %s:%d\n", __func__, __FILE__,
+                               __LINE__);
+               goto error;
        }
+       fs_writer->writer = ctf_writer;
+       fs_writer->writer_trace = writer_trace;
        BT_PUT(writer_trace);
+       if (bt_ctf_trace_is_static(trace)) {
+               fs_writer->trace_static = 1;
+               fs_writer->static_listener_id = -1;
+       } else {
+               ret = bt_ctf_trace_add_is_static_listener(trace,
+                               trace_is_static_listener, &fs_writer->trace_static);
+               if (ret < 0) {
+                       fprintf(writer_component->err,
+                                       "[error] %s in %s:%d\n", __func__, __FILE__,
+                                       __LINE__);
+                       g_free(fs_writer);
+                       fs_writer = NULL;
+                       goto error;
+               }
+               fs_writer->static_listener_id = ret;
+       }
 
        g_hash_table_insert(writer_component->trace_map, (gpointer) trace,
-                       ctf_writer);
+                       fs_writer);
 
        goto end;
 
@@ -209,37 +243,35 @@ error:
        bt_put(writer_trace);
        BT_PUT(ctf_writer);
 end:
-       return ctf_writer;
+       return fs_writer;
 }
 
 static
-struct bt_ctf_writer *get_writer(struct writer_component *writer_component,
+struct fs_writer *get_fs_writer(struct writer_component *writer_component,
                struct bt_ctf_stream_class *stream_class)
 {
        struct bt_ctf_trace *trace = NULL;
-       struct bt_ctf_writer *ctf_writer = NULL;
+       struct fs_writer *fs_writer;
 
        trace = bt_ctf_stream_class_get_trace(stream_class);
        if (!trace) {
-               ctf_writer = NULL;
                fprintf(writer_component->err, "[error] %s in %s:%d\n",
                                __func__, __FILE__, __LINE__);
                goto error;
        }
 
-       ctf_writer = g_hash_table_lookup(writer_component->trace_map,
+       fs_writer = g_hash_table_lookup(writer_component->trace_map,
                        (gpointer) trace);
-       if (!ctf_writer) {
-               ctf_writer = insert_new_writer(writer_component, trace);
+       if (!fs_writer) {
+               fs_writer = insert_new_writer(writer_component, trace);
        }
-       bt_get(ctf_writer);
        BT_PUT(trace);
        goto end;
 
 error:
-       BT_PUT(ctf_writer);
+       fs_writer = NULL;
 end:
-       return ctf_writer;
+       return fs_writer;
 }
 
 static
@@ -258,17 +290,25 @@ struct bt_ctf_stream *get_writer_stream(
                goto error;
        }
 
-       ctf_writer = get_writer(writer_component, stream_class);
-       if (!ctf_writer) {
-               fprintf(writer_component->err, "[error] %s in %s:%d\n",
-                               __func__, __FILE__, __LINE__);
-               goto error;
-       }
-
        writer_stream = lookup_stream(writer_component, stream);
        if (!writer_stream) {
+               struct fs_writer *fs_writer;
+
+               fs_writer = get_fs_writer(writer_component, stream_class);
+               if (!fs_writer) {
+                       fprintf(writer_component->err, "[error] %s in %s:%d\n",
+                                       __func__, __FILE__, __LINE__);
+                       goto error;
+               }
+               ctf_writer = bt_get(fs_writer->writer);
                writer_stream = insert_new_stream(writer_component, ctf_writer,
                                stream_class, stream);
+               if (!writer_stream) {
+                       fprintf(writer_component->err, "[error] %s in %s:%d\n",
+                                       __func__, __FILE__, __LINE__);
+                       goto error;
+               }
+               fs_writer->active_streams++;
        }
        bt_get(writer_stream);
 
@@ -282,6 +322,70 @@ end:
        return writer_stream;
 }
 
+BT_HIDDEN
+enum bt_component_status writer_close(
+               struct writer_component *writer_component,
+               struct fs_writer *fs_writer,
+               struct bt_ctf_trace *trace)
+{
+       enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
+
+       if (fs_writer->static_listener_id > 0) {
+               bt_ctf_trace_remove_is_static_listener(trace,
+                               fs_writer->static_listener_id);
+       }
+       g_hash_table_remove(writer_component->trace_map, trace);
+       return ret;
+}
+
+BT_HIDDEN
+enum bt_component_status writer_stream_end(
+               struct writer_component *writer_component,
+               struct bt_ctf_stream *stream)
+{
+       struct bt_ctf_stream_class *stream_class = NULL;
+       struct fs_writer *fs_writer;
+       struct bt_ctf_trace *trace = NULL;
+       enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
+
+       g_hash_table_remove(writer_component->stream_map, stream);
+
+       stream_class = bt_ctf_stream_get_class(stream);
+       if (!stream_class) {
+               fprintf(writer_component->err, "[error] %s in %s:%d\n",
+                               __func__, __FILE__, __LINE__);
+               goto error;
+       }
+
+       fs_writer = get_fs_writer(writer_component, stream_class);
+       if (!fs_writer) {
+               fprintf(writer_component->err, "[error] %s in %s:%d\n",
+                               __func__, __FILE__, __LINE__);
+               goto error;
+       }
+
+       assert(fs_writer->active_streams > 0);
+       fs_writer->active_streams--;
+       if (fs_writer->active_streams == 0 && fs_writer->trace_static) {
+               trace = bt_ctf_stream_class_get_trace(stream_class);
+               if (!trace) {
+                       fprintf(writer_component->err, "[error] %s in %s:%d\n",
+                                       __func__, __FILE__, __LINE__);
+                       goto error;
+               }
+               ret = writer_close(writer_component, fs_writer, trace);
+       }
+
+       goto end;
+
+error:
+       ret = BT_COMPONENT_STATUS_ERROR;
+end:
+       BT_PUT(trace);
+       BT_PUT(stream_class);
+       return ret;
+}
+
 BT_HIDDEN
 enum bt_component_status writer_new_packet(
                struct writer_component *writer_component,
index 18abc9e311ca175e088eb7f3bb9e01d064949c6e..56bab6e8f454670e8612127a5433466743bb9f71 100644 (file)
@@ -38,6 +38,7 @@
 #include <babeltrace/graph/notification-iterator.h>
 #include <babeltrace/graph/notification-event.h>
 #include <babeltrace/graph/notification-packet.h>
+#include <babeltrace/graph/notification-stream.h>
 #include <plugins-common.h>
 #include <stdio.h>
 #include <stdbool.h>
 #include "writer.h"
 #include <assert.h>
 
+gboolean empty_ht(gpointer key, gpointer value, gpointer user_data)
+{
+       return TRUE;
+}
+
 static
 void destroy_writer_component_data(struct writer_component *writer_component)
 {
        bt_put(writer_component->input_iterator);
-       g_hash_table_destroy(writer_component->stream_map);
+
+       g_hash_table_foreach_remove(writer_component->stream_class_map,
+                       empty_ht, NULL);
        g_hash_table_destroy(writer_component->stream_class_map);
+
+       g_hash_table_foreach_remove(writer_component->stream_map,
+                       empty_ht, NULL);
+       g_hash_table_destroy(writer_component->stream_map);
+
+       g_hash_table_foreach_remove(writer_component->trace_map,
+                       empty_ht, NULL);
        g_hash_table_destroy(writer_component->trace_map);
+
        g_string_free(writer_component->base_path, true);
        g_string_free(writer_component->trace_name_base, true);
 }
@@ -79,9 +95,10 @@ void unref_stream(struct bt_ctf_stream_class *writer_stream)
 }
 
 static
-void unref_trace(struct bt_ctf_writer *writer)
+void free_fs_writer(struct fs_writer *fs_writer)
 {
-       bt_put(writer);
+       bt_put(fs_writer->writer);
+       g_free(fs_writer);
 }
 
 static
@@ -108,7 +125,7 @@ struct writer_component *create_writer_component(void)
         * Reader to writer corresponding structures.
         */
        writer_component->trace_map = g_hash_table_new_full(g_direct_hash,
-                       g_direct_equal, NULL, (GDestroyNotify) unref_trace);
+                       g_direct_equal, NULL, (GDestroyNotify) free_fs_writer);
        writer_component->stream_class_map = g_hash_table_new_full(g_direct_hash,
                        g_direct_equal, NULL, (GDestroyNotify) unref_stream_class);
        writer_component->stream_map = g_hash_table_new_full(g_direct_hash,
@@ -167,7 +184,6 @@ enum bt_component_status handle_notification(
                        ret = BT_COMPONENT_STATUS_ERROR;
                        goto end;
                }
-               ret = BT_COMPONENT_STATUS_OK;
                ret = writer_output_event(writer_component, event);
                bt_put(event);
                if (ret != BT_COMPONENT_STATUS_OK) {
@@ -176,7 +192,18 @@ enum bt_component_status handle_notification(
                break;
        }
        case BT_NOTIFICATION_TYPE_STREAM_END:
+       {
+               struct bt_ctf_stream *stream =
+                       bt_notification_stream_end_get_stream(notification);
+
+               if (!stream) {
+                       ret = BT_COMPONENT_STATUS_ERROR;
+                       goto end;
+               }
+               ret = writer_stream_end(writer_component, stream);
+               bt_put(stream);
                break;
+       }
        default:
                puts("Unhandled notification type");
        }
@@ -193,8 +220,10 @@ void writer_component_port_connected(
        struct bt_private_connection *connection;
        struct writer_component *writer;
        static const enum bt_notification_type notif_types[] = {
+               BT_NOTIFICATION_TYPE_EVENT,
                BT_NOTIFICATION_TYPE_PACKET_BEGIN,
                BT_NOTIFICATION_TYPE_PACKET_END,
+               BT_NOTIFICATION_TYPE_STREAM_END,
                BT_NOTIFICATION_TYPE_SENTINEL,
        };
 
index 1765a9617762c9647bea4e3afbd388def1735c69..d34b77f9d64c874183c9aa5195740559827fbf91 100644 (file)
@@ -49,6 +49,14 @@ struct writer_component {
        bool error;
 };
 
+struct fs_writer {
+       struct bt_ctf_writer *writer;
+       struct bt_ctf_trace *writer_trace;
+       int static_listener_id;
+       unsigned int active_streams;
+       int trace_static;
+};
+
 BT_HIDDEN
 enum bt_component_status writer_output_event(struct writer_component *writer,
                struct bt_ctf_event *event);
@@ -58,6 +66,9 @@ enum bt_component_status writer_new_packet(struct writer_component *writer,
 BT_HIDDEN
 enum bt_component_status writer_close_packet(struct writer_component *writer,
                struct bt_ctf_packet *packet);
+BT_HIDDEN
+enum bt_component_status writer_stream_end(struct writer_component *writer,
+               struct bt_ctf_stream *stream);
 
 BT_HIDDEN
 enum bt_component_status writer_component_init(
This page took 0.029586 seconds and 4 git commands to generate.