res = bt2.QueryExecutor().query(MySink, 'obj', [17, 23])
exc = ctx.exception
- self.assertEqual(len(exc), 1)
+ self.assertEqual(len(exc), 2)
cause = exc[0]
self.assertIsInstance(cause, bt2.error._ComponentClassErrorCause)
self.assertIn('raise ValueError', cause.message)
with self.assertRaises(ValueError):
res = bt2.QueryExecutor().query(MySink, 'obj', [17, 23], 12345)
- def test_query_invalid_params(self):
- class MySink(bt2._UserSinkComponent):
- def _consume(self):
- pass
-
- def _graph_is_configured(self):
- pass
-
- @classmethod
- def _query(cls, query_exec, obj, params, log_level):
- raise bt2.InvalidParams
-
- with self.assertRaises(bt2.InvalidParams):
- res = bt2.QueryExecutor().query(MySink, 'obj', [17, 23])
-
def test_query_try_again(self):
class MySink(bt2._UserSinkComponent):
def _consume(self):