import bt2
-class GraphListenerType:
- PORT_ADDED = 0
- PORT_REMOVED = 1
- PORTS_CONNECTED = 2
- PORTS_DISCONNECTED = 3
-
-
-def _graph_port_added_listener_from_native(user_listener, port_ptr):
- try:
- port = bt2.port._create_from_ptr(port_ptr)
- port._get()
- user_listener(port)
- except:
- pass
-
-
-def _graph_port_removed_listener_from_native(user_listener, port_ptr):
- try:
- port = bt2.port._create_from_ptr(port_ptr)
- port._get()
- user_listener(port)
- except:
- pass
+def _graph_port_added_listener_from_native(user_listener,
+ component_ptr, component_type,
+ port_ptr, port_type):
+ component = bt2.component._create_component_from_ptr_and_get_ref(component_ptr, component_type)
+ port = bt2.port._create_from_ptr_and_get_ref(port_ptr, port_type)
+ user_listener(component, port)
def _graph_ports_connected_listener_from_native(user_listener,
+ upstream_component_ptr, upstream_component_type,
upstream_port_ptr,
+ downstream_component_ptr, downstream_component_type,
downstream_port_ptr):
- try:
- upstream_port = bt2.port._create_from_ptr(upstream_port_ptr)
- upstream_port._get()
- downstream_port = bt2.port._create_from_ptr(downstream_port_ptr)
- downstream_port._get()
- user_listener(upstream_port, downstream_port)
- except:
- pass
-
-
-def _graph_ports_disconnected_listener_from_native(user_listener,
- upstream_comp_ptr,
- downstream_comp_ptr,
- upstream_port_ptr,
- downstream_port_ptr):
- try:
- upstream_comp = bt2.component._create_generic_component_from_ptr(upstream_comp_ptr)
- upstream_comp._get()
- downstream_comp = bt2.component._create_generic_component_from_ptr(downstream_comp_ptr)
- downstream_comp._get()
- upstream_port = bt2.port._create_from_ptr(upstream_port_ptr)
- upstream_port._get()
- downstream_port = bt2.port._create_from_ptr(downstream_port_ptr)
- downstream_port._get()
- user_listener(upstream_comp, downstream_comp, upstream_port,
- downstream_port)
- except:
- pass
+ upstream_component = bt2.component._create_component_from_ptr_and_get_ref(
+ upstream_component_ptr, upstream_component_type)
+ upstream_port = bt2.port._create_from_ptr_and_get_ref(
+ upstream_port_ptr, native_bt.PORT_TYPE_OUTPUT)
+ downstream_component = bt2.component._create_component_from_ptr_and_get_ref(
+ downstream_component_ptr, downstream_component_type)
+ downstream_port = bt2.port._create_from_ptr_and_get_ref(
+ downstream_port_ptr, native_bt.PORT_TYPE_INPUT)
+ user_listener(upstream_component, upstream_port, downstream_component, downstream_port)
class Graph(object._SharedObject):
raise bt2.Error(gen_error_msg)
def add_component(self, component_class, name, params=None):
- if issubclass(component_class, bt2.component._UserSourceComponent):
- cc_ptr = component_class._cc_ptr
+ if isinstance(component_class, bt2.component._GenericSourceComponentClass):
+ cc_ptr = component_class._ptr
add_fn = native_bt.graph_add_source_component
cc_type = native_bt.COMPONENT_CLASS_TYPE_SOURCE
- elif issubclass(component_class, bt2.component._UserFilterComponent):
- cc_ptr = component_class._cc_ptr
+ elif isinstance(component_class, bt2.component._GenericFilterComponentClass):
+ cc_ptr = component_class._ptr
add_fn = native_bt.graph_add_filter_component
cc_type = native_bt.COMPONENT_CLASS_TYPE_FILTER
+ elif isinstance(component_class, bt2.component._GenericSinkComponentClass):
+ cc_ptr = component_class._ptr
+ add_fn = native_bt.graph_add_sink_component
+ cc_type = native_bt.COMPONENT_CLASS_TYPE_SINK
+ elif issubclass(component_class, bt2.component._UserSourceComponent):
+ cc_ptr = component_class._cc_ptr
+ add_fn = native_bt.graph_add_source_component
+ cc_type = native_bt.COMPONENT_CLASS_TYPE_SOURCE
elif issubclass(component_class, bt2.component._UserSinkComponent):
cc_ptr = component_class._cc_ptr
add_fn = native_bt.graph_add_sink_component
cc_type = native_bt.COMPONENT_CLASS_TYPE_SINK
+ elif issubclass(component_class, bt2.component._UserFilterComponent):
+ cc_ptr = component_class._cc_ptr
+ add_fn = native_bt.graph_add_filter_component
+ cc_type = native_bt.COMPONENT_CLASS_TYPE_FILTER
else:
raise TypeError("'{}' is not a component class".format(
component_class.__class__.__name__))
assert(conn_ptr)
return bt2.connection._Connection._create_from_ptr(conn_ptr)
- def add_listener(self, listener_type, listener):
- if not hasattr(listener, '__call__'):
+ def add_port_added_listener(self, listener):
+ if not callable(listener):
raise TypeError("'listener' parameter is not callable")
- if listener_type == GraphListenerType.PORT_ADDED:
- fn = native_bt.py3_graph_add_port_added_listener
- listener_from_native = functools.partial(_graph_port_added_listener_from_native,
- listener)
- elif listener_type == GraphListenerType.PORT_REMOVED:
- fn = native_bt.py3_graph_add_port_removed_listener
- listener_from_native = functools.partial(_graph_port_removed_listener_from_native,
- listener)
- elif listener_type == GraphListenerType.PORTS_CONNECTED:
- fn = native_bt.py3_graph_add_ports_connected_listener
- listener_from_native = functools.partial(_graph_ports_connected_listener_from_native,
- listener)
- elif listener_type == GraphListenerType.PORTS_DISCONNECTED:
- fn = native_bt.py3_graph_add_ports_disconnected_listener
- listener_from_native = functools.partial(_graph_ports_disconnected_listener_from_native,
- listener)
- else:
- raise TypeError
+ fn = native_bt.py3_graph_add_port_added_listener
+ listener_from_native = functools.partial(_graph_port_added_listener_from_native,
+ listener)
- listener_id = fn(self._ptr, listener_from_native)
- utils._handle_ret(listener_id, 'cannot add listener to graph object')
- return bt2._ListenerHandle(listener_id, self)
+ listener_ids = fn(self._ptr, listener_from_native)
+ if listener_ids is None:
+ utils._raise_bt2_error('cannot add listener to graph object')
+ return bt2._ListenerHandle(listener_ids, self)
+
+ def add_ports_connected_listener(self, listener):
+ if not callable(listener):
+ raise TypeError("'listener' parameter is not callable")
+
+ fn = native_bt.py3_graph_add_ports_connected_listener
+ listener_from_native = functools.partial(_graph_ports_connected_listener_from_native,
+ listener)
+
+ listener_ids = fn(self._ptr, listener_from_native)
+ if listener_ids is None:
+ utils._raise_bt2_error('cannot add listener to graph object')
+ return bt2._ListenerHandle(listener_ids, self)
def run(self):
status = native_bt.graph_run(self._ptr)
raise bt2.CreationError('cannot create output port message iterator')
return bt2.message_iterator._OutputPortMessageIterator(msg_iter_ptr)
-
- def __eq__(self, other):
- if type(other) is not type(self):
- return False
-
- return self.addr == other.addr
PyObject *py_component_ptr = NULL;
PyObject *py_port_ptr = NULL;
PyObject *py_res = NULL;
- bt_graph_listener_status status = BT_GRAPH_LISTENER_STATUS_OK;
+ bt_graph_listener_status status;
py_component_ptr = SWIG_NewPointerObj(SWIG_as_voidptr(component), component_swig_type, 0);
if (!py_component_ptr) {
BT_LOGF_STR("Failed to create component SWIG pointer object.");
+ status = BT_GRAPH_LISTENER_STATUS_NOMEM;
goto end;
}
py_port_ptr = SWIG_NewPointerObj(SWIG_as_voidptr(port), port_swig_type, 0);
if (!py_port_ptr) {
BT_LOGF_STR("Failed to create port SWIG pointer object.");
+ status = BT_GRAPH_LISTENER_STATUS_NOMEM;
goto end;
}
if (!py_res) {
bt2_py_loge_exception();
PyErr_Clear();
- } else {
- BT_ASSERT(py_res == Py_None);
+ status = BT_GRAPH_LISTENER_STATUS_ERROR;
+ goto end;
}
+
+ BT_ASSERT(py_res == Py_None);
+ status = BT_GRAPH_LISTENER_STATUS_OK;
end:
Py_XDECREF(py_res);
}
static bt_graph_listener_status
-ports_connected_listener(const bt_port_output *upstream_port,
- const bt_port_input *downstream_port,
- void *py_callable)
+ports_connected_listener(
+ const void *upstream_component,
+ swig_type_info *upstream_component_swig_type,
+ bt_component_class_type upstream_component_class_type,
+ const bt_port_output *upstream_port,
+ const void *downstream_component,
+ swig_type_info *downstream_component_swig_type,
+ bt_component_class_type downstream_component_class_type,
+ const bt_port_input *downstream_port,
+ void *py_callable)
{
+ PyObject *py_upstream_component_ptr = NULL;
PyObject *py_upstream_port_ptr = NULL;
+ PyObject *py_downstream_component_ptr = NULL;
PyObject *py_downstream_port_ptr = NULL;
PyObject *py_res = NULL;
- bt_graph_listener_status status = BT_GRAPH_LISTENER_STATUS_OK;
+ bt_graph_listener_status status;
+
+ py_upstream_component_ptr = SWIG_NewPointerObj(SWIG_as_voidptr(upstream_component),
+ upstream_component_swig_type, 0);
+ if (!py_upstream_component_ptr) {
+ BT_LOGF_STR("Failed to create upstream component SWIG pointer object.");
+ status = BT_GRAPH_LISTENER_STATUS_NOMEM;
+ goto end;
+ }
py_upstream_port_ptr = SWIG_NewPointerObj(
SWIG_as_voidptr(upstream_port), SWIGTYPE_p_bt_port_output, 0);
if (!py_upstream_port_ptr) {
- BT_LOGF_STR("Failed to create a SWIG pointer object.");
- abort();
+ BT_LOGF_STR("Failed to create upstream port SWIG pointer object.");
+ status = BT_GRAPH_LISTENER_STATUS_NOMEM;
+ goto end;
+ }
+
+ py_downstream_component_ptr = SWIG_NewPointerObj(SWIG_as_voidptr(downstream_component),
+ downstream_component_swig_type, 0);
+ if (!py_downstream_component_ptr) {
+ BT_LOGF_STR("Failed to create downstream component SWIG pointer object.");
+ status = BT_GRAPH_LISTENER_STATUS_NOMEM;
+ goto end;
}
py_downstream_port_ptr = SWIG_NewPointerObj(
SWIG_as_voidptr(downstream_port), SWIGTYPE_p_bt_port_input, 0);
if (!py_downstream_port_ptr) {
- BT_LOGF_STR("Failed to create a SWIG pointer object.");
- abort();
+ BT_LOGF_STR("Failed to create downstream port SWIG pointer object.");
+ status = BT_GRAPH_LISTENER_STATUS_NOMEM;
+ goto end;
}
- py_res = PyObject_CallFunction(py_callable, "(OO)",
- py_upstream_port_ptr, py_downstream_port_ptr);
+ py_res = PyObject_CallFunction(py_callable, "(OiOOiO)",
+ py_upstream_component_ptr, upstream_component_class_type,
+ py_upstream_port_ptr,
+ py_downstream_component_ptr, downstream_component_class_type,
+ py_downstream_port_ptr);
if (!py_res) {
bt2_py_loge_exception();
PyErr_Clear();
- } else {
- BT_ASSERT(py_res == Py_None);
+ status = BT_GRAPH_LISTENER_STATUS_ERROR;
+ goto end;
}
+
+ BT_ASSERT(py_res == Py_None);
+ status = BT_GRAPH_LISTENER_STATUS_OK;
- Py_DECREF(py_upstream_port_ptr);
- Py_DECREF(py_downstream_port_ptr);
+end:
+ Py_XDECREF(py_upstream_component_ptr);
+ Py_XDECREF(py_upstream_port_ptr);
+ Py_XDECREF(py_downstream_component_ptr);
+ Py_XDECREF(py_downstream_port_ptr);
Py_XDECREF(py_res);
return status;
const bt_port_output *upstream_port,
const bt_port_input *downstream_port, void *py_callable)
{
- return ports_connected_listener(upstream_port, downstream_port,
- py_callable);
+ return ports_connected_listener(
+ source_component, SWIGTYPE_p_bt_component_source, BT_COMPONENT_CLASS_TYPE_SOURCE,
+ upstream_port,
+ filter_component, SWIGTYPE_p_bt_component_filter, BT_COMPONENT_CLASS_TYPE_FILTER,
+ downstream_port,
+ py_callable);
}
static bt_graph_listener_status
const bt_port_output *upstream_port,
const bt_port_input *downstream_port, void *py_callable)
{
- return ports_connected_listener(upstream_port, downstream_port,
- py_callable);
+ return ports_connected_listener(
+ source_component, SWIGTYPE_p_bt_component_source, BT_COMPONENT_CLASS_TYPE_SOURCE,
+ upstream_port,
+ sink_component, SWIGTYPE_p_bt_component_sink, BT_COMPONENT_CLASS_TYPE_SINK,
+ downstream_port,
+ py_callable);
}
static bt_graph_listener_status
const bt_port_output *upstream_port,
const bt_port_input *downstream_port, void *py_callable)
{
- return ports_connected_listener(upstream_port, downstream_port,
- py_callable);
+ return ports_connected_listener(
+ filter_component_left, SWIGTYPE_p_bt_component_filter, BT_COMPONENT_CLASS_TYPE_FILTER,
+ upstream_port,
+ filter_component_right, SWIGTYPE_p_bt_component_filter, BT_COMPONENT_CLASS_TYPE_FILTER,
+ downstream_port,
+ py_callable);
}
static bt_graph_listener_status
const bt_port_output *upstream_port,
const bt_port_input *downstream_port, void *py_callable)
{
- return ports_connected_listener(upstream_port, downstream_port,
- py_callable);
+ return ports_connected_listener(
+ filter_component, SWIGTYPE_p_bt_component_filter, BT_COMPONENT_CLASS_TYPE_FILTER,
+ upstream_port,
+ sink_component, SWIGTYPE_p_bt_component_sink, BT_COMPONENT_CLASS_TYPE_SINK,
+ downstream_port,
+ py_callable);
}
-
static PyObject*
bt_py3_graph_add_ports_connected_listener(struct bt_graph *graph,
PyObject *py_callable)
import bt2
-@unittest.skip("this is broken")
+class _MyIter(bt2._UserMessageIterator):
+ def __init__(self):
+ self._build_meta()
+ self._at = 0
+
+ def _build_meta(self):
+ self._tc = self._component._create_trace_class()
+ self._t = self._tc()
+ self._sc = self._tc.create_stream_class()
+ self._ec = self._sc.create_event_class(name='salut')
+ self._my_int_ft = self._tc.create_signed_integer_field_class(32)
+ payload_ft = self._tc.create_structure_field_class()
+ payload_ft += collections.OrderedDict([
+ ('my_int', self._my_int_ft),
+ ])
+ self._ec.payload_field_type = payload_ft
+ self._stream = self._t.create_stream(self._sc)
+ self._packet = self._stream.create_packet()
+
+ def _create_event(self, value):
+ ev = self._ec()
+ ev.payload_field['my_int'] = value
+ ev.packet = self._packet
+ return ev
+
+
class GraphTestCase(unittest.TestCase):
def setUp(self):
self._graph = bt2.Graph()
pass
comp = self._graph.add_component(MySink, 'salut')
- assert(comp)
+ assert comp
comp2 = self._graph.add_component(comp.component_class, 'salut2')
self.assertEqual(comp2.name, 'salut2')
src = self._graph.add_component(MySource, 'src')
sink = self._graph.add_component(MySink, 'sink')
+
conn = self._graph.connect_ports(src.output_ports['out'],
sink.input_ports['in'])
self.assertTrue(src.output_ports['out'].is_connected)
self.assertTrue(sink.input_ports['in'].is_connected)
- self.assertEqual(src.output_ports['out'].connection, conn)
- self.assertEqual(sink.input_ports['in'].connection, conn)
+ self.assertEqual(src.output_ports['out'].connection._ptr, conn._ptr)
+ self.assertEqual(sink.input_ports['in'].connection._ptr, conn._ptr)
def test_connect_ports_invalid_direction(self):
class MyIter(bt2._UserMessageIterator):
conn = self._graph.connect_ports(src.output_ports['out'],
sink.input_ports['in'])
- def test_connect_ports_canceled(self):
- class MyIter(bt2._UserMessageIterator):
- def __next__(self):
- raise bt2.Stop
-
- class MySource(bt2._UserSourceComponent,
- message_iterator_class=MyIter):
- def __init__(self, params):
- self._add_output_port('out')
-
- class MySink(bt2._UserSinkComponent):
- def __init__(self, params):
- self._add_input_port('in')
-
- def _consume(self):
- raise bt2.Stop
-
- src = self._graph.add_component(MySource, 'src')
- sink = self._graph.add_component(MySink, 'sink')
+ def test_cancel(self):
+ self.assertFalse(self._graph.is_canceled)
self._graph.cancel()
+ self.assertTrue(self._graph.is_canceled)
- with self.assertRaises(bt2.GraphCanceled):
- conn = self._graph.connect_ports(src.output_ports['out'],
- sink.input_ports['in'])
-
- def test_connect_ports_cannot_consume_accept(self):
- class MyIter(bt2._UserMessageIterator):
- def __next__(self):
- raise bt2.Stop
-
- class MySource(bt2._UserSourceComponent,
- message_iterator_class=MyIter):
- def __init__(self, params):
- self._add_output_port('out')
-
- class MySink(bt2._UserSinkComponent):
- def __init__(self, params):
- self._add_input_port('in')
-
- def _consume(self):
- raise bt2.Stop
-
- def _accept_port_connection(self, port, other_port):
- nonlocal exc
-
- try:
- self.graph.run()
- except Exception as e:
- exc = e
-
- return True
-
- exc = None
- src = self._graph.add_component(MySource, 'src')
- sink = self._graph.add_component(MySink, 'sink')
- self._graph.connect_ports(src.output_ports['out'],
- sink.input_ports['in'])
- self.assertIs(type(exc), bt2.CannotConsumeGraph)
-
- def test_connect_ports_cannot_consume_connected(self):
- class MyIter(bt2._UserMessageIterator):
+ # Test that Graph.run() raises bt2.GraphCanceled if the graph gets canceled
+ # during execution.
+ def test_cancel_while_running(self):
+ class MyIter(_MyIter):
def __next__(self):
- raise bt2.Stop
+ return self._create_stream_beginning_message(self._stream)
class MySource(bt2._UserSourceComponent,
message_iterator_class=MyIter):
self._add_input_port('in')
def _consume(self):
- raise bt2.Stop
-
- def _port_connected(self, port, other_port):
- nonlocal exc
+ # Pretend that somebody asynchronously cancelled the graph.
+ nonlocal graph
+ graph.cancel()
- try:
- self.graph.run()
- except Exception as e:
- exc = e
+ return next(self._msg_iter)
- return True
+ def _graph_is_configured(self):
+ self._msg_iter = self._input_ports['in'].create_message_iterator()
- exc = None
- src = self._graph.add_component(MySource, 'src')
- sink = self._graph.add_component(MySink, 'sink')
- self._graph.connect_ports(src.output_ports['out'],
- sink.input_ports['in'])
- self._graph.run()
- self.assertIs(type(exc), bt2.CannotConsumeGraph)
-
- def test_cancel(self):
- self.assertFalse(self._graph.is_canceled)
- self._graph.cancel()
- self.assertTrue(self._graph.is_canceled)
+ graph = bt2.Graph()
+ up = graph.add_component(MySource, 'down')
+ down = graph.add_component(MySink, 'up')
+ graph.connect_ports(up.output_ports['out'], down.input_ports['in'])
+ with self.assertRaises(bt2.GraphCanceled):
+ graph.run()
def test_run(self):
- class MyIter(bt2._UserMessageIterator):
- def __init__(self):
- self._build_meta()
- self._at = 0
-
- def _build_meta(self):
- self._trace = bt2.Trace()
- self._sc = bt2.StreamClass()
- self._ec = bt2.EventClass('salut')
- self._my_int_fc = bt2.IntegerFieldClass(32)
- self._ec.payload_field_class = bt2.StructureFieldClass()
- self._ec.payload_field_class += collections.OrderedDict([
- ('my_int', self._my_int_fc),
- ])
- self._sc.add_event_class(self._ec)
- self._trace.add_stream_class(self._sc)
- self._stream = self._sc()
- self._packet = self._stream.create_packet()
-
- def _create_event(self, value):
- ev = self._ec()
- ev.payload_field['my_int'] = value
- ev.packet = self._packet
- return ev
-
+ class MyIter(_MyIter):
def __next__(self):
- if self._at == 5:
- raise bt2.Stop
+ if self._at == 9:
+ raise StopIteration
+
+ if self._at == 0:
+ msg = self._create_stream_beginning_message(self._stream)
+ elif self._at == 1:
+ msg = self._create_packet_beginning_message(self._packet)
+ elif self._at == 7:
+ msg = self._create_packet_end_message(self._packet)
+ elif self._at == 8:
+ msg = self._create_stream_end_message(self._stream)
+ else:
+ msg = self._create_event_message(self._ec, self._packet)
- msg = bt2.EventMessage(self._create_event(self._at * 3))
self._at += 1
return msg
class MySink(bt2._UserSinkComponent):
def __init__(self, params):
- self._add_input_port('in')
+ self._input_port = self._add_input_port('in')
self._at = 0
def _consume(comp_self):
msg = next(comp_self._msg_iter)
if comp_self._at == 0:
- self.assertIsInstance(msg, bt2.StreamBeginningMessage)
+ self.assertIsInstance(msg, bt2.message._StreamBeginningMessage)
elif comp_self._at == 1:
- self.assertIsInstance(msg, bt2.PacketBeginningMessage)
+ self.assertIsInstance(msg, bt2.message._PacketBeginningMessage)
elif comp_self._at >= 2 and comp_self._at <= 6:
- self.assertIsInstance(msg, bt2.EventMessage)
+ self.assertIsInstance(msg, bt2.message._EventMessage)
self.assertEqual(msg.event.event_class.name, 'salut')
- field = msg.event.payload_field['my_int']
- self.assertEqual(field, (comp_self._at - 2) * 3)
elif comp_self._at == 7:
- self.assertIsInstance(msg, bt2.PacketEndMessage)
+ self.assertIsInstance(msg, bt2.message._PacketEndMessage)
elif comp_self._at == 8:
- self.assertIsInstance(msg, bt2.StreamEndMessage)
+ self.assertIsInstance(msg, bt2.message._StreamEndMessage)
comp_self._at += 1
- def _port_connected(self, port, other_port):
- self._msg_iter = port.connection.create_message_iterator()
+ def _graph_is_configured(self):
+ self._msg_iter = self._input_port.create_message_iterator()
src = self._graph.add_component(MySource, 'src')
sink = self._graph.add_component(MySink, 'sink')
self._graph.run()
def test_run_again(self):
- class MyIter(bt2._UserMessageIterator):
- def __init__(self):
- self._build_meta()
- self._at = 0
-
- def _build_meta(self):
- self._trace = bt2.Trace()
- self._sc = bt2.StreamClass()
- self._ec = bt2.EventClass('salut')
- self._my_int_fc = bt2.IntegerFieldClass(32)
- self._ec.payload_field_class = bt2.StructureFieldClass()
- self._ec.payload_field_class += collections.OrderedDict([
- ('my_int', self._my_int_fc),
- ])
- self._sc.add_event_class(self._ec)
- self._trace.add_stream_class(self._sc)
- self._stream = self._sc()
- self._packet = self._stream.create_packet()
-
- def _create_event(self, value):
- ev = self._ec()
- ev.payload_field['my_int'] = value
- ev.packet = self._packet
- return ev
-
+ class MyIter(_MyIter):
def __next__(self):
- if self._at == 1:
+ if self._at == 3:
raise bt2.TryAgain
- msg = bt2.EventMessage(self._create_event(self._at * 3))
+ if self._at == 0:
+ msg = self._create_stream_beginning_message(self._stream)
+ elif self._at == 1:
+ msg = self._create_packet_beginning_message(self._packet)
+ elif self._at == 2:
+ msg = self._create_event_message(self._ec, self._packet)
+
self._at += 1
return msg
class MySink(bt2._UserSinkComponent):
def __init__(self, params):
- self._add_input_port('in')
+ self._input_port = self._add_input_port('in')
self._at = 0
def _consume(comp_self):
+ msg = next(comp_self._msg_iter)
if comp_self._at == 0:
- msg = next(comp_self._msg_iter)
- self.assertIsInstance(msg, bt2.EventMessage)
+ self.assertIsInstance(msg, bt2.message._StreamBeginningMessage)
elif comp_self._at == 1:
- with self.assertRaises(bt2.TryAgain):
- msg = next(comp_self._msg_iter)
-
+ self.assertIsInstance(msg, bt2.message._PacketBeginningMessage)
+ elif comp_self._at == 2:
+ self.assertIsInstance(msg, bt2.message._EventMessage)
raise bt2.TryAgain
+ else:
+ pass
comp_self._at += 1
- def _port_connected(self, port, other_port):
- types = [bt2.EventMessage]
- self._msg_iter = port.connection.create_message_iterator(types)
+ def _graph_is_configured(self):
+ self._msg_iter = self._input_port.create_message_iterator()
src = self._graph.add_component(MySource, 'src')
sink = self._graph.add_component(MySink, 'sink')
self._graph.run()
def test_run_error(self):
- class MyIter(bt2._UserMessageIterator):
- def __init__(self):
- self._build_meta()
- self._at = 0
-
- def _build_meta(self):
- self._trace = bt2.Trace()
- self._sc = bt2.StreamClass()
- self._ec = bt2.EventClass('salut')
- self._my_int_fc = bt2.IntegerFieldClass(32)
- self._ec.payload_field_class = bt2.StructureFieldClass()
- self._ec.payload_field_class += collections.OrderedDict([
- ('my_int', self._my_int_fc),
- ])
- self._sc.add_event_class(self._ec)
- self._trace.add_stream_class(self._sc)
- self._stream = self._sc()
- self._packet = self._stream.create_packet()
-
- def _create_event(self, value):
- ev = self._ec()
- ev.payload_field['my_int'] = value
- ev.packet = self._packet
- return ev
+ raised_in_sink = False
+ class MyIter(_MyIter):
def __next__(self):
- if self._at == 1:
+ # If this gets called after the sink raised an exception, it is
+ # an error.
+ nonlocal raised_in_sink
+ assert raised_in_sink is False
+
+ if self._at == 0:
+ msg = self._create_stream_beginning_message(self._stream)
+ elif self._at == 1:
+ msg = self._create_packet_beginning_message(self._packet)
+ elif self._at == 2 or self._at == 3:
+ msg = self._create_event_message(self._ec, self._packet)
+ else:
raise bt2.TryAgain
-
- msg = bt2.EventMessage(self._create_event(self._at * 3))
self._at += 1
return msg
class MySink(bt2._UserSinkComponent):
def __init__(self, params):
- self._add_input_port('in')
+ self._input_port = self._add_input_port('in')
self._at = 0
def _consume(comp_self):
+ msg = next(comp_self._msg_iter)
if comp_self._at == 0:
- msg = next(comp_self._msg_iter)
- self.assertIsInstance(msg, bt2.EventMessage)
+ self.assertIsInstance(msg, bt2.message._StreamBeginningMessage)
elif comp_self._at == 1:
+ self.assertIsInstance(msg, bt2.message._PacketBeginningMessage)
+ elif comp_self._at == 2:
+ self.assertIsInstance(msg, bt2.message._EventMessage)
+ elif comp_self._at == 3:
+ nonlocal raised_in_sink
+ raised_in_sink = True
raise RuntimeError('error!')
comp_self._at += 1
- def _port_connected(self, port, other_port):
- types = [bt2.EventMessage]
- self._msg_iter = port.connection.create_message_iterator(types)
+ def _graph_is_configured(self):
+ self._msg_iter = self._input_port.create_message_iterator()
src = self._graph.add_component(MySource, 'src')
sink = self._graph.add_component(MySink, 'sink')
with self.assertRaises(bt2.Error):
self._graph.run()
- def test_run_cannot_consume(self):
+ def test_listeners(self):
class MyIter(bt2._UserMessageIterator):
- pass
+ def __next__(self):
+ raise bt2.Stop
class MySource(bt2._UserSourceComponent,
message_iterator_class=MyIter):
def __init__(self, params):
self._add_output_port('out')
+ self._add_output_port('zero')
class MySink(bt2._UserSinkComponent):
def __init__(self, params):
self._add_input_port('in')
- self._at = 0
- def _consume(comp_self):
- nonlocal exc
+ def _consume(self):
+ raise bt2.Stop
- try:
- print('going in')
- comp_self.graph.run()
- print('going out')
- except Exception as e:
- exc = e
+ def _port_connected(self, port, other_port):
+ self._add_input_port('taste')
- raise bt2.Stop
+ def port_added_listener(component, port):
+ nonlocal calls
+ calls.append((port_added_listener, component, port))
- exc = None
+ def ports_connected_listener(upstream_component, upstream_port,
+ downstream_component, downstream_port):
+ nonlocal calls
+ calls.append((ports_connected_listener,
+ upstream_component, upstream_port,
+ downstream_component, downstream_port))
+
+ calls = []
+ self._graph.add_port_added_listener(port_added_listener)
+ self._graph.add_ports_connected_listener(ports_connected_listener)
src = self._graph.add_component(MySource, 'src')
sink = self._graph.add_component(MySink, 'sink')
- conn = self._graph.connect_ports(src.output_ports['out'],
- sink.input_ports['in'])
- self._graph.run()
- self.assertIs(type(exc), bt2.CannotConsumeGraph)
+ self._graph.connect_ports(src.output_ports['out'],
+ sink.input_ports['in'])
- def test_listeners(self):
+ self.assertEqual(len(calls), 5)
+
+ self.assertIs(calls[0][0], port_added_listener)
+ self.assertEqual(calls[0][1].name, 'src')
+ self.assertEqual(calls[0][2].name, 'out')
+
+ self.assertIs(calls[1][0], port_added_listener)
+ self.assertEqual(calls[1][1].name, 'src')
+ self.assertEqual(calls[1][2].name, 'zero')
+
+ self.assertIs(calls[2][0], port_added_listener)
+ self.assertEqual(calls[2][1].name, 'sink')
+ self.assertEqual(calls[2][2].name, 'in')
+
+ self.assertIs(calls[3][0], port_added_listener)
+ self.assertEqual(calls[3][1].name, 'sink')
+ self.assertEqual(calls[3][2].name, 'taste')
+
+ self.assertIs(calls[4][0], ports_connected_listener)
+ self.assertEqual(calls[4][1].name, 'src')
+ self.assertEqual(calls[4][2].name, 'out')
+ self.assertEqual(calls[4][3].name, 'sink')
+ self.assertEqual(calls[4][4].name, 'in')
+
+ def test_invalid_listeners(self):
class MyIter(bt2._UserMessageIterator):
def __next__(self):
raise bt2.Stop
self._add_output_port('out')
self._add_output_port('zero')
- def _port_connected(self, port, other_port):
- self._output_ports['zero'].remove_from_component()
-
class MySink(bt2._UserSinkComponent):
def __init__(self, params):
self._add_input_port('in')
def _port_connected(self, port, other_port):
self._add_input_port('taste')
- def _port_disconnected(self, port):
- port.remove_from_component()
+ with self.assertRaises(TypeError):
+ self._graph.add_port_added_listener(1234)
+ with self.assertRaises(TypeError):
+ self._graph.add_ports_connected_listener(1234)
- def port_added_listener(port):
- nonlocal calls
- calls.append((port_added_listener, port))
+ def test_raise_in_component_init(self):
+ class MySink(bt2._UserSinkComponent):
+ def __init__(self, params):
+ raise ValueError('oops!')
- def port_removed_listener(port):
- nonlocal calls
- calls.append((port_removed_listener, port))
+ def _consume(self):
+ raise bt2.Stop
+
+ graph = bt2.Graph()
+
+ with self.assertRaises(bt2.Error):
+ graph.add_component(MySink, 'comp')
+
+ def test_raise_in_port_added_listener(self):
+ class MySink(bt2._UserSinkComponent):
+ def __init__(self, params):
+ self._add_input_port('in')
+
+ def _consume(self):
+ raise bt2.Stop
+
+ def port_added_listener(component, port):
+ raise ValueError('oh noes!')
+
+ graph = bt2.Graph()
+ graph.add_port_added_listener(port_added_listener)
+
+ with self.assertRaises(bt2.Error):
+ graph.add_component(MySink, 'comp')
+
+ def test_raise_in_ports_connected_listener(self):
+ class MyIter(bt2._UserMessageIterator):
+ def __next__(self):
+ raise bt2.Stop
+
+ class MySource(bt2._UserSourceComponent,
+ message_iterator_class=MyIter):
+ def __init__(self, params):
+ self._add_output_port('out')
+
+ class MySink(bt2._UserSinkComponent):
+ def __init__(self, params):
+ self._add_input_port('in')
+
+ def _consume(self):
+ raise bt2.Stop
def ports_connected_listener(upstream_port, downstream_port):
- nonlocal calls
- calls.append((ports_connected_listener, upstream_port,
- downstream_port))
+ raise ValueError('oh noes!')
- def ports_disconnected_listener(upstream_comp, downstream_comp,
- upstream_port, downstream_port):
- nonlocal calls
- calls.append((ports_disconnected_listener, upstream_comp,
- downstream_comp, upstream_port, downstream_port))
+ graph = bt2.Graph()
+ graph.add_ports_connected_listener(ports_connected_listener)
+ up = graph.add_component(MySource, 'down')
+ down = graph.add_component(MySink, 'up')
- calls = []
- self._graph.add_listener(bt2.GraphListenerType.PORT_ADDED,
- port_added_listener)
- self._graph.add_listener(bt2.GraphListenerType.PORT_REMOVED,
- port_removed_listener)
- self._graph.add_listener(bt2.GraphListenerType.PORTS_CONNECTED,
- ports_connected_listener)
- self._graph.add_listener(bt2.GraphListenerType.PORTS_DISCONNECTED,
- ports_disconnected_listener)
- src = self._graph.add_component(MySource, 'src')
- sink = self._graph.add_component(MySink, 'sink')
- self._graph.connect_ports(src.output_ports['out'],
- sink.input_ports['in'])
- sink.input_ports['in'].disconnect()
- self.assertIs(calls[0][0], port_added_listener)
- self.assertEqual(calls[0][1].name, 'out')
- self.assertIs(calls[1][0], port_added_listener)
- self.assertEqual(calls[1][1].name, 'zero')
- self.assertIs(calls[2][0], port_added_listener)
- self.assertEqual(calls[2][1].name, 'in')
- self.assertIs(calls[3][0], port_removed_listener)
- self.assertEqual(calls[3][1].name, 'zero')
- self.assertIs(calls[4][0], port_added_listener)
- self.assertEqual(calls[4][1].name, 'taste')
- self.assertIs(calls[5][0], ports_connected_listener)
- self.assertEqual(calls[5][1].name, 'out')
- self.assertEqual(calls[5][2].name, 'in')
- self.assertIs(calls[6][0], port_removed_listener)
- self.assertEqual(calls[6][1].name, 'in')
- self.assertIs(calls[7][0], ports_disconnected_listener)
- self.assertEqual(calls[7][1].name, 'src')
- self.assertEqual(calls[7][2].name, 'sink')
- self.assertEqual(calls[7][3].name, 'out')
- self.assertEqual(calls[7][4].name, 'in')
- del calls
+ with self.assertRaises(bt2.Error):
+ graph.connect_ports(up.output_ports['out'], down.input_ports['in'])