from bt2.connection import *
from bt2.connection import _Connection
from bt2.error import *
+from bt2.error import _Error
from bt2.event import _Event
from bt2.event_class import *
from bt2.field_class import *
from bt2.clock_snapshot import _UnknownClockSnapshot
-class MemoryError(Error):
+class _MemoryError(_Error):
'''Raised when an operation fails due to memory issues.'''
pass
-class LoadingError(Error):
+class _LoadingError(_Error):
pass
)
if cc_ptr is None:
- raise bt2.MemoryError(
+ raise bt2._MemoryError(
"cannot create component class '{}'".format(class_name)
)
tc_ptr = native_bt.trace_class_create(ptr)
if tc_ptr is None:
- raise bt2.MemoryError('could not create trace class')
+ raise bt2._MemoryError('could not create trace class')
tc = bt2._TraceClass._create_from_ptr(tc_ptr)
tc._assigns_automatic_stream_class_id = assigns_automatic_stream_class_id
cc_ptr = native_bt.clock_class_create(ptr)
if cc_ptr is None:
- raise bt2.MemoryError('could not create clock class')
+ raise bt2._MemoryError('could not create clock class')
cc = bt2.clock_class._ClockClass._create_from_ptr(cc_ptr)
}
-class Error(Exception, abc.Sequence):
+class _Error(Exception, abc.Sequence):
"""
Babeltrace API call error.
def _check_create_status(self, ptr):
if ptr is None:
- raise bt2.MemoryError(
+ raise bt2._MemoryError(
'cannot create {} field class object'.format(self._NAME.lower())
)
ptr = native_bt.graph_create()
if ptr is None:
- raise bt2.MemoryError('cannot create graph object')
+ raise bt2._MemoryError('cannot create graph object')
super().__init__(ptr)
listener_ids = fn(self._ptr, listener_from_native)
if listener_ids is None:
- raise bt2.Error('cannot add listener to graph object')
+ raise bt2._Error('cannot add listener to graph object')
return bt2._ListenerHandle(listener_ids, self)
listener_ids = fn(self._ptr, listener_from_native)
if listener_ids is None:
- raise bt2.Error('cannot add listener to graph object')
+ raise bt2._Error('cannot add listener to graph object')
return bt2._ListenerHandle(listener_ids, self)
)
if msg_iter_ptr is None:
- raise bt2.MemoryError('cannot create output port message iterator')
+ raise bt2._MemoryError('cannot create output port message iterator')
return bt2.message_iterator._OutputPortMessageIterator(msg_iter_ptr)
ptr = self._create_range_set()
if ptr is None:
- raise bt2.MemoryError('cannot create range set object')
+ raise bt2._MemoryError('cannot create range set object')
super().__init__(ptr)
)
if ptr is None:
- raise bt2.MemoryError('cannot create event message object')
+ raise bt2._MemoryError('cannot create event message object')
return bt2.message._EventMessage(ptr)
)
if ptr is None:
- raise bt2.MemoryError('cannot create inactivity message object')
+ raise bt2._MemoryError('cannot create inactivity message object')
return bt2.message._MessageIteratorInactivityMessage(ptr)
ptr = native_bt.message_stream_beginning_create(self._bt_ptr, stream._ptr)
if ptr is None:
- raise bt2.MemoryError('cannot create stream beginning message object')
+ raise bt2._MemoryError('cannot create stream beginning message object')
msg = bt2.message._StreamBeginningMessage(ptr)
ptr = native_bt.message_stream_end_create(self._bt_ptr, stream._ptr)
if ptr is None:
- raise bt2.MemoryError('cannot create stream end message object')
+ raise bt2._MemoryError('cannot create stream end message object')
msg = bt2.message._StreamEndMessage(ptr)
ptr = native_bt.message_packet_beginning_create(self._bt_ptr, packet._ptr)
if ptr is None:
- raise bt2.MemoryError('cannot create packet beginning message object')
+ raise bt2._MemoryError('cannot create packet beginning message object')
return bt2.message._PacketBeginningMessage(ptr)
ptr = native_bt.message_packet_end_create(self._bt_ptr, packet._ptr)
if ptr is None:
- raise bt2.MemoryError('cannot create packet end message object')
+ raise bt2._MemoryError('cannot create packet end message object')
return bt2.message._PacketEndMessage(ptr)
ptr = native_bt.message_discarded_events_create(self._bt_ptr, stream._ptr)
if ptr is None:
- raise bt2.MemoryError('cannot discarded events message object')
+ raise bt2._MemoryError('cannot discarded events message object')
msg = bt2.message._DiscardedEventsMessage(ptr)
ptr = native_bt.message_discarded_packets_create(self._bt_ptr, stream._ptr)
if ptr is None:
- raise bt2.MemoryError('cannot discarded packets message object')
+ raise bt2._MemoryError('cannot discarded packets message object')
msg = bt2.message._DiscardedPacketsMessage(ptr)
py_mod_bt2 = PyImport_ImportModule("bt2");
BT_ASSERT(py_mod_bt2);
py_mod_bt2_exc_error_type =
- PyObject_GetAttrString(py_mod_bt2, "Error");
+ PyObject_GetAttrString(py_mod_bt2, "_Error");
BT_ASSERT(py_mod_bt2_exc_error_type);
py_mod_bt2_exc_memory_error =
- PyObject_GetAttrString(py_mod_bt2, "MemoryError");
+ PyObject_GetAttrString(py_mod_bt2, "_MemoryError");
BT_ASSERT(py_mod_bt2_exc_memory_error);
py_mod_bt2_exc_try_again_type =
PyObject_GetAttrString(py_mod_bt2, "TryAgain");
}
/*
- * If the raised exception is a bt2.Error, restore the wrapped error.
+ * If the raised exception is a bt2._Error, restore the wrapped error.
*/
if (PyErr_GivenExceptionMatches(py_exc_value, py_mod_bt2_exc_error_type)) {
PyObject *py_error_swig_ptr;
int ret;
/*
- * We never raise a bt2.Error with a cause: it should be the
+ * We never raise a bt2._Error with a cause: it should be the
* end of the chain.
*/
BT_ASSERT(!py_exc_cause_value);
* try:
* try:
* something_that_raises_bt2_error()
- * except bt2.Error as e1:
+ * except bt2._Error as e1:
* raise ValueError from e1
* except ValueError as e2:
* raise TypeError from e2
*
* We will have the following exception chain:
*
- * TypeError -> ValueError -> bt2.Error
+ * TypeError -> ValueError -> bt2._Error
*
* Where the TypeError is the current exception (obtained from PyErr_Fetch).
*
- * The bt2.Error contains a `struct bt_error *` that used to be the current
+ * The bt2._Error contains a `struct bt_error *` that used to be the current
* thread's error, at the moment the exception was raised.
*
- * This function gets to the bt2.Error and restores the wrapped
+ * This function gets to the bt2._Error and restores the wrapped
* `struct bt_error *` as the current thread's error.
*
* Then, for each exception in the chain, starting with the oldest one, it adds
self._ptr
)
if msg_iter_ptr is None:
- raise bt2.MemoryError('cannot create message iterator object')
+ raise bt2._MemoryError('cannot create message iterator object')
return bt2.message_iterator._UserComponentInputPortMessageIterator(msg_iter_ptr)
ptr = native_bt.query_executor_create()
if ptr is None:
- raise bt2.MemoryError('cannot create query executor object')
+ raise bt2._MemoryError('cannot create query executor object')
super().__init__(ptr)
packet_ptr = native_bt.packet_create(self._ptr)
if packet_ptr is None:
- raise bt2.MemoryError('cannot create packet object')
+ raise bt2._MemoryError('cannot create packet object')
return bt2.packet._Packet._create_from_ptr(packet_ptr)
)
if stream_ptr is None:
- raise bt2.MemoryError('cannot create stream object')
+ raise bt2._MemoryError('cannot create stream object')
stream = bt2.stream._Stream._create_from_ptr(stream_ptr)
trace_ptr = native_bt.trace_create(self._ptr)
if trace_ptr is None:
- raise bt2.MemoryError('cannot create trace class object')
+ raise bt2._MemoryError('cannot create trace class object')
trace = bt2.trace._Trace._create_from_ptr(trace_ptr)
def _check_create_status(self, ptr, type_name):
if ptr is None:
- raise bt2.MemoryError('cannot create {} field class'.format(type_name))
+ raise bt2._MemoryError('cannot create {} field class'.format(type_name))
def _create_integer_field_class(
self, create_func, py_cls, type_name, field_value_range, preferred_display_base
if status == native_bt.__BT_FUNC_STATUS_ERROR:
assert msg is not None
- raise bt2.Error(msg)
+ raise bt2._Error(msg)
elif status == native_bt.__BT_FUNC_STATUS_MEMORY_ERROR:
assert msg is not None
- raise bt2.MemoryError(msg)
+ raise bt2._MemoryError(msg)
elif status == native_bt.__BT_FUNC_STATUS_END:
if msg is None:
raise bt2.Stop
raise bt2.Canceled(msg)
elif status == native_bt.__BT_FUNC_STATUS_LOADING_ERROR:
if msg is None:
- raise bt2.LoadingError
+ raise bt2._LoadingError
else:
- raise bt2.LoadingError(msg)
+ raise bt2._LoadingError(msg)
elif status == native_bt.__BT_FUNC_STATUS_OVERFLOW:
if msg is None:
raise bt2.OverflowError
def _check_create_status(self, ptr):
if ptr is None:
- raise bt2.MemoryError(
+ raise bt2._MemoryError(
'cannot create {} value object'.format(self._NAME.lower())
)
def _graph_is_configured(self):
pass
- with self.assertRaises(bt2.Error):
+ with self.assertRaises(bt2._Error):
bt2.QueryExecutor().query(MySink, 'obj', 23)
def test_query_raises(self):
def _query(cls, query_exec, obj, params, log_level):
raise ValueError
- with self.assertRaises(bt2.Error):
+ with self.assertRaises(bt2._Error):
bt2.QueryExecutor().query(MySink, 'obj', 23)
def test_query_wrong_return_type(self):
def _query(cls, query_exec, obj, params, log_level):
return ...
- with self.assertRaises(bt2.Error):
+ with self.assertRaises(bt2._Error):
bt2.QueryExecutor().query(MySink, 'obj', 23)
def test_query_params_none(self):
try:
print(self._iter.__next__)
next(self._iter)
- except bt2.Error as e:
+ except bt2._Error as e:
print(hex(id(e)))
print(e.__dict__)
raise ValueError('oops') from e
class ErrorTestCase(unittest.TestCase):
def _run_failing_graph(self, source_cc, sink_cc):
- with self.assertRaises(bt2.Error) as ctx:
+ with self.assertRaises(bt2._Error) as ctx:
graph = bt2.Graph()
src = graph.add_component(source_cc, 'src')
snk = graph.add_component(sink_cc, 'snk')
return ctx.exception
def test_current_thread_error_none(self):
- # When a bt2.Error is raised, it steals the current thread's error.
+ # When a bt2._Error is raised, it steals the current thread's error.
# Verify that it is now NULL.
exc = self._run_failing_graph(SourceWithFailingInit, WorkingSink)
self.assertIsNone(native_bt.current_thread_take_error())
#
# try:
# ...
- # except bt2.Error as exc:
+ # except bt2._Error as exc:
# raise ValueError('oh noes') from exc
#
- # We are able to fetch the causes of the original bt2.Error in the
+ # We are able to fetch the causes of the original bt2._Error in the
# exception chain. Also, each exception in the chain should become one
# cause once caught.
exc = self._run_failing_graph(SourceWithFailingIter, SinkWithExceptionChaining)
self.assertIsInstance(exc[2], bt2.error._ComponentErrorCause)
self.assertEqual(exc[2].component_class_name, 'SinkWithExceptionChaining')
self.assertIn(
- 'bt2.error.Error: unexpected error: cannot advance the message iterator',
+ 'bt2.error._Error: unexpected error: cannot advance the message iterator',
exc[2].message,
)
def test_component_class_error_cause(self):
q = bt2.QueryExecutor()
- with self.assertRaises(bt2.Error) as ctx:
+ with self.assertRaises(bt2._Error) as ctx:
q.query(SinkWithFailingQuery, 'hello')
cause = ctx.exception[0]
src.output_ports['out'], sink.input_ports['in']
)
- with self.assertRaises(bt2.Error):
+ with self.assertRaises(bt2._Error):
self._graph.run()
def test_listeners(self):
graph = bt2.Graph()
- with self.assertRaises(bt2.Error):
+ with self.assertRaises(bt2._Error):
graph.add_component(MySink, 'comp')
def test_raise_in_port_added_listener(self):
graph = bt2.Graph()
graph.add_port_added_listener(port_added_listener)
- with self.assertRaises(bt2.Error):
+ with self.assertRaises(bt2._Error):
graph.add_component(MySink, 'comp')
def test_raise_in_ports_connected_listener(self):
up = graph.add_component(MySource, 'down')
down = graph.add_component(MySink, 'up')
- with self.assertRaises(bt2.Error):
+ with self.assertRaises(bt2._Error):
graph.connect_ports(up.output_ports['out'], down.input_ports['in'])
MySourceIter._seek_beginning = _seek_beginning_error
- with self.assertRaises(bt2.Error):
+ with self.assertRaises(bt2._Error):
it.seek_beginning()
# Try consuming many times from an iterator that always returns TryAgain.
def _query(cls, query_exec, obj, params, log_level):
raise ValueError
- with self.assertRaises(bt2.Error) as ctx:
+ with self.assertRaises(bt2._Error) as ctx:
res = bt2.QueryExecutor().query(MySink, 'obj', [17, 23])
exc = ctx.exception