ctf.fs: split streams, one per port
authorPhilippe Proulx <eeppeliteloop@gmail.com>
Wed, 29 Mar 2017 21:44:06 +0000 (17:44 -0400)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Sun, 28 May 2017 16:57:39 +0000 (12:57 -0400)
The ports are named `traceN-stream-STREAM`, where `N` is the index of
the trace (starts at 0) and `STREAM` is the base name of the stream
file.

The heap which used to be in this component class is expected to be
moved to the utils.muxer filter component class.

Signed-off-by: Philippe Proulx <eeppeliteloop@gmail.com>
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
plugins/ctf/fs/data-stream.c
plugins/ctf/fs/data-stream.h
plugins/ctf/fs/file.c
plugins/ctf/fs/fs.c
plugins/ctf/fs/fs.h
plugins/ctf/fs/metadata.c
plugins/ctf/fs/metadata.h

index ceeefdf5cf2b23768fddea02f79583889fe21eab..b4bb087a3ae867f4ac85bf19fe154ce8bcab1c7f 100644 (file)
@@ -31,6 +31,9 @@
 #include <sys/mman.h>
 #include <babeltrace/ctf-ir/stream.h>
 #include <babeltrace/graph/notification-iterator.h>
+#include <babeltrace/graph/notification-stream.h>
+#include <babeltrace/graph/notification-event.h>
+#include <babeltrace/graph/notification-packet.h>
 #include "file.h"
 #include "metadata.h"
 #include "../common/notif-iter/notif-iter.h"
@@ -41,7 +44,7 @@
 #define PRINT_PREFIX           "ctf-fs-data-stream"
 #include "print.h"
 
