- raise bt2.TryAgain
-
- with self.assertRaises(bt2.TryAgain):
- res = bt2.QueryExecutor().query(MySink, 'obj', [17, 23])
-
- def test_cancel_no_query(self):
+ nonlocal interrupter2
+ test_self.assertFalse(query_exec.is_interrupted)
+ interrupter2.set()
+ test_self.assertTrue(query_exec.is_interrupted)
+ interrupter2.reset()
+ test_self.assertFalse(query_exec.is_interrupted)
+
+ interrupter1 = bt2.Interrupter()
+ interrupter2 = bt2.Interrupter()
+ test_self = self