class QueryExecutorTestCase(unittest.TestCase):
def test_query(self):
class MySink(bt2._UserSinkComponent):
- def _consume(self):
- pass
-
- def _graph_is_configured(self):
+ def _user_consume(self):
pass
@classmethod
- def _query(cls, query_exec, obj, params, log_level):
+ def _user_query(cls, query_exec, obj, params, log_level):
nonlocal query_params
query_params = params
return {'null': None, 'bt2': 'BT2'}
def test_query_params_none(self):
class MySink(bt2._UserSinkComponent):
- def _consume(self):
- pass
-
- def _graph_is_configured(self):
+ def _user_consume(self):
pass
@classmethod
- def _query(cls, query_exec, obj, params, log_level):
+ def _user_query(cls, query_exec, obj, params, log_level):
nonlocal query_params
query_params = params
def test_query_logging_level(self):
class MySink(bt2._UserSinkComponent):
- def _consume(self):
- pass
-
- def _graph_is_configured(self):
+ def _user_consume(self):
pass
@classmethod
- def _query(cls, query_exec, obj, params, log_level):
+ def _user_query(cls, query_exec, obj, params, log_level):
nonlocal query_log_level
query_log_level = log_level
def test_query_gen_error(self):
class MySink(bt2._UserSinkComponent):
- def _consume(self):
- pass
-
- def _graph_is_configured(self):
+ def _user_consume(self):
pass
@classmethod
- def _query(cls, query_exec, obj, params, log_level):
+ def _user_query(cls, query_exec, obj, params, log_level):
raise ValueError
with self.assertRaises(bt2._Error) as ctx:
exc = ctx.exception
self.assertEqual(len(exc), 2)
cause = exc[0]
- self.assertIsInstance(cause, bt2.error._ComponentClassErrorCause)
+ self.assertIsInstance(cause, bt2._ComponentClassErrorCause)
self.assertIn('raise ValueError', cause.message)
self.assertEqual(cause.component_class_type, bt2.ComponentClassType.SINK)
self.assertEqual(cause.component_class_name, 'MySink')
- def test_query_invalid_object(self):
+ def test_query_unknown_object(self):
class MySink(bt2._UserSinkComponent):
- def _consume(self):
- pass
-
- def _graph_is_configured(self):
+ def _user_consume(self):
pass
@classmethod
- def _query(cls, query_exec, obj, params, log_level):
- raise bt2.InvalidObject
+ def _user_query(cls, query_exec, obj, params, log_level):
+ raise bt2.UnknownObject
- with self.assertRaises(bt2.InvalidObject):
+ with self.assertRaises(bt2.UnknownObject):
res = bt2.QueryExecutor().query(MySink, 'obj', [17, 23])
def test_query_logging_level_invalid_type(self):
class MySink(bt2._UserSinkComponent):
- def _consume(self):
- pass
-
- def _graph_is_configured(self):
+ def _user_consume(self):
pass
@classmethod
- def _query(cls, query_exec, obj, params, log_level):
+ def _user_query(cls, query_exec, obj, params, log_level):
pass
with self.assertRaises(TypeError):
def test_query_logging_level_invalid_value(self):
class MySink(bt2._UserSinkComponent):
- def _consume(self):
- pass
-
- def _graph_is_configured(self):
+ def _user_consume(self):
pass
@classmethod
- def _query(cls, query_exec, obj, params, log_level):
+ def _user_query(cls, query_exec, obj, params, log_level):
pass
with self.assertRaises(ValueError):
def test_query_try_again(self):
class MySink(bt2._UserSinkComponent):
- def _consume(self):
- pass
-
- def _graph_is_configured(self):
+ def _user_consume(self):
pass
@classmethod
- def _query(cls, query_exec, obj, params, log_level):
+ def _user_query(cls, query_exec, obj, params, log_level):
raise bt2.TryAgain
with self.assertRaises(bt2.TryAgain):
def test_query_add_interrupter(self):
class MySink(bt2._UserSinkComponent):
- def _consume(self):
- pass
-
- def _graph_is_configured(self):
+ def _user_consume(self):
pass
@classmethod
- def _query(cls, query_exec, obj, params, log_level):
+ def _user_query(cls, query_exec, obj, params, log_level):
nonlocal interrupter2
test_self.assertFalse(query_exec.is_interrupted)
interrupter2.set()
def test_query_interrupt(self):
class MySink(bt2._UserSinkComponent):
- def _consume(self):
- pass
-
- def _graph_is_configured(self):
+ def _user_consume(self):
pass
@classmethod
- def _query(cls, query_exec, obj, params, log_level):
+ def _user_query(cls, query_exec, obj, params, log_level):
test_self.assertFalse(query_exec.is_interrupted)
query_exec.interrupt()
test_self.assertTrue(query_exec.is_interrupted)