)
utils._handle_func_status(status, 'cannot seek message iterator beginning')
+ def can_seek_ns_from_origin(self, ns_from_origin):
+ utils._check_int64(ns_from_origin)
+ status, res = native_bt.self_component_port_input_message_iterator_can_seek_ns_from_origin(
+ self._ptr, ns_from_origin
+ )
+ utils._handle_func_status(
+ status,
+ 'cannot check whether or not message iterator can seek given ns from origin',
+ )
+ return res != 0
+
+ def seek_ns_from_origin(self, ns_from_origin):
+ utils._check_int64(ns_from_origin)
+
+ # Forget about buffered messages, they won't be valid after seeking.
+ self._current_msgs.clear()
+ self._at = 0
+
+ status = native_bt.self_component_port_input_message_iterator_seek_ns_from_origin(
+ self._ptr, ns_from_origin
+ )
+ utils._handle_func_status(
+ status, 'message iterator cannot seek given ns from origin'
+ )
+
# This is extended by the user to implement component classes in Python. It
# is created for a given output port when an input port message iterator is
def _bt_seek_beginning_from_native(self):
self._user_seek_beginning()
+ def _bt_can_seek_ns_from_origin_from_native(self, ns_from_origin):
+ # Here, we mimic the behavior of the C API:
+ #
+ # - If the iterator has a _user_can_seek_ns_from_origin method,
+ # call it and use its return value.
+ # - Otherwise, if there is a `_user_seek_ns_from_origin` method,
+ # we presume it's possible.
+ # - Otherwise, check if we can seek to beginning (which allows us to
+ # seek to beginning and then fast forward - aka auto-seek).
+ if hasattr(self, '_user_can_seek_ns_from_origin'):
+ can_seek_ns_from_origin = self._user_can_seek_ns_from_origin(ns_from_origin)
+ utils._check_bool(can_seek_ns_from_origin)
+ return can_seek_ns_from_origin
+ elif hasattr(self, '_user_seek_ns_from_origin'):
+ return True
+ else:
+ return self._bt_can_seek_beginning_from_native()
+
+ def _bt_seek_ns_from_origin_from_native(self, ns_from_origin):
+ self._user_seek_ns_from_origin(ns_from_origin)
+
def _create_input_port_message_iterator(self, input_port):
utils._check_type(input_port, bt2_port._UserComponentInputPort)
return status;
}
+static
+bt_component_class_message_iterator_can_seek_ns_from_origin_method_status
+component_class_can_seek_ns_from_origin(
+ bt_self_message_iterator *self_message_iterator,
+ int64_t ns_from_origin, bt_bool *can_seek)
+{
+ PyObject *py_iter;
+ PyObject *py_result = NULL;
+ bt_component_class_message_iterator_can_seek_ns_from_origin_method_status status;
+ py_iter = bt_self_message_iterator_get_data(self_message_iterator);
+ BT_ASSERT(py_iter);
+
+ py_result = PyObject_CallMethod(py_iter,
+ "_bt_can_seek_ns_from_origin_from_native", "L", ns_from_origin);
+
+ BT_ASSERT(!py_result || PyBool_Check(py_result));
+
+ if (py_result) {
+ *can_seek = PyObject_IsTrue(py_result);
+ status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_CAN_SEEK_NS_FROM_ORIGIN_METHOD_STATUS_OK;
+ } else {
+ status = py_exc_to_status_message_iterator(self_message_iterator);
+ BT_ASSERT(status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_CAN_SEEK_NS_FROM_ORIGIN_METHOD_STATUS_OK);
+ }
+
+ Py_XDECREF(py_result);
+
+ return status;
+}
+
+static
+bt_component_class_message_iterator_seek_ns_from_origin_method_status
+component_class_seek_ns_from_origin(
+ bt_self_message_iterator *self_message_iterator,
+ int64_t ns_from_origin)
+{
+ PyObject *py_iter;
+ PyObject *py_result;
+ bt_component_class_message_iterator_seek_ns_from_origin_method_status status;
+
+ py_iter = bt_self_message_iterator_get_data(self_message_iterator);
+ BT_ASSERT(py_iter);
+ py_result = PyObject_CallMethod(py_iter,
+ "_bt_seek_ns_from_origin_from_native", "L", ns_from_origin);
+ BT_ASSERT(!py_result || py_result == Py_None);
+ status = py_exc_to_status_message_iterator(self_message_iterator);
+ Py_XDECREF(py_result);
+ return status;
+}
+
static
bt_component_class_port_connected_method_status component_class_port_connected(
bt_self_component *self_component,
BT_ASSERT(ret == 0);
ret = bt_component_class_source_set_message_iterator_seek_beginning_method(component_class_source,
component_class_seek_beginning);
+ ret = bt_component_class_source_set_message_iterator_can_seek_ns_from_origin_method(
+ component_class_source, component_class_can_seek_ns_from_origin);
+ BT_ASSERT(ret == 0);
+ ret = bt_component_class_source_set_message_iterator_seek_ns_from_origin_method(
+ component_class_source, component_class_seek_ns_from_origin);
BT_ASSERT(ret == 0);
ret = bt_component_class_source_set_output_port_connected_method(component_class_source,
component_class_source_output_port_connected);
ret = bt_component_class_filter_set_message_iterator_seek_beginning_method(component_class_filter,
component_class_seek_beginning);
BT_ASSERT(ret == 0);
+ ret = bt_component_class_filter_set_message_iterator_can_seek_ns_from_origin_method(
+ component_class_filter, component_class_can_seek_ns_from_origin);
+ BT_ASSERT(ret == 0);
+ ret = bt_component_class_filter_set_message_iterator_seek_ns_from_origin_method(
+ component_class_filter, component_class_seek_ns_from_origin);
ret = bt_component_class_filter_set_input_port_connected_method(component_class_filter,
component_class_filter_input_port_connected);
BT_ASSERT(ret == 0);
next(it)
-def _setup_seek_test(sink_cls, user_seek_beginning=None, user_can_seek_beginning=None):
+def _setup_seek_test(
+ sink_cls,
+ user_seek_beginning=None,
+ user_can_seek_beginning=None,
+ user_seek_ns_from_origin=None,
+ user_can_seek_ns_from_origin=None,
+):
class MySourceIter(bt2._UserMessageIterator):
def __init__(self, port):
tc, sc, ec = port.user_data
if user_can_seek_beginning is not None:
MySourceIter._user_can_seek_beginning = user_can_seek_beginning
+ if user_seek_ns_from_origin is not None:
+ MySourceIter._user_seek_ns_from_origin = user_seek_ns_from_origin
+
+ if user_can_seek_ns_from_origin is not None:
+ MySourceIter._user_can_seek_ns_from_origin = user_can_seek_ns_from_origin
+
class MySource(bt2._UserSourceComponent, message_iterator_class=MySourceIter):
def __init__(self, params, obj):
tc = self._create_trace_class()
def _user_seek_beginning(self):
self._upstream_iter.seek_beginning()
+ def _user_can_seek_ns_from_origin(self, ns_from_origin):
+ return self._upstream_iter.can_seek_ns_from_origin(ns_from_origin)
+
+ def _user_seek_ns_from_origin(self, ns_from_origin):
+ self._upstream_iter.seek_ns_from_origin(ns_from_origin)
+
class MyFilter(bt2._UserFilterComponent, message_iterator_class=MyFilterIter):
def __init__(self, params, obj):
self._add_input_port('in')
graph.run_once()
+class UserMessageIteratorSeekNsFromOriginTestCase(unittest.TestCase):
+ def test_can_seek_ns_from_origin(self):
+ class MySink(bt2._UserSinkComponent):
+ def __init__(self, params, obj):
+ self._add_input_port('in')
+
+ def _user_graph_is_configured(self):
+ self._msg_iter = self._create_input_port_message_iterator(
+ self._input_ports['in']
+ )
+
+ def _user_consume(self):
+ nonlocal can_seek_ns_from_origin
+ nonlocal test_ns_from_origin
+ can_seek_ns_from_origin = self._msg_iter.can_seek_ns_from_origin(
+ test_ns_from_origin
+ )
+
+ def _user_can_seek_ns_from_origin(iter_self, ns_from_origin):
+ nonlocal input_port_iter_can_seek_ns_from_origin
+ nonlocal test_ns_from_origin
+ self.assertEqual(ns_from_origin, test_ns_from_origin)
+ return input_port_iter_can_seek_ns_from_origin
+
+ graph = _setup_seek_test(
+ MySink, user_can_seek_ns_from_origin=_user_can_seek_ns_from_origin
+ )
+
+ input_port_iter_can_seek_ns_from_origin = True
+ can_seek_ns_from_origin = None
+ test_ns_from_origin = 1
+ graph.run_once()
+ self.assertIs(can_seek_ns_from_origin, True)
+
+ input_port_iter_can_seek_ns_from_origin = False
+ can_seek_ns_from_origin = None
+ test_ns_from_origin = 2
+ graph.run_once()
+ self.assertIs(can_seek_ns_from_origin, False)
+
+ def test_no_can_seek_ns_from_origin_with_seek_ns_from_origin(self):
+ # Test an iterator without a _user_can_seek_ns_from_origin method, but
+ # with a _user_seek_ns_from_origin method.
+ class MySink(bt2._UserSinkComponent):
+ def __init__(self, params, obj):
+ self._add_input_port('in')
+
+ def _user_graph_is_configured(self):
+ self._msg_iter = self._create_input_port_message_iterator(
+ self._input_ports['in']
+ )
+
+ def _user_consume(self):
+ nonlocal can_seek_ns_from_origin
+ nonlocal test_ns_from_origin
+ can_seek_ns_from_origin = self._msg_iter.can_seek_ns_from_origin(
+ test_ns_from_origin
+ )
+
+ def _user_seek_ns_from_origin(self):
+ pass
+
+ graph = _setup_seek_test(
+ MySink, user_seek_ns_from_origin=_user_seek_ns_from_origin
+ )
+ can_seek_ns_from_origin = None
+ test_ns_from_origin = 2
+ graph.run_once()
+ self.assertIs(can_seek_ns_from_origin, True)
+
+ def test_no_can_seek_ns_from_origin_with_seek_beginning(self):
+ # Test an iterator without a _user_can_seek_ns_from_origin method, but
+ # with a _user_seek_beginning method.
+ class MySink(bt2._UserSinkComponent):
+ def __init__(self, params, obj):
+ self._add_input_port('in')
+
+ def _user_graph_is_configured(self):
+ self._msg_iter = self._create_input_port_message_iterator(
+ self._input_ports['in']
+ )
+
+ def _user_consume(self):
+ nonlocal can_seek_ns_from_origin
+ nonlocal test_ns_from_origin
+ can_seek_ns_from_origin = self._msg_iter.can_seek_ns_from_origin(
+ test_ns_from_origin
+ )
+
+ def _user_seek_beginning(self):
+ pass
+
+ graph = _setup_seek_test(MySink, user_seek_beginning=_user_seek_beginning)
+ can_seek_ns_from_origin = None
+ test_ns_from_origin = 2
+ graph.run_once()
+ self.assertIs(can_seek_ns_from_origin, True)
+
+ def test_no_can_seek_ns_from_origin(self):
+ # Test an iterator without a _user_can_seek_ns_from_origin method
+ # and no other related method.
+ class MySink(bt2._UserSinkComponent):
+ def __init__(self, params, obj):
+ self._add_input_port('in')
+
+ def _user_graph_is_configured(self):
+ self._msg_iter = self._create_input_port_message_iterator(
+ self._input_ports['in']
+ )
+
+ def _user_consume(self):
+ nonlocal can_seek_ns_from_origin
+ nonlocal test_ns_from_origin
+ can_seek_ns_from_origin = self._msg_iter.can_seek_ns_from_origin(
+ test_ns_from_origin
+ )
+
+ graph = _setup_seek_test(MySink)
+ can_seek_ns_from_origin = None
+ test_ns_from_origin = 2
+ graph.run_once()
+ self.assertIs(can_seek_ns_from_origin, False)
+
+ def test_can_seek_ns_from_origin_user_error(self):
+ class MySink(bt2._UserSinkComponent):
+ def __init__(self, params, obj):
+ self._add_input_port('in')
+
+ def _user_graph_is_configured(self):
+ self._msg_iter = self._create_input_port_message_iterator(
+ self._input_ports['in']
+ )
+
+ def _user_consume(self):
+ # This is expected to raise.
+ self._msg_iter.can_seek_ns_from_origin(2)
+
+ def _user_can_seek_ns_from_origin(self, ns_from_origin):
+ raise ValueError('Joutel')
+
+ graph = _setup_seek_test(
+ MySink, user_can_seek_ns_from_origin=_user_can_seek_ns_from_origin
+ )
+
+ with self.assertRaises(bt2._Error) as ctx:
+ graph.run_once()
+
+ cause = ctx.exception[0]
+ self.assertIn('ValueError: Joutel', cause.message)
+
+ def test_can_seek_ns_from_origin_wrong_return_value(self):
+ class MySink(bt2._UserSinkComponent):
+ def __init__(self, params, obj):
+ self._add_input_port('in')
+
+ def _user_graph_is_configured(self):
+ self._msg_iter = self._create_input_port_message_iterator(
+ self._input_ports['in']
+ )
+
+ def _user_consume(self):
+ # This is expected to raise.
+ self._msg_iter.can_seek_ns_from_origin(2)
+
+ def _user_can_seek_ns_from_origin(self, ns_from_origin):
+ return 'Nitchequon'
+
+ graph = _setup_seek_test(
+ MySink, user_can_seek_ns_from_origin=_user_can_seek_ns_from_origin
+ )
+
+ with self.assertRaises(bt2._Error) as ctx:
+ graph.run_once()
+
+ cause = ctx.exception[0]
+ self.assertIn("TypeError: 'str' is not a 'bool' object", cause.message)
+
+ def test_seek_ns_from_origin(self):
+ class MySink(bt2._UserSinkComponent):
+ def __init__(self, params, obj):
+ self._add_input_port('in')
+
+ def _user_graph_is_configured(self):
+ self._msg_iter = self._create_input_port_message_iterator(
+ self._input_ports['in']
+ )
+
+ def _user_consume(self):
+ self._msg_iter.seek_ns_from_origin(17)
+
+ def _user_seek_ns_from_origin(self, ns_from_origin):
+ nonlocal actual_ns_from_origin
+ actual_ns_from_origin = ns_from_origin
+
+ msg = None
+ graph = _setup_seek_test(
+ MySink, user_seek_ns_from_origin=_user_seek_ns_from_origin
+ )
+
+ actual_ns_from_origin = None
+ graph.run_once()
+ self.assertEqual(actual_ns_from_origin, 17)
+
+
if __name__ == '__main__':
unittest.main()