+ (int) bt_plugin_get_filter_component_class_count(plugin));
+ printf(" %sSink component classes%s: %d\n",
+ bt_common_color_bold(),
+ bt_common_color_reset(),
+ (int) bt_plugin_get_sink_component_class_count(plugin));
+
+ if (strlen(cfg->cmd_data.help.cfg_component->comp_cls_name->str) == 0) {
+ /* Plugin help only */
+ goto end;
+ }
+
+ needed_comp_cls = find_component_class(
+ cfg->cmd_data.help.cfg_component->plugin_name->str,
+ cfg->cmd_data.help.cfg_component->comp_cls_name->str,
+ cfg->cmd_data.help.cfg_component->type);
+ if (!needed_comp_cls) {
+ BT_LOGE("Cannot find component class: plugin-name=\"%s\", "
+ "comp-cls-name=\"%s\", comp-cls-type=%d",
+ cfg->cmd_data.help.cfg_component->plugin_name->str,
+ cfg->cmd_data.help.cfg_component->comp_cls_name->str,
+ cfg->cmd_data.help.cfg_component->type);
+ fprintf(stderr, "\n%s%sCannot find component class %s",
+ bt_common_color_bold(),
+ bt_common_color_fg_red(),
+ bt_common_color_reset());
+ print_plugin_comp_cls_opt(stderr,
+ cfg->cmd_data.help.cfg_component->plugin_name->str,
+ cfg->cmd_data.help.cfg_component->comp_cls_name->str,
+ cfg->cmd_data.help.cfg_component->type);
+ fprintf(stderr, "\n");
+ ret = -1;
+ goto end;
+ }
+
+ printf("\n");
+ print_component_class_help(
+ cfg->cmd_data.help.cfg_component->plugin_name->str,
+ needed_comp_cls);
+
+end:
+ bt_component_class_put_ref(needed_comp_cls);
+ bt_plugin_put_ref(plugin);
+ return ret;
+}
+
+typedef void *(* plugin_borrow_comp_cls_by_index_func_t)(const bt_plugin *,
+ uint64_t);
+typedef const bt_component_class *(* spec_comp_cls_borrow_comp_cls_func_t)(
+ void *);
+
+void cmd_list_plugins_print_component_classes(const bt_plugin *plugin,
+ const char *cc_type_name, uint64_t count,
+ plugin_borrow_comp_cls_by_index_func_t borrow_comp_cls_by_index_func,
+ spec_comp_cls_borrow_comp_cls_func_t spec_comp_cls_borrow_comp_cls_func)
+{
+ uint64_t i;
+
+ if (count == 0) {
+ printf(" %s%s component classes%s: (none)\n",
+ bt_common_color_bold(),
+ cc_type_name,
+ bt_common_color_reset());
+ goto end;
+ } else {
+ printf(" %s%s component classes%s:\n",
+ bt_common_color_bold(),
+ cc_type_name,
+ bt_common_color_reset());
+ }
+
+ for (i = 0; i < count; i++) {
+ const bt_component_class *comp_class =
+ spec_comp_cls_borrow_comp_cls_func(
+ borrow_comp_cls_by_index_func(plugin, i));
+ const char *comp_class_name =
+ bt_component_class_get_name(comp_class);
+ const char *comp_class_description =
+ bt_component_class_get_description(comp_class);
+ bt_component_class_type type =
+ bt_component_class_get_type(comp_class);
+
+ printf(" ");
+ print_plugin_comp_cls_opt(stdout,
+ bt_plugin_get_name(plugin), comp_class_name,
+ type);
+
+ if (comp_class_description) {
+ printf(": %s", comp_class_description);
+ }
+
+ printf("\n");
+ }
+
+end:
+ return;
+}
+
+static
+int cmd_list_plugins(struct bt_config *cfg)
+{
+ int ret = 0;
+ int plugins_count, component_classes_count = 0, i;
+
+ printf("From the following plugin paths:\n\n");
+ print_value(stdout, cfg->plugin_paths, 2);
+ printf("\n");
+ plugins_count = loaded_plugins->len;
+ if (plugins_count == 0) {
+ printf("No plugins found.\n");
+ goto end;
+ }
+
+ for (i = 0; i < plugins_count; i++) {
+ const bt_plugin *plugin = g_ptr_array_index(loaded_plugins, i);
+
+ component_classes_count +=
+ bt_plugin_get_source_component_class_count(plugin) +
+ bt_plugin_get_filter_component_class_count(plugin) +
+ bt_plugin_get_sink_component_class_count(plugin);
+ }
+
+ printf("Found %s%d%s component classes in %s%d%s plugins.\n",
+ bt_common_color_bold(),
+ component_classes_count,
+ bt_common_color_reset(),
+ bt_common_color_bold(),
+ plugins_count,
+ bt_common_color_reset());
+
+ for (i = 0; i < plugins_count; i++) {
+ const bt_plugin *plugin = g_ptr_array_index(loaded_plugins, i);
+
+ printf("\n");
+ print_plugin_info(plugin);
+ cmd_list_plugins_print_component_classes(plugin, "Source",
+ bt_plugin_get_source_component_class_count(plugin),
+ (plugin_borrow_comp_cls_by_index_func_t)
+ bt_plugin_borrow_source_component_class_by_index_const,
+ (spec_comp_cls_borrow_comp_cls_func_t)
+ bt_component_class_source_as_component_class);
+ cmd_list_plugins_print_component_classes(plugin, "Filter",
+ bt_plugin_get_filter_component_class_count(plugin),
+ (plugin_borrow_comp_cls_by_index_func_t)
+ bt_plugin_borrow_filter_component_class_by_index_const,
+ (spec_comp_cls_borrow_comp_cls_func_t)
+ bt_component_class_filter_as_component_class);
+ cmd_list_plugins_print_component_classes(plugin, "Sink",
+ bt_plugin_get_sink_component_class_count(plugin),
+ (plugin_borrow_comp_cls_by_index_func_t)
+ bt_plugin_borrow_sink_component_class_by_index_const,
+ (spec_comp_cls_borrow_comp_cls_func_t)
+ bt_component_class_sink_as_component_class);
+ }
+
+end:
+ return ret;
+}
+
+static
+int cmd_print_lttng_live_sessions(struct bt_config *cfg)
+{
+ int ret = 0;
+ const bt_component_class *comp_cls = NULL;
+ const bt_value *results = NULL;
+ bt_value *params = NULL;
+ const bt_value *map = NULL;
+ const bt_value *v = NULL;
+ static const char * const plugin_name = "ctf";
+ static const char * const comp_cls_name = "lttng-live";
+ static const bt_component_class_type comp_cls_type =
+ BT_COMPONENT_CLASS_TYPE_SOURCE;
+ int64_t array_size, i;
+ const char *fail_reason = NULL;
+ FILE *out_stream = stdout;
+
+ BT_ASSERT(cfg->cmd_data.print_lttng_live_sessions.url);
+ comp_cls = find_component_class(plugin_name, comp_cls_name,
+ comp_cls_type);
+ if (!comp_cls) {
+ BT_LOGE("Cannot find component class: plugin-name=\"%s\", "
+ "comp-cls-name=\"%s\", comp-cls-type=%d",
+ plugin_name, comp_cls_name,
+ BT_COMPONENT_CLASS_TYPE_SOURCE);
+ fprintf(stderr, "%s%sCannot find component class %s",
+ bt_common_color_bold(),
+ bt_common_color_fg_red(),
+ bt_common_color_reset());
+ print_plugin_comp_cls_opt(stderr, plugin_name,
+ comp_cls_name, comp_cls_type);
+ fprintf(stderr, "\n");
+ goto error;
+ }
+
+ params = bt_value_map_create();
+ if (!params) {
+ goto error;
+ }
+
+ ret = bt_value_map_insert_string_entry(params, "url",
+ cfg->cmd_data.print_lttng_live_sessions.url->str);
+ if (ret) {
+ goto error;
+ }
+
+ ret = query(comp_cls, "sessions", params,
+ &results, &fail_reason);
+ if (ret) {
+ goto failed;
+ }
+
+ BT_ASSERT(results);
+
+ if (!bt_value_is_array(results)) {
+ BT_LOGE_STR("Expecting an array for sessions query.");
+ fprintf(stderr, "%s%sUnexpected type returned by session query%s\n",
+ bt_common_color_bold(),
+ bt_common_color_fg_red(),
+ bt_common_color_reset());
+ goto error;
+ }
+
+ if (cfg->cmd_data.print_lttng_live_sessions.output_path->len > 0) {
+ out_stream =
+ fopen(cfg->cmd_data.print_lttng_live_sessions.output_path->str,
+ "w");
+ if (!out_stream) {
+ ret = -1;
+ BT_LOGE_ERRNO("Cannot open file for writing",
+ ": path=\"%s\"",
+ cfg->cmd_data.print_lttng_live_sessions.output_path->str);
+ goto end;
+ }
+ }
+
+ array_size = bt_value_array_get_size(results);
+ for (i = 0; i < array_size; i++) {
+ const char *url_text;
+ int64_t timer_us, streams, clients;
+
+ map = bt_value_array_borrow_element_by_index_const(results, i);
+ if (!map) {
+ BT_LOGE_STR("Unexpected empty array entry.");
+ goto error;
+ }
+ if (!bt_value_is_map(map)) {
+ BT_LOGE_STR("Unexpected entry type.");
+ goto error;
+ }
+
+ v = bt_value_map_borrow_entry_value_const(map, "url");
+ if (!v) {
+ BT_LOGE_STR("Unexpected empty array \"url\" entry.");
+ goto error;
+ }
+ url_text = bt_value_string_get(v);
+ fprintf(out_stream, "%s", url_text);
+ v = bt_value_map_borrow_entry_value_const(map, "timer-us");
+ if (!v) {
+ BT_LOGE_STR("Unexpected empty array \"timer-us\" entry.");
+ goto error;
+ }
+ timer_us = bt_value_signed_integer_get(v);
+ fprintf(out_stream, " (timer = %" PRIu64 ", ", timer_us);
+ v = bt_value_map_borrow_entry_value_const(map, "stream-count");
+ if (!v) {
+ BT_LOGE_STR("Unexpected empty array \"stream-count\" entry.");
+ goto error;
+ }
+ streams = bt_value_signed_integer_get(v);
+ fprintf(out_stream, "%" PRIu64 " stream(s), ", streams);
+ v = bt_value_map_borrow_entry_value_const(map, "client-count");
+ if (!v) {
+ BT_LOGE_STR("Unexpected empty array \"client-count\" entry.");
+ goto error;
+ }
+ clients = bt_value_signed_integer_get(v);
+ fprintf(out_stream, "%" PRIu64 " client(s) connected)\n", clients);
+ }
+
+ goto end;
+
+failed:
+ BT_LOGE("Failed to query for sessions: %s", fail_reason);
+ fprintf(stderr, "%s%sFailed to request sessions: %s%s\n",
+ bt_common_color_bold(),
+ bt_common_color_fg_red(),
+ fail_reason,
+ bt_common_color_reset());
+
+error:
+ ret = -1;
+
+end:
+ bt_value_put_ref(results);
+ bt_value_put_ref(params);
+ bt_component_class_put_ref(comp_cls);
+
+ if (out_stream && out_stream != stdout) {
+ int fclose_ret = fclose(out_stream);
+
+ if (fclose_ret) {
+ BT_LOGE_ERRNO("Cannot close file stream",
+ ": path=\"%s\"",
+ cfg->cmd_data.print_lttng_live_sessions.output_path->str);
+ }
+ }
+
+ return ret;
+}
+
+static
+int cmd_print_ctf_metadata(struct bt_config *cfg)
+{
+ int ret = 0;
+ const bt_component_class *comp_cls = NULL;
+ const bt_value *results = NULL;
+ bt_value *params = NULL;
+ const bt_value *metadata_text_value = NULL;
+ const char *metadata_text = NULL;
+ static const char * const plugin_name = "ctf";
+ static const char * const comp_cls_name = "fs";
+ static const bt_component_class_type comp_cls_type =
+ BT_COMPONENT_CLASS_TYPE_SOURCE;
+ const char *fail_reason = NULL;
+ FILE *out_stream = stdout;
+
+ BT_ASSERT(cfg->cmd_data.print_ctf_metadata.path);
+ comp_cls = find_component_class(plugin_name, comp_cls_name,
+ comp_cls_type);
+ if (!comp_cls) {
+ BT_LOGE("Cannot find component class: plugin-name=\"%s\", "
+ "comp-cls-name=\"%s\", comp-cls-type=%d",
+ plugin_name, comp_cls_name,
+ BT_COMPONENT_CLASS_TYPE_SOURCE);
+ fprintf(stderr, "%s%sCannot find component class %s",
+ bt_common_color_bold(),
+ bt_common_color_fg_red(),
+ bt_common_color_reset());
+ print_plugin_comp_cls_opt(stderr, plugin_name,
+ comp_cls_name, comp_cls_type);
+ fprintf(stderr, "\n");
+ ret = -1;
+ goto end;
+ }
+
+ params = bt_value_map_create();
+ if (!params) {
+ ret = -1;
+ goto end;
+ }
+
+ ret = bt_value_map_insert_string_entry(params, "path",
+ cfg->cmd_data.print_ctf_metadata.path->str);
+ if (ret) {
+ ret = -1;
+ goto end;
+ }
+
+ ret = query(comp_cls, "metadata-info",
+ params, &results, &fail_reason);
+ if (ret) {
+ goto failed;
+ }
+
+ metadata_text_value = bt_value_map_borrow_entry_value_const(results,
+ "text");
+ if (!metadata_text_value) {
+ BT_LOGE_STR("Cannot find `text` string value in the resulting metadata info object.");
+ ret = -1;
+ goto end;
+ }
+
+ metadata_text = bt_value_string_get(metadata_text_value);
+
+ if (cfg->cmd_data.print_ctf_metadata.output_path->len > 0) {
+ out_stream =
+ fopen(cfg->cmd_data.print_ctf_metadata.output_path->str,
+ "w");
+ if (!out_stream) {
+ ret = -1;
+ BT_LOGE_ERRNO("Cannot open file for writing",
+ ": path=\"%s\"",
+ cfg->cmd_data.print_ctf_metadata.output_path->str);
+ goto end;
+ }
+ }
+
+ ret = fprintf(out_stream, "%s\n", metadata_text);
+ if (ret < 0) {
+ BT_LOGE("Cannot write whole metadata text to output stream: "
+ "ret=%d", ret);
+ }
+
+ goto end;
+
+failed:
+ ret = -1;
+ BT_LOGE("Failed to query for metadata info: %s", fail_reason);
+ fprintf(stderr, "%s%sFailed to request metadata info: %s%s\n",
+ bt_common_color_bold(),
+ bt_common_color_fg_red(),
+ fail_reason,
+ bt_common_color_reset());
+
+end:
+ bt_value_put_ref(results);
+ bt_value_put_ref(params);
+ bt_component_class_put_ref(comp_cls);
+
+ if (out_stream && out_stream != stdout) {
+ int fclose_ret = fclose(out_stream);
+
+ if (fclose_ret) {
+ BT_LOGE_ERRNO("Cannot close file stream",
+ ": path=\"%s\"",
+ cfg->cmd_data.print_ctf_metadata.output_path->str);
+ }
+ }
+
+ return ret;
+}
+
+struct port_id {
+ char *instance_name;
+ char *port_name;
+};
+
+struct trace_range {
+ uint64_t intersection_range_begin_ns;
+ uint64_t intersection_range_end_ns;
+};
+
+static
+guint port_id_hash(gconstpointer v)
+{
+ const struct port_id *id = v;
+
+ BT_ASSERT(id->instance_name);
+ BT_ASSERT(id->port_name);
+
+ return g_str_hash(id->instance_name) ^ g_str_hash(id->port_name);
+}
+
+static
+gboolean port_id_equal(gconstpointer v1, gconstpointer v2)
+{
+ const struct port_id *id1 = v1;
+ const struct port_id *id2 = v2;
+
+ return !strcmp(id1->instance_name, id2->instance_name) &&
+ !strcmp(id1->port_name, id2->port_name);
+}
+
+static
+void port_id_destroy(gpointer data)
+{
+ struct port_id *id = data;
+
+ free(id->instance_name);
+ free(id->port_name);
+ free(id);
+}
+
+static
+void trace_range_destroy(gpointer data)
+{
+ free(data);
+}
+
+struct cmd_run_ctx {
+ /* Owned by this */
+ GHashTable *src_components;
+
+ /* Owned by this */
+ GHashTable *flt_components;
+
+ /* Owned by this */
+ GHashTable *sink_components;
+
+ /* Owned by this */
+ bt_graph *graph;
+
+ /* Weak */
+ struct bt_config *cfg;
+
+ bool connect_ports;
+
+ bool stream_intersection_mode;
+
+ /*
+ * Association of struct port_id -> struct trace_range.
+ */
+ GHashTable *intersections;
+};
+
+/* Returns a timestamp of the form "(-)s.ns" */
+static
+char *s_from_ns(int64_t ns)
+{
+ int ret;
+ char *s_ret = NULL;
+ bool is_negative;
+ int64_t ts_sec_abs, ts_nsec_abs;
+ int64_t ts_sec = ns / NSEC_PER_SEC;
+ int64_t ts_nsec = ns % NSEC_PER_SEC;
+
+ if (ts_sec >= 0 && ts_nsec >= 0) {
+ is_negative = false;
+ ts_sec_abs = ts_sec;
+ ts_nsec_abs = ts_nsec;
+ } else if (ts_sec > 0 && ts_nsec < 0) {
+ is_negative = false;
+ ts_sec_abs = ts_sec - 1;
+ ts_nsec_abs = NSEC_PER_SEC + ts_nsec;
+ } else if (ts_sec == 0 && ts_nsec < 0) {
+ is_negative = true;
+ ts_sec_abs = ts_sec;
+ ts_nsec_abs = -ts_nsec;
+ } else if (ts_sec < 0 && ts_nsec > 0) {
+ is_negative = true;
+ ts_sec_abs = -(ts_sec + 1);
+ ts_nsec_abs = NSEC_PER_SEC - ts_nsec;
+ } else if (ts_sec < 0 && ts_nsec == 0) {
+ is_negative = true;
+ ts_sec_abs = -ts_sec;
+ ts_nsec_abs = ts_nsec;
+ } else { /* (ts_sec < 0 && ts_nsec < 0) */
+ is_negative = true;
+ ts_sec_abs = -ts_sec;
+ ts_nsec_abs = -ts_nsec;
+ }
+
+ ret = asprintf(&s_ret, "%s%" PRId64 ".%09" PRId64,
+ is_negative ? "-" : "", ts_sec_abs, ts_nsec_abs);
+ if (ret < 0) {
+ s_ret = NULL;
+ }
+ return s_ret;
+}
+
+static
+int cmd_run_ctx_connect_upstream_port_to_downstream_component(
+ struct cmd_run_ctx *ctx,
+ const bt_component *upstream_comp,
+ const bt_port_output *out_upstream_port,
+ struct bt_config_connection *cfg_conn)
+{
+ typedef uint64_t (*input_port_count_func_t)(void *);
+ typedef const bt_port_input *(*borrow_input_port_by_index_func_t)(
+ const void *, uint64_t);
+ const bt_port *upstream_port =
+ bt_port_output_as_port_const(out_upstream_port);
+
+ int ret = 0;
+ GQuark downstreamp_comp_name_quark;
+ void *downstream_comp;
+ uint64_t downstream_port_count;
+ uint64_t i;
+ input_port_count_func_t port_count_fn;
+ borrow_input_port_by_index_func_t port_by_index_fn;
+ bt_graph_status status = BT_GRAPH_STATUS_ERROR;
+ bool insert_trimmer = false;
+ bt_value *trimmer_params = NULL;
+ char *intersection_begin = NULL;
+ char *intersection_end = NULL;
+ const bt_component_filter *trimmer = NULL;
+ const bt_component_class_filter *trimmer_class = NULL;
+ const bt_port_input *trimmer_input = NULL;
+ const bt_port_output *trimmer_output = NULL;
+
+ if (ctx->intersections &&
+ bt_component_get_class_type(upstream_comp) ==
+ BT_COMPONENT_CLASS_TYPE_SOURCE) {
+ struct trace_range *range;
+ struct port_id port_id = {
+ .instance_name = (char *) bt_component_get_name(upstream_comp),
+ .port_name = (char *) bt_port_get_name(upstream_port)
+ };
+
+ if (!port_id.instance_name || !port_id.port_name) {
+ goto error;
+ }
+
+ range = (struct trace_range *) g_hash_table_lookup(
+ ctx->intersections, &port_id);
+ if (range) {
+ bt_value_status status;
+
+ intersection_begin = s_from_ns(
+ range->intersection_range_begin_ns);
+ intersection_end = s_from_ns(
+ range->intersection_range_end_ns);
+ if (!intersection_begin || !intersection_end) {
+ BT_LOGE_STR("Cannot create trimmer argument timestamp string.");
+ goto error;
+ }
+
+ insert_trimmer = true;
+ trimmer_params = bt_value_map_create();
+ if (!trimmer_params) {
+ goto error;
+ }
+
+ status = bt_value_map_insert_string_entry(
+ trimmer_params, "begin", intersection_begin);
+ if (status != BT_VALUE_STATUS_OK) {
+ goto error;
+ }
+ status = bt_value_map_insert_string_entry(
+ trimmer_params,
+ "end", intersection_end);
+ if (status != BT_VALUE_STATUS_OK) {
+ goto error;
+ }
+ }
+
+ trimmer_class = find_filter_component_class("utils", "trimmer");
+ if (!trimmer_class) {
+ goto error;
+ }
+ }
+
+ BT_LOGI("Connecting upstream port to the next available downstream port: "
+ "upstream-port-addr=%p, upstream-port-name=\"%s\", "
+ "downstream-comp-name=\"%s\", conn-arg=\"%s\"",
+ upstream_port, bt_port_get_name(upstream_port),
+ cfg_conn->downstream_comp_name->str,
+ cfg_conn->arg->str);
+ downstreamp_comp_name_quark = g_quark_from_string(
+ cfg_conn->downstream_comp_name->str);
+ BT_ASSERT(downstreamp_comp_name_quark > 0);
+ downstream_comp = g_hash_table_lookup(ctx->flt_components,
+ GUINT_TO_POINTER(downstreamp_comp_name_quark));
+ port_count_fn = (input_port_count_func_t)
+ bt_component_filter_get_input_port_count;
+ port_by_index_fn = (borrow_input_port_by_index_func_t)
+ bt_component_filter_borrow_input_port_by_index_const;
+
+ if (!downstream_comp) {
+ downstream_comp = g_hash_table_lookup(ctx->sink_components,
+ GUINT_TO_POINTER(downstreamp_comp_name_quark));
+ port_count_fn = (input_port_count_func_t)
+ bt_component_sink_get_input_port_count;
+ port_by_index_fn = (borrow_input_port_by_index_func_t)
+ bt_component_sink_borrow_input_port_by_index_const;
+ }
+
+ if (!downstream_comp) {
+ BT_LOGE("Cannot find downstream component: comp-name=\"%s\", "
+ "conn-arg=\"%s\"", cfg_conn->downstream_comp_name->str,
+ cfg_conn->arg->str);
+ fprintf(stderr, "Cannot create connection: cannot find downstream component: %s\n",
+ cfg_conn->arg->str);
+ goto error;
+ }
+
+ downstream_port_count = port_count_fn(downstream_comp);
+
+ for (i = 0; i < downstream_port_count; i++) {
+ const bt_port_input *in_downstream_port =
+ port_by_index_fn(downstream_comp, i);
+ const bt_port *downstream_port =
+ bt_port_input_as_port_const(in_downstream_port);
+ const char *upstream_port_name;
+ const char *downstream_port_name;
+
+ BT_ASSERT(downstream_port);
+
+ /* Skip port if it's already connected. */
+ if (bt_port_is_connected(downstream_port)) {
+ BT_LOGD("Skipping downstream port: already connected: "
+ "port-addr=%p, port-name=\"%s\"",
+ downstream_port,
+ bt_port_get_name(downstream_port));
+ continue;
+ }
+
+ downstream_port_name = bt_port_get_name(downstream_port);
+ BT_ASSERT(downstream_port_name);
+ upstream_port_name = bt_port_get_name(upstream_port);
+ BT_ASSERT(upstream_port_name);
+
+ if (!bt_common_star_glob_match(
+ cfg_conn->downstream_port_glob->str, SIZE_MAX,
+ downstream_port_name, SIZE_MAX)) {
+ continue;
+ }
+
+ if (insert_trimmer) {
+ /*
+ * In order to insert the trimmer between the
+ * two components that were being connected, we
+ * create a connection configuration entry which
+ * describes a connection from the trimmer's
+ * output to the original input that was being
+ * connected.
+ *
+ * Hence, the creation of the trimmer will cause
+ * the graph "new port" listener to establish
+ * all downstream connections as its output port
+ * is connected. We will then establish the
+ * connection between the original upstream
+ * source and the trimmer.
+ */
+ char *trimmer_name = NULL;
+ bt_graph_status graph_status;
+
+ ret = asprintf(&trimmer_name,
+ "stream-intersection-trimmer-%s",
+ upstream_port_name);
+ if (ret < 0) {
+ goto error;
+ }
+ ret = 0;
+
+ ctx->connect_ports = false;
+ graph_status = bt_graph_add_filter_component(
+ ctx->graph, trimmer_class, trimmer_name,
+ trimmer_params, &trimmer);
+ free(trimmer_name);
+ if (graph_status != BT_GRAPH_STATUS_OK) {
+ goto error;
+ }
+ BT_ASSERT(trimmer);
+
+ trimmer_input =
+ bt_component_filter_borrow_input_port_by_index_const(
+ trimmer, 0);
+ if (!trimmer_input) {
+ goto error;
+ }
+ trimmer_output =
+ bt_component_filter_borrow_output_port_by_index_const(
+ trimmer, 0);
+ if (!trimmer_output) {
+ goto error;
+ }
+
+ /*
+ * Replace the current downstream port by the trimmer's
+ * upstream port.
+ */
+ in_downstream_port = trimmer_input;
+ downstream_port =
+ bt_port_input_as_port_const(in_downstream_port);
+ downstream_port_name = bt_port_get_name(
+ downstream_port);
+ BT_ASSERT(downstream_port_name);
+ }
+
+ /* We have a winner! */
+ status = bt_graph_connect_ports(ctx->graph,
+ out_upstream_port, in_downstream_port, NULL);
+ downstream_port = NULL;
+ switch (status) {
+ case BT_GRAPH_STATUS_OK:
+ break;
+ case BT_GRAPH_STATUS_CANCELED:
+ BT_LOGI_STR("Graph was canceled by user.");
+ status = BT_GRAPH_STATUS_OK;
+ break;
+ case BT_GRAPH_STATUS_COMPONENT_REFUSES_PORT_CONNECTION:
+ BT_LOGE("A component refused a connection to one of its ports: "
+ "upstream-comp-addr=%p, upstream-comp-name=\"%s\", "
+ "upstream-port-addr=%p, upstream-port-name=\"%s\", "
+ "downstream-comp-addr=%p, downstream-comp-name=\"%s\", "
+ "downstream-port-addr=%p, downstream-port-name=\"%s\", "
+ "conn-arg=\"%s\"",
+ upstream_comp, bt_component_get_name(upstream_comp),
+ upstream_port, bt_port_get_name(upstream_port),
+ downstream_comp, cfg_conn->downstream_comp_name->str,
+ downstream_port, downstream_port_name,
+ cfg_conn->arg->str);
+ fprintf(stderr,
+ "A component refused a connection to one of its ports (`%s` to `%s`): %s\n",
+ bt_port_get_name(upstream_port),
+ downstream_port_name,
+ cfg_conn->arg->str);
+ break;
+ default:
+ BT_LOGE("Cannot create connection: graph refuses to connect ports: "
+ "upstream-comp-addr=%p, upstream-comp-name=\"%s\", "
+ "upstream-port-addr=%p, upstream-port-name=\"%s\", "
+ "downstream-comp-addr=%p, downstream-comp-name=\"%s\", "
+ "downstream-port-addr=%p, downstream-port-name=\"%s\", "
+ "conn-arg=\"%s\"",
+ upstream_comp, bt_component_get_name(upstream_comp),
+ upstream_port, bt_port_get_name(upstream_port),
+ downstream_comp, cfg_conn->downstream_comp_name->str,
+ downstream_port, downstream_port_name,
+ cfg_conn->arg->str);
+ fprintf(stderr,
+ "Cannot create connection: graph refuses to connect ports (`%s` to `%s`): %s\n",
+ bt_port_get_name(upstream_port),
+ downstream_port_name,
+ cfg_conn->arg->str);
+ goto error;
+ }
+
+ BT_LOGI("Connected component ports: "
+ "upstream-comp-addr=%p, upstream-comp-name=\"%s\", "
+ "upstream-port-addr=%p, upstream-port-name=\"%s\", "
+ "downstream-comp-addr=%p, downstream-comp-name=\"%s\", "
+ "downstream-port-addr=%p, downstream-port-name=\"%s\", "
+ "conn-arg=\"%s\"",
+ upstream_comp, bt_component_get_name(upstream_comp),
+ upstream_port, bt_port_get_name(upstream_port),
+ downstream_comp, cfg_conn->downstream_comp_name->str,
+ downstream_port, downstream_port_name,
+ cfg_conn->arg->str);
+
+ if (insert_trimmer) {
+ /*
+ * The first connection, from the source to the trimmer,
+ * has been done. We now connect the trimmer to the
+ * original downstream port.
+ */
+ ret = cmd_run_ctx_connect_upstream_port_to_downstream_component(
+ ctx,
+ bt_component_filter_as_component_const(trimmer),
+ trimmer_output, cfg_conn);
+ if (ret) {
+ goto error;
+ }
+ ctx->connect_ports = true;
+ }
+
+ /*
+ * We found a matching downstream port: the search is
+ * over.
+ */
+ goto end;
+ }
+
+ /* No downstream port found */
+ BT_LOGE("Cannot create connection: cannot find a matching downstream port for upstream port: "
+ "upstream-port-addr=%p, upstream-port-name=\"%s\", "
+ "downstream-comp-name=\"%s\", conn-arg=\"%s\"",
+ upstream_port, bt_port_get_name(upstream_port),
+ cfg_conn->downstream_comp_name->str,
+ cfg_conn->arg->str);
+ fprintf(stderr,
+ "Cannot create connection: cannot find a matching downstream port for upstream port `%s`: %s\n",
+ bt_port_get_name(upstream_port), cfg_conn->arg->str);
+
+error:
+ ret = -1;
+
+end:
+ free(intersection_begin);
+ free(intersection_end);
+ BT_VALUE_PUT_REF_AND_RESET(trimmer_params);
+ BT_COMPONENT_CLASS_FILTER_PUT_REF_AND_RESET(trimmer_class);
+ BT_COMPONENT_FILTER_PUT_REF_AND_RESET(trimmer);
+ return ret;
+}
+
+static
+int cmd_run_ctx_connect_upstream_port(struct cmd_run_ctx *ctx,
+ const bt_port_output *upstream_port)
+{
+ int ret = 0;
+ const char *upstream_port_name;
+ const char *upstream_comp_name;
+ const bt_component *upstream_comp = NULL;
+ size_t i;
+
+ BT_ASSERT(ctx);
+ BT_ASSERT(upstream_port);
+ upstream_port_name = bt_port_get_name(
+ bt_port_output_as_port_const(upstream_port));
+ BT_ASSERT(upstream_port_name);
+ upstream_comp = bt_port_borrow_component_const(
+ bt_port_output_as_port_const(upstream_port));
+ if (!upstream_comp) {
+ BT_LOGW("Upstream port to connect is not part of a component: "
+ "port-addr=%p, port-name=\"%s\"",
+ upstream_port, upstream_port_name);
+ ret = -1;
+ goto end;
+ }
+
+ upstream_comp_name = bt_component_get_name(upstream_comp);
+ BT_ASSERT(upstream_comp_name);
+ BT_LOGI("Connecting upstream port: comp-addr=%p, comp-name=\"%s\", "
+ "port-addr=%p, port-name=\"%s\"",
+ upstream_comp, upstream_comp_name,
+ upstream_port, upstream_port_name);
+
+ for (i = 0; i < ctx->cfg->cmd_data.run.connections->len; i++) {
+ struct bt_config_connection *cfg_conn =
+ g_ptr_array_index(
+ ctx->cfg->cmd_data.run.connections, i);
+
+ if (strcmp(cfg_conn->upstream_comp_name->str,
+ upstream_comp_name)) {
+ continue;
+ }
+
+ if (!bt_common_star_glob_match(
+ cfg_conn->upstream_port_glob->str,
+ SIZE_MAX, upstream_port_name, SIZE_MAX)) {
+ continue;
+ }
+
+ ret = cmd_run_ctx_connect_upstream_port_to_downstream_component(
+ ctx, upstream_comp, upstream_port, cfg_conn);
+ if (ret) {
+ BT_LOGE("Cannot connect upstream port: "
+ "port-addr=%p, port-name=\"%s\"",
+ upstream_port,
+ upstream_port_name);
+ fprintf(stderr,
+ "Cannot connect port `%s` of component `%s` to a downstream port: %s\n",
+ upstream_port_name,
+ upstream_comp_name,
+ cfg_conn->arg->str);
+ goto error;
+ }
+ goto end;
+ }
+
+ BT_LOGE("Cannot connect upstream port: port does not match any connection argument: "
+ "port-addr=%p, port-name=\"%s\"", upstream_port,
+ upstream_port_name);
+ fprintf(stderr,
+ "Cannot create connection: upstream port `%s` does not match any connection\n",
+ upstream_port_name);
+
+error:
+ ret = -1;
+
+end:
+ return ret;
+}
+
+static
+void graph_output_port_added_listener(struct cmd_run_ctx *ctx,
+ const bt_port_output *out_port)
+{
+ const bt_component *comp;
+ const bt_port *port = bt_port_output_as_port_const(out_port);
+
+ comp = bt_port_borrow_component_const(port);
+ BT_LOGI("Port added to a graph's component: comp-addr=%p, "
+ "comp-name=\"%s\", port-addr=%p, port-name=\"%s\"",
+ comp, comp ? bt_component_get_name(comp) : "",
+ port, bt_port_get_name(port));
+
+ if (!ctx->connect_ports) {
+ goto end;
+ }
+
+ if (!comp) {
+ BT_LOGW_STR("Port has no component.");
+ goto end;
+ }
+
+ if (bt_port_is_connected(port)) {
+ BT_LOGW_STR("Port is already connected.");
+ goto end;
+ }
+
+ if (cmd_run_ctx_connect_upstream_port(ctx, out_port)) {
+ BT_LOGF_STR("Cannot connect upstream port.");
+ fprintf(stderr, "Added port could not be connected: aborting\n");
+ abort();
+ }
+
+end:
+ return;
+}
+
+static
+void graph_source_output_port_added_listener(
+ const bt_component_source *component,
+ const bt_port_output *port, void *data)
+{
+ graph_output_port_added_listener(data, port);
+}
+
+static
+void graph_filter_output_port_added_listener(
+ const bt_component_filter *component,
+ const bt_port_output *port, void *data)
+{
+ graph_output_port_added_listener(data, port);
+}
+
+static
+void cmd_run_ctx_destroy(struct cmd_run_ctx *ctx)
+{
+ if (!ctx) {
+ return;
+ }
+
+ if (ctx->src_components) {
+ g_hash_table_destroy(ctx->src_components);
+ ctx->src_components = NULL;
+ }
+
+ if (ctx->flt_components) {
+ g_hash_table_destroy(ctx->flt_components);
+ ctx->flt_components = NULL;
+ }
+
+ if (ctx->sink_components) {
+ g_hash_table_destroy(ctx->sink_components);
+ ctx->sink_components = NULL;
+ }
+
+ if (ctx->intersections) {
+ g_hash_table_destroy(ctx->intersections);
+ ctx->intersections = NULL;
+ }
+
+ BT_GRAPH_PUT_REF_AND_RESET(ctx->graph);
+ the_graph = NULL;
+ ctx->cfg = NULL;
+}
+
+static
+int cmd_run_ctx_init(struct cmd_run_ctx *ctx, struct bt_config *cfg)
+{
+ int ret = 0;
+ bt_graph_status status;
+
+ ctx->cfg = cfg;
+ ctx->connect_ports = false;
+ ctx->src_components = g_hash_table_new_full(g_direct_hash,
+ g_direct_equal, NULL, (GDestroyNotify) bt_object_put_ref);
+ if (!ctx->src_components) {
+ goto error;
+ }