From 4f1f88a6bc222c4f93aa8972d488eeb3fc195f33 Mon Sep 17 00:00:00 2001 From: Philippe Proulx Date: Wed, 29 Mar 2017 17:44:06 -0400 Subject: [PATCH] ctf.fs: split streams, one per port MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit The ports are named `traceN-stream-STREAM`, where `N` is the index of the trace (starts at 0) and `STREAM` is the base name of the stream file. The heap which used to be in this component class is expected to be moved to the utils.muxer filter component class. Signed-off-by: Philippe Proulx Signed-off-by: Jérémie Galarneau --- plugins/ctf/fs/data-stream.c | 84 +++- plugins/ctf/fs/data-stream.h | 6 +- plugins/ctf/fs/file.c | 4 +- plugins/ctf/fs/fs.c | 716 ++++++++++------------------------- plugins/ctf/fs/fs.h | 23 +- plugins/ctf/fs/metadata.c | 6 +- plugins/ctf/fs/metadata.h | 2 +- 7 files changed, 300 insertions(+), 541 deletions(-) diff --git a/plugins/ctf/fs/data-stream.c b/plugins/ctf/fs/data-stream.c index ceeefdf5..b4bb087a 100644 --- a/plugins/ctf/fs/data-stream.c +++ b/plugins/ctf/fs/data-stream.c @@ -31,6 +31,9 @@ #include #include #include +#include +#include +#include #include "file.h" #include "metadata.h" #include "../common/notif-iter/notif-iter.h" @@ -41,7 +44,7 @@ #define PRINT_PREFIX "ctf-fs-data-stream" #include "print.h" -static +static inline size_t remaining_mmap_bytes(struct ctf_fs_stream *stream) { return stream->mmap_valid_len - stream->request_offset; @@ -326,9 +329,7 @@ invalid_index: static int build_index_from_stream(struct ctf_fs_stream *stream) { - int ret = 0; -end: - return ret; + return 0; } static @@ -348,7 +349,7 @@ end: BT_HIDDEN struct ctf_fs_stream *ctf_fs_stream_create( - struct ctf_fs_component *ctf_fs, struct ctf_fs_file *file) + struct ctf_fs_component *ctf_fs, const char *path) { int ret; struct ctf_fs_stream *stream = g_new0(struct ctf_fs_stream, 1); @@ -357,7 +358,17 @@ struct ctf_fs_stream *ctf_fs_stream_create( goto error; } - stream->file = file; + stream->file = ctf_fs_file_create(ctf_fs); + if (!stream->file) { + goto error; + } + + g_string_assign(stream->file->path, path); + ret = ctf_fs_file_open(ctf_fs, stream->file, "rb"); + if (ret) { + goto error; + } + stream->notif_iter = bt_ctf_notif_iter_create(ctf_fs->metadata->trace, ctf_fs->page_size, medops, stream, ctf_fs->error_fp); if (!stream->notif_iter) { @@ -372,7 +383,6 @@ struct ctf_fs_stream *ctf_fs_stream_create( goto end; error: /* Do not touch "borrowed" file. */ - stream->file = NULL; ctf_fs_stream_destroy(stream); stream = NULL; end: @@ -404,3 +414,63 @@ void ctf_fs_stream_destroy(struct ctf_fs_stream *stream) g_free(stream); } + +BT_HIDDEN +struct bt_notification_iterator_next_return ctf_fs_stream_next( + struct ctf_fs_stream *stream) +{ + enum bt_ctf_notif_iter_status notif_iter_status; + struct bt_notification_iterator_next_return ret = { + .status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR, + .notification = NULL, + }; + + if (stream->end_reached) { + notif_iter_status = BT_CTF_NOTIF_ITER_STATUS_EOF; + goto translate_status; + } + + notif_iter_status = bt_ctf_notif_iter_get_next_notification(stream->notif_iter, + &ret.notification); + if (notif_iter_status != BT_CTF_NOTIF_ITER_STATUS_OK && + notif_iter_status != BT_CTF_NOTIF_ITER_STATUS_EOF) { + goto translate_status; + } + + /* Should be handled in bt_ctf_notif_iter_get_next_notification. */ + if (notif_iter_status == BT_CTF_NOTIF_ITER_STATUS_EOF) { + ret.notification = bt_notification_stream_end_create( + stream->stream); + if (!ret.notification) { + notif_iter_status = BT_CTF_NOTIF_ITER_STATUS_ERROR; + goto translate_status; + } + + notif_iter_status = BT_CTF_NOTIF_ITER_STATUS_OK; + stream->end_reached = true; + } + +translate_status: + switch (notif_iter_status) { + case BT_CTF_NOTIF_ITER_STATUS_EOF: + ret.status = BT_NOTIFICATION_ITERATOR_STATUS_END; + break; + case BT_CTF_NOTIF_ITER_STATUS_OK: + ret.status = BT_NOTIFICATION_ITERATOR_STATUS_OK; + break; + case BT_CTF_NOTIF_ITER_STATUS_AGAIN: + /* + * Should not make it this far as this is + * medium-specific; there is nothing for the user to do + * and it should have been handled upstream. + */ + assert(false); + case BT_CTF_NOTIF_ITER_STATUS_INVAL: + case BT_CTF_NOTIF_ITER_STATUS_ERROR: + default: + ret.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR; + break; + } + + return ret; +} diff --git a/plugins/ctf/fs/data-stream.h b/plugins/ctf/fs/data-stream.h index 3b686b7a..3d484425 100644 --- a/plugins/ctf/fs/data-stream.h +++ b/plugins/ctf/fs/data-stream.h @@ -48,7 +48,7 @@ struct index { BT_HIDDEN struct ctf_fs_stream *ctf_fs_stream_create( - struct ctf_fs_component *ctf_fs, struct ctf_fs_file *file); + struct ctf_fs_component *ctf_fs, const char *path); BT_HIDDEN void ctf_fs_stream_destroy(struct ctf_fs_stream *stream); @@ -56,4 +56,8 @@ void ctf_fs_stream_destroy(struct ctf_fs_stream *stream); BT_HIDDEN int ctf_fs_data_stream_open_streams(struct ctf_fs_component *ctf_fs); +BT_HIDDEN +struct bt_notification_iterator_next_return ctf_fs_stream_next( + struct ctf_fs_stream *stream); + #endif /* CTF_FS_DATA_STREAM_H */ diff --git a/plugins/ctf/fs/file.c b/plugins/ctf/fs/file.c index c0ca3b8d..8b756496 100644 --- a/plugins/ctf/fs/file.c +++ b/plugins/ctf/fs/file.c @@ -35,12 +35,14 @@ BT_HIDDEN void ctf_fs_file_destroy(struct ctf_fs_file *file) { - struct ctf_fs_component *ctf_fs = file->ctf_fs; + struct ctf_fs_component *ctf_fs;; if (!file) { return; } + ctf_fs = file->ctf_fs; + if (file->fp) { PDBG("Closing file \"%s\" (%p)\n", file->path->str, file->fp); diff --git a/plugins/ctf/fs/fs.c b/plugins/ctf/fs/fs.c index 9a38834a..ea45fbca 100644 --- a/plugins/ctf/fs/fs.c +++ b/plugins/ctf/fs/fs.c @@ -28,14 +28,12 @@ #include #include +#include #include +#include +#include #include #include -#include -#include -#include -#include -#include #include #include #include @@ -54,625 +52,300 @@ BT_HIDDEN bool ctf_fs_debug; struct bt_notification_iterator_next_return ctf_fs_iterator_next( - struct bt_private_notification_iterator *iterator); - -static -enum bt_notification_iterator_status ctf_fs_iterator_get_next_notification( - struct ctf_fs_iterator *it, - struct ctf_fs_stream *stream, - struct bt_notification **notification) + struct bt_private_notification_iterator *iterator) { - enum bt_ctf_notif_iter_status status; - enum bt_notification_iterator_status ret; - - if (stream->end_reached) { - status = BT_CTF_NOTIF_ITER_STATUS_EOF; - goto end; - } - - status = bt_ctf_notif_iter_get_next_notification(stream->notif_iter, - notification); - if (status != BT_CTF_NOTIF_ITER_STATUS_OK && - status != BT_CTF_NOTIF_ITER_STATUS_EOF) { - goto end; - } + struct ctf_fs_stream *fs_stream = + bt_private_notification_iterator_get_user_data(iterator); - /* Should be handled in bt_ctf_notif_iter_get_next_notification. */ - if (status == BT_CTF_NOTIF_ITER_STATUS_EOF) { - *notification = bt_notification_stream_end_create( - stream->stream); - if (!*notification) { - status = BT_CTF_NOTIF_ITER_STATUS_ERROR; - } - status = BT_CTF_NOTIF_ITER_STATUS_OK; - stream->end_reached = true; - } -end: - switch (status) { - case BT_CTF_NOTIF_ITER_STATUS_EOF: - ret = BT_NOTIFICATION_ITERATOR_STATUS_END; - break; - case BT_CTF_NOTIF_ITER_STATUS_OK: - ret = BT_NOTIFICATION_ITERATOR_STATUS_OK; - break; - case BT_CTF_NOTIF_ITER_STATUS_AGAIN: - /* - * Should not make it this far as this is medium-specific; - * there is nothing for the user to do and it should have been - * handled upstream. - */ - assert(0); - case BT_CTF_NOTIF_ITER_STATUS_INVAL: - /* No argument provided by the user, so don't return INVAL. */ - case BT_CTF_NOTIF_ITER_STATUS_ERROR: - default: - ret = BT_NOTIFICATION_ITERATOR_STATUS_ERROR; - break; - } - return ret; + return ctf_fs_stream_next(fs_stream); } -/* - * Remove me. This is a temporary work-around due to our inhability to use - * libbabeltrace-ctf from libbabeltrace-plugin. - */ -static -struct bt_ctf_stream *internal_bt_notification_get_stream( - struct bt_notification *notification) +void ctf_fs_iterator_finalize(struct bt_private_notification_iterator *it) { - struct bt_ctf_stream *stream = NULL; - - assert(notification); - switch (bt_notification_get_type(notification)) { - case BT_NOTIFICATION_TYPE_EVENT: - { - struct bt_ctf_event *event; - - event = bt_notification_event_get_event(notification); - stream = bt_ctf_event_get_stream(event); - bt_put(event); - break; - } - case BT_NOTIFICATION_TYPE_PACKET_BEGIN: - { - struct bt_ctf_packet *packet; - - packet = bt_notification_packet_begin_get_packet(notification); - stream = bt_ctf_packet_get_stream(packet); - bt_put(packet); - break; - } - case BT_NOTIFICATION_TYPE_PACKET_END: - { - struct bt_ctf_packet *packet; - - packet = bt_notification_packet_end_get_packet(notification); - stream = bt_ctf_packet_get_stream(packet); - bt_put(packet); - break; - } - case BT_NOTIFICATION_TYPE_STREAM_END: - stream = bt_notification_stream_end_get_stream(notification); - break; - default: - goto end; - } -end: - return stream; + void *ctf_fs_stream = + bt_private_notification_iterator_get_user_data(it); + + ctf_fs_stream_destroy(ctf_fs_stream); } -static -enum bt_notification_iterator_status populate_heap(struct ctf_fs_iterator *it) +enum bt_notification_iterator_status ctf_fs_iterator_init( + struct bt_private_notification_iterator *it, + struct bt_private_port *port) { - size_t i, pending_streams_count = it->pending_streams->len; + struct ctf_fs_stream *stream = NULL; + struct ctf_fs_component *ctf_fs; + struct ctf_fs_port_data *port_data; + struct bt_private_component *priv_comp = + bt_private_notification_iterator_get_private_component(it); enum bt_notification_iterator_status ret = - BT_NOTIFICATION_ITERATOR_STATUS_OK; - - /* Insert one stream-associated notification for each stream. */ - for (i = 0; i < pending_streams_count; i++) { - struct bt_notification *notification; - struct ctf_fs_stream *fs_stream; - struct bt_ctf_stream *stream; - size_t pending_stream_index = pending_streams_count - 1 - i; - - fs_stream = g_ptr_array_index(it->pending_streams, - pending_stream_index); - - do { - int heap_ret; - - ret = ctf_fs_iterator_get_next_notification( - it, fs_stream, ¬ification); - if (ret && ret != BT_NOTIFICATION_ITERATOR_STATUS_END) { - printf_debug("Failed to populate heap at stream %zu\n", - pending_stream_index); - goto end; - } + BT_NOTIFICATION_ITERATOR_STATUS_OK; - stream = internal_bt_notification_get_stream( - notification); - if (stream) { - gboolean inserted; - - /* - * Associate pending ctf_fs_stream to - * bt_ctf_stream. Ownership of stream - * is passed to the stream ht. - */ - inserted = g_hash_table_insert(it->stream_ht, - stream, fs_stream); - if (!inserted) { - ret = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM; - printf_debug("Failed to associate fs stream to ctf stream\n"); - goto end; - } - } + assert(priv_comp); - heap_ret = bt_notification_heap_insert( - it->pending_notifications, - notification); - bt_put(notification); - if (heap_ret) { - ret = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM; - printf_debug("Failed to insert notification in heap\n"); - goto end; - } - } while (!stream && ret != BT_NOTIFICATION_ITERATOR_STATUS_END); - /* - * Set NULL so the destruction callback registered with the - * array is not invoked on the stream (its ownership was - * transferred to the streams hashtable). - */ - g_ptr_array_index(it->pending_streams, - pending_stream_index) = NULL; - g_ptr_array_remove_index(it->pending_streams, - pending_stream_index); - } - - g_ptr_array_free(it->pending_streams, TRUE); - it->pending_streams = NULL; -end: - return ret; -} - -struct bt_notification_iterator_next_return ctf_fs_iterator_next( - struct bt_private_notification_iterator *iterator) -{ - int heap_ret; - struct bt_ctf_stream *stream = NULL; - struct ctf_fs_stream *fs_stream; - struct bt_notification *next_stream_notification; - struct ctf_fs_iterator *ctf_it = - bt_private_notification_iterator_get_user_data( - iterator); - struct bt_notification_iterator_next_return ret = { - .status = BT_NOTIFICATION_ITERATOR_STATUS_OK, - .notification = NULL, - }; - - ret.notification = - bt_notification_heap_pop(ctf_it->pending_notifications); - if (!ret.notification && !ctf_it->pending_streams) { - ret.status = BT_NOTIFICATION_ITERATOR_STATUS_END; - goto end; + ctf_fs = bt_private_component_get_user_data(priv_comp); + if (!ctf_fs) { + ret = BT_NOTIFICATION_ITERATOR_STATUS_INVAL; + goto error; } - if (!ret.notification && ctf_it->pending_streams) { - /* - * Insert at one notification per stream in the heap and pop - * one. - */ - ret.status = populate_heap(ctf_it); - if (ret.status) { - goto end; - } - - ret.notification = bt_notification_heap_pop( - ctf_it->pending_notifications); - if (!ret.notification) { - ret.status = BT_NOTIFICATION_ITERATOR_STATUS_END; - goto end; - } + port_data = bt_private_port_get_user_data(port); + if (!port_data) { + ret = BT_NOTIFICATION_ITERATOR_STATUS_INVAL; + goto error; } - /* notification is set from here. */ - - stream = internal_bt_notification_get_stream(ret.notification); + stream = ctf_fs_stream_create(ctf_fs, port_data->path->str); if (!stream) { - /* - * The current notification is not associated to a particular - * stream, there is no need to insert a new notification from - * a stream in the heap. - */ - goto end; + goto error; } - fs_stream = g_hash_table_lookup(ctf_it->stream_ht, stream); - if (!fs_stream) { - /* We have reached this stream's end. */ - goto end; + ret = bt_private_notification_iterator_set_user_data(it, stream); + if (ret) { + goto error; } - ret.status = ctf_fs_iterator_get_next_notification(ctf_it, fs_stream, - &next_stream_notification); - if ((ret.status && ret.status != BT_NOTIFICATION_ITERATOR_STATUS_END)) { - heap_ret = bt_notification_heap_insert( - ctf_it->pending_notifications, - ret.notification); - - assert(!next_stream_notification); - if (heap_ret) { - /* - * We're dropping the most recent notification, but at - * this point, something is seriously wrong... - */ - ret.status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM; - } - BT_PUT(ret.notification); - goto end; - } + stream = NULL; + goto end; - if (ret.status == BT_NOTIFICATION_ITERATOR_STATUS_END) { - gboolean success; +error: + (void) bt_private_notification_iterator_set_user_data(it, NULL); - /* Remove stream. */ - success = g_hash_table_remove(ctf_it->stream_ht, stream); - assert(success); - ret.status = BT_NOTIFICATION_ITERATOR_STATUS_OK; - } else { - heap_ret = bt_notification_heap_insert(ctf_it->pending_notifications, - next_stream_notification); - BT_PUT(next_stream_notification); - if (heap_ret) { - /* - * We're dropping the most recent notification... - */ - ret.status = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM; - } + if (ret == BT_NOTIFICATION_ITERATOR_STATUS_OK) { + ret = BT_NOTIFICATION_ITERATOR_STATUS_ERROR; } - /* - * Ensure that the stream is removed from both pending_streams and - * the streams hashtable on reception of the "end of stream" - * notification. - */ end: - bt_put(stream); + ctf_fs_stream_destroy(stream); + bt_put(priv_comp); return ret; } static -void ctf_fs_iterator_destroy_data(struct ctf_fs_iterator *ctf_it) +void ctf_fs_destroy_data(struct ctf_fs_component *ctf_fs) { - if (!ctf_it) { + if (!ctf_fs) { return; } - bt_put(ctf_it->pending_notifications); - if (ctf_it->pending_streams) { - g_ptr_array_free(ctf_it->pending_streams, TRUE); + + if (ctf_fs->trace_path) { + g_string_free(ctf_fs->trace_path, TRUE); } - if (ctf_it->stream_ht) { - g_hash_table_destroy(ctf_it->stream_ht); + + if (ctf_fs->port_data) { + g_ptr_array_free(ctf_fs->port_data, TRUE); } - g_free(ctf_it); -} -void ctf_fs_iterator_finalize(struct bt_private_notification_iterator *it) -{ - void *data = bt_private_notification_iterator_get_user_data(it); + if (ctf_fs->metadata) { + ctf_fs_metadata_fini(ctf_fs->metadata); + g_free(ctf_fs->metadata); + } - ctf_fs_iterator_destroy_data(data); + g_free(ctf_fs); } -static -bool compare_event_notifications(struct bt_notification *a, - struct bt_notification *b) +void ctf_fs_finalize(struct bt_private_component *component) { - int ret; - struct bt_ctf_clock_class *clock_class; - struct bt_ctf_clock_value *a_clock_value, *b_clock_value; - struct bt_ctf_stream_class *a_stream_class; - struct bt_ctf_stream *a_stream; - struct bt_ctf_event *a_event, *b_event; - struct bt_ctf_trace *trace; - int64_t a_ts, b_ts; - - // FIXME - assumes only one clock - a_event = bt_notification_event_get_event(a); - b_event = bt_notification_event_get_event(b); - assert(a_event); - assert(b_event); - - a_stream = bt_ctf_event_get_stream(a_event); - assert(a_stream); - a_stream_class = bt_ctf_stream_get_class(a_stream); - assert(a_stream_class); - trace = bt_ctf_stream_class_get_trace(a_stream_class); - assert(trace); - - clock_class = bt_ctf_trace_get_clock_class(trace, 0); - a_clock_value = bt_ctf_event_get_clock_value(a_event, clock_class); - b_clock_value = bt_ctf_event_get_clock_value(b_event, clock_class); - assert(a_clock_value); - assert(b_clock_value); - - ret = bt_ctf_clock_value_get_value_ns_from_epoch(a_clock_value, &a_ts); - assert(!ret); - ret = bt_ctf_clock_value_get_value_ns_from_epoch(b_clock_value, &b_ts); - assert(!ret); - - bt_put(a_event); - bt_put(b_event); - bt_put(a_clock_value); - bt_put(b_clock_value); - bt_put(a_stream); - bt_put(a_stream_class); - bt_put(clock_class); - bt_put(trace); - return a_ts < b_ts; + void *data = bt_private_component_get_user_data(component); + + ctf_fs_destroy_data(data); } static -bool compare_notifications(struct bt_notification *a, struct bt_notification *b, - void *unused) -{ - static int notification_priorities[] = { - [BT_NOTIFICATION_TYPE_NEW_TRACE] = 0, - [BT_NOTIFICATION_TYPE_NEW_STREAM_CLASS] = 1, - [BT_NOTIFICATION_TYPE_NEW_EVENT_CLASS] = 2, - [BT_NOTIFICATION_TYPE_PACKET_BEGIN] = 3, - [BT_NOTIFICATION_TYPE_PACKET_END] = 4, - [BT_NOTIFICATION_TYPE_EVENT] = 5, - [BT_NOTIFICATION_TYPE_END_OF_TRACE] = 6, - }; - int a_prio, b_prio; - enum bt_notification_type a_type, b_type; - - assert(a && b); - a_type = bt_notification_get_type(a); - b_type = bt_notification_get_type(b); - assert(a_type > BT_NOTIFICATION_TYPE_ALL); - assert(a_type < BT_NOTIFICATION_TYPE_NR); - assert(b_type > BT_NOTIFICATION_TYPE_ALL); - assert(b_type < BT_NOTIFICATION_TYPE_NR); - - a_prio = notification_priorities[a_type]; - b_prio = notification_priorities[b_type]; - - if (likely((a_type == b_type) && a_type == BT_NOTIFICATION_TYPE_EVENT)) { - return compare_event_notifications(a, b); - } - - if (unlikely(a_prio != b_prio)) { - return a_prio < b_prio; - } - - /* Notification types are equal, but not of type "event". */ - switch (a_type) { - case BT_NOTIFICATION_TYPE_PACKET_BEGIN: - case BT_NOTIFICATION_TYPE_PACKET_END: - case BT_NOTIFICATION_TYPE_STREAM_END: - { - int64_t a_sc_id, b_sc_id; - struct bt_ctf_stream *a_stream, *b_stream; - struct bt_ctf_stream_class *a_sc, *b_sc; - - a_stream = internal_bt_notification_get_stream(a); - b_stream = internal_bt_notification_get_stream(b); - assert(a_stream && b_stream); - - a_sc = bt_ctf_stream_get_class(a_stream); - b_sc = bt_ctf_stream_get_class(b_stream); - assert(a_sc && b_sc); - - a_sc_id = bt_ctf_stream_class_get_id(a_sc); - b_sc_id = bt_ctf_stream_class_get_id(b_sc); - assert(a_sc_id >= 0 && b_sc_id >= 0); - bt_put(a_sc); - bt_put(a_stream); - bt_put(b_sc); - bt_put(b_stream); - return a_sc_id < b_sc_id; - } - case BT_NOTIFICATION_TYPE_NEW_TRACE: - case BT_NOTIFICATION_TYPE_END_OF_TRACE: - /* Impossible to have two separate traces. */ - default: - assert(0); - } - - assert(0); - return a < b; +void port_data_destroy(void *data) { + struct ctf_fs_port_data *port_data = data; + + if (!port_data) { + return; + } + + if (port_data->path) { + g_string_free(port_data->path, TRUE); + } + + g_free(port_data); } static -void stream_destroy(void *stream) +int create_one_port(struct ctf_fs_component *ctf_fs, + const char *stream_basename, const char *stream_path) { - ctf_fs_stream_destroy((struct ctf_fs_stream *) stream); + int ret = 0; + struct bt_private_port *port = NULL; + struct ctf_fs_port_data *port_data = NULL; + GString *port_name = NULL; + + port_name = g_string_new(NULL); + if (!port_name) { + goto error; + } + + /* Assign the name for the new output port */ + g_string_assign(port_name, ""); + g_string_printf(port_name, "trace0-stream-%s", stream_basename); + PDBG("Creating one port named `%s` associated with path `%s`\n", + port_name->str, stream_path); + + /* Create output port for this file */ + port = bt_private_component_source_add_output_private_port( + ctf_fs->priv_comp, port_name->str); + if (!port) { + goto error; + } + + port_data = g_new0(struct ctf_fs_port_data, 1); + if (!port_data) { + goto error; + } + + port_data->path = g_string_new(stream_path); + if (!port_data->path) { + goto error; + } + + ret = bt_private_port_set_user_data(port, port_data); + if (ret) { + goto error; + } + + g_ptr_array_add(ctf_fs->port_data, port_data); + port_data = NULL; + goto end; + +error: + ret = -1; + +end: + if (port_name) { + g_string_free(port_name, TRUE); + } + + bt_put(port); + port_data_destroy(port_data); + return ret; } static -int open_trace_streams(struct ctf_fs_component *ctf_fs, - struct ctf_fs_iterator *ctf_it) +int create_ports(struct ctf_fs_component *ctf_fs) { int ret = 0; - const char *name; + const char *basename; GError *error = NULL; - GDir *dir = g_dir_open(ctf_fs->trace_path->str, 0, &error); + GDir *dir = NULL; + struct bt_private_port *def_port; + struct ctf_fs_file *file = NULL; + + /* Remove default port if needed */ + def_port = bt_private_component_source_get_default_output_private_port( + ctf_fs->priv_comp); + if (def_port) { + bt_private_port_remove_from_component(def_port); + bt_put(def_port); + } + /* Create one output port for each stream file */ + dir = g_dir_open(ctf_fs->trace_path->str, 0, &error); if (!dir) { - PERR("Cannot open directory \"%s\": %s (code %d)\n", - ctf_fs->trace_path->str, error->message, - error->code); + PERR("Cannot open directory `%s`: %s (code %d)\n", + ctf_fs->trace_path->str, error->message, + error->code); goto error; } - while ((name = g_dir_read_name(dir))) { - struct ctf_fs_file *file = NULL; - struct ctf_fs_stream *stream = NULL; - - if (!strcmp(name, CTF_FS_METADATA_FILENAME)) { + while ((basename = g_dir_read_name(dir))) { + if (!strcmp(basename, CTF_FS_METADATA_FILENAME)) { /* Ignore the metadata stream. */ - PDBG("Ignoring metadata file \"%s\"\n", - name); + PDBG("Ignoring metadata file `%s`\n", basename); continue; } - if (name[0] == '.') { - PDBG("Ignoring hidden file \"%s\"\n", - name); + if (basename[0] == '.') { + PDBG("Ignoring hidden file `%s`\n", basename); continue; } /* Create the file. */ file = ctf_fs_file_create(ctf_fs); if (!file) { - PERR("Cannot create stream file object\n"); + PERR("Cannot create stream file object for file `%s`\n", + basename); goto error; } /* Create full path string. */ g_string_append_printf(file->path, "%s/%s", - ctf_fs->trace_path->str, name); + ctf_fs->trace_path->str, basename); if (!g_file_test(file->path->str, G_FILE_TEST_IS_REGULAR)) { - PDBG("Ignoring non-regular file \"%s\"\n", name); + PDBG("Ignoring non-regular file `%s`\n", basename); ctf_fs_file_destroy(file); + file = NULL; continue; } - /* Open the file. */ - if (ctf_fs_file_open(ctf_fs, file, "rb")) { - ctf_fs_file_destroy(file); + ret = ctf_fs_file_open(ctf_fs, file, "rb"); + if (ret) { + PERR("Cannot open stream file `%s`\n", basename); goto error; } if (file->size == 0) { /* Skip empty stream. */ + PDBG("Ignoring empty file `%s`\n", basename); ctf_fs_file_destroy(file); + file = NULL; continue; } - /* Create a private stream; file ownership is passed to it. */ - stream = ctf_fs_stream_create(ctf_fs, file); - if (!stream) { - ctf_fs_file_destroy(file); + ret = create_one_port(ctf_fs, basename, file->path->str); + if (ret) { + PERR("Cannot create output port for file `%s`\n", + basename); goto error; } - g_ptr_array_add(ctf_it->pending_streams, stream); + ctf_fs_file_destroy(file); + file = NULL; } goto end; + error: ret = -1; + end: if (dir) { g_dir_close(dir); dir = NULL; } + if (error) { g_error_free(error); } - return ret; -} - -enum bt_notification_iterator_status ctf_fs_iterator_init( - struct bt_private_notification_iterator *it, - struct bt_private_port *port) -{ - struct ctf_fs_iterator *ctf_it; - struct ctf_fs_component *ctf_fs; - struct bt_private_component *source = - bt_private_notification_iterator_get_private_component(it); - enum bt_notification_iterator_status ret = BT_NOTIFICATION_ITERATOR_STATUS_OK; - - assert(source && it); - - ctf_fs = bt_private_component_get_user_data(source); - if (!ctf_fs) { - ret = BT_NOTIFICATION_ITERATOR_STATUS_INVAL; - goto end; - } - - ctf_it = g_new0(struct ctf_fs_iterator, 1); - if (!ctf_it) { - ret = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM; - goto end; - } - ctf_it->stream_ht = g_hash_table_new_full(g_direct_hash, - g_direct_equal, bt_put, stream_destroy); - if (!ctf_it->stream_ht) { - goto error; - } - ctf_it->pending_streams = g_ptr_array_new_with_free_func( - stream_destroy); - if (!ctf_it->pending_streams) { - goto error; - } - ctf_it->pending_notifications = bt_notification_heap_create( - compare_notifications, NULL); - if (!ctf_it->pending_notifications) { - goto error; - } - - ret = open_trace_streams(ctf_fs, ctf_it); - if (ret) { - goto error; - } - - ret = bt_private_notification_iterator_set_user_data(it, ctf_it); - if (ret) { - goto error; - } - - goto end; - -error: - (void) bt_private_notification_iterator_set_user_data(it, NULL); - ctf_fs_iterator_destroy_data(ctf_it); - -end: - bt_put(source); + ctf_fs_file_destroy(file); return ret; } static -void ctf_fs_destroy_data(struct ctf_fs_component *ctf_fs) -{ - if (!ctf_fs) { - return; - } - if (ctf_fs->trace_path) { - g_string_free(ctf_fs->trace_path, TRUE); - } - if (ctf_fs->metadata) { - ctf_fs_metadata_fini(ctf_fs->metadata); - g_free(ctf_fs->metadata); - } - g_free(ctf_fs); -} - -void ctf_fs_finalize(struct bt_private_component *component) -{ - void *data = bt_private_component_get_user_data(component); - - ctf_fs_destroy_data(data); -} - -static -struct ctf_fs_component *ctf_fs_create(struct bt_value *params) +struct ctf_fs_component *ctf_fs_create(struct bt_private_component *priv_comp, + struct bt_value *params) { struct ctf_fs_component *ctf_fs; struct bt_value *value = NULL; const char *path; - enum bt_value_status ret; + int ret; ctf_fs = g_new0(struct ctf_fs_component, 1); if (!ctf_fs) { goto end; } + /* + * We don't need to get a new reference here because as long as + * our private ctf_fs_component object exists, the containing + * private component should also exist. + */ + ctf_fs->priv_comp = priv_comp; + /* FIXME: should probably look for a source URI */ value = bt_value_map_get(params, "path"); if (!value || bt_value_is_null(value) || !bt_value_is_string(value)) { @@ -680,7 +353,12 @@ struct ctf_fs_component *ctf_fs_create(struct bt_value *params) } ret = bt_value_string_get(value, &path); - if (ret != BT_VALUE_STATUS_OK) { + if (ret) { + goto error; + } + + ctf_fs->port_data = g_ptr_array_new_with_free_func(port_data_destroy); + if (!ctf_fs->port_data) { goto error; } @@ -696,7 +374,17 @@ struct ctf_fs_component *ctf_fs_create(struct bt_value *params) if (!ctf_fs->metadata) { goto error; } - ctf_fs_metadata_set_trace(ctf_fs); + + ret = ctf_fs_metadata_set_trace(ctf_fs); + if (ret) { + goto error; + } + + ret = create_ports(ctf_fs); + if (ret) { + goto error; + } + goto end; error: @@ -708,28 +396,28 @@ end: } BT_HIDDEN -enum bt_component_status ctf_fs_init(struct bt_private_component *source, +enum bt_component_status ctf_fs_init(struct bt_private_component *priv_comp, struct bt_value *params, UNUSED_VAR void *init_method_data) { struct ctf_fs_component *ctf_fs; enum bt_component_status ret = BT_COMPONENT_STATUS_OK; - assert(source); + assert(priv_comp); ctf_fs_debug = g_strcmp0(getenv("CTF_FS_DEBUG"), "1") == 0; - ctf_fs = ctf_fs_create(params); + ctf_fs = ctf_fs_create(priv_comp, params); if (!ctf_fs) { ret = BT_COMPONENT_STATUS_NOMEM; goto end; } - ret = bt_private_component_set_user_data(source, ctf_fs); + ret = bt_private_component_set_user_data(priv_comp, ctf_fs); if (ret != BT_COMPONENT_STATUS_OK) { goto error; } end: return ret; error: - (void) bt_private_component_set_user_data(source, NULL); + (void) bt_private_component_set_user_data(priv_comp, NULL); ctf_fs_destroy_data(ctf_fs); return ret; } diff --git a/plugins/ctf/fs/fs.h b/plugins/ctf/fs/fs.h index 43977d28..0a34856a 100644 --- a/plugins/ctf/fs/fs.h +++ b/plugins/ctf/fs/fs.h @@ -38,8 +38,6 @@ BT_HIDDEN extern bool ctf_fs_debug; -struct bt_notification_heap; - struct ctf_fs_file { struct ctf_fs_component *ctf_fs; GString *path; @@ -58,7 +56,6 @@ struct ctf_fs_metadata { struct ctf_fs_stream { struct ctf_fs_file *file; struct bt_ctf_stream *stream; - /* FIXME There should be many and ctf_fs_stream should not own them. */ struct bt_ctf_notif_iter *notif_iter; /* A stream is assumed to be indexed. */ struct index index; @@ -79,29 +76,23 @@ struct ctf_fs_stream { bool end_reached; }; -struct ctf_fs_iterator { - struct bt_notification_heap *pending_notifications; - /* - * struct ctf_fs_data_stream* which have not yet been associated to a - * bt_ctf_stream. The association is performed on the first packet - * read by the stream (since, at that point, we have read a packet - * header). - */ - GPtrArray *pending_streams; - /* bt_ctf_stream -> ctf_fs_stream */ - GHashTable *stream_ht; +struct ctf_fs_component_options { }; -struct ctf_fs_component_options { - bool opt_dummy : 1; +struct ctf_fs_port_data { + GString *path; }; struct ctf_fs_component { + struct bt_private_component *priv_comp; GString *trace_path; FILE *error_fp; size_t page_size; struct ctf_fs_component_options options; struct ctf_fs_metadata *metadata; + + /* Array of struct ctf_fs_port_data *, owned by this */ + GPtrArray *port_data; }; BT_HIDDEN diff --git a/plugins/ctf/fs/metadata.c b/plugins/ctf/fs/metadata.c index e531056c..063ebe56 100644 --- a/plugins/ctf/fs/metadata.c +++ b/plugins/ctf/fs/metadata.c @@ -313,7 +313,7 @@ end: return ret; } -void ctf_fs_metadata_set_trace(struct ctf_fs_component *ctf_fs) +int ctf_fs_metadata_set_trace(struct ctf_fs_component *ctf_fs) { int ret = 0; struct ctf_fs_file *file = get_file(ctf_fs, ctf_fs->trace_path->str); @@ -412,6 +412,8 @@ error: ctf_fs->metadata->text = NULL; } + ret = -1; + end: if (file) { ctf_fs_file_destroy(file); @@ -420,6 +422,8 @@ end: if (scanner) { ctf_scanner_free(scanner); } + + return ret; } int ctf_fs_metadata_init(struct ctf_fs_metadata *metadata) diff --git a/plugins/ctf/fs/metadata.h b/plugins/ctf/fs/metadata.h index a8ca942d..85a77268 100644 --- a/plugins/ctf/fs/metadata.h +++ b/plugins/ctf/fs/metadata.h @@ -38,7 +38,7 @@ BT_HIDDEN void ctf_fs_metadata_fini(struct ctf_fs_metadata *metadata); BT_HIDDEN -void ctf_fs_metadata_set_trace(struct ctf_fs_component *ctf_fs); +int ctf_fs_metadata_set_trace(struct ctf_fs_component *ctf_fs); BT_HIDDEN FILE *ctf_fs_metadata_open_file(const char *trace_path); -- 2.34.1