X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=plugins%2Fctf%2Ffs-src%2Ffs.c;h=965f5963e6f8f1ed15a0619be9bc8d508400a9dc;hb=94cf822ec0adbb6d6d33d6475c5b58e1dd7e2b79;hp=57468fbf1a7af44d319d6f8ae545ed022e52e449;hpb=87187cbfcf0ff2101a05e58d9043ffa86108f431;p=babeltrace.git diff --git a/plugins/ctf/fs-src/fs.c b/plugins/ctf/fs-src/fs.c index 57468fbf..965f5963 100644 --- a/plugins/ctf/fs-src/fs.c +++ b/plugins/ctf/fs-src/fs.c @@ -27,6 +27,8 @@ #include #include +#include +#include #include #include #include @@ -37,11 +39,12 @@ #include #include #include +#include #include #include #include "fs.h" #include "metadata.h" -#include "data-stream.h" +#include "data-stream-file.h" #include "file.h" #include "../common/metadata/decoder.h" @@ -54,31 +57,109 @@ BT_HIDDEN bool ctf_fs_debug; +static +int notif_iter_data_set_current_ds_file(struct ctf_fs_notif_iter_data *notif_iter_data) +{ + struct ctf_fs_ds_file_info *ds_file_info; + int ret = 0; + + assert(notif_iter_data->ds_file_info_index < + notif_iter_data->ds_file_group->ds_file_infos->len); + ds_file_info = g_ptr_array_index( + notif_iter_data->ds_file_group->ds_file_infos, + notif_iter_data->ds_file_info_index); + + ctf_fs_ds_file_destroy(notif_iter_data->ds_file); + notif_iter_data->ds_file = ctf_fs_ds_file_create( + notif_iter_data->ds_file_group->ctf_fs_trace, + notif_iter_data->ds_file_group->stream, + ds_file_info->path->str, true); + if (!notif_iter_data->ds_file) { + ret = -1; + } + + return ret; +} + +static +void ctf_fs_notif_iter_data_destroy( + struct ctf_fs_notif_iter_data *notif_iter_data) +{ + if (!notif_iter_data) { + return; + } + + ctf_fs_ds_file_destroy(notif_iter_data->ds_file); + g_free(notif_iter_data); +} + struct bt_notification_iterator_next_return ctf_fs_iterator_next( struct bt_private_notification_iterator *iterator) { - struct ctf_fs_stream *fs_stream = + struct bt_notification_iterator_next_return next_ret; + struct ctf_fs_notif_iter_data *notif_iter_data = bt_private_notification_iterator_get_user_data(iterator); + int ret; + + assert(notif_iter_data->ds_file); + next_ret = ctf_fs_ds_file_next(notif_iter_data->ds_file); + if (next_ret.status == BT_NOTIFICATION_ITERATOR_STATUS_END) { + assert(!next_ret.notification); + notif_iter_data->ds_file_info_index++; + + if (notif_iter_data->ds_file_info_index == + notif_iter_data->ds_file_group->ds_file_infos->len) { + /* + * No more stream files to read: we reached the + * real end. + */ + goto end; + } + + /* + * Open and start reading the next stream file within + * our stream file group. + */ + ret = notif_iter_data_set_current_ds_file(notif_iter_data); + if (ret) { + next_ret.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR; + goto end; + } + + next_ret = ctf_fs_ds_file_next(notif_iter_data->ds_file); + + /* + * We should not get BT_NOTIFICATION_ITERATOR_STATUS_END + * with a brand new stream file because empty stream + * files are not even part of stream file groups, which + * means we're sure to get at least one pair of "packet + * begin" and "packet end" notifications in the case of + * a single, empty packet. + */ + assert(next_ret.status != BT_NOTIFICATION_ITERATOR_STATUS_END); + } - return ctf_fs_stream_next(fs_stream); +end: + return next_ret; } void ctf_fs_iterator_finalize(struct bt_private_notification_iterator *it) { - void *ctf_fs_stream = + void *notif_iter_data = bt_private_notification_iterator_get_user_data(it); - ctf_fs_stream_destroy(ctf_fs_stream); + ctf_fs_notif_iter_data_destroy(notif_iter_data); } enum bt_notification_iterator_status ctf_fs_iterator_init( struct bt_private_notification_iterator *it, struct bt_private_port *port) { - struct ctf_fs_stream *ctf_fs_stream = NULL; struct ctf_fs_port_data *port_data; + struct ctf_fs_notif_iter_data *notif_iter_data = NULL; enum bt_notification_iterator_status ret = BT_NOTIFICATION_ITERATOR_STATUS_OK; + int iret; port_data = bt_private_port_get_user_data(port); if (!port_data) { @@ -86,18 +167,25 @@ enum bt_notification_iterator_status ctf_fs_iterator_init( goto error; } - ctf_fs_stream = ctf_fs_stream_create(port_data->ctf_fs_trace, - port_data->path->str); - if (!ctf_fs_stream) { + notif_iter_data = g_new0(struct ctf_fs_notif_iter_data, 1); + if (!notif_iter_data) { + ret = BT_NOTIFICATION_ITERATOR_STATUS_NOMEM; + goto error; + } + + notif_iter_data->ds_file_group = port_data->ds_file_group; + iret = notif_iter_data_set_current_ds_file(notif_iter_data); + if (iret) { + ret = BT_NOTIFICATION_ITERATOR_STATUS_ERROR; goto error; } - ret = bt_private_notification_iterator_set_user_data(it, ctf_fs_stream); + ret = bt_private_notification_iterator_set_user_data(it, notif_iter_data); if (ret) { goto error; } - ctf_fs_stream = NULL; + notif_iter_data = NULL; goto end; error: @@ -108,7 +196,7 @@ error: } end: - ctf_fs_stream_destroy(ctf_fs_stream); + ctf_fs_notif_iter_data_destroy(notif_iter_data); return ret; } @@ -139,6 +227,10 @@ void ctf_fs_trace_destroy(void *data) return; } + if (ctf_fs_trace->ds_file_groups) { + g_ptr_array_free(ctf_fs_trace->ds_file_groups, TRUE); + } + if (ctf_fs_trace->path) { g_string_free(ctf_fs_trace->path, TRUE); } @@ -171,30 +263,34 @@ void port_data_destroy(void *data) { return; } - if (port_data->path) { - g_string_free(port_data->path, TRUE); - } - g_free(port_data); } static int create_one_port_for_trace(struct ctf_fs_trace *ctf_fs_trace, - const char *stream_path) + struct ctf_fs_ds_file_group *ds_file_group) { int ret = 0; struct bt_private_port *port = NULL; struct ctf_fs_port_data *port_data = NULL; GString *port_name = NULL; struct ctf_fs_component *ctf_fs = ctf_fs_trace->ctf_fs; + struct ctf_fs_ds_file_info *ds_file_info = + g_ptr_array_index(ds_file_group->ds_file_infos, 0); port_name = g_string_new(NULL); if (!port_name) { goto error; } - /* Assign the name for the new output port */ - g_string_printf(port_name, "%s", stream_path); + /* + * Assign the name for the new output port. If there's more than + * one stream file in the stream file group, the first + * (earliest) stream file's path is used. + */ + assert(ds_file_group->ds_file_infos->len > 0); + ds_file_info = g_ptr_array_index(ds_file_group->ds_file_infos, 0); + g_string_assign(port_name, ds_file_info->path->str); PDBG("Creating one port named `%s`\n", port_name->str); /* Create output port for this file */ @@ -203,12 +299,7 @@ int create_one_port_for_trace(struct ctf_fs_trace *ctf_fs_trace, goto error; } - port_data->ctf_fs_trace = ctf_fs_trace; - port_data->path = g_string_new(stream_path); - if (!port_data->path) { - goto error; - } - + port_data->ds_file_group = ds_file_group; port = bt_private_component_source_add_output_private_port( ctf_fs->priv_comp, port_name->str, port_data); if (!port) { @@ -234,15 +325,434 @@ end: static int create_ports_for_trace(struct ctf_fs_trace *ctf_fs_trace) +{ + int ret = 0; + struct ctf_fs_component *ctf_fs = ctf_fs_trace->ctf_fs; + size_t i; + + /* Create one output port for each stream file group */ + for (i = 0; i < ctf_fs_trace->ds_file_groups->len; i++) { + struct ctf_fs_ds_file_group *ds_file_group = + g_ptr_array_index(ctf_fs_trace->ds_file_groups, i); + + ret = create_one_port_for_trace(ctf_fs_trace, ds_file_group); + if (ret) { + PERR("Cannot create output port.\n"); + goto end; + } + } + +end: + return ret; +} + +uint64_t get_packet_header_stream_instance_id(struct ctf_fs_trace *ctf_fs_trace, + struct bt_ctf_field *packet_header_field) +{ + struct bt_ctf_field *stream_instance_id_field = NULL; + uint64_t stream_instance_id = -1ULL; + int ret; + + if (!packet_header_field) { + goto end; + } + + stream_instance_id_field = bt_ctf_field_structure_get_field_by_name( + packet_header_field, "stream_instance_id"); + if (!stream_instance_id_field) { + goto end; + } + + ret = bt_ctf_field_unsigned_integer_get_value(stream_instance_id_field, + &stream_instance_id); + if (ret) { + stream_instance_id = -1ULL; + goto end; + } + +end: + bt_put(stream_instance_id_field); + return stream_instance_id; +} + +struct bt_ctf_stream_class *stream_class_from_packet_header( + struct ctf_fs_trace *ctf_fs_trace, + struct bt_ctf_field *packet_header_field) +{ + struct bt_ctf_field *stream_id_field = NULL; + struct bt_ctf_stream_class *stream_class = NULL; + uint64_t stream_id = -1ULL; + int ret; + + if (!packet_header_field) { + goto single_stream_class; + } + + stream_id_field = bt_ctf_field_structure_get_field_by_name( + packet_header_field, "stream_id"); + if (!stream_id_field) { + goto end; + } + + ret = bt_ctf_field_unsigned_integer_get_value(stream_id_field, + &stream_id); + if (ret) { + stream_id = -1ULL; + } + + if (stream_id == -1ULL) { +single_stream_class: + /* Single stream class */ + if (bt_ctf_trace_get_stream_class_count( + ctf_fs_trace->metadata->trace) == 0) { + goto end; + } + + stream_class = bt_ctf_trace_get_stream_class_by_index( + ctf_fs_trace->metadata->trace, 0); + } else { + stream_class = bt_ctf_trace_get_stream_class_by_id( + ctf_fs_trace->metadata->trace, stream_id); + } + +end: + bt_put(stream_id_field); + return stream_class; +} + +uint64_t get_packet_context_timestamp_begin_ns( + struct ctf_fs_trace *ctf_fs_trace, + struct bt_ctf_field *packet_context_field) +{ + int ret; + struct bt_ctf_field *timestamp_begin_field = NULL; + struct bt_ctf_field_type *timestamp_begin_ft = NULL; + uint64_t timestamp_begin_raw_value = -1ULL; + uint64_t timestamp_begin_ns = -1ULL; + int64_t timestamp_begin_ns_signed; + struct bt_ctf_clock_class *timestamp_begin_clock_class = NULL; + struct bt_ctf_clock_value *clock_value = NULL; + + if (!packet_context_field) { + goto end; + } + + timestamp_begin_field = bt_ctf_field_structure_get_field_by_name( + packet_context_field, "timestamp_begin"); + if (!timestamp_begin_field) { + goto end; + } + + timestamp_begin_ft = bt_ctf_field_get_type(timestamp_begin_field); + assert(timestamp_begin_ft); + timestamp_begin_clock_class = + bt_ctf_field_type_integer_get_mapped_clock_class(timestamp_begin_ft); + if (!timestamp_begin_clock_class) { + goto end; + } + + ret = bt_ctf_field_unsigned_integer_get_value(timestamp_begin_field, + ×tamp_begin_raw_value); + if (ret) { + goto end; + } + + clock_value = bt_ctf_clock_value_create(timestamp_begin_clock_class, + timestamp_begin_raw_value); + if (!clock_value) { + goto end; + } + + ret = bt_ctf_clock_value_get_value_ns_from_epoch(clock_value, + ×tamp_begin_ns_signed); + if (ret) { + goto end; + } + + timestamp_begin_ns = (uint64_t) timestamp_begin_ns_signed; + +end: + bt_put(timestamp_begin_field); + bt_put(timestamp_begin_ft); + bt_put(timestamp_begin_clock_class); + bt_put(clock_value); + return timestamp_begin_ns; +} + +static +void ctf_fs_ds_file_info_destroy(struct ctf_fs_ds_file_info *ds_file_info) +{ + if (!ds_file_info) { + return; + } + + if (ds_file_info->path) { + g_string_free(ds_file_info->path, TRUE); + } + + g_free(ds_file_info); +} + +static +struct ctf_fs_ds_file_info *ctf_fs_ds_file_info_create(const char *path, + uint64_t begin_ns) +{ + struct ctf_fs_ds_file_info *ds_file_info; + + ds_file_info = g_new0(struct ctf_fs_ds_file_info, 1); + if (!ds_file_info) { + goto end; + } + + ds_file_info->path = g_string_new(path); + if (!ds_file_info->path) { + ctf_fs_ds_file_info_destroy(ds_file_info); + ds_file_info = NULL; + goto end; + } + + ds_file_info->begin_ns = begin_ns; + +end: + return ds_file_info; +} + +static +void ctf_fs_ds_file_group_destroy(struct ctf_fs_ds_file_group *ds_file_group) +{ + if (!ds_file_group) { + return; + } + + if (ds_file_group->ds_file_infos) { + g_ptr_array_free(ds_file_group->ds_file_infos, TRUE); + } + + bt_put(ds_file_group->stream); + g_free(ds_file_group); +} + +static +struct ctf_fs_ds_file_group *ctf_fs_ds_file_group_create( + struct ctf_fs_trace *ctf_fs_trace, + struct bt_ctf_stream_class *stream_class, + uint64_t stream_instance_id) +{ + struct ctf_fs_ds_file_group *ds_file_group; + + assert(stream_class); + ds_file_group = g_new0(struct ctf_fs_ds_file_group, 1); + if (!ds_file_group) { + goto error; + } + + ds_file_group->ds_file_infos = g_ptr_array_new_with_free_func( + (GDestroyNotify) ctf_fs_ds_file_info_destroy); + if (!ds_file_group->ds_file_infos) { + goto error; + } + + if (stream_instance_id == -1ULL) { + ds_file_group->stream = bt_ctf_stream_create( + stream_class, NULL); + } else { + ds_file_group->stream = bt_ctf_stream_create_with_id( + stream_class, NULL, stream_instance_id); + } + + if (!ds_file_group->stream) { + goto error; + } + + ds_file_group->ctf_fs_trace = ctf_fs_trace; + + goto end; + +error: + ctf_fs_ds_file_group_destroy(ds_file_group); + ds_file_group = NULL; + +end: + return ds_file_group; +} + +static +int ctf_fs_ds_file_group_add_ds_file_info( + struct ctf_fs_ds_file_group *ds_file_group, + const char *path, uint64_t begin_ns) +{ + struct ctf_fs_ds_file_info *ds_file_info; + gint i = 0; + int ret = 0; + + ds_file_info = ctf_fs_ds_file_info_create(path, begin_ns); + if (!ds_file_info) { + goto error; + } + + /* Find a spot to insert this one */ + for (i = 0; i < ds_file_group->ds_file_infos->len; i++) { + struct ctf_fs_ds_file_info *other_ds_file_info = + g_ptr_array_index(ds_file_group->ds_file_infos, i); + + if (begin_ns < other_ds_file_info->begin_ns) { + break; + } + } + + if (i == ds_file_group->ds_file_infos->len) { + /* Append instead */ + i = -1; + } + + g_ptr_array_insert(ds_file_group->ds_file_infos, i, ds_file_info); + ds_file_info = NULL; + goto end; + +error: + ctf_fs_ds_file_info_destroy(ds_file_info); + ret = -1; + +end: + return ret; +} + +static +int add_ds_file_to_ds_file_group(struct ctf_fs_trace *ctf_fs_trace, + const char *path) +{ + struct bt_ctf_field *packet_header_field = NULL; + struct bt_ctf_field *packet_context_field = NULL; + struct bt_ctf_stream_class *stream_class = NULL; + struct ctf_fs_component *ctf_fs = ctf_fs_trace->ctf_fs; + uint64_t stream_instance_id = -1ULL; + uint64_t begin_ns = -1ULL; + struct ctf_fs_ds_file_group *ds_file_group = NULL; + bool add_group = false; + int ret; + size_t i; + + ret = ctf_fs_ds_file_get_packet_header_context_fields( + ctf_fs_trace, path, &packet_header_field, + &packet_context_field); + if (ret) { + PERR("Cannot get stream file's first packet's header and context fields (`%s`).\n", + path); + goto error; + } + + stream_instance_id = get_packet_header_stream_instance_id(ctf_fs_trace, + packet_header_field); + begin_ns = get_packet_context_timestamp_begin_ns(ctf_fs_trace, + packet_context_field); + stream_class = stream_class_from_packet_header(ctf_fs_trace, + packet_header_field); + if (!stream_class) { + goto error; + } + + if (begin_ns == -1ULL) { + /* + * No beggining timestamp to sort the stream files + * within a stream file group, so consider that this + * file must be the only one within its group. + */ + stream_instance_id = -1ULL; + } + + if (stream_instance_id == -1ULL) { + /* + * No stream instance ID or no beginning timestamp: + * create a unique stream file group for this stream + * file because, even if there's a stream instance ID, + * there's no timestamp to order the file within its + * group. + */ + + ds_file_group = ctf_fs_ds_file_group_create(ctf_fs_trace, + stream_class, stream_instance_id); + if (!ds_file_group) { + goto error; + } + + ret = ctf_fs_ds_file_group_add_ds_file_info(ds_file_group, + path, begin_ns); + if (ret) { + goto error; + } + + add_group = true; + goto end; + } + + assert(stream_instance_id != -1ULL); + assert(begin_ns != -1ULL); + + /* Find an existing stream file group with this ID */ + for (i = 0; i < ctf_fs_trace->ds_file_groups->len; i++) { + int64_t id; + struct bt_ctf_stream_class *cand_stream_class; + + ds_file_group = g_ptr_array_index( + ctf_fs_trace->ds_file_groups, i); + id = bt_ctf_stream_get_id(ds_file_group->stream); + cand_stream_class = bt_ctf_stream_get_class( + ds_file_group->stream); + + assert(cand_stream_class); + bt_put(cand_stream_class); + + if (cand_stream_class == stream_class && + (uint64_t) id == stream_instance_id) { + break; + } + + ds_file_group = NULL; + } + + if (!ds_file_group) { + ds_file_group = ctf_fs_ds_file_group_create(ctf_fs_trace, + stream_class, stream_instance_id); + if (!ds_file_group) { + goto error; + } + + add_group = true; + } + + ret = ctf_fs_ds_file_group_add_ds_file_info(ds_file_group, + path, begin_ns); + if (ret) { + goto error; + } + + goto end; + +error: + ctf_fs_ds_file_group_destroy(ds_file_group); + ret = -1; + +end: + if (add_group && ds_file_group) { + g_ptr_array_add(ctf_fs_trace->ds_file_groups, ds_file_group); + } + + bt_put(packet_header_field); + bt_put(packet_context_field); + bt_put(stream_class); + return ret; +} + +static +int create_ds_file_groups(struct ctf_fs_trace *ctf_fs_trace) { int ret = 0; const char *basename; GError *error = NULL; GDir *dir = NULL; - struct ctf_fs_file *file = NULL; struct ctf_fs_component *ctf_fs = ctf_fs_trace->ctf_fs; - /* Create one output port for each stream file */ + /* Check each file in the path directory, except specific ones */ dir = g_dir_open(ctf_fs_trace->path->str, 0, &error); if (!dir) { PERR("Cannot open directory `%s`: %s (code %d)\n", @@ -252,6 +762,8 @@ int create_ports_for_trace(struct ctf_fs_trace *ctf_fs_trace) } while ((basename = g_dir_read_name(dir))) { + struct ctf_fs_file *file; + if (!strcmp(basename, CTF_FS_METADATA_FILENAME)) { /* Ignore the metadata stream. */ PDBG("Ignoring metadata file `%s/%s`\n", @@ -294,19 +806,19 @@ int create_ports_for_trace(struct ctf_fs_trace *ctf_fs_trace) /* Skip empty stream. */ PDBG("Ignoring empty file `%s`\n", file->path->str); ctf_fs_file_destroy(file); - file = NULL; continue; } - ret = create_one_port_for_trace(ctf_fs_trace, file->path->str); + ret = add_ds_file_to_ds_file_group(ctf_fs_trace, + file->path->str); if (ret) { - PERR("Cannot create output port for file `%s`\n", + PDBG("Cannot add stream file `%s` to stream file group\n", file->path->str); + ctf_fs_file_destroy(file); goto error; } ctf_fs_file_destroy(file); - file = NULL; } goto end; @@ -324,7 +836,6 @@ end: g_error_free(error); } - ctf_fs_file_destroy(file); return ret; } @@ -393,11 +904,22 @@ struct ctf_fs_trace *ctf_fs_trace_create(struct ctf_fs_component *ctf_fs, goto error; } + ctf_fs_trace->ds_file_groups = g_ptr_array_new_with_free_func( + (GDestroyNotify) ctf_fs_ds_file_group_destroy); + if (!ctf_fs_trace->ds_file_groups) { + goto error; + } + ret = ctf_fs_metadata_set_trace(ctf_fs_trace); if (ret) { goto error; } + ret = create_ds_file_groups(ctf_fs_trace); + if (ret) { + goto error; + } + ret = create_cc_prio_map(ctf_fs_trace); if (ret) { goto error;