From 5f25509b06eb8dd0a76d3068894d9f8fefd63178 Mon Sep 17 00:00:00 2001 From: Simon Marchi Date: Mon, 3 Jun 2019 16:45:56 -0400 Subject: [PATCH] bt2: Adapt test_graph.py and make it pass This patch updates test_graph.py and graph.py to work with the current BT API. An important change is that now, the port added and ports connected listeners can report their failures up the stack. Therefore, if a Python listener raises an exception, we now report it as an error. Some tests are added for this. The tests: - test_connect_ports_canceled - test_connect_ports_cannot_consume_accept - test_connect_ports_cannot_consume_connected were removed, since the library no longer returns an error in these situations (they are just verified by precondition check). However, a test was added to check that when the graph gets cancelled during execution, a bt2.GraphCanceled gets raised. Some changes in message.py, message_iterator.py and port.py are necessary to support the test. They are not complete, but they are representative of what will come after. Any reference to ports being removed or disconnected were removed, since it's not longer possible to remove or disconnect a port. Change-Id: Ie8f11553f34208bb58242d9108efc361acba3c18 Signed-off-by: Simon Marchi Signed-off-by: Francis Deslauriers Reviewed-on: https://review.lttng.org/c/babeltrace/+/1317 Reviewed-by: Philippe Proulx Tested-by: jenkins --- bindings/python/bt2/bt2/component.py | 17 +- bindings/python/bt2/bt2/graph.py | 139 +++--- bindings/python/bt2/bt2/message.py | 26 +- bindings/python/bt2/bt2/message_iterator.py | 29 +- bindings/python/bt2/bt2/native_bt_graph.i | 108 +++-- bindings/python/bt2/bt2/port.py | 7 + tests/bindings/python/bt2/test_graph.py | 484 +++++++++----------- 7 files changed, 410 insertions(+), 400 deletions(-) diff --git a/bindings/python/bt2/bt2/component.py b/bindings/python/bt2/bt2/component.py index e500c982..a0855a2e 100644 --- a/bindings/python/bt2/bt2/component.py +++ b/bindings/python/bt2/bt2/component.py @@ -294,6 +294,14 @@ _COMP_CLS_TYPE_TO_GENERIC_COMP_CLS_PYCLS = { def _create_component_from_ptr(ptr, comp_cls_type): return _COMP_CLS_TYPE_TO_GENERIC_COMP_PYCLS[comp_cls_type]._create_from_ptr(ptr) + +# Same as the above, but acquire a new reference instead of stealing the +# reference from the caller. + +def _create_component_from_ptr_and_get_ref(ptr, comp_cls_type): + return _COMP_CLS_TYPE_TO_GENERIC_COMP_PYCLS[comp_cls_type]._create_from_ptr_and_get_ref(ptr) + + # Create a component class Python object of type # _GenericSourceComponentClass, _GenericFilterComponentClass or # _GenericSinkComponentClass, depending on comp_cls_type. @@ -620,6 +628,9 @@ class _UserComponent(metaclass=_UserComponentType): other_port_ptr, other_port_type) self._port_connected(port, other_port) + def _graph_is_configured_from_native(self): + self._graph_is_configured() + def _create_trace_class(self, env=None, uuid=None, assigns_automatic_stream_class_id=True): ptr = self._as_self_component_ptr(self._ptr) @@ -767,8 +778,8 @@ class _UserSinkComponent(_UserComponent, _SinkComponent): def _add_input_port(self, name): utils._check_str(name) fn = native_bt.self_component_sink_add_input_port - comp_status, priv_port_ptr = fn(self._ptr, name, None) + comp_status, self_port_ptr = fn(self._ptr, name, None) _handle_component_status(comp_status, 'cannot add input port to sink component object') - assert priv_port_ptr - return bt2.port._UserComponentInputPort._create_from_ptr(priv_port_ptr) + assert self_port_ptr + return bt2.port._UserComponentInputPort._create_from_ptr(self_port_ptr) diff --git a/bindings/python/bt2/bt2/graph.py b/bindings/python/bt2/bt2/graph.py index c630cf0a..998d310b 100644 --- a/bindings/python/bt2/bt2/graph.py +++ b/bindings/python/bt2/bt2/graph.py @@ -28,62 +28,28 @@ import bt2.port import bt2 -class GraphListenerType: - PORT_ADDED = 0 - PORT_REMOVED = 1 - PORTS_CONNECTED = 2 - PORTS_DISCONNECTED = 3 - - -def _graph_port_added_listener_from_native(user_listener, port_ptr): - try: - port = bt2.port._create_from_ptr(port_ptr) - port._get() - user_listener(port) - except: - pass - - -def _graph_port_removed_listener_from_native(user_listener, port_ptr): - try: - port = bt2.port._create_from_ptr(port_ptr) - port._get() - user_listener(port) - except: - pass +def _graph_port_added_listener_from_native(user_listener, + component_ptr, component_type, + port_ptr, port_type): + component = bt2.component._create_component_from_ptr_and_get_ref(component_ptr, component_type) + port = bt2.port._create_from_ptr_and_get_ref(port_ptr, port_type) + user_listener(component, port) def _graph_ports_connected_listener_from_native(user_listener, + upstream_component_ptr, upstream_component_type, upstream_port_ptr, + downstream_component_ptr, downstream_component_type, downstream_port_ptr): - try: - upstream_port = bt2.port._create_from_ptr(upstream_port_ptr) - upstream_port._get() - downstream_port = bt2.port._create_from_ptr(downstream_port_ptr) - downstream_port._get() - user_listener(upstream_port, downstream_port) - except: - pass - - -def _graph_ports_disconnected_listener_from_native(user_listener, - upstream_comp_ptr, - downstream_comp_ptr, - upstream_port_ptr, - downstream_port_ptr): - try: - upstream_comp = bt2.component._create_generic_component_from_ptr(upstream_comp_ptr) - upstream_comp._get() - downstream_comp = bt2.component._create_generic_component_from_ptr(downstream_comp_ptr) - downstream_comp._get() - upstream_port = bt2.port._create_from_ptr(upstream_port_ptr) - upstream_port._get() - downstream_port = bt2.port._create_from_ptr(downstream_port_ptr) - downstream_port._get() - user_listener(upstream_comp, downstream_comp, upstream_port, - downstream_port) - except: - pass + upstream_component = bt2.component._create_component_from_ptr_and_get_ref( + upstream_component_ptr, upstream_component_type) + upstream_port = bt2.port._create_from_ptr_and_get_ref( + upstream_port_ptr, native_bt.PORT_TYPE_OUTPUT) + downstream_component = bt2.component._create_component_from_ptr_and_get_ref( + downstream_component_ptr, downstream_component_type) + downstream_port = bt2.port._create_from_ptr_and_get_ref( + downstream_port_ptr, native_bt.PORT_TYPE_INPUT) + user_listener(upstream_component, upstream_port, downstream_component, downstream_port) class Graph(object._SharedObject): @@ -111,18 +77,30 @@ class Graph(object._SharedObject): raise bt2.Error(gen_error_msg) def add_component(self, component_class, name, params=None): - if issubclass(component_class, bt2.component._UserSourceComponent): - cc_ptr = component_class._cc_ptr + if isinstance(component_class, bt2.component._GenericSourceComponentClass): + cc_ptr = component_class._ptr add_fn = native_bt.graph_add_source_component cc_type = native_bt.COMPONENT_CLASS_TYPE_SOURCE - elif issubclass(component_class, bt2.component._UserFilterComponent): - cc_ptr = component_class._cc_ptr + elif isinstance(component_class, bt2.component._GenericFilterComponentClass): + cc_ptr = component_class._ptr add_fn = native_bt.graph_add_filter_component cc_type = native_bt.COMPONENT_CLASS_TYPE_FILTER + elif isinstance(component_class, bt2.component._GenericSinkComponentClass): + cc_ptr = component_class._ptr + add_fn = native_bt.graph_add_sink_component + cc_type = native_bt.COMPONENT_CLASS_TYPE_SINK + elif issubclass(component_class, bt2.component._UserSourceComponent): + cc_ptr = component_class._cc_ptr + add_fn = native_bt.graph_add_source_component + cc_type = native_bt.COMPONENT_CLASS_TYPE_SOURCE elif issubclass(component_class, bt2.component._UserSinkComponent): cc_ptr = component_class._cc_ptr add_fn = native_bt.graph_add_sink_component cc_type = native_bt.COMPONENT_CLASS_TYPE_SINK + elif issubclass(component_class, bt2.component._UserFilterComponent): + cc_ptr = component_class._cc_ptr + add_fn = native_bt.graph_add_filter_component + cc_type = native_bt.COMPONENT_CLASS_TYPE_FILTER else: raise TypeError("'{}' is not a component class".format( component_class.__class__.__name__)) @@ -147,32 +125,31 @@ class Graph(object._SharedObject): assert(conn_ptr) return bt2.connection._Connection._create_from_ptr(conn_ptr) - def add_listener(self, listener_type, listener): - if not hasattr(listener, '__call__'): + def add_port_added_listener(self, listener): + if not callable(listener): raise TypeError("'listener' parameter is not callable") - if listener_type == GraphListenerType.PORT_ADDED: - fn = native_bt.py3_graph_add_port_added_listener - listener_from_native = functools.partial(_graph_port_added_listener_from_native, - listener) - elif listener_type == GraphListenerType.PORT_REMOVED: - fn = native_bt.py3_graph_add_port_removed_listener - listener_from_native = functools.partial(_graph_port_removed_listener_from_native, - listener) - elif listener_type == GraphListenerType.PORTS_CONNECTED: - fn = native_bt.py3_graph_add_ports_connected_listener - listener_from_native = functools.partial(_graph_ports_connected_listener_from_native, - listener) - elif listener_type == GraphListenerType.PORTS_DISCONNECTED: - fn = native_bt.py3_graph_add_ports_disconnected_listener - listener_from_native = functools.partial(_graph_ports_disconnected_listener_from_native, - listener) - else: - raise TypeError + fn = native_bt.py3_graph_add_port_added_listener + listener_from_native = functools.partial(_graph_port_added_listener_from_native, + listener) - listener_id = fn(self._ptr, listener_from_native) - utils._handle_ret(listener_id, 'cannot add listener to graph object') - return bt2._ListenerHandle(listener_id, self) + listener_ids = fn(self._ptr, listener_from_native) + if listener_ids is None: + utils._raise_bt2_error('cannot add listener to graph object') + return bt2._ListenerHandle(listener_ids, self) + + def add_ports_connected_listener(self, listener): + if not callable(listener): + raise TypeError("'listener' parameter is not callable") + + fn = native_bt.py3_graph_add_ports_connected_listener + listener_from_native = functools.partial(_graph_ports_connected_listener_from_native, + listener) + + listener_ids = fn(self._ptr, listener_from_native) + if listener_ids is None: + utils._raise_bt2_error('cannot add listener to graph object') + return bt2._ListenerHandle(listener_ids, self) def run(self): status = native_bt.graph_run(self._ptr) @@ -200,9 +177,3 @@ class Graph(object._SharedObject): raise bt2.CreationError('cannot create output port message iterator') return bt2.message_iterator._OutputPortMessageIterator(msg_iter_ptr) - - def __eq__(self, other): - if type(other) is not type(self): - return False - - return self.addr == other.addr diff --git a/bindings/python/bt2/bt2/message.py b/bindings/python/bt2/bt2/message.py index 0baf1a71..6a48f93a 100644 --- a/bindings/python/bt2/bt2/message.py +++ b/bindings/python/bt2/bt2/message.py @@ -145,18 +145,9 @@ class _PacketBeginningMessage(_CopyableMessage): return PacketBeginningMessage(self.packet) -class PacketEndMessage(_CopyableMessage): +class _PacketEndMessage(_CopyableMessage): _TYPE = native_bt.MESSAGE_TYPE_PACKET_END - def __init__(self, packet): - utils._check_type(packet, bt2.packet._Packet) - ptr = native_bt.message_packet_end_create(packet._ptr) - - if ptr is None: - raise bt2.CreationError('cannot create packet end message object') - - super().__init__(ptr) - @property def packet(self): packet_ptr = native_bt.message_packet_end_get_packet(self._ptr) @@ -206,18 +197,9 @@ class _StreamBeginningMessage(_CopyableMessage): return StreamBeginningMessage(self.stream) -class StreamEndMessage(_CopyableMessage): +class _StreamEndMessage(_CopyableMessage): _TYPE = native_bt.MESSAGE_TYPE_STREAM_END - def __init__(self, stream): - utils._check_type(stream, bt2.stream._Stream) - ptr = native_bt.message_stream_end_create(stream._ptr) - - if ptr is None: - raise bt2.CreationError('cannot create stream end message object') - - super().__init__(ptr) - @property def stream(self): stream_ptr = native_bt.message_stream_end_get_stream(self._ptr) @@ -473,9 +455,9 @@ class _DiscardedEventsMessage(_DiscardedElementsMessage): _MESSAGE_TYPE_TO_CLS = { native_bt.MESSAGE_TYPE_EVENT: _EventMessage, native_bt.MESSAGE_TYPE_PACKET_BEGINNING: _PacketBeginningMessage, - native_bt.MESSAGE_TYPE_PACKET_END: PacketEndMessage, + native_bt.MESSAGE_TYPE_PACKET_END: _PacketEndMessage, native_bt.MESSAGE_TYPE_STREAM_BEGINNING: _StreamBeginningMessage, - native_bt.MESSAGE_TYPE_STREAM_END: StreamEndMessage, + native_bt.MESSAGE_TYPE_STREAM_END: _StreamEndMessage, native_bt.MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY: InactivityMessage, native_bt.MESSAGE_TYPE_DISCARDED_PACKETS: _DiscardedPacketsMessage, native_bt.MESSAGE_TYPE_DISCARDED_EVENTS: _DiscardedEventsMessage, diff --git a/bindings/python/bt2/bt2/message_iterator.py b/bindings/python/bt2/bt2/message_iterator.py index c9176efd..781c876d 100644 --- a/bindings/python/bt2/bt2/message_iterator.py +++ b/bindings/python/bt2/bt2/message_iterator.py @@ -60,7 +60,9 @@ class _GenericMessageIterator(object._SharedObject, _MessageIterator): return bt2.message._create_from_ptr(msg_ptr) -class _PrivateConnectionMessageIterator(_GenericMessageIterator): +class _UserComponentInputPortMessageIterator(_GenericMessageIterator): + _get_msg_range = staticmethod(native_bt.py3_self_component_port_input_get_msg_range) + @property def component(self): comp_ptr = native_bt.private_connection_message_iterator_get_component(self._ptr) @@ -163,6 +165,15 @@ class _UserMessageIterator(_MessageIterator): return bt2.message._StreamBeginningMessage(ptr) + def _create_stream_end_message(self, stream): + utils._check_type(stream, bt2.stream._Stream) + + ptr = native_bt.message_stream_end_create(self._ptr, stream._ptr) + if ptr is None: + raise bt2.CreationError('cannot create stream end message object') + + return bt2.message._StreamEndMessage(ptr) + def _create_packet_beginning_message(self, packet, default_clock_snapshot=None): utils._check_type(packet, bt2.packet._Packet) @@ -183,3 +194,19 @@ class _UserMessageIterator(_MessageIterator): raise bt2.CreationError('cannot create packet beginning message object') return bt2.message._PacketBeginningMessage(ptr) + + def _create_packet_end_message(self, packet, default_clock_snapshot=None): + utils._check_type(packet, bt2.packet._Packet) + self._validate_default_clock_snapshot(packet.stream.stream_class, default_clock_snapshot) + + if default_clock_snapshot is not None: + utils._check_uint64(default_clock_snapshot) + ptr = native_bt.message_packet_end_create_with_default_clock_snapshot( + self._ptr, packet._ptr, default_clock_snapshot) + else: + ptr = native_bt.message_packet_end_create(self._ptr, packet._ptr) + + if ptr is None: + raise bt2.CreationError('cannot create packet end message object') + + return bt2.message._PacketEndMessage(ptr) diff --git a/bindings/python/bt2/bt2/native_bt_graph.i b/bindings/python/bt2/bt2/native_bt_graph.i index bf6b5cdd..bb7a5d13 100644 --- a/bindings/python/bt2/bt2/native_bt_graph.i +++ b/bindings/python/bt2/bt2/native_bt_graph.i @@ -302,17 +302,19 @@ port_added_listener( PyObject *py_component_ptr = NULL; PyObject *py_port_ptr = NULL; PyObject *py_res = NULL; - bt_graph_listener_status status = BT_GRAPH_LISTENER_STATUS_OK; + bt_graph_listener_status status; py_component_ptr = SWIG_NewPointerObj(SWIG_as_voidptr(component), component_swig_type, 0); if (!py_component_ptr) { BT_LOGF_STR("Failed to create component SWIG pointer object."); + status = BT_GRAPH_LISTENER_STATUS_NOMEM; goto end; } py_port_ptr = SWIG_NewPointerObj(SWIG_as_voidptr(port), port_swig_type, 0); if (!py_port_ptr) { BT_LOGF_STR("Failed to create port SWIG pointer object."); + status = BT_GRAPH_LISTENER_STATUS_NOMEM; goto end; } @@ -321,9 +323,12 @@ port_added_listener( if (!py_res) { bt2_py_loge_exception(); PyErr_Clear(); - } else { - BT_ASSERT(py_res == Py_None); + status = BT_GRAPH_LISTENER_STATUS_ERROR; + goto end; } + + BT_ASSERT(py_res == Py_None); + status = BT_GRAPH_LISTENER_STATUS_OK; end: Py_XDECREF(py_res); @@ -474,40 +479,76 @@ end: } static bt_graph_listener_status -ports_connected_listener(const bt_port_output *upstream_port, - const bt_port_input *downstream_port, - void *py_callable) +ports_connected_listener( + const void *upstream_component, + swig_type_info *upstream_component_swig_type, + bt_component_class_type upstream_component_class_type, + const bt_port_output *upstream_port, + const void *downstream_component, + swig_type_info *downstream_component_swig_type, + bt_component_class_type downstream_component_class_type, + const bt_port_input *downstream_port, + void *py_callable) { + PyObject *py_upstream_component_ptr = NULL; PyObject *py_upstream_port_ptr = NULL; + PyObject *py_downstream_component_ptr = NULL; PyObject *py_downstream_port_ptr = NULL; PyObject *py_res = NULL; - bt_graph_listener_status status = BT_GRAPH_LISTENER_STATUS_OK; + bt_graph_listener_status status; + + py_upstream_component_ptr = SWIG_NewPointerObj(SWIG_as_voidptr(upstream_component), + upstream_component_swig_type, 0); + if (!py_upstream_component_ptr) { + BT_LOGF_STR("Failed to create upstream component SWIG pointer object."); + status = BT_GRAPH_LISTENER_STATUS_NOMEM; + goto end; + } py_upstream_port_ptr = SWIG_NewPointerObj( SWIG_as_voidptr(upstream_port), SWIGTYPE_p_bt_port_output, 0); if (!py_upstream_port_ptr) { - BT_LOGF_STR("Failed to create a SWIG pointer object."); - abort(); + BT_LOGF_STR("Failed to create upstream port SWIG pointer object."); + status = BT_GRAPH_LISTENER_STATUS_NOMEM; + goto end; + } + + py_downstream_component_ptr = SWIG_NewPointerObj(SWIG_as_voidptr(downstream_component), + downstream_component_swig_type, 0); + if (!py_downstream_component_ptr) { + BT_LOGF_STR("Failed to create downstream component SWIG pointer object."); + status = BT_GRAPH_LISTENER_STATUS_NOMEM; + goto end; } py_downstream_port_ptr = SWIG_NewPointerObj( SWIG_as_voidptr(downstream_port), SWIGTYPE_p_bt_port_input, 0); if (!py_downstream_port_ptr) { - BT_LOGF_STR("Failed to create a SWIG pointer object."); - abort(); + BT_LOGF_STR("Failed to create downstream port SWIG pointer object."); + status = BT_GRAPH_LISTENER_STATUS_NOMEM; + goto end; } - py_res = PyObject_CallFunction(py_callable, "(OO)", - py_upstream_port_ptr, py_downstream_port_ptr); + py_res = PyObject_CallFunction(py_callable, "(OiOOiO)", + py_upstream_component_ptr, upstream_component_class_type, + py_upstream_port_ptr, + py_downstream_component_ptr, downstream_component_class_type, + py_downstream_port_ptr); if (!py_res) { bt2_py_loge_exception(); PyErr_Clear(); - } else { - BT_ASSERT(py_res == Py_None); + status = BT_GRAPH_LISTENER_STATUS_ERROR; + goto end; } + + BT_ASSERT(py_res == Py_None); + status = BT_GRAPH_LISTENER_STATUS_OK; - Py_DECREF(py_upstream_port_ptr); - Py_DECREF(py_downstream_port_ptr); +end: + Py_XDECREF(py_upstream_component_ptr); + Py_XDECREF(py_upstream_port_ptr); + Py_XDECREF(py_downstream_component_ptr); + Py_XDECREF(py_downstream_port_ptr); Py_XDECREF(py_res); return status; @@ -520,8 +561,12 @@ source_filter_component_ports_connected_listener( const bt_port_output *upstream_port, const bt_port_input *downstream_port, void *py_callable) { - return ports_connected_listener(upstream_port, downstream_port, - py_callable); + return ports_connected_listener( + source_component, SWIGTYPE_p_bt_component_source, BT_COMPONENT_CLASS_TYPE_SOURCE, + upstream_port, + filter_component, SWIGTYPE_p_bt_component_filter, BT_COMPONENT_CLASS_TYPE_FILTER, + downstream_port, + py_callable); } static bt_graph_listener_status @@ -531,8 +576,12 @@ source_sink_component_ports_connected_listener( const bt_port_output *upstream_port, const bt_port_input *downstream_port, void *py_callable) { - return ports_connected_listener(upstream_port, downstream_port, - py_callable); + return ports_connected_listener( + source_component, SWIGTYPE_p_bt_component_source, BT_COMPONENT_CLASS_TYPE_SOURCE, + upstream_port, + sink_component, SWIGTYPE_p_bt_component_sink, BT_COMPONENT_CLASS_TYPE_SINK, + downstream_port, + py_callable); } static bt_graph_listener_status @@ -542,8 +591,12 @@ filter_filter_component_ports_connected_listener( const bt_port_output *upstream_port, const bt_port_input *downstream_port, void *py_callable) { - return ports_connected_listener(upstream_port, downstream_port, - py_callable); + return ports_connected_listener( + filter_component_left, SWIGTYPE_p_bt_component_filter, BT_COMPONENT_CLASS_TYPE_FILTER, + upstream_port, + filter_component_right, SWIGTYPE_p_bt_component_filter, BT_COMPONENT_CLASS_TYPE_FILTER, + downstream_port, + py_callable); } static bt_graph_listener_status @@ -553,11 +606,14 @@ filter_sink_component_ports_connected_listener( const bt_port_output *upstream_port, const bt_port_input *downstream_port, void *py_callable) { - return ports_connected_listener(upstream_port, downstream_port, - py_callable); + return ports_connected_listener( + filter_component, SWIGTYPE_p_bt_component_filter, BT_COMPONENT_CLASS_TYPE_FILTER, + upstream_port, + sink_component, SWIGTYPE_p_bt_component_sink, BT_COMPONENT_CLASS_TYPE_SINK, + downstream_port, + py_callable); } - static PyObject* bt_py3_graph_add_ports_connected_listener(struct bt_graph *graph, PyObject *py_callable) diff --git a/bindings/python/bt2/bt2/port.py b/bindings/python/bt2/bt2/port.py index 8ec04657..a84edf56 100644 --- a/bindings/python/bt2/bt2/port.py +++ b/bindings/python/bt2/bt2/port.py @@ -107,6 +107,13 @@ class _UserComponentPort(_Port): class _UserComponentInputPort(_UserComponentPort, _InputPort): _as_self_port_ptr = staticmethod(native_bt.self_component_port_input_as_self_component_port) + def create_message_iterator(self): + msg_iter_ptr = native_bt.self_component_port_input_message_iterator_create(self._ptr) + if msg_iter_ptr is None: + raise bt2.CreationError('cannot create message iterator object') + + return bt2.message_iterator._UserComponentInputPortMessageIterator(msg_iter_ptr) + class _UserComponentOutputPort(_UserComponentPort, _OutputPort): _as_self_port_ptr = staticmethod(native_bt.self_component_port_output_as_self_component_port) diff --git a/tests/bindings/python/bt2/test_graph.py b/tests/bindings/python/bt2/test_graph.py index 73788b48..8ed585a6 100644 --- a/tests/bindings/python/bt2/test_graph.py +++ b/tests/bindings/python/bt2/test_graph.py @@ -5,7 +5,32 @@ import copy import bt2 -@unittest.skip("this is broken") +class _MyIter(bt2._UserMessageIterator): + def __init__(self): + self._build_meta() + self._at = 0 + + def _build_meta(self): + self._tc = self._component._create_trace_class() + self._t = self._tc() + self._sc = self._tc.create_stream_class() + self._ec = self._sc.create_event_class(name='salut') + self._my_int_ft = self._tc.create_signed_integer_field_class(32) + payload_ft = self._tc.create_structure_field_class() + payload_ft += collections.OrderedDict([ + ('my_int', self._my_int_ft), + ]) + self._ec.payload_field_type = payload_ft + self._stream = self._t.create_stream(self._sc) + self._packet = self._stream.create_packet() + + def _create_event(self, value): + ev = self._ec() + ev.payload_field['my_int'] = value + ev.packet = self._packet + return ev + + class GraphTestCase(unittest.TestCase): def setUp(self): self._graph = bt2.Graph() @@ -30,7 +55,7 @@ class GraphTestCase(unittest.TestCase): pass comp = self._graph.add_component(MySink, 'salut') - assert(comp) + assert comp comp2 = self._graph.add_component(comp.component_class, 'salut2') self.assertEqual(comp2.name, 'salut2') @@ -73,12 +98,13 @@ class GraphTestCase(unittest.TestCase): src = self._graph.add_component(MySource, 'src') sink = self._graph.add_component(MySink, 'sink') + conn = self._graph.connect_ports(src.output_ports['out'], sink.input_ports['in']) self.assertTrue(src.output_ports['out'].is_connected) self.assertTrue(sink.input_ports['in'].is_connected) - self.assertEqual(src.output_ports['out'].connection, conn) - self.assertEqual(sink.input_ports['in'].connection, conn) + self.assertEqual(src.output_ports['out'].connection._ptr, conn._ptr) + self.assertEqual(sink.input_ports['in'].connection._ptr, conn._ptr) def test_connect_ports_invalid_direction(self): class MyIter(bt2._UserMessageIterator): @@ -131,69 +157,17 @@ class GraphTestCase(unittest.TestCase): conn = self._graph.connect_ports(src.output_ports['out'], sink.input_ports['in']) - def test_connect_ports_canceled(self): - class MyIter(bt2._UserMessageIterator): - def __next__(self): - raise bt2.Stop - - class MySource(bt2._UserSourceComponent, - message_iterator_class=MyIter): - def __init__(self, params): - self._add_output_port('out') - - class MySink(bt2._UserSinkComponent): - def __init__(self, params): - self._add_input_port('in') - - def _consume(self): - raise bt2.Stop - - src = self._graph.add_component(MySource, 'src') - sink = self._graph.add_component(MySink, 'sink') + def test_cancel(self): + self.assertFalse(self._graph.is_canceled) self._graph.cancel() + self.assertTrue(self._graph.is_canceled) - with self.assertRaises(bt2.GraphCanceled): - conn = self._graph.connect_ports(src.output_ports['out'], - sink.input_ports['in']) - - def test_connect_ports_cannot_consume_accept(self): - class MyIter(bt2._UserMessageIterator): - def __next__(self): - raise bt2.Stop - - class MySource(bt2._UserSourceComponent, - message_iterator_class=MyIter): - def __init__(self, params): - self._add_output_port('out') - - class MySink(bt2._UserSinkComponent): - def __init__(self, params): - self._add_input_port('in') - - def _consume(self): - raise bt2.Stop - - def _accept_port_connection(self, port, other_port): - nonlocal exc - - try: - self.graph.run() - except Exception as e: - exc = e - - return True - - exc = None - src = self._graph.add_component(MySource, 'src') - sink = self._graph.add_component(MySink, 'sink') - self._graph.connect_ports(src.output_ports['out'], - sink.input_ports['in']) - self.assertIs(type(exc), bt2.CannotConsumeGraph) - - def test_connect_ports_cannot_consume_connected(self): - class MyIter(bt2._UserMessageIterator): + # Test that Graph.run() raises bt2.GraphCanceled if the graph gets canceled + # during execution. + def test_cancel_while_running(self): + class MyIter(_MyIter): def __next__(self): - raise bt2.Stop + return self._create_stream_beginning_message(self._stream) class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter): @@ -205,62 +179,39 @@ class GraphTestCase(unittest.TestCase): self._add_input_port('in') def _consume(self): - raise bt2.Stop - - def _port_connected(self, port, other_port): - nonlocal exc + # Pretend that somebody asynchronously cancelled the graph. + nonlocal graph + graph.cancel() - try: - self.graph.run() - except Exception as e: - exc = e + return next(self._msg_iter) - return True + def _graph_is_configured(self): + self._msg_iter = self._input_ports['in'].create_message_iterator() - exc = None - src = self._graph.add_component(MySource, 'src') - sink = self._graph.add_component(MySink, 'sink') - self._graph.connect_ports(src.output_ports['out'], - sink.input_ports['in']) - self._graph.run() - self.assertIs(type(exc), bt2.CannotConsumeGraph) - - def test_cancel(self): - self.assertFalse(self._graph.is_canceled) - self._graph.cancel() - self.assertTrue(self._graph.is_canceled) + graph = bt2.Graph() + up = graph.add_component(MySource, 'down') + down = graph.add_component(MySink, 'up') + graph.connect_ports(up.output_ports['out'], down.input_ports['in']) + with self.assertRaises(bt2.GraphCanceled): + graph.run() def test_run(self): - class MyIter(bt2._UserMessageIterator): - def __init__(self): - self._build_meta() - self._at = 0 - - def _build_meta(self): - self._trace = bt2.Trace() - self._sc = bt2.StreamClass() - self._ec = bt2.EventClass('salut') - self._my_int_fc = bt2.IntegerFieldClass(32) - self._ec.payload_field_class = bt2.StructureFieldClass() - self._ec.payload_field_class += collections.OrderedDict([ - ('my_int', self._my_int_fc), - ]) - self._sc.add_event_class(self._ec) - self._trace.add_stream_class(self._sc) - self._stream = self._sc() - self._packet = self._stream.create_packet() - - def _create_event(self, value): - ev = self._ec() - ev.payload_field['my_int'] = value - ev.packet = self._packet - return ev - + class MyIter(_MyIter): def __next__(self): - if self._at == 5: - raise bt2.Stop + if self._at == 9: + raise StopIteration + + if self._at == 0: + msg = self._create_stream_beginning_message(self._stream) + elif self._at == 1: + msg = self._create_packet_beginning_message(self._packet) + elif self._at == 7: + msg = self._create_packet_end_message(self._packet) + elif self._at == 8: + msg = self._create_stream_end_message(self._stream) + else: + msg = self._create_event_message(self._ec, self._packet) - msg = bt2.EventMessage(self._create_event(self._at * 3)) self._at += 1 return msg @@ -271,30 +222,28 @@ class GraphTestCase(unittest.TestCase): class MySink(bt2._UserSinkComponent): def __init__(self, params): - self._add_input_port('in') + self._input_port = self._add_input_port('in') self._at = 0 def _consume(comp_self): msg = next(comp_self._msg_iter) if comp_self._at == 0: - self.assertIsInstance(msg, bt2.StreamBeginningMessage) + self.assertIsInstance(msg, bt2.message._StreamBeginningMessage) elif comp_self._at == 1: - self.assertIsInstance(msg, bt2.PacketBeginningMessage) + self.assertIsInstance(msg, bt2.message._PacketBeginningMessage) elif comp_self._at >= 2 and comp_self._at <= 6: - self.assertIsInstance(msg, bt2.EventMessage) + self.assertIsInstance(msg, bt2.message._EventMessage) self.assertEqual(msg.event.event_class.name, 'salut') - field = msg.event.payload_field['my_int'] - self.assertEqual(field, (comp_self._at - 2) * 3) elif comp_self._at == 7: - self.assertIsInstance(msg, bt2.PacketEndMessage) + self.assertIsInstance(msg, bt2.message._PacketEndMessage) elif comp_self._at == 8: - self.assertIsInstance(msg, bt2.StreamEndMessage) + self.assertIsInstance(msg, bt2.message._StreamEndMessage) comp_self._at += 1 - def _port_connected(self, port, other_port): - self._msg_iter = port.connection.create_message_iterator() + def _graph_is_configured(self): + self._msg_iter = self._input_port.create_message_iterator() src = self._graph.add_component(MySource, 'src') sink = self._graph.add_component(MySink, 'sink') @@ -303,36 +252,18 @@ class GraphTestCase(unittest.TestCase): self._graph.run() def test_run_again(self): - class MyIter(bt2._UserMessageIterator): - def __init__(self): - self._build_meta() - self._at = 0 - - def _build_meta(self): - self._trace = bt2.Trace() - self._sc = bt2.StreamClass() - self._ec = bt2.EventClass('salut') - self._my_int_fc = bt2.IntegerFieldClass(32) - self._ec.payload_field_class = bt2.StructureFieldClass() - self._ec.payload_field_class += collections.OrderedDict([ - ('my_int', self._my_int_fc), - ]) - self._sc.add_event_class(self._ec) - self._trace.add_stream_class(self._sc) - self._stream = self._sc() - self._packet = self._stream.create_packet() - - def _create_event(self, value): - ev = self._ec() - ev.payload_field['my_int'] = value - ev.packet = self._packet - return ev - + class MyIter(_MyIter): def __next__(self): - if self._at == 1: + if self._at == 3: raise bt2.TryAgain - msg = bt2.EventMessage(self._create_event(self._at * 3)) + if self._at == 0: + msg = self._create_stream_beginning_message(self._stream) + elif self._at == 1: + msg = self._create_packet_beginning_message(self._packet) + elif self._at == 2: + msg = self._create_event_message(self._ec, self._packet) + self._at += 1 return msg @@ -343,24 +274,25 @@ class GraphTestCase(unittest.TestCase): class MySink(bt2._UserSinkComponent): def __init__(self, params): - self._add_input_port('in') + self._input_port = self._add_input_port('in') self._at = 0 def _consume(comp_self): + msg = next(comp_self._msg_iter) if comp_self._at == 0: - msg = next(comp_self._msg_iter) - self.assertIsInstance(msg, bt2.EventMessage) + self.assertIsInstance(msg, bt2.message._StreamBeginningMessage) elif comp_self._at == 1: - with self.assertRaises(bt2.TryAgain): - msg = next(comp_self._msg_iter) - + self.assertIsInstance(msg, bt2.message._PacketBeginningMessage) + elif comp_self._at == 2: + self.assertIsInstance(msg, bt2.message._EventMessage) raise bt2.TryAgain + else: + pass comp_self._at += 1 - def _port_connected(self, port, other_port): - types = [bt2.EventMessage] - self._msg_iter = port.connection.create_message_iterator(types) + def _graph_is_configured(self): + self._msg_iter = self._input_port.create_message_iterator() src = self._graph.add_component(MySource, 'src') sink = self._graph.add_component(MySink, 'sink') @@ -371,36 +303,23 @@ class GraphTestCase(unittest.TestCase): self._graph.run() def test_run_error(self): - class MyIter(bt2._UserMessageIterator): - def __init__(self): - self._build_meta() - self._at = 0 - - def _build_meta(self): - self._trace = bt2.Trace() - self._sc = bt2.StreamClass() - self._ec = bt2.EventClass('salut') - self._my_int_fc = bt2.IntegerFieldClass(32) - self._ec.payload_field_class = bt2.StructureFieldClass() - self._ec.payload_field_class += collections.OrderedDict([ - ('my_int', self._my_int_fc), - ]) - self._sc.add_event_class(self._ec) - self._trace.add_stream_class(self._sc) - self._stream = self._sc() - self._packet = self._stream.create_packet() - - def _create_event(self, value): - ev = self._ec() - ev.payload_field['my_int'] = value - ev.packet = self._packet - return ev + raised_in_sink = False + class MyIter(_MyIter): def __next__(self): - if self._at == 1: + # If this gets called after the sink raised an exception, it is + # an error. + nonlocal raised_in_sink + assert raised_in_sink is False + + if self._at == 0: + msg = self._create_stream_beginning_message(self._stream) + elif self._at == 1: + msg = self._create_packet_beginning_message(self._packet) + elif self._at == 2 or self._at == 3: + msg = self._create_event_message(self._ec, self._packet) + else: raise bt2.TryAgain - - msg = bt2.EventMessage(self._create_event(self._at * 3)) self._at += 1 return msg @@ -411,21 +330,26 @@ class GraphTestCase(unittest.TestCase): class MySink(bt2._UserSinkComponent): def __init__(self, params): - self._add_input_port('in') + self._input_port = self._add_input_port('in') self._at = 0 def _consume(comp_self): + msg = next(comp_self._msg_iter) if comp_self._at == 0: - msg = next(comp_self._msg_iter) - self.assertIsInstance(msg, bt2.EventMessage) + self.assertIsInstance(msg, bt2.message._StreamBeginningMessage) elif comp_self._at == 1: + self.assertIsInstance(msg, bt2.message._PacketBeginningMessage) + elif comp_self._at == 2: + self.assertIsInstance(msg, bt2.message._EventMessage) + elif comp_self._at == 3: + nonlocal raised_in_sink + raised_in_sink = True raise RuntimeError('error!') comp_self._at += 1 - def _port_connected(self, port, other_port): - types = [bt2.EventMessage] - self._msg_iter = port.connection.create_message_iterator(types) + def _graph_is_configured(self): + self._msg_iter = self._input_port.create_message_iterator() src = self._graph.add_component(MySource, 'src') sink = self._graph.add_component(MySink, 'sink') @@ -435,41 +359,71 @@ class GraphTestCase(unittest.TestCase): with self.assertRaises(bt2.Error): self._graph.run() - def test_run_cannot_consume(self): + def test_listeners(self): class MyIter(bt2._UserMessageIterator): - pass + def __next__(self): + raise bt2.Stop class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter): def __init__(self, params): self._add_output_port('out') + self._add_output_port('zero') class MySink(bt2._UserSinkComponent): def __init__(self, params): self._add_input_port('in') - self._at = 0 - def _consume(comp_self): - nonlocal exc + def _consume(self): + raise bt2.Stop - try: - print('going in') - comp_self.graph.run() - print('going out') - except Exception as e: - exc = e + def _port_connected(self, port, other_port): + self._add_input_port('taste') - raise bt2.Stop + def port_added_listener(component, port): + nonlocal calls + calls.append((port_added_listener, component, port)) - exc = None + def ports_connected_listener(upstream_component, upstream_port, + downstream_component, downstream_port): + nonlocal calls + calls.append((ports_connected_listener, + upstream_component, upstream_port, + downstream_component, downstream_port)) + + calls = [] + self._graph.add_port_added_listener(port_added_listener) + self._graph.add_ports_connected_listener(ports_connected_listener) src = self._graph.add_component(MySource, 'src') sink = self._graph.add_component(MySink, 'sink') - conn = self._graph.connect_ports(src.output_ports['out'], - sink.input_ports['in']) - self._graph.run() - self.assertIs(type(exc), bt2.CannotConsumeGraph) + self._graph.connect_ports(src.output_ports['out'], + sink.input_ports['in']) - def test_listeners(self): + self.assertEqual(len(calls), 5) + + self.assertIs(calls[0][0], port_added_listener) + self.assertEqual(calls[0][1].name, 'src') + self.assertEqual(calls[0][2].name, 'out') + + self.assertIs(calls[1][0], port_added_listener) + self.assertEqual(calls[1][1].name, 'src') + self.assertEqual(calls[1][2].name, 'zero') + + self.assertIs(calls[2][0], port_added_listener) + self.assertEqual(calls[2][1].name, 'sink') + self.assertEqual(calls[2][2].name, 'in') + + self.assertIs(calls[3][0], port_added_listener) + self.assertEqual(calls[3][1].name, 'sink') + self.assertEqual(calls[3][2].name, 'taste') + + self.assertIs(calls[4][0], ports_connected_listener) + self.assertEqual(calls[4][1].name, 'src') + self.assertEqual(calls[4][2].name, 'out') + self.assertEqual(calls[4][3].name, 'sink') + self.assertEqual(calls[4][4].name, 'in') + + def test_invalid_listeners(self): class MyIter(bt2._UserMessageIterator): def __next__(self): raise bt2.Stop @@ -480,9 +434,6 @@ class GraphTestCase(unittest.TestCase): self._add_output_port('out') self._add_output_port('zero') - def _port_connected(self, port, other_port): - self._output_ports['zero'].remove_from_component() - class MySink(bt2._UserSinkComponent): def __init__(self, params): self._add_input_port('in') @@ -493,60 +444,65 @@ class GraphTestCase(unittest.TestCase): def _port_connected(self, port, other_port): self._add_input_port('taste') - def _port_disconnected(self, port): - port.remove_from_component() + with self.assertRaises(TypeError): + self._graph.add_port_added_listener(1234) + with self.assertRaises(TypeError): + self._graph.add_ports_connected_listener(1234) - def port_added_listener(port): - nonlocal calls - calls.append((port_added_listener, port)) + def test_raise_in_component_init(self): + class MySink(bt2._UserSinkComponent): + def __init__(self, params): + raise ValueError('oops!') - def port_removed_listener(port): - nonlocal calls - calls.append((port_removed_listener, port)) + def _consume(self): + raise bt2.Stop + + graph = bt2.Graph() + + with self.assertRaises(bt2.Error): + graph.add_component(MySink, 'comp') + + def test_raise_in_port_added_listener(self): + class MySink(bt2._UserSinkComponent): + def __init__(self, params): + self._add_input_port('in') + + def _consume(self): + raise bt2.Stop + + def port_added_listener(component, port): + raise ValueError('oh noes!') + + graph = bt2.Graph() + graph.add_port_added_listener(port_added_listener) + + with self.assertRaises(bt2.Error): + graph.add_component(MySink, 'comp') + + def test_raise_in_ports_connected_listener(self): + class MyIter(bt2._UserMessageIterator): + def __next__(self): + raise bt2.Stop + + class MySource(bt2._UserSourceComponent, + message_iterator_class=MyIter): + def __init__(self, params): + self._add_output_port('out') + + class MySink(bt2._UserSinkComponent): + def __init__(self, params): + self._add_input_port('in') + + def _consume(self): + raise bt2.Stop def ports_connected_listener(upstream_port, downstream_port): - nonlocal calls - calls.append((ports_connected_listener, upstream_port, - downstream_port)) + raise ValueError('oh noes!') - def ports_disconnected_listener(upstream_comp, downstream_comp, - upstream_port, downstream_port): - nonlocal calls - calls.append((ports_disconnected_listener, upstream_comp, - downstream_comp, upstream_port, downstream_port)) + graph = bt2.Graph() + graph.add_ports_connected_listener(ports_connected_listener) + up = graph.add_component(MySource, 'down') + down = graph.add_component(MySink, 'up') - calls = [] - self._graph.add_listener(bt2.GraphListenerType.PORT_ADDED, - port_added_listener) - self._graph.add_listener(bt2.GraphListenerType.PORT_REMOVED, - port_removed_listener) - self._graph.add_listener(bt2.GraphListenerType.PORTS_CONNECTED, - ports_connected_listener) - self._graph.add_listener(bt2.GraphListenerType.PORTS_DISCONNECTED, - ports_disconnected_listener) - src = self._graph.add_component(MySource, 'src') - sink = self._graph.add_component(MySink, 'sink') - self._graph.connect_ports(src.output_ports['out'], - sink.input_ports['in']) - sink.input_ports['in'].disconnect() - self.assertIs(calls[0][0], port_added_listener) - self.assertEqual(calls[0][1].name, 'out') - self.assertIs(calls[1][0], port_added_listener) - self.assertEqual(calls[1][1].name, 'zero') - self.assertIs(calls[2][0], port_added_listener) - self.assertEqual(calls[2][1].name, 'in') - self.assertIs(calls[3][0], port_removed_listener) - self.assertEqual(calls[3][1].name, 'zero') - self.assertIs(calls[4][0], port_added_listener) - self.assertEqual(calls[4][1].name, 'taste') - self.assertIs(calls[5][0], ports_connected_listener) - self.assertEqual(calls[5][1].name, 'out') - self.assertEqual(calls[5][2].name, 'in') - self.assertIs(calls[6][0], port_removed_listener) - self.assertEqual(calls[6][1].name, 'in') - self.assertIs(calls[7][0], ports_disconnected_listener) - self.assertEqual(calls[7][1].name, 'src') - self.assertEqual(calls[7][2].name, 'sink') - self.assertEqual(calls[7][3].name, 'out') - self.assertEqual(calls[7][4].name, 'in') - del calls + with self.assertRaises(bt2.Error): + graph.connect_ports(up.output_ports['out'], down.input_ports['in']) -- 2.34.1