#include <babeltrace/common-internal.h>
#include <babeltrace/babeltrace.h>
+#include <babeltrace/compat/uuid-internal.h>
#include <plugins-common.h>
#include <glib.h>
#include <babeltrace/assert-internal.h>
g_free(msg_iter_data);
}
+static
+void set_msg_iter_emits_stream_beginning_end_messages(
+ struct ctf_fs_msg_iter_data *msg_iter_data)
+{
+ bt_msg_iter_set_emit_stream_beginning_message(
+ msg_iter_data->ds_file->msg_iter,
+ msg_iter_data->ds_file_info_index == 0);
+ bt_msg_iter_set_emit_stream_end_message(
+ msg_iter_data->ds_file->msg_iter,
+ msg_iter_data->ds_file_info_index ==
+ msg_iter_data->ds_file_group->ds_file_infos->len - 1);
+}
+
static
bt_self_message_iterator_status ctf_fs_iterator_next_one(
struct ctf_fs_msg_iter_data *msg_iter_data,
- const bt_message **msg)
+ const bt_message **out_msg)
{
bt_self_message_iterator_status status;
- bt_message *priv_msg;
- int ret;
BT_ASSERT(msg_iter_data->ds_file);
- status = ctf_fs_ds_file_next(msg_iter_data->ds_file, &priv_msg);
- *msg = priv_msg;
- if (status == BT_SELF_MESSAGE_ITERATOR_STATUS_OK &&
- bt_message_get_type(*msg) ==
- BT_MESSAGE_TYPE_STREAM_BEGINNING) {
- if (msg_iter_data->skip_stream_begin_msgs) {
- /*
- * We already emitted a
- * BT_MESSAGE_TYPE_STREAM_BEGINNING
- * message: skip this one, get a new one.
- */
- BT_MESSAGE_PUT_REF_AND_RESET(*msg);
- status = ctf_fs_ds_file_next(msg_iter_data->ds_file,
- &priv_msg);
- *msg = priv_msg;
- BT_ASSERT(status != BT_SELF_MESSAGE_ITERATOR_STATUS_END);
- goto end;
- } else {
- /*
- * First BT_MESSAGE_TYPE_STREAM_BEGINNING
- * message: skip all following.
- */
- msg_iter_data->skip_stream_begin_msgs = true;
- goto end;
- }
- }
+ while (true) {
+ bt_message *msg;
- if (status == BT_SELF_MESSAGE_ITERATOR_STATUS_OK &&
- bt_message_get_type(*msg) ==
- BT_MESSAGE_TYPE_STREAM_END) {
- msg_iter_data->ds_file_info_index++;
+ status = ctf_fs_ds_file_next(msg_iter_data->ds_file, &msg);
+ switch (status) {
+ case BT_SELF_MESSAGE_ITERATOR_STATUS_OK:
+ *out_msg = msg;
+ msg = NULL;
+ goto end;
+ case BT_SELF_MESSAGE_ITERATOR_STATUS_END:
+ {
+ int ret;
+
+ if (msg_iter_data->ds_file_info_index ==
+ msg_iter_data->ds_file_group->ds_file_infos->len - 1) {
+ /* End of all group's stream files */
+ goto end;
+ }
+
+ msg_iter_data->ds_file_info_index++;
+ bt_msg_iter_reset_for_next_stream_file(
+ msg_iter_data->msg_iter);
+ set_msg_iter_emits_stream_beginning_end_messages(
+ msg_iter_data);
- if (msg_iter_data->ds_file_info_index ==
- msg_iter_data->ds_file_group->ds_file_infos->len) {
/*
- * No more stream files to read: we reached the
- * real end. Emit this
- * BT_MESSAGE_TYPE_STREAM_END message.
- * The next time ctf_fs_iterator_next() is
- * called for this message iterator,
- * ctf_fs_ds_file_next() will return
- * BT_SELF_MESSAGE_ITERATOR_STATUS_END().
+ * Open and start reading the next stream file
+ * within our stream file group.
*/
- goto end;
- }
+ ret = msg_iter_data_set_current_ds_file(msg_iter_data);
+ if (ret) {
+ status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
+ goto end;
+ }
- BT_MESSAGE_PUT_REF_AND_RESET(*msg);
- bt_msg_iter_reset(msg_iter_data->msg_iter);
-
- /*
- * Open and start reading the next stream file within
- * our stream file group.
- */
- ret = msg_iter_data_set_current_ds_file(msg_iter_data);
- if (ret) {
- status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
- goto end;
+ /* Continue the loop to get the next message */
+ break;
}
-
- status = ctf_fs_ds_file_next(msg_iter_data->ds_file, &priv_msg);
- *msg = priv_msg;
-
- /*
- * If we get a message, we expect to get a
- * BT_MESSAGE_TYPE_STREAM_BEGINNING message
- * because the iterator's state machine emits one before
- * even requesting the first block of data from the
- * medium. Skip this message because we're not
- * really starting a new stream here, and try getting a
- * new message (which, if it works, is a
- * BT_MESSAGE_TYPE_PACKET_BEGINNING one). We're sure to
- * get at least one pair of
- * BT_MESSAGE_TYPE_PACKET_BEGINNING and
- * BT_MESSAGE_TYPE_PACKET_END messages in the
- * case of a single, empty packet. We know there's at
- * least one packet because the stream file group does
- * not contain empty stream files.
- */
- BT_ASSERT(msg_iter_data->skip_stream_begin_msgs);
-
- if (status == BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
- BT_ASSERT(bt_message_get_type(*msg) ==
- BT_MESSAGE_TYPE_STREAM_BEGINNING);
- BT_MESSAGE_PUT_REF_AND_RESET(*msg);
- status = ctf_fs_ds_file_next(msg_iter_data->ds_file,
- &priv_msg);
- *msg = priv_msg;
- BT_ASSERT(status != BT_SELF_MESSAGE_ITERATOR_STATUS_END);
+ default:
+ goto end;
}
}
return status;
}
+static
+int ctf_fs_iterator_reset(struct ctf_fs_msg_iter_data *msg_iter_data)
+{
+ int ret;
+
+ msg_iter_data->ds_file_info_index = 0;
+ ret = msg_iter_data_set_current_ds_file(msg_iter_data);
+ if (ret) {
+ goto end;
+ }
+
+ bt_msg_iter_reset(msg_iter_data->msg_iter);
+ set_msg_iter_emits_stream_beginning_end_messages(msg_iter_data);
+
+end:
+ return ret;
+}
+
+BT_HIDDEN
+bt_self_message_iterator_status ctf_fs_iterator_seek_beginning(
+ bt_self_message_iterator *it)
+{
+ struct ctf_fs_msg_iter_data *msg_iter_data =
+ bt_self_message_iterator_get_data(it);
+ bt_self_message_iterator_status status =
+ BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
+
+ BT_ASSERT(msg_iter_data);
+ if (ctf_fs_iterator_reset(msg_iter_data)) {
+ status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
+ }
+
+ return status;
+}
+
+BT_HIDDEN
void ctf_fs_iterator_finalize(bt_self_message_iterator *it)
{
ctf_fs_msg_iter_data_destroy(
bt_self_message_iterator_get_data(it));
}
+BT_HIDDEN
bt_self_message_iterator_status ctf_fs_iterator_init(
bt_self_message_iterator *self_msg_iter,
bt_self_component_source *self_comp,
struct ctf_fs_msg_iter_data *msg_iter_data = NULL;
bt_self_message_iterator_status ret =
BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
- int iret;
port_data = bt_self_component_port_get_data(
bt_self_component_port_output_as_self_component_port(
}
msg_iter_data->ds_file_group = port_data->ds_file_group;
- iret = msg_iter_data_set_current_ds_file(msg_iter_data);
- if (iret) {
+ if (ctf_fs_iterator_reset(msg_iter_data)) {
ret = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
goto error;
}
return ret;
}
-static
+BT_HIDDEN
void ctf_fs_destroy(struct ctf_fs_component *ctf_fs)
{
if (!ctf_fs) {
g_free(ctf_fs);
}
-BT_HIDDEN
+static
+void port_data_destroy(struct ctf_fs_port_data *port_data)
+{
+ if (!port_data) {
+ return;
+ }
+
+ g_free(port_data);
+}
+
+static
+void port_data_destroy_notifier(void *data) {
+ port_data_destroy(data);
+}
+
+static
void ctf_fs_trace_destroy(struct ctf_fs_trace *ctf_fs_trace)
{
if (!ctf_fs_trace) {
}
static
-void ctf_fs_trace_destroy_msgier(void *data)
+void ctf_fs_trace_destroy_notifier(void *data)
{
struct ctf_fs_trace *trace = data;
ctf_fs_trace_destroy(trace);
}
-void ctf_fs_finalize(bt_self_component_source *component)
+struct ctf_fs_component *ctf_fs_component_create(void)
{
- ctf_fs_destroy(bt_self_component_get_data(
- bt_self_component_source_as_self_component(component)));
-}
+ struct ctf_fs_component *ctf_fs;
-static
-void port_data_destroy(void *data) {
- struct ctf_fs_port_data *port_data = data;
+ ctf_fs = g_new0(struct ctf_fs_component, 1);
+ if (!ctf_fs) {
+ goto error;
+ }
- if (!port_data) {
- return;
+ ctf_fs->port_data =
+ g_ptr_array_new_with_free_func(port_data_destroy_notifier);
+ if (!ctf_fs->port_data) {
+ goto error;
}
- g_free(port_data);
+ ctf_fs->traces =
+ g_ptr_array_new_with_free_func(ctf_fs_trace_destroy_notifier);
+ if (!ctf_fs->traces) {
+ goto error;
+ }
+
+ goto end;
+
+error:
+ if (ctf_fs) {
+ ctf_fs_destroy(ctf_fs);
+ }
+
+end:
+ return ctf_fs;
+}
+
+void ctf_fs_finalize(bt_self_component_source *component)
+{
+ ctf_fs_destroy(bt_self_component_get_data(
+ bt_self_component_source_as_self_component(component)));
}
static
}
bt_stream_put_ref(ds_file_group->stream);
- bt_stream_class_put_ref(ds_file_group->stream_class);
g_free(ds_file_group);
}
static
struct ctf_fs_ds_file_group *ctf_fs_ds_file_group_create(
struct ctf_fs_trace *ctf_fs_trace,
- bt_stream_class *stream_class,
+ struct ctf_stream_class *sc,
uint64_t stream_instance_id)
{
struct ctf_fs_ds_file_group *ds_file_group;
}
ds_file_group->stream_id = stream_instance_id;
- BT_ASSERT(stream_class);
- ds_file_group->stream_class = stream_class;
- bt_stream_class_get_ref(ds_file_group->stream_class);
+ BT_ASSERT(sc);
+ ds_file_group->sc = sc;
ds_file_group->ctf_fs_trace = ctf_fs_trace;
goto end;
(original_array_len - pos) * sizeof(gpointer));
}
- /* Insert the value and bump the array len */
+ /* Insert the value. */
array->pdata[pos] = element;
}
+/*
+ * Insert ds_file_info in ds_file_group's list of ds_file_infos at the right
+ * place to keep it sorted.
+ */
+
+static
+void ds_file_group_insert_ds_file_info_sorted(
+ struct ctf_fs_ds_file_group *ds_file_group,
+ struct ctf_fs_ds_file_info *ds_file_info)
+{
+ guint i;
+
+ /* Find the spot where to insert this ds_file_info. */
+ for (i = 0; i < ds_file_group->ds_file_infos->len; i++) {
+ struct ctf_fs_ds_file_info *other_ds_file_info =
+ g_ptr_array_index(ds_file_group->ds_file_infos, i);
+
+ if (ds_file_info->begin_ns < other_ds_file_info->begin_ns) {
+ break;
+ }
+ }
+
+ array_insert(ds_file_group->ds_file_infos, ds_file_info, i);
+}
+
+/*
+ * Create a new ds_file_info using the provided path, begin_ns and index, then
+ * add it to ds_file_group's list of ds_file_infos.
+ */
+
static
int ctf_fs_ds_file_group_add_ds_file_info(
struct ctf_fs_ds_file_group *ds_file_group,
struct ctf_fs_ds_index *index)
{
struct ctf_fs_ds_file_info *ds_file_info;
- gint i = 0;
int ret = 0;
/* Onwership of index is transferred. */
goto error;
}
- /* Find a spot to insert this one */
- for (i = 0; i < ds_file_group->ds_file_infos->len; i++) {
- struct ctf_fs_ds_file_info *other_ds_file_info =
- g_ptr_array_index(ds_file_group->ds_file_infos, i);
+ ds_file_group_insert_ds_file_info_sorted(ds_file_group, ds_file_info);
- if (begin_ns < other_ds_file_info->begin_ns) {
- break;
- }
- }
-
- array_insert(ds_file_group->ds_file_infos, ds_file_info, i);
ds_file_info = NULL;
goto end;
int add_ds_file_to_ds_file_group(struct ctf_fs_trace *ctf_fs_trace,
const char *path)
{
- bt_stream_class *stream_class = NULL;
int64_t stream_instance_id = -1;
int64_t begin_ns = -1;
struct ctf_fs_ds_file_group *ds_file_group = NULL;
sc = ctf_trace_class_borrow_stream_class_by_id(ds_file->metadata->tc,
props.stream_class_id);
BT_ASSERT(sc);
- stream_class = sc->ir_sc;
- BT_ASSERT(stream_class);
stream_instance_id = props.data_stream_id;
if (props.snapshots.beginning_clock != UINT64_C(-1)) {
* group.
*/
ds_file_group = ctf_fs_ds_file_group_create(ctf_fs_trace,
- stream_class, stream_instance_id);
+ sc, UINT64_C(-1));
if (!ds_file_group) {
goto error;
}
ds_file_group = g_ptr_array_index(
ctf_fs_trace->ds_file_groups, i);
- if (ds_file_group->stream_class == stream_class &&
+ if (ds_file_group->sc == sc &&
ds_file_group->stream_id ==
stream_instance_id) {
break;
if (!ds_file_group) {
ds_file_group = ctf_fs_ds_file_group_create(ctf_fs_trace,
- stream_class, stream_instance_id);
+ sc, stream_instance_id);
if (!ds_file_group) {
goto error;
}
const char *basename;
GError *error = NULL;
GDir *dir = NULL;
- size_t i;
/* Check each file in the path directory, except specific ones */
dir = g_dir_open(ctf_fs_trace->path->str, 0, &error);
ctf_fs_file_destroy(file);
}
- if (!ctf_fs_trace->trace) {
- goto end;
- }
-
- /*
- * At this point, DS file groupes are created, but their
- * associated stream objects do not exist yet. This is because
- * we need to name the created stream object with the data
- * stream file's path. We have everything we need here to do
- * this.
- */
- for (i = 0; i < ctf_fs_trace->ds_file_groups->len; i++) {
- struct ctf_fs_ds_file_group *ds_file_group =
- g_ptr_array_index(ctf_fs_trace->ds_file_groups, i);
- GString *name = get_stream_instance_unique_name(ds_file_group);
-
- if (!name) {
- goto error;
- }
-
- if (ds_file_group->stream_id == UINT64_C(-1)) {
- /* No stream ID: use 0 */
- ds_file_group->stream = bt_stream_create_with_id(
- ds_file_group->stream_class,
- ctf_fs_trace->trace,
- ctf_fs_trace->next_stream_id);
- ctf_fs_trace->next_stream_id++;
- } else {
- /* Specific stream ID */
- ds_file_group->stream = bt_stream_create_with_id(
- ds_file_group->stream_class,
- ctf_fs_trace->trace,
- (uint64_t) ds_file_group->stream_id);
- }
-
- if (!ds_file_group->stream) {
- BT_LOGE("Cannot create stream for DS file group: "
- "addr=%p, stream-name=\"%s\"",
- ds_file_group, name->str);
- g_string_free(name, TRUE);
- goto error;
- }
-
- ret = bt_stream_set_name(ds_file_group->stream,
- name->str);
- if (ret) {
- BT_LOGE("Cannot set stream's name: "
- "addr=%p, stream-name=\"%s\"",
- ds_file_group->stream, name->str);
- g_string_free(name, TRUE);
- goto error;
- }
-
- g_string_free(name, TRUE);
- }
-
goto end;
error:
return ret;
}
-BT_HIDDEN
+static
struct ctf_fs_trace *ctf_fs_trace_create(bt_self_component_source *self_comp,
const char *path, const char *name,
struct ctf_fs_metadata_config *metadata_config)
goto error;
}
- /*
- * create_ds_file_groups() created all the streams that this
- * trace needs. There won't be any more. Therefore it is safe to
- * make this trace static.
- */
- if (ctf_fs_trace->trace) {
- (void) bt_trace_make_static(ctf_fs_trace->trace);
- }
-
goto end;
error:
return ret;
}
-BT_HIDDEN
+static
int ctf_fs_find_traces(GList **trace_paths, const char *start_path)
{
int ret;
return ret;
}
-BT_HIDDEN
+static
GList *ctf_fs_create_trace_names(GList *trace_paths, const char *base_path) {
GList *trace_names = NULL;
GList *node;
return trace_names;
}
+/* Helper for ctf_fs_component_create_ctf_fs_traces, to handle a single path/root. */
+
static
-int create_ctf_fs_traces(bt_self_component_source *self_comp,
+int ctf_fs_component_create_ctf_fs_traces_one_root(bt_self_component_source *self_comp,
struct ctf_fs_component *ctf_fs,
const char *path_param)
{
goto error;
}
- ret = create_ports_for_trace(ctf_fs, ctf_fs_trace);
- if (ret) {
- goto error;
- }
-
g_ptr_array_add(ctf_fs->traces, ctf_fs_trace);
ctf_fs_trace = NULL;
}
return ret;
}
+/* GCompareFunc to sort traces by UUID. */
+
static
-struct ctf_fs_component *ctf_fs_create(
- bt_self_component_source *self_comp,
- const bt_value *params)
+gint sort_traces_by_uuid(gconstpointer a, gconstpointer b)
{
- struct ctf_fs_component *ctf_fs;
- const bt_value *value = NULL;
- const char *path_param;
+ const struct ctf_fs_trace *trace_a = *((const struct ctf_fs_trace **) a);
+ const struct ctf_fs_trace *trace_b = *((const struct ctf_fs_trace **) b);
- ctf_fs = g_new0(struct ctf_fs_component, 1);
- if (!ctf_fs) {
- goto end;
+ bool trace_a_has_uuid = trace_a->metadata->tc->is_uuid_set;
+ bool trace_b_has_uuid = trace_b->metadata->tc->is_uuid_set;
+ gint ret;
+
+ /* Order traces without uuid first. */
+ if (!trace_a_has_uuid && trace_b_has_uuid) {
+ ret = -1;
+ } else if (trace_a_has_uuid && !trace_b_has_uuid) {
+ ret = 1;
+ } else if (!trace_a_has_uuid && !trace_b_has_uuid) {
+ ret = 0;
+ } else {
+ ret = bt_uuid_compare(trace_a->metadata->tc->uuid, trace_b->metadata->tc->uuid);
}
- bt_self_component_set_data(
- bt_self_component_source_as_self_component(self_comp),
- ctf_fs);
+ return ret;
+}
+
+/*
+ * Count the number of stream and event classes defined by this trace's metadata.
+ *
+ * This is used to determine which metadata is the "latest", out of multiple
+ * traces sharing the same UUID. It is assumed that amongst all these metadatas,
+ * a bigger metadata is a superset of a smaller metadata. Therefore, it is
+ * enough to just count the classes.
+ */
+
+static
+unsigned int metadata_count_stream_and_event_classes(struct ctf_fs_trace *trace)
+{
+ unsigned int num = trace->metadata->tc->stream_classes->len;
+ guint i;
+
+ for (i = 0; i < trace->metadata->tc->stream_classes->len; i++) {
+ struct ctf_stream_class *sc = trace->metadata->tc->stream_classes->pdata[i];
+ num += sc->event_classes->len;
+ }
+
+ return num;
+}
+
+/*
+ * Merge the src ds_file_group into dest. This consists of merging their
+ * ds_file_infos, making sure to keep the result sorted.
+ */
+
+static
+void merge_ctf_fs_ds_file_groups(struct ctf_fs_ds_file_group *dest, struct ctf_fs_ds_file_group *src)
+{
+ guint i;
+
+ for (i = 0; i < src->ds_file_infos->len; i++) {
+ struct ctf_fs_ds_file_info *ds_file_info =
+ g_ptr_array_index(src->ds_file_infos, i);
+
+ /* Ownership of the ds_file_info is transferred to dest. */
+ g_ptr_array_index(src->ds_file_infos, i) = NULL;
+
+ ds_file_group_insert_ds_file_info_sorted(dest, ds_file_info);
+ }
+}
+
+/* Merge src_trace's data stream file groups into dest_trace's. */
+
+static
+void merge_matching_ctf_fs_ds_file_groups(
+ struct ctf_fs_trace *dest_trace,
+ struct ctf_fs_trace *src_trace)
+{
+
+ GPtrArray *dest = dest_trace->ds_file_groups;
+ GPtrArray *src = src_trace->ds_file_groups;
+ guint s_i;
/*
- * We don't need to get a new reference here because as long as
- * our private ctf_fs_component object exists, the containing
- * private component should also exist.
+ * Save the initial length of dest: we only want to check against the
+ * original elements in the inner loop.
*/
- ctf_fs->self_comp = self_comp;
- value = bt_value_map_borrow_entry_value_const(params, "path");
- if (value && !bt_value_is_string(value)) {
+ const guint dest_len = dest->len;
+
+ for (s_i = 0; s_i < src->len; s_i++) {
+ struct ctf_fs_ds_file_group *src_group = g_ptr_array_index(src, s_i);
+ struct ctf_fs_ds_file_group *dest_group = NULL;
+
+ /* A stream instance without ID can't match a stream in the other trace. */
+ if (src_group->stream_id != -1) {
+ guint d_i;
+
+ /* Let's search for a matching ds_file_group in the destination. */
+ for (d_i = 0; d_i < dest_len; d_i++) {
+ struct ctf_fs_ds_file_group *candidate_dest = g_ptr_array_index(dest, d_i);
+
+ /* Can't match a stream instance without ID. */
+ if (candidate_dest->stream_id == -1) {
+ continue;
+ }
+
+ /*
+ * If the two groups have the same stream instance id
+ * and belong to the same stream class (stream instance
+ * ids are per-stream class), they represent the same
+ * stream instance.
+ */
+ if (candidate_dest->stream_id != src_group->stream_id ||
+ candidate_dest->sc->id != src_group->sc->id) {
+ continue;
+ }
+
+ dest_group = candidate_dest;
+ break;
+ }
+ }
+
+ /*
+ * Didn't find a friend in dest to merge our src_group into?
+ * Create a new empty one.
+ */
+ if (!dest_group) {
+ struct ctf_stream_class *sc;
+
+ sc = ctf_trace_class_borrow_stream_class_by_id(
+ dest_trace->metadata->tc, src_group->sc->id);
+ BT_ASSERT(sc);
+
+ dest_group = ctf_fs_ds_file_group_create(dest_trace, sc,
+ src_group->stream_id);
+
+ g_ptr_array_add(dest_trace->ds_file_groups, dest_group);
+ }
+
+ BT_ASSERT(dest_group);
+ merge_ctf_fs_ds_file_groups(dest_group, src_group);
+ }
+}
+
+/*
+ * Collapse the given traces, which must all share the same UUID, in a single
+ * one.
+ *
+ * The trace with the most expansive metadata is chosen and all other traces
+ * are merged into that one. The array slots of all the traces that get merged
+ * in the chosen one are set to NULL, so only the slot of the chosen trace
+ * remains non-NULL.
+ */
+
+static
+void merge_ctf_fs_traces(struct ctf_fs_trace **traces, unsigned int num_traces)
+{
+ unsigned int winner_count;
+ struct ctf_fs_trace *winner;
+ guint i;
+ char uuid_str[BABELTRACE_UUID_STR_LEN];
+
+ BT_ASSERT(num_traces >= 2);
+
+ winner_count = metadata_count_stream_and_event_classes(traces[0]);
+ winner = traces[0];
+
+ /* Find the trace with the largest metadata. */
+ for (i = 1; i < num_traces; i++) {
+ struct ctf_fs_trace *candidate;
+ unsigned int candidate_count;
+
+ candidate = traces[i];
+
+ /* A bit of sanity check. */
+ BT_ASSERT(bt_uuid_compare(winner->metadata->tc->uuid, candidate->metadata->tc->uuid) == 0);
+
+ candidate_count = metadata_count_stream_and_event_classes(candidate);
+
+ if (candidate_count > winner_count) {
+ winner_count = candidate_count;
+ winner = candidate;
+ }
+ }
+
+ /* Merge all the other traces in the winning trace. */
+ for (i = 0; i < num_traces; i++) {
+ struct ctf_fs_trace *trace = traces[i];
+
+ /* Don't merge the winner into itself. */
+ if (trace == winner) {
+ continue;
+ }
+
+ /* Merge trace's data stream file groups into winner's. */
+ merge_matching_ctf_fs_ds_file_groups(winner, trace);
+
+ /* Free the trace that got merged into winner, clear the slot in the array. */
+ ctf_fs_trace_destroy(trace);
+ traces[i] = NULL;
+ }
+
+ /* Use the string representation of the UUID as the trace name. */
+ bt_uuid_unparse(winner->metadata->tc->uuid, uuid_str);
+ g_string_printf(winner->name, "%s", uuid_str);
+}
+
+/*
+ * Merge all traces of `ctf_fs` that share the same UUID in a single trace.
+ * Traces with no UUID are not merged.
+ */
+
+static
+void merge_traces_with_same_uuid(struct ctf_fs_component *ctf_fs)
+{
+ GPtrArray *traces = ctf_fs->traces;
+ guint range_start_idx = 0;
+ unsigned int num_traces = 0;
+ guint i;
+
+ /* Sort the traces by uuid, then collapse traces with the same uuid in a single one. */
+ g_ptr_array_sort(traces, sort_traces_by_uuid);
+
+ /* Find ranges of consecutive traces that share the same UUID. */
+ while (range_start_idx < traces->len) {
+ guint range_len;
+ struct ctf_fs_trace *range_start_trace = g_ptr_array_index(traces, range_start_idx);
+
+ /* Exclusive end of range. */
+ guint range_end_exc_idx = range_start_idx + 1;
+
+ while (range_end_exc_idx < traces->len) {
+ struct ctf_fs_trace *this_trace = g_ptr_array_index(traces, range_end_exc_idx);
+
+ if (!range_start_trace->metadata->tc->is_uuid_set ||
+ (bt_uuid_compare(range_start_trace->metadata->tc->uuid, this_trace->metadata->tc->uuid) != 0)) {
+ break;
+ }
+
+ range_end_exc_idx++;
+ }
+
+ /* If we have two or more traces with matching UUIDs, merge them. */
+ range_len = range_end_exc_idx - range_start_idx;
+ if (range_len > 1) {
+ struct ctf_fs_trace **range_start = (struct ctf_fs_trace **) &traces->pdata[range_start_idx];
+ merge_ctf_fs_traces(range_start, range_len);
+ }
+
+ num_traces++;
+ range_start_idx = range_end_exc_idx;
+ }
+
+ /* Clear any NULL slot (traces that got merged in another one) in the array. */
+ for (i = 0; i < traces->len;) {
+ if (g_ptr_array_index(traces, i) == NULL) {
+ g_ptr_array_remove_index_fast(traces, i);
+ } else {
+ i++;
+ }
+ }
+
+ BT_ASSERT(num_traces == traces->len);
+}
+
+int ctf_fs_component_create_ctf_fs_traces(bt_self_component_source *self_comp,
+ struct ctf_fs_component *ctf_fs,
+ const bt_value *paths_value)
+{
+ int ret = 0;
+ uint64_t i;
+
+ for (i = 0; i < bt_value_array_get_size(paths_value); i++) {
+ const bt_value *path_value = bt_value_array_borrow_element_by_index_const(paths_value, i);
+ const char *path = bt_value_string_get(path_value);
+
+ ret = ctf_fs_component_create_ctf_fs_traces_one_root(self_comp, ctf_fs, path);
+ if (ret) {
+ goto end;
+ }
+ }
+
+ merge_traces_with_same_uuid(ctf_fs);
+
+end:
+ return ret;
+}
+
+/* Create the IR stream objects for ctf_fs_trace. */
+
+static
+int create_streams_for_trace(struct ctf_fs_trace *ctf_fs_trace)
+{
+ int ret;
+ GString *name = NULL;
+ guint i;
+
+ for (i = 0; i < ctf_fs_trace->ds_file_groups->len; i++) {
+ struct ctf_fs_ds_file_group *ds_file_group =
+ g_ptr_array_index(ctf_fs_trace->ds_file_groups, i);
+ name = get_stream_instance_unique_name(ds_file_group);
+
+ if (!name) {
+ goto error;
+ }
+
+ if (ds_file_group->sc->ir_sc) {
+ BT_ASSERT(ctf_fs_trace->trace);
+
+ if (ds_file_group->stream_id == UINT64_C(-1)) {
+ /* No stream ID: use 0 */
+ ds_file_group->stream = bt_stream_create_with_id(
+ ds_file_group->sc->ir_sc,
+ ctf_fs_trace->trace,
+ ctf_fs_trace->next_stream_id);
+ ctf_fs_trace->next_stream_id++;
+ } else {
+ /* Specific stream ID */
+ ds_file_group->stream = bt_stream_create_with_id(
+ ds_file_group->sc->ir_sc,
+ ctf_fs_trace->trace,
+ (uint64_t) ds_file_group->stream_id);
+ }
+ } else {
+ ds_file_group->stream = NULL;
+ }
+
+ if (!ds_file_group->stream) {
+ BT_LOGE("Cannot create stream for DS file group: "
+ "addr=%p, stream-name=\"%s\"",
+ ds_file_group, name->str);
+ goto error;
+ }
+
+ ret = bt_stream_set_name(ds_file_group->stream,
+ name->str);
+ if (ret) {
+ BT_LOGE("Cannot set stream's name: "
+ "addr=%p, stream-name=\"%s\"",
+ ds_file_group->stream, name->str);
+ goto error;
+ }
+
+ g_string_free(name, TRUE);
+ name = NULL;
+ }
+
+ ret = 0;
+ goto end;
+
+error:
+ ret = -1;
+
+end:
+
+ if (name) {
+ g_string_free(name, TRUE);
+ }
+ return ret;
+}
+
+/*
+ * Validate the "paths" parameter passed to this component. It must be
+ * present, and it must be an array of strings.
+ */
+
+static
+bool validate_paths_parameter(const bt_value *paths)
+{
+ bool ret;
+ bt_value_type type;
+ uint64_t i;
+
+ if (!paths) {
+ BT_LOGE("missing \"paths\" parameter");
+ goto error;
+ }
+
+ type = bt_value_get_type(paths);
+ if (type != BT_VALUE_TYPE_ARRAY) {
+ BT_LOGE("`paths` parameter: expecting array value: type=%s",
+ bt_common_value_type_string(type));
goto error;
}
- path_param = bt_value_string_get(value);
+ for (i = 0; i < bt_value_array_get_size(paths); i++) {
+ const bt_value *elem;
+
+ elem = bt_value_array_borrow_element_by_index_const(paths, i);
+ type = bt_value_get_type(elem);
+ if (type != BT_VALUE_TYPE_STRING) {
+ BT_LOGE("`paths` parameter: expecting string value: index=%" PRIu64 ", type=%s",
+ i, bt_common_value_type_string(type));
+ goto error;
+ }
+ }
+
+ ret = true;
+ goto end;
+
+error:
+ ret = false;
+
+end:
+ return ret;
+}
+
+bool read_src_fs_parameters(const bt_value *params,
+ const bt_value **paths, struct ctf_fs_component *ctf_fs) {
+ bool ret;
+ const bt_value *value;
+
+ /* paths parameter */
+ *paths = bt_value_map_borrow_entry_value_const(params, "paths");
+ if (!validate_paths_parameter(*paths)) {
+ goto error;
+ }
+
+ /* clock-class-offset-s parameter */
value = bt_value_map_borrow_entry_value_const(params,
"clock-class-offset-s");
if (value) {
- if (!bt_value_is_integer(value)) {
- BT_LOGE("clock-class-offset-s should be an integer");
+ if (!bt_value_is_signed_integer(value)) {
+ BT_LOGE("clock-class-offset-s must be an integer");
goto error;
}
- ctf_fs->metadata_config.clock_class_offset_s = bt_value_integer_get(value);
+ ctf_fs->metadata_config.clock_class_offset_s =
+ bt_value_signed_integer_get(value);
}
+ /* clock-class-offset-ns parameter */
value = bt_value_map_borrow_entry_value_const(params,
"clock-class-offset-ns");
if (value) {
- if (!bt_value_is_integer(value)) {
- BT_LOGE("clock-class-offset-ns should be an integer");
+ if (!bt_value_is_signed_integer(value)) {
+ BT_LOGE("clock-class-offset-ns must be an integer");
goto error;
}
- ctf_fs->metadata_config.clock_class_offset_ns = bt_value_integer_get(value);
+ ctf_fs->metadata_config.clock_class_offset_ns =
+ bt_value_signed_integer_get(value);
}
- ctf_fs->port_data = g_ptr_array_new_with_free_func(port_data_destroy);
- if (!ctf_fs->port_data) {
+
+ ret = true;
+ goto end;
+
+error:
+ ret = false;
+
+end:
+ return ret;
+}
+
+static
+struct ctf_fs_component *ctf_fs_create(
+ bt_self_component_source *self_comp,
+ const bt_value *params)
+{
+ struct ctf_fs_component *ctf_fs = NULL;
+ guint i;
+ const bt_value *paths_value;
+
+ ctf_fs = ctf_fs_component_create();
+ if (!ctf_fs) {
goto error;
}
- ctf_fs->traces = g_ptr_array_new_with_free_func(
- ctf_fs_trace_destroy_msgier);
- if (!ctf_fs->traces) {
+ if (!read_src_fs_parameters(params, &paths_value, ctf_fs)) {
goto error;
}
- if (create_ctf_fs_traces(self_comp, ctf_fs, path_param)) {
+ bt_self_component_set_data(
+ bt_self_component_source_as_self_component(self_comp),
+ ctf_fs);
+
+ /*
+ * We don't need to get a new reference here because as long as
+ * our private ctf_fs_component object exists, the containing
+ * private component should also exist.
+ */
+ ctf_fs->self_comp = self_comp;
+
+ if (ctf_fs_component_create_ctf_fs_traces(self_comp, ctf_fs, paths_value)) {
goto error;
}
+ for (i = 0; i < ctf_fs->traces->len; i++) {
+ struct ctf_fs_trace *trace = g_ptr_array_index(ctf_fs->traces, i);
+
+ if (create_streams_for_trace(trace)) {
+ goto error;
+ }
+
+ if (create_ports_for_trace(ctf_fs, trace)) {
+ goto error;
+ }
+ }
+
goto end;
error: