From: Jérémie Galarneau Date: Thu, 29 Jun 2017 21:47:55 +0000 (-0400) Subject: stream-intersection: use the trace-info query results to insert trimmers X-Git-Tag: v2.0.0-pre2~64 X-Git-Url: http://git.efficios.com/?p=babeltrace.git;a=commitdiff_plain;h=b5fe3e001f0785d685bb946497105f360a8f34d4 stream-intersection: use the trace-info query results to insert trimmers Signed-off-by: Jérémie Galarneau --- diff --git a/cli/babeltrace-cfg-cli-args.c b/cli/babeltrace-cfg-cli-args.c index e4d66b7f..9c6db059 100644 --- a/cli/babeltrace-cfg-cli-args.c +++ b/cli/babeltrace-cfg-cli-args.c @@ -3596,6 +3596,7 @@ struct bt_config *bt_config_convert_from_args(int argc, const char *argv[], bool got_output_format_opt = false; bool trimmer_has_begin = false; bool trimmer_has_end = false; + bool stream_intersection_mode = false; GString *cur_name = NULL; GString *cur_name_prefix = NULL; const char *leftover = NULL; @@ -4269,10 +4270,11 @@ struct bt_config *bt_config_convert_from_args(int argc, const char *argv[], print_run_args_0 = true; break; case OPT_STREAM_INTERSECTION: - append_implicit_component_param( - &base_implicit_ctf_input_args, - "stream-intersection", "yes"); - base_implicit_ctf_input_args.exists = true; + /* + * Applies to all traces implementing the trace-info + * query. + */ + stream_intersection_mode = true; break; case OPT_VERBOSE: if (*log_level != 'V' && *log_level != 'D') { @@ -4708,6 +4710,7 @@ struct bt_config *bt_config_convert_from_args(int argc, const char *argv[], goto error; } + cfg->cmd_data.run.stream_intersection_mode = stream_intersection_mode; goto end; error: diff --git a/cli/babeltrace-cfg.h b/cli/babeltrace-cfg.h index b9a4b5ed..89065244 100644 --- a/cli/babeltrace-cfg.h +++ b/cli/babeltrace-cfg.h @@ -92,6 +92,12 @@ struct bt_config { * to retry to run the graph. */ uint64_t retry_duration_us; + + /* + * Whether or not to trim the source trace to the + * intersection of its streams. + */ + bool stream_intersection_mode; } run; /* BT_CONFIG_COMMAND_HELP */ diff --git a/cli/babeltrace.c b/cli/babeltrace.c index 3c44db35..8b3d0c0f 100644 --- a/cli/babeltrace.c +++ b/cli/babeltrace.c @@ -59,6 +59,7 @@ #define ENV_BABELTRACE_WARN_COMMAND_NAME_DIRECTORY_CLASH "BABELTRACE_CLI_WARN_COMMAND_NAME_DIRECTORY_CLASH" #define ENV_BABELTRACE_CLI_LOG_LEVEL "BABELTRACE_CLI_LOG_LEVEL" +#define NSEC_PER_SEC 1000000000LL /* * Known environment variable names for the log levels of the project's @@ -1168,6 +1169,53 @@ end: return 0; } +struct port_id { + char *instance_name; + char *port_name; +}; + +struct trace_range { + uint64_t intersection_range_begin_ns; + uint64_t intersection_range_end_ns; +}; + +static +guint port_id_hash(gconstpointer v) +{ + const struct port_id *id = v; + + assert(id->instance_name); + assert(id->port_name); + + return g_str_hash(id->instance_name) ^ g_str_hash(id->port_name); +} + +static +gboolean port_id_equal(gconstpointer v1, gconstpointer v2) +{ + const struct port_id *id1 = v1; + const struct port_id *id2 = v2; + + return !strcmp(id1->instance_name, id2->instance_name) && + !strcmp(id1->port_name, id2->port_name); +} + +static +void port_id_destroy(gpointer data) +{ + struct port_id *id = data; + + free(id->instance_name); + free(id->port_name); + free(id); +} + +static +void trace_range_destroy(gpointer data) +{ + free(data); +} + struct cmd_run_ctx { /* Owned by this */ GHashTable *components; @@ -1179,8 +1227,60 @@ struct cmd_run_ctx { struct bt_config *cfg; bool connect_ports; + + bool stream_intersection_mode; + + /* + * Association of struct port_id -> struct trace_range. + */ + GHashTable *intersections; }; +/* Returns a timestamp of the form "(-)s.ns" */ +static +char *s_from_ns(int64_t ns) +{ + int ret; + char *s_ret = NULL; + bool is_negative; + int64_t ts_sec_abs, ts_nsec_abs; + int64_t ts_sec = ns / NSEC_PER_SEC; + int64_t ts_nsec = ns % NSEC_PER_SEC; + + if (ts_sec >= 0 && ts_nsec >= 0) { + is_negative = false; + ts_sec_abs = ts_sec; + ts_nsec_abs = ts_nsec; + } else if (ts_sec > 0 && ts_nsec < 0) { + is_negative = false; + ts_sec_abs = ts_sec - 1; + ts_nsec_abs = NSEC_PER_SEC + ts_nsec; + } else if (ts_sec == 0 && ts_nsec < 0) { + is_negative = true; + ts_sec_abs = ts_sec; + ts_nsec_abs = -ts_nsec; + } else if (ts_sec < 0 && ts_nsec > 0) { + is_negative = true; + ts_sec_abs = -(ts_sec + 1); + ts_nsec_abs = NSEC_PER_SEC - ts_nsec; + } else if (ts_sec < 0 && ts_nsec == 0) { + is_negative = true; + ts_sec_abs = -ts_sec; + ts_nsec_abs = ts_nsec; + } else { /* (ts_sec < 0 && ts_nsec < 0) */ + is_negative = true; + ts_sec_abs = -ts_sec; + ts_nsec_abs = -ts_nsec; + } + + ret = asprintf(&s_ret, "%s%" PRId64 ".%09" PRId64, + is_negative ? "-" : "", ts_sec_abs, ts_nsec_abs); + if (ret < 0) { + s_ret = NULL; + } + return s_ret; +} + static int cmd_run_ctx_connect_upstream_port_to_downstream_component( struct cmd_run_ctx *ctx, struct bt_component *upstream_comp, @@ -1195,6 +1295,66 @@ int cmd_run_ctx_connect_upstream_port_to_downstream_component( int64_t (*port_count_fn)(struct bt_component *); struct bt_port *(*port_by_index_fn)(struct bt_component *, uint64_t); enum bt_graph_status status = BT_GRAPH_STATUS_ERROR; + bool insert_trimmer = false; + struct bt_value *trimmer_params = NULL; + char *intersection_begin = NULL; + char *intersection_end = NULL; + struct bt_component *trimmer = NULL; + struct bt_component_class *trimmer_class = NULL; + struct bt_port *trimmer_input = NULL; + struct bt_port *trimmer_output = NULL; + + if (ctx->intersections && + bt_component_get_class_type(upstream_comp) == + BT_COMPONENT_CLASS_TYPE_SOURCE) { + struct trace_range *range; + struct port_id port_id = { + .instance_name = (char *) bt_component_get_name(upstream_comp), + .port_name = (char *) bt_port_get_name(upstream_port) + }; + + if (!port_id.instance_name || !port_id.port_name) { + goto error; + } + + range = (struct trace_range *) g_hash_table_lookup( + ctx->intersections, &port_id); + if (range) { + enum bt_value_status status; + + intersection_begin = s_from_ns( + range->intersection_range_begin_ns); + intersection_end = s_from_ns( + range->intersection_range_end_ns); + if (!intersection_begin || !intersection_end) { + BT_LOGE_STR("Cannot create trimmer argument timestamp string."); + goto error; + } + + insert_trimmer = true; + trimmer_params = bt_value_map_create(); + if (!trimmer_params) { + goto error; + } + + status = bt_value_map_insert_string(trimmer_params, + "begin", intersection_begin); + if (status != BT_VALUE_STATUS_OK) { + goto error; + } + status = bt_value_map_insert_string(trimmer_params, + "end", intersection_end); + if (status != BT_VALUE_STATUS_OK) { + goto error; + } + } + + trimmer_class = find_component_class("utils", "trimmer", + BT_COMPONENT_CLASS_TYPE_FILTER); + if (!trimmer_class) { + goto error; + } + } BT_LOGI("Connecting upstream port to the next available downstream port: " "upstream-port-addr=%p, upstream-port-name=\"%s\", " @@ -1238,11 +1398,12 @@ int cmd_run_ctx_connect_upstream_port_to_downstream_component( for (i = 0; i < downstream_port_count; i++) { struct bt_port *downstream_port = port_by_index_fn(downstream_comp, i); + const char *upstream_port_name; const char *downstream_port_name; assert(downstream_port); - /* Skip port if it's already connected */ + /* Skip port if it's already connected. */ if (bt_port_is_connected(downstream_port)) { bt_put(downstream_port); BT_LOGD("Skipping downstream port: already connected: " @@ -1254,60 +1415,89 @@ int cmd_run_ctx_connect_upstream_port_to_downstream_component( downstream_port_name = bt_port_get_name(downstream_port); assert(downstream_port_name); + upstream_port_name = bt_port_get_name(upstream_port); + assert(upstream_port_name); - if (bt_common_star_glob_match( + if (!bt_common_star_glob_match( cfg_conn->downstream_port_glob->str, -1ULL, downstream_port_name, -1ULL)) { - /* We have a winner! */ - status = bt_graph_connect_ports(ctx->graph, - upstream_port, downstream_port, NULL); bt_put(downstream_port); - switch (status) { - case BT_GRAPH_STATUS_OK: - break; - case BT_GRAPH_STATUS_CANCELED: - BT_LOGI_STR("Graph was canceled by user."); - status = BT_GRAPH_STATUS_OK; - break; - case BT_GRAPH_STATUS_COMPONENT_REFUSES_PORT_CONNECTION: - BT_LOGE("A component refused a connection to one of its ports: " - "upstream-comp-addr=%p, upstream-comp-name=\"%s\", " - "upstream-port-addr=%p, upstream-port-name=\"%s\", " - "downstream-comp-addr=%p, downstream-comp-name=\"%s\", " - "downstream-port-addr=%p, downstream-port-name=\"%s\", " - "conn-arg=\"%s\"", - upstream_comp, bt_component_get_name(upstream_comp), - upstream_port, bt_port_get_name(upstream_port), - downstream_comp, cfg_conn->downstream_comp_name->str, - downstream_port, downstream_port_name, - cfg_conn->arg->str); - fprintf(stderr, - "A component refused a connection to one of its ports (`%s` to `%s`): %s\n", - bt_port_get_name(upstream_port), - downstream_port_name, - cfg_conn->arg->str); - break; - default: - BT_LOGE("Cannot create connection: graph refuses to connect ports: " - "upstream-comp-addr=%p, upstream-comp-name=\"%s\", " - "upstream-port-addr=%p, upstream-port-name=\"%s\", " - "downstream-comp-addr=%p, downstream-comp-name=\"%s\", " - "downstream-port-addr=%p, downstream-port-name=\"%s\", " - "conn-arg=\"%s\"", - upstream_comp, bt_component_get_name(upstream_comp), - upstream_port, bt_port_get_name(upstream_port), - downstream_comp, cfg_conn->downstream_comp_name->str, - downstream_port, downstream_port_name, - cfg_conn->arg->str); - fprintf(stderr, - "Cannot create connection: graph refuses to connect ports (`%s` to `%s`): %s\n", - bt_port_get_name(upstream_port), - downstream_port_name, - cfg_conn->arg->str); + continue; + } + + if (insert_trimmer) { + /* + * In order to insert the trimmer between the two + * components that were being connected, we create + * a connection configuration entry which describes + * a connection from the trimmer's output to the + * original input that was being connected. + * + * Hence, the creation of the trimmer will cause the + * graph "new port" listener to establish all downstream + * connections as its output port is connected. We will + * then establish the connection between the original + * upstream source and the trimmer. + */ + char *trimmer_name = NULL; + enum bt_graph_status graph_status; + + ret = asprintf(&trimmer_name, "%s-%s", + "stream-intersection-trimmer", + upstream_port_name); + if (ret < 0) { + goto error; + } + ret = 0; + + ctx->connect_ports = false; + graph_status = bt_graph_add_component(ctx->graph, + trimmer_class, trimmer_name, trimmer_params, + &trimmer); + free(trimmer_name); + if (graph_status != BT_GRAPH_STATUS_OK) { + goto error; + } + assert(trimmer); + + trimmer_input = + bt_component_filter_get_input_port_by_index( + trimmer, 0); + if (!trimmer_input) { + goto error; + } + trimmer_output = + bt_component_filter_get_output_port_by_index( + trimmer, 0); + if (!trimmer_output) { goto error; } - BT_LOGI("Connected component ports: " + /* + * Replace the current downstream port by the trimmer's + * upstream port. + */ + BT_MOVE(downstream_port, trimmer_input); + downstream_port_name = bt_port_get_name( + downstream_port); + if (!downstream_port_name) { + goto error; + } + } + + /* We have a winner! */ + status = bt_graph_connect_ports(ctx->graph, + upstream_port, downstream_port, NULL); + BT_PUT(downstream_port); + switch (status) { + case BT_GRAPH_STATUS_OK: + break; + case BT_GRAPH_STATUS_CANCELED: + BT_LOGI_STR("Graph was canceled by user."); + status = BT_GRAPH_STATUS_OK; + break; + case BT_GRAPH_STATUS_COMPONENT_REFUSES_PORT_CONNECTION: + BT_LOGE("A component refused a connection to one of its ports: " "upstream-comp-addr=%p, upstream-comp-name=\"%s\", " "upstream-port-addr=%p, upstream-port-name=\"%s\", " "downstream-comp-addr=%p, downstream-comp-name=\"%s\", " @@ -1318,11 +1508,58 @@ int cmd_run_ctx_connect_upstream_port_to_downstream_component( downstream_comp, cfg_conn->downstream_comp_name->str, downstream_port, downstream_port_name, cfg_conn->arg->str); - - goto end; + fprintf(stderr, + "A component refused a connection to one of its ports (`%s` to `%s`): %s\n", + bt_port_get_name(upstream_port), + downstream_port_name, + cfg_conn->arg->str); + break; + default: + BT_LOGE("Cannot create connection: graph refuses to connect ports: " + "upstream-comp-addr=%p, upstream-comp-name=\"%s\", " + "upstream-port-addr=%p, upstream-port-name=\"%s\", " + "downstream-comp-addr=%p, downstream-comp-name=\"%s\", " + "downstream-port-addr=%p, downstream-port-name=\"%s\", " + "conn-arg=\"%s\"", + upstream_comp, bt_component_get_name(upstream_comp), + upstream_port, bt_port_get_name(upstream_port), + downstream_comp, cfg_conn->downstream_comp_name->str, + downstream_port, downstream_port_name, + cfg_conn->arg->str); + fprintf(stderr, + "Cannot create connection: graph refuses to connect ports (`%s` to `%s`): %s\n", + bt_port_get_name(upstream_port), + downstream_port_name, + cfg_conn->arg->str); + goto error; } - bt_put(downstream_port); + BT_LOGI("Connected component ports: " + "upstream-comp-addr=%p, upstream-comp-name=\"%s\", " + "upstream-port-addr=%p, upstream-port-name=\"%s\", " + "downstream-comp-addr=%p, downstream-comp-name=\"%s\", " + "downstream-port-addr=%p, downstream-port-name=\"%s\", " + "conn-arg=\"%s\"", + upstream_comp, bt_component_get_name(upstream_comp), + upstream_port, bt_port_get_name(upstream_port), + downstream_comp, cfg_conn->downstream_comp_name->str, + downstream_port, downstream_port_name, + cfg_conn->arg->str); + + if (insert_trimmer) { + /* + * The first connection, from the source to the trimmer, + * has been done. We now connect the trimmer to the + * original downstream port. + */ + ret = cmd_run_ctx_connect_upstream_port_to_downstream_component( + ctx, trimmer, trimmer_output, cfg_conn); + if (ret) { + goto error; + } + ctx->connect_ports = true; + } + goto end; } if (status != BT_GRAPH_STATUS_OK) { @@ -1344,6 +1581,13 @@ error: ret = -1; end: + free(intersection_begin); + free(intersection_end); + BT_PUT(trimmer_params); + BT_PUT(trimmer_class); + BT_PUT(trimmer); + BT_PUT(trimmer_input); + BT_PUT(trimmer_output); return ret; } @@ -1383,29 +1627,31 @@ int cmd_run_ctx_connect_upstream_port(struct cmd_run_ctx *ctx, ctx->cfg->cmd_data.run.connections, i); if (strcmp(cfg_conn->upstream_comp_name->str, - upstream_comp_name) == 0) { - if (bt_common_star_glob_match( - cfg_conn->upstream_port_glob->str, - -1ULL, upstream_port_name, -1ULL)) { - ret = cmd_run_ctx_connect_upstream_port_to_downstream_component( - ctx, upstream_comp, upstream_port, - cfg_conn); - if (ret) { - BT_LOGE("Cannot connect upstream port: " - "port-addr=%p, port-name=\"%s\"", - upstream_port, - upstream_port_name); - fprintf(stderr, - "Cannot connect port `%s` of component `%s` to a downstream port: %s\n", - upstream_port_name, - upstream_comp_name, - cfg_conn->arg->str); - goto error; - } + upstream_comp_name)) { + continue; + } - goto end; - } + if (!bt_common_star_glob_match( + cfg_conn->upstream_port_glob->str, + -1ULL, upstream_port_name, -1ULL)) { + continue; } + + ret = cmd_run_ctx_connect_upstream_port_to_downstream_component( + ctx, upstream_comp, upstream_port, cfg_conn); + if (ret) { + BT_LOGE("Cannot connect upstream port: " + "port-addr=%p, port-name=\"%s\"", + upstream_port, + upstream_port_name); + fprintf(stderr, + "Cannot connect port `%s` of component `%s` to a downstream port: %s\n", + upstream_port_name, + upstream_comp_name, + cfg_conn->arg->str); + goto error; + } + goto end; } BT_LOGE("Cannot connect upstream port: port does not match any connection argument: " @@ -1523,6 +1769,11 @@ void cmd_run_ctx_destroy(struct cmd_run_ctx *ctx) ctx->components = NULL; } + if (ctx->intersections) { + g_hash_table_destroy(ctx->intersections); + ctx->components = NULL; + } + BT_PUT(ctx->graph); the_graph = NULL; ctx->cfg = NULL; @@ -1541,6 +1792,15 @@ int cmd_run_ctx_init(struct cmd_run_ctx *ctx, struct bt_config *cfg) goto error; } + if (cfg->cmd_data.run.stream_intersection_mode) { + ctx->stream_intersection_mode = true; + ctx->intersections = g_hash_table_new_full(port_id_hash, + port_id_equal, port_id_destroy, trace_range_destroy); + if (!ctx->intersections) { + goto error; + } + } + ctx->graph = bt_graph_create(); if (!ctx->graph) { goto error; @@ -1585,6 +1845,278 @@ end: return ret; } +static +int set_stream_intersections(struct cmd_run_ctx *ctx, + struct bt_config_component *cfg_comp, + struct bt_component_class *comp_cls) +{ + int ret = 0; + uint64_t trace_idx; + int64_t trace_count; + enum bt_value_status value_status; + const char *path = NULL; + struct bt_value *component_path_value = NULL; + struct bt_value *query_params = NULL; + struct bt_value *query_result = NULL; + struct bt_value *trace_info = NULL; + struct bt_value *intersection_range = NULL; + struct bt_value *intersection_begin = NULL; + struct bt_value *intersection_end = NULL; + struct bt_value *stream_path_value = NULL; + struct bt_value *stream_paths = NULL; + struct bt_value *stream_infos = NULL; + struct bt_value *stream_info = NULL; + struct port_id *port_id = NULL; + struct trace_range *trace_range = NULL; + + component_path_value = bt_value_map_get(cfg_comp->params, "path"); + if (!bt_value_is_string(component_path_value)) { + BT_LOGD("Cannot get path parameter: component-name=%s", + cfg_comp->instance_name->str); + ret = -1; + goto error; + } + + value_status = bt_value_string_get(component_path_value, &path); + if (value_status != BT_VALUE_STATUS_OK) { + BT_LOGD("Cannot get path string value: component-name=%s", + cfg_comp->instance_name->str); + ret = -1; + goto error; + } + + query_params = bt_value_map_create(); + if (!query_params) { + BT_LOGE_STR("Cannot create query parameters."); + ret = -1; + goto error; + } + + value_status = bt_value_map_insert(query_params, "path", component_path_value); + if (value_status != BT_VALUE_STATUS_OK) { + BT_LOGE_STR("Cannot insert path parameter in query paramater map."); + ret = -1; + goto error; + } + + query_result = bt_component_class_query(comp_cls, "trace-info", + query_params); + if (!query_result) { + BT_LOGD("Component class \'%s\' does not support the \'trace-info\' query.", + bt_component_class_get_name(comp_cls)); + ret = -1; + goto error; + } + + if (!bt_value_is_array(query_result)) { + BT_LOGD("Unexpected format of \'trace-info\' query result: " + "component-class-name=%s", + bt_component_class_get_name(comp_cls)); + ret = -1; + goto error; + } + + trace_count = bt_value_array_size(query_result); + if (trace_count < 0) { + ret = -1; + goto error; + } + + for (trace_idx = 0; trace_idx < trace_count; trace_idx++) { + int64_t begin, end; + uint64_t stream_idx; + int64_t stream_count; + + trace_info = bt_value_array_get(query_result, trace_idx); + if (!trace_info || !bt_value_is_map(trace_info)) { + ret = -1; + BT_LOGD_STR("Cannot retrieve trace from query result."); + goto error; + } + + intersection_range = bt_value_map_get(trace_info, + "intersection-range-ns"); + if (!intersection_range) { + ret = -1; + BT_LOGD_STR("Cannot retrieve \'intersetion-range-ns\' field from query result."); + goto error; + } + + intersection_begin = bt_value_map_get(intersection_range, + "begin"); + if (!intersection_begin) { + ret = -1; + BT_LOGD_STR("Cannot retrieve intersection-range-ns \'begin\' field from query result."); + goto error; + } + + intersection_end = bt_value_map_get(intersection_range, + "end"); + if (!intersection_end) { + ret = -1; + BT_LOGD_STR("Cannot retrieve intersection-range-ns \'end\' field from query result."); + goto error; + } + + value_status = bt_value_integer_get(intersection_begin, &begin); + if (value_status != BT_VALUE_STATUS_OK) { + ret = -1; + BT_LOGD_STR("Cannot retrieve value of intersection-range-ns \'begin\' field from query result."); + goto error; + } + + value_status = bt_value_integer_get(intersection_end, &end); + if (value_status != BT_VALUE_STATUS_OK) { + ret = -1; + BT_LOGD_STR("Cannot retrieve value of intersection-range-ns \'end\' field from query result."); + goto error; + } + + if (begin < 0 || end < 0 || end < begin) { + BT_LOGW("Invalid trace stream intersection values: " + "intersection-range-ns:begin=%" PRId64 + ", intersection-range-ns:end=%" PRId64, + begin, end); + ret = -1; + goto error; + } + + stream_infos = bt_value_map_get(trace_info, "streams"); + if (!stream_infos || !bt_value_is_array(stream_infos)) { + ret = -1; + BT_LOGD_STR("Cannot retrieve stream informations from trace in query result."); + goto error; + } + + stream_count = bt_value_array_size(stream_infos); + if (stream_count < 0) { + ret = -1; + goto error; + } + + /* + * FIXME + * + * The first path of a stream's "paths" is currently used to + * associate streams/ports to a given trace intersection. + * + * This is a fragile hack as it relies on the port names + * being set to the various streams path. + * + * A stream name should be introduced as part of the trace-info + * query result. + */ + for (stream_idx = 0; stream_idx < stream_count; stream_idx++) { + const char *stream_path; + gboolean hash_ret; + + port_id = g_new0(struct port_id, 1); + if (!port_id) { + ret = -1; + BT_LOGE_STR("Cannot allocate memory for port_id structure."); + goto error; + } + port_id->instance_name = strdup(cfg_comp->instance_name->str); + if (!port_id->instance_name) { + ret = -1; + BT_LOGE_STR("Cannot allocate memory for port_id component instance name."); + goto error; + } + + trace_range = g_new0(struct trace_range, 1); + if (!trace_range) { + ret = -1; + BT_LOGE_STR("Cannot allocate memory for trace_range structure."); + goto error; + } + trace_range->intersection_range_begin_ns = begin; + trace_range->intersection_range_end_ns = end; + + stream_info = bt_value_array_get(stream_infos, + stream_idx); + if (!stream_info || !bt_value_is_map(stream_info)) { + ret = -1; + BT_LOGD_STR("Cannot retrieve stream informations from trace in query result."); + goto error; + } + + stream_paths = bt_value_map_get(stream_info, "paths"); + if (!stream_paths || !bt_value_is_array(stream_paths)) { + ret = -1; + BT_LOGD_STR("Cannot retrieve stream paths from trace in query result."); + goto error; + } + + stream_path_value = bt_value_array_get(stream_paths, 0); + if (!stream_path_value || + !bt_value_is_string(stream_path_value)) { + ret = -1; + BT_LOGD_STR("Cannot retrieve stream path value from trace in query result."); + goto error; + } + + value_status = bt_value_string_get(stream_path_value, + &stream_path); + if (value_status != BT_VALUE_STATUS_OK) { + ret = -1; + goto error; + } + + port_id->port_name = strdup(stream_path); + if (!port_id->port_name) { + ret = -1; + BT_LOGE_STR("Cannot allocate memory for port_id port_name."); + goto error; + } + + BT_LOGD("Inserting stream intersection "); + + hash_ret = g_hash_table_insert(ctx->intersections, + port_id, trace_range); + assert(hash_ret); + + port_id = NULL; + trace_range = NULL; + BT_PUT(stream_info); + BT_PUT(stream_paths); + BT_PUT(stream_path_value); + } + + BT_PUT(trace_info); + BT_PUT(stream_paths); + BT_PUT(stream_path_value); + BT_PUT(intersection_range); + BT_PUT(intersection_begin); + BT_PUT(intersection_end); + BT_PUT(stream_paths); + BT_PUT(stream_path_value); + } + + goto end; + +error: + fprintf(stderr, "%s%sCannot determine stream intersection of trace at path \'%s\'.%s\n", + bt_common_color_bold(), + bt_common_color_fg_yellow(), + path ? path : "(unknown)", + bt_common_color_reset()); +end: + bt_put(component_path_value); + bt_put(query_params); + bt_put(query_result); + bt_put(trace_info); + bt_put(intersection_range); + bt_put(intersection_begin); + bt_put(intersection_end); + bt_put(stream_infos); + bt_put(stream_info); + bt_put(stream_paths); + bt_put(stream_path_value); + g_free(port_id); + g_free(trace_range); + return ret; +} + static int cmd_run_ctx_create_components_from_config_components( struct cmd_run_ctx *ctx, GPtrArray *cfg_components) @@ -1636,6 +2168,14 @@ int cmd_run_ctx_create_components_from_config_components( goto error; } + if (ctx->stream_intersection_mode && + cfg_comp->type == BT_COMPONENT_CLASS_TYPE_SOURCE) { + ret = set_stream_intersections(ctx, cfg_comp, comp_cls); + if (ret) { + goto error; + } + } + BT_LOGI("Created and inserted component: comp-addr=%p, comp-name=\"%s\"", comp, cfg_comp->instance_name->str); quark = g_quark_from_string(cfg_comp->instance_name->str); @@ -1734,16 +2274,25 @@ int cmd_run_ctx_connect_ports(struct cmd_run_ctx *ctx) g_hash_table_iter_init(&iter, ctx->components); while (g_hash_table_iter_next(&iter, &g_name_quark, &g_comp)) { + int64_t (*port_count_fn)(struct bt_component *); + struct bt_port *(*port_by_index_fn)(struct bt_component *, uint64_t); + if (bt_component_is_source(g_comp)) { - ret = cmd_run_ctx_connect_comp_ports(ctx, - g_comp, bt_component_source_get_output_port_count, - bt_component_source_get_output_port_by_index); + port_count_fn = + bt_component_source_get_output_port_count; + port_by_index_fn = + bt_component_source_get_output_port_by_index; } else if (bt_component_is_filter(g_comp)) { - ret = cmd_run_ctx_connect_comp_ports(ctx, - g_comp, bt_component_filter_get_output_port_count, - bt_component_filter_get_output_port_by_index); + port_count_fn = + bt_component_filter_get_output_port_count; + port_by_index_fn = + bt_component_filter_get_output_port_by_index; + } else { + continue; } + ret = cmd_run_ctx_connect_comp_ports(ctx, + g_comp, port_count_fn, port_by_index_fn); if (ret) { goto end; }