+def _handle_component_status(status, gen_error_msg):
+ if status == native_bt.COMPONENT_STATUS_END:
+ raise bt2.Stop
+ elif status == native_bt.COMPONENT_STATUS_AGAIN:
+ raise bt2.TryAgain
+ elif status == native_bt.COMPONENT_STATUS_UNSUPPORTED:
+ raise bt2.UnsupportedFeature
+ elif status == native_bt.COMPONENT_STATUS_REFUSE_PORT_CONNECTION:
+ raise bt2.PortConnectionRefused
+ elif status == native_bt.COMPONENT_STATUS_GRAPH_IS_CANCELED:
+ raise bt2.GraphCanceled
+ elif status < 0:
+ raise bt2.Error(gen_error_msg)
+
+
+class _PortIterator(collections.abc.Iterator):
+ def __init__(self, comp_ports):
+ self._comp_ports = comp_ports
+ self._at = 0
+
+ def __next__(self):
+ if self._at == len(self._comp_ports):
+ raise StopIteration
+
+ comp_ports = self._comp_ports
+ comp_ptr = comp_ports._component._ptr
+ port_ptr = comp_ports._get_port_at_index_fn(comp_ptr, self._at)
+ assert(port_ptr)
+
+ if comp_ports._is_private:
+ port_pub_ptr = native_bt.port_from_private_port(port_ptr)
+ name = native_bt.port_get_name(port_pub_ptr)
+ native_bt.put(port_pub_ptr)
+ else:
+ name = native_bt.port_get_name(port_ptr)
+
+ assert(name is not None)
+ native_bt.put(port_ptr)
+ self._at += 1
+ return name
+
+
+class _ComponentPorts(collections.abc.Mapping):
+ def __init__(self, is_private, component,
+ get_port_by_name_fn, get_port_at_index_fn,
+ get_port_count_fn):
+ self._is_private = is_private
+ self._component = component
+ self._get_port_by_name_fn = get_port_by_name_fn
+ self._get_port_at_index_fn = get_port_at_index_fn
+ self._get_port_count_fn = get_port_count_fn
+
+ def __getitem__(self, key):
+ utils._check_str(key)
+ port_ptr = self._get_port_by_name_fn(self._component._ptr, key)
+
+ if port_ptr is None:
+ raise KeyError(key)
+
+ if self._is_private:
+ return bt2.port._create_private_from_ptr(port_ptr)
+ else:
+ return bt2.port._create_from_ptr(port_ptr)
+
+ def __len__(self):
+ if self._is_private:
+ pub_ptr = native_bt.component_from_private_component(self._component._ptr)
+ count = self._get_port_count_fn(pub_ptr)
+ native_bt.put(pub_ptr)
+ else:
+ count = self._get_port_count_fn(self._component._ptr)
+
+ assert(count >= 0)
+ return count
+
+ def __iter__(self):
+ return _PortIterator(self)
+
+