+ def component_class(self):
+ return self._component_class
+
+ @classmethod
+ def from_named_plugin_and_component_class(
+ cls,
+ plugin_name,
+ component_class_name,
+ params=None,
+ obj=None,
+ logging_level=bt2.LoggingLevel.NONE,
+ ):
+ plugin = bt2.find_plugin(plugin_name)
+
+ if plugin is None:
+ raise ValueError('no such plugin: {}'.format(plugin_name))
+
+ if component_class_name in plugin.source_component_classes:
+ comp_class = plugin.source_component_classes[component_class_name]
+ elif component_class_name in plugin.filter_component_classes:
+ comp_class = plugin.filter_component_classes[component_class_name]
+ else:
+ raise KeyError(
+ 'source or filter component class `{}` not found in plugin `{}`'.format(
+ component_class_name, plugin_name
+ )
+ )
+
+ return cls(comp_class, params, obj, logging_level)
+
+
+class AutoSourceComponentSpec(_BaseComponentSpec):
+ # A component spec that does automatic source discovery.
+ _no_obj = object()
+
+ def __init__(self, input, params=None, obj=_no_obj, logging_level=None):
+ super().__init__(params, obj, logging_level)
+ self._input = input
+
+ @property
+ def input(self):
+ return self._input
+
+
+def _auto_discover_source_component_specs(auto_source_comp_specs, plugin_set):
+ # Transform a list of `AutoSourceComponentSpec` in a list of `ComponentSpec`
+ # using the automatic source discovery mechanism.
+ inputs = bt2.ArrayValue([spec.input for spec in auto_source_comp_specs])
+
+ if plugin_set is None:
+ plugin_set = bt2.find_plugins()
+ else:
+ utils._check_type(plugin_set, bt2_plugin._PluginSet)
+
+ res_ptr = native_bt.bt2_auto_discover_source_components(
+ inputs._ptr, plugin_set._ptr
+ )
+
+ if res_ptr is None:
+ raise bt2._MemoryError('cannot auto discover source components')
+
+ res = bt2_value._create_from_ptr(res_ptr)
+
+ assert type(res) == bt2.MapValue
+ assert 'status' in res
+
+ status = res['status']
+ utils._handle_func_status(status, 'cannot auto-discover source components')
+
+ comp_specs = []
+ comp_specs_raw = res['results']
+ assert type(comp_specs_raw) == bt2.ArrayValue
+
+ used_input_indices = set()
+
+ for comp_spec_raw in comp_specs_raw:
+ assert type(comp_spec_raw) == bt2.ArrayValue
+ assert len(comp_spec_raw) == 4
+
+ plugin_name = comp_spec_raw[0]
+ assert type(plugin_name) == bt2.StringValue
+ plugin_name = str(plugin_name)
+
+ class_name = comp_spec_raw[1]
+ assert type(class_name) == bt2.StringValue
+ class_name = str(class_name)
+
+ comp_inputs = comp_spec_raw[2]
+ assert type(comp_inputs) == bt2.ArrayValue
+
+ comp_orig_indices = comp_spec_raw[3]
+ assert type(comp_orig_indices)
+
+ params = bt2.MapValue()
+ logging_level = bt2.LoggingLevel.NONE
+ obj = None
+
+ # Compute `params` for this component by piling up params given to all
+ # AutoSourceComponentSpec objects that contributed in the instantiation
+ # of this component.
+ #
+ # The effective log level for a component is the last one specified
+ # across the AutoSourceComponentSpec that contributed in its
+ # instantiation.
+ for idx in comp_orig_indices:
+ orig_spec = auto_source_comp_specs[idx]
+
+ if orig_spec.params is not None:
+ params.update(orig_spec.params)
+
+ if orig_spec.logging_level is not None:
+ logging_level = orig_spec.logging_level
+
+ if orig_spec.obj is not AutoSourceComponentSpec._no_obj:
+ obj = orig_spec.obj
+
+ used_input_indices.add(int(idx))
+
+ params['inputs'] = comp_inputs
+
+ comp_specs.append(
+ ComponentSpec.from_named_plugin_and_component_class(
+ plugin_name,
+ class_name,
+ params=params,
+ obj=obj,
+ logging_level=logging_level,
+ )
+ )
+
+ if len(used_input_indices) != len(inputs):
+ unused_input_indices = set(range(len(inputs))) - used_input_indices
+ unused_input_indices = sorted(unused_input_indices)
+ unused_inputs = [str(inputs[x]) for x in unused_input_indices]
+
+ msg = (
+ 'Some auto source component specs did not produce any component: '
+ + ', '.join(unused_inputs)
+ )
+ raise RuntimeError(msg)
+
+ return comp_specs