-static
+static inline
 size_t remaining_mmap_bytes(struct ctf_fs_stream *stream)
 {
        return stream->mmap_valid_len - stream->request_offset;
@@ -326,9 +329,7 @@ invalid_index:
 static
 int build_index_from_stream(struct ctf_fs_stream *stream)
 {
-       int ret = 0;
-end:
-       return ret;
+       return 0;
 }
 
 static
@@ -348,7 +349,7 @@ end:
 
 BT_HIDDEN
 struct ctf_fs_stream *ctf_fs_stream_create(
-               struct ctf_fs_component *ctf_fs, struct ctf_fs_file *file)
+               struct ctf_fs_component *ctf_fs, const char *path)
 {
        int ret;
        struct ctf_fs_stream *stream = g_new0(struct ctf_fs_stream, 1);
@@ -357,7 +358,17 @@ struct ctf_fs_stream *ctf_fs_stream_create(
                goto error;
        }
 
-       stream->file = file;
+       stream->file = ctf_fs_file_create(ctf_fs);
+       if (!stream->file) {
+               goto error;
+       }
+
+       g_string_assign(stream->file->path, path);
+       ret = ctf_fs_file_open(ctf_fs, stream->file, "rb");
+       if (ret) {
+               goto error;
+       }
+
        stream->notif_iter = bt_ctf_notif_iter_create(ctf_fs->metadata->trace,
                        ctf_fs->page_size, medops, stream, ctf_fs->error_fp);
        if (!stream->notif_iter) {
@@ -372,7 +383,6 @@ struct ctf_fs_stream *ctf_fs_stream_create(
        goto end;
 error:
        /* Do not touch "borrowed" file. */
-       stream->file = NULL;
        ctf_fs_stream_destroy(stream);
        stream = NULL;
 end:
@@ -404,3 +414,63 @@ void ctf_fs_stream_destroy(struct ctf_fs_stream *stream)
 
        g_free(stream);
 }
+
+BT_HIDDEN
+struct bt_notification_iterator_next_return ctf_fs_stream_next(
+               struct ctf_fs_stream *stream)
+{
+       enum bt_ctf_notif_iter_status notif_iter_status;
+       struct bt_notification_iterator_next_return ret = {
+               .status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR,
+               .notification = NULL,
+       };
+
+       if (stream->end_reached) {
+               notif_iter_status = BT_CTF_NOTIF_ITER_STATUS_EOF;
+               goto translate_status;
+       }
+
+       notif_iter_status = bt_ctf_notif_iter_get_next_notification(stream->notif_iter,
+                       &ret.notification);
+       if (notif_iter_status != BT_CTF_NOTIF_ITER_STATUS_OK &&
+                       notif_iter_status != BT_CTF_NOTIF_ITER_STATUS_EOF) {
+               goto translate_status;
+       }
+
+       /* Should be handled in bt_ctf_notif_iter_get_next_notification. */
+       if (notif_iter_status == BT_CTF_NOTIF_ITER_STATUS_EOF) {
+               ret.notification = bt_notification_stream_end_create(
+                       stream->stream);
+               if (!ret.notification) {
+                       notif_iter_status = BT_CTF_NOTIF_ITER_STATUS_ERROR;
+                       goto translate_status;
+               }
+
+               notif_iter_status = BT_CTF_NOTIF_ITER_STATUS_OK;
+               stream->end_reached = true;
+       }
+
+translate_status:
+       switch (notif_iter_status) {
+       case BT_CTF_NOTIF_ITER_STATUS_EOF:
+               ret.status = BT_NOTIFICATION_ITERATOR_STATUS_END;
+               break;
+       case BT_CTF_NOTIF_ITER_STATUS_OK:
+               ret.status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
+               break;
+       case BT_CTF_NOTIF_ITER_STATUS_AGAIN:
+               /*
+                * Should not make it this far as this is
+                * medium-specific; there is nothing for the user to do
+                * and it should have been handled upstream.
+                */
+               assert(false);
+       case BT_CTF_NOTIF_ITER_STATUS_INVAL:
+       case BT_CTF_NOTIF_ITER_STATUS_ERROR:
+       default:
+               ret.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
+               break;
+       }
+
+       return ret;
+}
index 3b686b7a6d7bbd0bc426fdf31161a50c7dfb03ab..3d484425d1b72209c86ea60ee3e1586af288f207 100644 (file)
@@ -48,7 +48,7 @@ struct index {
 
 BT_HIDDEN
 struct ctf_fs_stream *ctf_fs_stream_create(
-               struct ctf_fs_component *ctf_fs, struct ctf_fs_file *file);
+               struct ctf_fs_component *ctf_fs, const char *path);
 
 BT_HIDDEN
 void ctf_fs_stream_destroy(struct ctf_fs_stream *stream);
@@ -56,4 +56,8 @@ void ctf_fs_stream_destroy(struct ctf_fs_stream *stream);
 BT_HIDDEN
 int ctf_fs_data_stream_open_streams(struct ctf_fs_component *ctf_fs);
 
+BT_HIDDEN
+struct bt_notification_iterator_next_return ctf_fs_stream_next(
+               struct ctf_fs_stream *stream);
+
 #endif /* CTF_FS_DATA_STREAM_H */
index c0ca3b8d05cf9a222598c750e69e1a039bd4d3b2..8b756496a9d82064981ce1351db6efd34b3ede1e 100644 (file)
 BT_HIDDEN
 void ctf_fs_file_destroy(struct ctf_fs_file *file)
 {
-       struct ctf_fs_component *ctf_fs = file->ctf_fs;
+       struct ctf_fs_component *ctf_fs;;
 
        if (!file) {
                return;
        }
 
+       ctf_fs = file->ctf_fs;
+
        if (file->fp) {
                PDBG("Closing file \"%s\" (%p)\n", file->path->str, file->fp);
 
index 9a38834a7ca56a0499d0978b8d7d06de5e78a53f..ea45fbca792bbd9061844281a3054ff2a77ab558 100644 (file)
 
 #include <babeltrace/ctf-ir/packet.h>
 #include <babeltrace/ctf-ir/clock-class.h>
+#include <babeltrace/graph/private-port.h>
 #include <babeltrace/graph/private-component.h>
+#include <babeltrace/graph/private-component-source.h>
+#include <babeltrace/graph/private-notification-iterator.h>
 #include <babeltrace/graph/component.h>
 #include <babeltrace/graph/notification-iterator.h>
-#include <babeltrace/graph/private-notification-iterator.h>
-#include <babeltrace/graph/notification-stream.h>
-#include <babeltrace/graph/notification-event.h>
-#include <babeltrace/graph/notification-packet.h>
-#include <babeltrace/graph/notification-heap.h>
 #include <plugins-common.h>
 #include <glib.h>
 #include <assert.h>
@@ -54,625 +52,300 @@ BT_HIDDEN
 bool ctf_fs_debug;
 
 struct bt_notification_iterator_next_return ctf_fs_iterator_next(
-               struct bt_private_notification_iterator *iterator);
-
-static
-enum bt_notification_iterator_status ctf_fs_iterator_get_next_notification(
-               struct ctf_fs_iterator *it,
-               struct ctf_fs_stream *stream,
-               struct bt_notification **notification)
+               struct bt_private_notification_iterator *iterator)
 {
-       enum bt_ctf_notif_iter_status status;
-       enum bt_notification_iterator_status ret;
-
-       if (stream->end_reached) {
-               status = BT_CTF_NOTIF_ITER_STATUS_EOF;
-               goto end;
-       }
-
-       status = bt_ctf_notif_iter_get_next_notification(stream->notif_iter,
-                       notification);
-       if (status != BT_CTF_NOTIF_ITER_STATUS_OK &&
-                       status != BT_CTF_NOTIF_ITER_STATUS_EOF) {
-               goto end;
-       }
+       struct ctf_fs_stream *fs_stream =
+               bt_private_notification_iterator_get_user_data(iterator);
 
-       /* Should be handled in bt_ctf_notif_iter_get_next_notification. */
-       if (status == BT_CTF_NOTIF_ITER_STATUS_EOF) {
-               *notification = bt_notification_stream_end_create(
-                               stream->stream);
-               if (!*notification) {
-                       status = BT_CTF_NOTIF_ITER_STATUS_ERROR;
-               }
-               status = BT_CTF_NOTIF_ITER_STATUS_OK;
-               stream->end_reached = true;
-       }
-end:
-       switch (status) {
-       case BT_CTF_NOTIF_ITER_STATUS_EOF:
-               ret = BT_NOTIFICATION_ITERATOR_STATUS_END;
-               break;
-       case BT_CTF_NOTIF_ITER_STATUS_OK:
-               ret = BT_NOTIFICATION_ITERATOR_STATUS_OK;
-               break;
-       case BT_CTF_NOTIF_ITER_STATUS_AGAIN:
-               /*
-                * Should not make it this far as this is medium-specific;
-                * there is nothing for the user to do and it should have been
-                * handled upstream.
-                */
-               assert(0);
-       case BT_CTF_NOTIF_ITER_STATUS_INVAL:
-               /* No argument provided by the user, so don't return INVAL. */
-       case BT_CTF_NOTIF_ITER_STATUS_ERROR:
-       default:
-               ret = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
-               break;
-       }
-       return ret;
+       return ctf_fs_stream_next(fs_stream);
 }
 
-/*
- * Remove me. This is a temporary work-around due to our inhability to use
- * libbabeltrace-ctf from libbabeltrace-plugin.
- */
-static
-struct bt_ctf_stream *internal_bt_notification_get_stream(
-               struct bt_notification *notification)
+void ctf_fs_iterator_finalize(struct bt_private_notification_iterator *it)
 {
-       struct bt_ctf_stream *stream = NULL;
-
-       assert(notification);
-       switch (bt_notification_get_type(notification)) {
-       case BT_NOTIFICATION_TYPE_EVENT:
-       {
-               struct bt_ctf_event *event;
-
-               event = bt_notification_event_get_event(notification);
-               stream = bt_ctf_event_get_stream(event);
-               bt_put(event);
-               break;
-       }
-       case BT_NOTIFICATION_TYPE_PACKET_BEGIN:
-       {
-               struct bt_ctf_packet *packet;
-
-               packet = bt_notification_packet_begin_get_packet(notification);
-               stream = bt_ctf_packet_get_stream(packet);
-               bt_put(packet);
-               break;
-       }
-       case BT_NOTIFICATION_TYPE_PACKET_END:
-       {
-               struct bt_ctf_packet *packet;
-
-               packet = bt_notification_packet_end_get_packet(notification);
-               stream = bt_ctf_packet_get_stream(packet);
-               bt_put(packet);
-               break;
-       }
-       case BT_NOTIFICATION_TYPE_STREAM_END:
-               stream = bt_notification_stream_end_get_stream(notification);
-               break;
-       default:
-               goto end;
-       }
-end:
-       return stream;
+       void *ctf_fs_stream =
+               bt_private_notification_iterator_get_user_data(it);
+
+       ctf_fs_stream_destroy(ctf_fs_stream);
 }
 
-static
-enum bt_notification_iterator_status populate_heap(struct ctf_fs_iterator *it)
+enum bt_notification_iterator_status ctf_fs_iterator_init(
+               struct bt_private_notification_iterator *it,
+               struct bt_private_port *port)
 {
-       size_t i, pending_streams_count = it->pending_streams->len;
+       struct ctf_fs_stream *stream = NULL;
+       struct ctf_fs_component *ctf_fs;
+       struct ctf_fs_port_data *port_data;
+       struct bt_private_component *priv_comp =
+               bt_private_notification_iterator_get_private_component(it);
        enum bt_notification_iterator_status ret =
-                       BT_NOTIFICATION_ITERATOR_STATUS_OK;
-
-       /* Insert one stream-associated notification for each stream. */
-       for (i = 0; i < pending_streams_count; i++) {
-               struct bt_notification *notification;
-               struct ctf_fs_stream *fs_stream;
-               struct bt_ctf_stream *stream;
-               size_t pending_stream_index = pending_streams_count - 1 - i;
-
-               fs_stream = g_ptr_array_index(it->pending_streams,
-                               pending_stream_index);
-
-               do {
-                       int heap_ret;
-
-                       ret = ctf_fs_iterator_get_next_notification(
-                                       it, fs_stream, &notification);
-                       if (ret && ret != BT_NOTIFICATION_ITERATOR_STATUS_END) {
-                               printf_debug("Failed to populate heap at stream %zu\n",
-                                               pending_stream_index);
-                               goto end;
-                       }
+               BT_NOTIFICATION_ITERATOR_STATUS_OK;
 
-                       stream = internal_bt_notification_get_stream(
-                                       notification);
-                       if (stream) {
-                               gboolean inserted;
-
-                               /*
-                                * Associate pending ctf_fs_stream to
-                                * bt_ctf_stream. Ownership of stream
-                                * is passed to the stream ht.
-                                */
-                               inserted = g_hash_table_insert(it->stream_ht,
-                                               stream, fs_stream);
-                               if (!inserted) {
-                                       ret = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
-                                       printf_debug("Failed to associate fs stream to ctf stream\n");
-                                       goto end;
-                               }
-                       }
+       assert(priv_comp);
 
-                       heap_ret = bt_notification_heap_insert(
-                                       it->pending_notifications,
-                                       notification);
-                       bt_put(notification);
-                       if (heap_ret) {
-                               ret = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
-                               printf_debug("Failed to insert notification in heap\n");
-                               goto end;
-                       }
-               } while (!stream && ret != BT_NOTIFICATION_ITERATOR_STATUS_END);
-               /*
-                * Set NULL so the destruction callback registered with the
-                * array is not invoked on the stream (its ownership was
-                * transferred to the streams hashtable).
-                */
-               g_ptr_array_index(it->pending_streams,
-                               pending_stream_index) = NULL;
-               g_ptr_array_remove_index(it->pending_streams,
-                               pending_stream_index);
-       }
-
-       g_ptr_array_free(it->pending_streams, TRUE);
-       it->pending_streams = NULL;
-end:
-       return ret;
-}
-
-struct bt_notification_iterator_next_return ctf_fs_iterator_next(
-               struct bt_private_notification_iterator *iterator)
-{
-       int heap_ret;
-       struct bt_ctf_stream *stream = NULL;
-       struct ctf_fs_stream *fs_stream;
-       struct bt_notification *next_stream_notification;
-       struct ctf_fs_iterator *ctf_it =
-                       bt_private_notification_iterator_get_user_data(
-                               iterator);
-       struct bt_notification_iterator_next_return ret = {
-               .status = BT_NOTIFICATION_ITERATOR_STATUS_OK,
-               .notification = NULL,
-       };
-
-       ret.notification =
-               bt_notification_heap_pop(ctf_it->pending_notifications);
-       if (!ret.notification && !ctf_it->pending_streams) {
-               ret.status = BT_NOTIFICATION_ITERATOR_STATUS_END;
-               goto end;
+       ctf_fs = bt_private_component_get_user_data(priv_comp);
+       if (!ctf_fs) {
+               ret = BT_NOTIFICATION_ITERATOR_STATUS_INVAL;
+               goto error;
        }
 
-       if (!ret.notification && ctf_it->pending_streams) {
-               /*
-                * Insert at one notification per stream in the heap and pop
-                * one.
-                */
-               ret.status = populate_heap(ctf_it);
-               if (ret.status) {
-                       goto end;
-               }
-
-               ret.notification = bt_notification_heap_pop(
-                               ctf_it->pending_notifications);
-               if (!ret.notification) {
-                       ret.status = BT_NOTIFICATION_ITERATOR_STATUS_END;
-                       goto end;
-               }
+       port_data = bt_private_port_get_user_data(port);
+       if (!port_data) {
+               ret = BT_NOTIFICATION_ITERATOR_STATUS_INVAL;
+               goto error;
        }
 
-       /* notification is set from here. */
-
-       stream = internal_bt_notification_get_stream(ret.notification);
+       stream = ctf_fs_stream_create(ctf_fs, port_data->path->str);
        if (!stream) {
-               /*
-                * The current notification is not associated to a particular
-                * stream, there is no need to insert a new notification from
-                * a stream in the heap.
-                */
-               goto end;
+               goto error;
        }
 
-       fs_stream = g_hash_table_lookup(ctf_it->stream_ht, stream);
-       if (!fs_stream) {
-               /* We have reached this stream's end. */
-               goto end;
+       ret = bt_private_notification_iterator_set_user_data(it, stream);
+       if (ret) {
+               goto error;
        }
 
-       ret.status = ctf_fs_iterator_get_next_notification(ctf_it, fs_stream,
-                       &next_stream_notification);
-       if ((ret.status && ret.status != BT_NOTIFICATION_ITERATOR_STATUS_END)) {
-               heap_ret = bt_notification_heap_insert(
-                               ctf_it->pending_notifications,
-                               ret.notification);
-
-               assert(!next_stream_notification);
-               if (heap_ret) {
-                       /*
-                        * We're dropping the most recent notification, but at
-                        * this point, something is seriously wrong...
-                        */
-                       ret.status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
-               }
-               BT_PUT(ret.notification);
-               goto end;
-       }
+       stream = NULL;
+       goto end;
 
-       if (ret.status == BT_NOTIFICATION_ITERATOR_STATUS_END) {
-               gboolean success;
+error:
+       (void) bt_private_notification_iterator_set_user_data(it, NULL);
 
-               /* Remove stream. */
-               success = g_hash_table_remove(ctf_it->stream_ht, stream);
-               assert(success);
-               ret.status = BT_NOTIFICATION_ITERATOR_STATUS_OK;
-       } else {
-               heap_ret = bt_notification_heap_insert(ctf_it->pending_notifications,
-                                                      next_stream_notification);
-               BT_PUT(next_stream_notification);
-               if (heap_ret) {
-                       /*
-                        * We're dropping the most recent notification...
-                        */
-                       ret.status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
-               }
+       if (ret == BT_NOTIFICATION_ITERATOR_STATUS_OK) {
+               ret = BT_NOTIFICATION_ITERATOR_STATUS_ERROR;
        }
 
-       /*
-        * Ensure that the stream is removed from both pending_streams and
-        * the streams hashtable on reception of the "end of stream"
-        * notification.
-        */
 end:
-       bt_put(stream);
+       ctf_fs_stream_destroy(stream);
+       bt_put(priv_comp);
        return ret;
 }
 
 static
-void ctf_fs_iterator_destroy_data(struct ctf_fs_iterator *ctf_it)
+void ctf_fs_destroy_data(struct ctf_fs_component *ctf_fs)
 {
-       if (!ctf_it) {
+       if (!ctf_fs) {
                return;
        }
-       bt_put(ctf_it->pending_notifications);
-       if (ctf_it->pending_streams) {
-               g_ptr_array_free(ctf_it->pending_streams, TRUE);
+
+       if (ctf_fs->trace_path) {
+               g_string_free(ctf_fs->trace_path, TRUE);
        }
-       if (ctf_it->stream_ht) {
-               g_hash_table_destroy(ctf_it->stream_ht);
+
+       if (ctf_fs->port_data) {
+               g_ptr_array_free(ctf_fs->port_data, TRUE);
        }
-       g_free(ctf_it);
-}
 
-void ctf_fs_iterator_finalize(struct bt_private_notification_iterator *it)
-{
-       void *data = bt_private_notification_iterator_get_user_data(it);
+       if (ctf_fs->metadata) {
+               ctf_fs_metadata_fini(ctf_fs->metadata);
+               g_free(ctf_fs->metadata);
+       }
 
-       ctf_fs_iterator_destroy_data(data);
+       g_free(ctf_fs);
 }
 
-static
-bool compare_event_notifications(struct bt_notification *a,
-               struct bt_notification *b)
+void ctf_fs_finalize(struct bt_private_component *component)
 {
-       int ret;
-       struct bt_ctf_clock_class *clock_class;
-       struct bt_ctf_clock_value *a_clock_value, *b_clock_value;
-       struct bt_ctf_stream_class *a_stream_class;
-       struct bt_ctf_stream *a_stream;
-       struct bt_ctf_event *a_event, *b_event;
-       struct bt_ctf_trace *trace;
-       int64_t a_ts, b_ts;
-
-       // FIXME - assumes only one clock
-       a_event = bt_notification_event_get_event(a);
-       b_event = bt_notification_event_get_event(b);
-       assert(a_event);
-       assert(b_event);
-
-       a_stream = bt_ctf_event_get_stream(a_event);
-       assert(a_stream);
-       a_stream_class = bt_ctf_stream_get_class(a_stream);
-       assert(a_stream_class);
-       trace = bt_ctf_stream_class_get_trace(a_stream_class);
-       assert(trace);
-
-       clock_class = bt_ctf_trace_get_clock_class(trace, 0);
-       a_clock_value = bt_ctf_event_get_clock_value(a_event, clock_class);
-       b_clock_value = bt_ctf_event_get_clock_value(b_event, clock_class);
-       assert(a_clock_value);
-       assert(b_clock_value);
-
-       ret = bt_ctf_clock_value_get_value_ns_from_epoch(a_clock_value, &a_ts);
-       assert(!ret);
-       ret = bt_ctf_clock_value_get_value_ns_from_epoch(b_clock_value, &b_ts);
-       assert(!ret);
-
-       bt_put(a_event);
-       bt_put(b_event);
-       bt_put(a_clock_value);
-       bt_put(b_clock_value);
-       bt_put(a_stream);
-       bt_put(a_stream_class);
-       bt_put(clock_class);
-       bt_put(trace);
-       return a_ts < b_ts;
+       void *data = bt_private_component_get_user_data(component);
+
+       ctf_fs_destroy_data(data);
 }
 
 static
-bool compare_notifications(struct bt_notification *a, struct bt_notification *b,
-               void *unused)
-{
-       static int notification_priorities[] = {
-               [BT_NOTIFICATION_TYPE_NEW_TRACE] = 0,
-               [BT_NOTIFICATION_TYPE_NEW_STREAM_CLASS] = 1,
-               [BT_NOTIFICATION_TYPE_NEW_EVENT_CLASS] = 2,
-               [BT_NOTIFICATION_TYPE_PACKET_BEGIN] = 3,
-               [BT_NOTIFICATION_TYPE_PACKET_END] = 4,
-               [BT_NOTIFICATION_TYPE_EVENT] = 5,
-               [BT_NOTIFICATION_TYPE_END_OF_TRACE] = 6,
-       };
-       int a_prio, b_prio;
-       enum bt_notification_type a_type, b_type;
-
-       assert(a && b);
-       a_type = bt_notification_get_type(a);
-       b_type = bt_notification_get_type(b);
-       assert(a_type > BT_NOTIFICATION_TYPE_ALL);
-       assert(a_type < BT_NOTIFICATION_TYPE_NR);
-       assert(b_type > BT_NOTIFICATION_TYPE_ALL);
-       assert(b_type < BT_NOTIFICATION_TYPE_NR);
-
-       a_prio = notification_priorities[a_type];
-       b_prio = notification_priorities[b_type];
-
-       if (likely((a_type == b_type) && a_type == BT_NOTIFICATION_TYPE_EVENT)) {
-               return compare_event_notifications(a, b);
-       }
-
-       if (unlikely(a_prio != b_prio)) {
-               return a_prio < b_prio;
-       }
-
-       /* Notification types are equal, but not of type "event". */
-       switch (a_type) {
-       case BT_NOTIFICATION_TYPE_PACKET_BEGIN:
-       case BT_NOTIFICATION_TYPE_PACKET_END:
-       case BT_NOTIFICATION_TYPE_STREAM_END:
-       {
-               int64_t a_sc_id, b_sc_id;
-               struct bt_ctf_stream *a_stream, *b_stream;
-               struct bt_ctf_stream_class *a_sc, *b_sc;
-
-               a_stream = internal_bt_notification_get_stream(a);
-               b_stream = internal_bt_notification_get_stream(b);
-               assert(a_stream && b_stream);
-
-               a_sc = bt_ctf_stream_get_class(a_stream);
-               b_sc = bt_ctf_stream_get_class(b_stream);
-               assert(a_sc && b_sc);
-
-               a_sc_id = bt_ctf_stream_class_get_id(a_sc);
-               b_sc_id = bt_ctf_stream_class_get_id(b_sc);
-               assert(a_sc_id >= 0 && b_sc_id >= 0);
-               bt_put(a_sc);
-               bt_put(a_stream);
-               bt_put(b_sc);
-               bt_put(b_stream);
-               return a_sc_id < b_sc_id;
-       }
-       case BT_NOTIFICATION_TYPE_NEW_TRACE:
-       case BT_NOTIFICATION_TYPE_END_OF_TRACE:
-               /* Impossible to have two separate traces. */
-       default:
-               assert(0);
-       }
-
-       assert(0);
-       return a < b;
+void port_data_destroy(void *data) {
+       struct ctf_fs_port_data *port_data = data;
+
+       if (!port_data) {
+               return;
+       }
+
+       if (port_data->path) {
+               g_string_free(port_data->path, TRUE);
+       }
+
+       g_free(port_data);
 }
 
 static
-void stream_destroy(void *stream)
+int create_one_port(struct ctf_fs_component *ctf_fs,
+               const char *stream_basename, const char *stream_path)
 {
-       ctf_fs_stream_destroy((struct ctf_fs_stream *) stream);
+       int ret = 0;
+       struct bt_private_port *port = NULL;
+       struct ctf_fs_port_data *port_data = NULL;
+       GString *port_name = NULL;
+
+       port_name = g_string_new(NULL);
+       if (!port_name) {
+               goto error;
+       }
+
+       /* Assign the name for the new output port */
+       g_string_assign(port_name, "");
+       g_string_printf(port_name, "trace0-stream-%s", stream_basename);
+       PDBG("Creating one port named `%s` associated with path `%s`\n",
+               port_name->str, stream_path);
+
+       /* Create output port for this file */
+       port = bt_private_component_source_add_output_private_port(
+               ctf_fs->priv_comp, port_name->str);
+       if (!port) {
+               goto error;
+       }
+
+       port_data = g_new0(struct ctf_fs_port_data, 1);
+       if (!port_data) {
+               goto error;
+       }
+
+       port_data->path = g_string_new(stream_path);
+       if (!port_data->path) {
+               goto error;
+       }
+
+       ret = bt_private_port_set_user_data(port, port_data);
+       if (ret) {
+               goto error;
+       }
+
+       g_ptr_array_add(ctf_fs->port_data, port_data);
+       port_data = NULL;
+       goto end;
+
+error:
+       ret = -1;
+
+end:
+       if (port_name) {
+               g_string_free(port_name, TRUE);
+       }
+
+       bt_put(port);
+       port_data_destroy(port_data);
+       return ret;
 }
 
 static
-int open_trace_streams(struct ctf_fs_component *ctf_fs,
-               struct ctf_fs_iterator *ctf_it)
+int create_ports(struct ctf_fs_component *ctf_fs)
 {
        int ret = 0;
-       const char *name;
+       const char *basename;
        GError *error = NULL;
-       GDir *dir = g_dir_open(ctf_fs->trace_path->str, 0, &error);
+       GDir *dir = NULL;
+       struct bt_private_port *def_port;
+       struct ctf_fs_file *file = NULL;
+
+       /* Remove default port if needed */
+       def_port = bt_private_component_source_get_default_output_private_port(
+               ctf_fs->priv_comp);
+       if (def_port) {
+               bt_private_port_remove_from_component(def_port);
+               bt_put(def_port);
+       }
 
+       /* Create one output port for each stream file */
+       dir = g_dir_open(ctf_fs->trace_path->str, 0, &error);
        if (!dir) {
-               PERR("Cannot open directory \"%s\": %s (code %d)\n",
-                               ctf_fs->trace_path->str, error->message,
-                               error->code);
+               PERR("Cannot open directory `%s`: %s (code %d)\n",
+                       ctf_fs->trace_path->str, error->message,
+                       error->code);
                goto error;
        }
 
-       while ((name = g_dir_read_name(dir))) {
-               struct ctf_fs_file *file = NULL;
-               struct ctf_fs_stream *stream = NULL;
-
-               if (!strcmp(name, CTF_FS_METADATA_FILENAME)) {
+       while ((basename = g_dir_read_name(dir))) {
+               if (!strcmp(basename, CTF_FS_METADATA_FILENAME)) {
                        /* Ignore the metadata stream. */
-                       PDBG("Ignoring metadata file \"%s\"\n",
-                                       name);
+                       PDBG("Ignoring metadata file `%s`\n", basename);
                        continue;
                }
 
-               if (name[0] == '.') {
-                       PDBG("Ignoring hidden file \"%s\"\n",
-                                       name);
+               if (basename[0] == '.') {
+                       PDBG("Ignoring hidden file `%s`\n", basename);
                        continue;
                }
 
                /* Create the file. */
                file = ctf_fs_file_create(ctf_fs);
                if (!file) {
-                       PERR("Cannot create stream file object\n");
+                       PERR("Cannot create stream file object for file `%s`\n",
+                               basename);
                        goto error;
                }
 
                /* Create full path string. */
                g_string_append_printf(file->path, "%s/%s",
-                               ctf_fs->trace_path->str, name);
+                               ctf_fs->trace_path->str, basename);
                if (!g_file_test(file->path->str, G_FILE_TEST_IS_REGULAR)) {
-                       PDBG("Ignoring non-regular file \"%s\"\n", name);
+                       PDBG("Ignoring non-regular file `%s`\n", basename);
                        ctf_fs_file_destroy(file);
+                       file = NULL;
                        continue;
                }
 
-               /* Open the file. */
-               if (ctf_fs_file_open(ctf_fs, file, "rb")) {
-                       ctf_fs_file_destroy(file);
+               ret = ctf_fs_file_open(ctf_fs, file, "rb");
+               if (ret) {
+                       PERR("Cannot open stream file `%s`\n", basename);
                        goto error;
                }
 
                if (file->size == 0) {
                        /* Skip empty stream. */
+                       PDBG("Ignoring empty file `%s`\n", basename);
                        ctf_fs_file_destroy(file);
+                       file = NULL;
                        continue;
                }
 
-               /* Create a private stream; file ownership is passed to it. */
-               stream = ctf_fs_stream_create(ctf_fs, file);
-               if (!stream) {
-                       ctf_fs_file_destroy(file);
+               ret = create_one_port(ctf_fs, basename, file->path->str);
+               if (ret) {
+                       PERR("Cannot create output port for file `%s`\n",
+                               basename);
                        goto error;
                }
 
-               g_ptr_array_add(ctf_it->pending_streams, stream);
+               ctf_fs_file_destroy(file);
+               file = NULL;
        }
 
        goto end;
+
 error:
        ret = -1;
+
 end:
        if (dir) {
                g_dir_close(dir);
                dir = NULL;
        }
+
        if (error) {
                g_error_free(error);
        }
-       return ret;
-}
-
-enum bt_notification_iterator_status ctf_fs_iterator_init(
-               struct bt_private_notification_iterator *it,
-               struct bt_private_port *port)
-{
-       struct ctf_fs_iterator *ctf_it;
-       struct ctf_fs_component *ctf_fs;
-       struct bt_private_component *source =
-               bt_private_notification_iterator_get_private_component(it);
-       enum bt_notification_iterator_status ret = BT_NOTIFICATION_ITERATOR_STATUS_OK;
-
-       assert(source && it);
-
-       ctf_fs = bt_private_component_get_user_data(source);
-       if (!ctf_fs) {
-               ret = BT_NOTIFICATION_ITERATOR_STATUS_INVAL;
-               goto end;
-       }
-
-       ctf_it = g_new0(struct ctf_fs_iterator, 1);
-       if (!ctf_it) {
-               ret = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM;
-               goto end;
-       }
 
-       ctf_it->stream_ht = g_hash_table_new_full(g_direct_hash,
-                       g_direct_equal, bt_put, stream_destroy);
-       if (!ctf_it->stream_ht) {
-               goto error;
-       }
-       ctf_it->pending_streams = g_ptr_array_new_with_free_func(
-                       stream_destroy);
-       if (!ctf_it->pending_streams) {
-               goto error;
-       }
-       ctf_it->pending_notifications = bt_notification_heap_create(
-                       compare_notifications, NULL);
-       if (!ctf_it->pending_notifications) {
-               goto error;
-       }
-
-       ret = open_trace_streams(ctf_fs, ctf_it);
-       if (ret) {
-               goto error;
-       }
-
-       ret = bt_private_notification_iterator_set_user_data(it, ctf_it);
-       if (ret) {
-               goto error;
-       }
-
-       goto end;
-
-error:
-       (void) bt_private_notification_iterator_set_user_data(it, NULL);
-       ctf_fs_iterator_destroy_data(ctf_it);
-
-end:
-       bt_put(source);
+       ctf_fs_file_destroy(file);
        return ret;
 }
 
 static
-void ctf_fs_destroy_data(struct ctf_fs_component *ctf_fs)
-{
-       if (!ctf_fs) {
-               return;
-       }
-       if (ctf_fs->trace_path) {
-               g_string_free(ctf_fs->trace_path, TRUE);
-       }
-       if (ctf_fs->metadata) {
-               ctf_fs_metadata_fini(ctf_fs->metadata);
-               g_free(ctf_fs->metadata);
-       }
-       g_free(ctf_fs);
-}
-
-void ctf_fs_finalize(struct bt_private_component *component)
-{
-       void *data = bt_private_component_get_user_data(component);
-
-       ctf_fs_destroy_data(data);
-}
-
-static
-struct ctf_fs_component *ctf_fs_create(struct bt_value *params)
+struct ctf_fs_component *ctf_fs_create(struct bt_private_component *priv_comp,
+               struct bt_value *params)
 {
        struct ctf_fs_component *ctf_fs;
        struct bt_value *value = NULL;
        const char *path;
-       enum bt_value_status ret;
+       int ret;
 
        ctf_fs = g_new0(struct ctf_fs_component, 1);
        if (!ctf_fs) {
                goto end;
        }
 
+       /*
+        * We don't need to get a new reference here because as long as
+        * our private ctf_fs_component object exists, the containing
+        * private component should also exist.
+        */
+       ctf_fs->priv_comp = priv_comp;
+
        /* FIXME: should probably look for a source URI */
        value = bt_value_map_get(params, "path");
        if (!value || bt_value_is_null(value) || !bt_value_is_string(value)) {
@@ -680,7 +353,12 @@ struct ctf_fs_component *ctf_fs_create(struct bt_value *params)
        }
 
        ret = bt_value_string_get(value, &path);
-       if (ret != BT_VALUE_STATUS_OK) {
+       if (ret) {
+               goto error;
+       }
+
+       ctf_fs->port_data = g_ptr_array_new_with_free_func(port_data_destroy);
+       if (!ctf_fs->port_data) {
                goto error;
        }
 
@@ -696,7 +374,17 @@ struct ctf_fs_component *ctf_fs_create(struct bt_value *params)
        if (!ctf_fs->metadata) {
                goto error;
        }
-       ctf_fs_metadata_set_trace(ctf_fs);
+
+       ret = ctf_fs_metadata_set_trace(ctf_fs);
+       if (ret) {
+               goto error;
+       }
+
+       ret = create_ports(ctf_fs);
+       if (ret) {
+               goto error;
+       }
+
        goto end;
 
 error:
@@ -708,28 +396,28 @@ end:
 }
 
 BT_HIDDEN
-enum bt_component_status ctf_fs_init(struct bt_private_component *source,
+enum bt_component_status ctf_fs_init(struct bt_private_component *priv_comp,
                struct bt_value *params, UNUSED_VAR void *init_method_data)
 {
        struct ctf_fs_component *ctf_fs;
        enum bt_component_status ret = BT_COMPONENT_STATUS_OK;
 
-       assert(source);
+       assert(priv_comp);
        ctf_fs_debug = g_strcmp0(getenv("CTF_FS_DEBUG"), "1") == 0;
-       ctf_fs = ctf_fs_create(params);
+       ctf_fs = ctf_fs_create(priv_comp, params);
        if (!ctf_fs) {
                ret = BT_COMPONENT_STATUS_NOMEM;
                goto end;
        }
 
-       ret = bt_private_component_set_user_data(source, ctf_fs);
+       ret = bt_private_component_set_user_data(priv_comp, ctf_fs);
        if (ret != BT_COMPONENT_STATUS_OK) {
                goto error;
        }
 end:
        return ret;
 error:
-       (void) bt_private_component_set_user_data(source, NULL);
+       (void) bt_private_component_set_user_data(priv_comp, NULL);
         ctf_fs_destroy_data(ctf_fs);
        return ret;
 }
index 43977d286ba137d1b109a0fada8f45e4ac73cabe..0a34856af8a8000cc08815b36ebec8c6f6f8bbe3 100644 (file)
@@ -38,8 +38,6 @@
 BT_HIDDEN
 extern bool ctf_fs_debug;
 
-struct bt_notification_heap;
-
 struct ctf_fs_file {
        struct ctf_fs_component *ctf_fs;
        GString *path;
@@ -58,7 +56,6 @@ struct ctf_fs_metadata {
 struct ctf_fs_stream {
        struct ctf_fs_file *file;
        struct bt_ctf_stream *stream;
-       /* FIXME There should be many and ctf_fs_stream should not own them. */
        struct bt_ctf_notif_iter *notif_iter;
        /* A stream is assumed to be indexed. */
        struct index index;
@@ -79,29 +76,23 @@ struct ctf_fs_stream {
        bool end_reached;
 };
 
-struct ctf_fs_iterator {
-       struct bt_notification_heap *pending_notifications;
-       /*
-        * struct ctf_fs_data_stream* which have not yet been associated to a
-        * bt_ctf_stream. The association is performed on the first packet
-        * read by the stream (since, at that point, we have read a packet
-        * header).
-        */
-       GPtrArray *pending_streams;
-       /* bt_ctf_stream -> ctf_fs_stream */
-       GHashTable *stream_ht;
+struct ctf_fs_component_options {
 };
 
-struct ctf_fs_component_options {
-       bool opt_dummy : 1;
+struct ctf_fs_port_data {
+       GString *path;
 };
 
 struct ctf_fs_component {
+       struct bt_private_component *priv_comp;
        GString *trace_path;
        FILE *error_fp;
        size_t page_size;
        struct ctf_fs_component_options options;
        struct ctf_fs_metadata *metadata;
+
+       /* Array of struct ctf_fs_port_data *, owned by this */
+       GPtrArray *port_data;
 };
 
 BT_HIDDEN
index e531056c91fadb2769d2ba0483e8cfbb4464f3ba..063ebe56c7d74a77f765b6589d61b5a491573db1 100644 (file)
@@ -313,7 +313,7 @@ end:
        return ret;
 }
 
-void ctf_fs_metadata_set_trace(struct ctf_fs_component *ctf_fs)
+int ctf_fs_metadata_set_trace(struct ctf_fs_component *ctf_fs)
 {
        int ret = 0;
        struct ctf_fs_file *file = get_file(ctf_fs, ctf_fs->trace_path->str);
@@ -412,6 +412,8 @@ error:
                ctf_fs->metadata->text = NULL;
        }
 
+       ret = -1;
+
 end:
        if (file) {
                ctf_fs_file_destroy(file);
@@ -420,6 +422,8 @@ end:
        if (scanner) {
                ctf_scanner_free(scanner);
        }
+
+       return ret;
 }
 
 int ctf_fs_metadata_init(struct ctf_fs_metadata *metadata)
index a8ca942d7a7db7980aee38913997d85fa3215303..85a7726834d5e1943af9d7868e253543e398703c 100644 (file)
@@ -38,7 +38,7 @@ BT_HIDDEN
 void ctf_fs_metadata_fini(struct ctf_fs_metadata *metadata);
 
 BT_HIDDEN
-void ctf_fs_metadata_set_trace(struct ctf_fs_component *ctf_fs);
+int ctf_fs_metadata_set_trace(struct ctf_fs_component *ctf_fs);
 
 BT_HIDDEN
 FILE *ctf_fs_metadata_open_file(const char *trace_path);
This page took 0.040914 seconds and 4 git commands to generate.