res = bt2.QueryExecutor().query(MySink, 'obj', [17, 23])
exc = ctx.exception
- self.assertEqual(len(exc), 1)
+ self.assertEqual(len(exc), 2)
cause = exc[0]
- self.assertIsInstance(cause, bt2.error._ComponentClassErrorCause)
+ self.assertIsInstance(cause, bt2._ComponentClassErrorCause)
self.assertIn('raise ValueError', cause.message)
self.assertEqual(cause.component_class_type, bt2.ComponentClassType.SINK)
self.assertEqual(cause.component_class_name, 'MySink')
with self.assertRaises(ValueError):
res = bt2.QueryExecutor().query(MySink, 'obj', [17, 23], 12345)
- def test_query_invalid_params(self):
+ def test_query_try_again(self):
class MySink(bt2._UserSinkComponent):
def _consume(self):
pass
@classmethod
def _query(cls, query_exec, obj, params, log_level):
- raise bt2.InvalidParams
+ raise bt2.TryAgain
- with self.assertRaises(bt2.InvalidParams):
+ with self.assertRaises(bt2.TryAgain):
res = bt2.QueryExecutor().query(MySink, 'obj', [17, 23])
- def test_query_try_again(self):
+ def test_query_add_interrupter(self):
class MySink(bt2._UserSinkComponent):
def _consume(self):
pass
@classmethod
def _query(cls, query_exec, obj, params, log_level):
- raise bt2.TryAgain
-
- with self.assertRaises(bt2.TryAgain):
- res = bt2.QueryExecutor().query(MySink, 'obj', [17, 23])
-
- def test_cancel_no_query(self):
+ nonlocal interrupter2
+ test_self.assertFalse(query_exec.is_interrupted)
+ interrupter2.set()
+ test_self.assertTrue(query_exec.is_interrupted)
+ interrupter2.reset()
+ test_self.assertFalse(query_exec.is_interrupted)
+
+ interrupter1 = bt2.Interrupter()
+ interrupter2 = bt2.Interrupter()
+ test_self = self
query_exec = bt2.QueryExecutor()
- self.assertFalse(query_exec.is_canceled)
- query_exec.cancel()
- self.assertTrue(query_exec.is_canceled)
+ query_exec.add_interrupter(interrupter1)
+ query_exec.add_interrupter(interrupter2)
+ query_exec.query(MySink, 'obj', [17, 23])
- def test_query_canceled(self):
+ def test_query_interrupt(self):
class MySink(bt2._UserSinkComponent):
def _consume(self):
pass
@classmethod
def _query(cls, query_exec, obj, params, log_level):
- raise bt2.TryAgain
+ test_self.assertFalse(query_exec.is_interrupted)
+ query_exec.interrupt()
+ test_self.assertTrue(query_exec.is_interrupted)
+ test_self = self
query_exec = bt2.QueryExecutor()
- query_exec.cancel()
-
- with self.assertRaises(bt2.Canceled):
- res = query_exec.query(MySink, 'obj', [17, 23])
+ query_exec.query(MySink, 'obj', [17, 23])