7 class QueryExecutorTestCase(unittest
.TestCase
):
9 class MySink(bt2
._UserSinkComponent
):
14 def _query(cls
, query_exec
, obj
, params
, log_level
):
24 'array': ['coucou', 23, None],
33 res
= bt2
.QueryExecutor().query(MySink
, 'obj', params
)
34 self
.assertEqual(query_params
, params
)
35 self
.assertEqual(res
, {
41 def test_query_params_none(self
):
42 class MySink(bt2
._UserSinkComponent
):
47 def _query(cls
, query_exec
, obj
, params
, log_level
):
52 res
= bt2
.QueryExecutor().query(MySink
, 'obj', None)
53 self
.assertEqual(query_params
, None)
56 def test_query_logging_level(self
):
57 class MySink(bt2
._UserSinkComponent
):
62 def _query(cls
, query_exec
, obj
, params
, log_level
):
63 nonlocal query_log_level
64 query_log_level
= log_level
66 query_log_level
= None
67 res
= bt2
.QueryExecutor().query(MySink
, 'obj', None,
68 bt2
.LoggingLevel
.INFO
)
69 self
.assertEqual(query_log_level
, bt2
.LoggingLevel
.INFO
)
72 def test_query_gen_error(self
):
73 class MySink(bt2
._UserSinkComponent
):
78 def _query(cls
, query_exec
, obj
, params
, log_level
):
81 with self
.assertRaises(bt2
.Error
):
82 res
= bt2
.QueryExecutor().query(MySink
, 'obj', [17, 23])
84 def test_query_invalid_object(self
):
85 class MySink(bt2
._UserSinkComponent
):
90 def _query(cls
, query_exec
, obj
, params
, log_level
):
91 raise bt2
.InvalidQueryObject
93 with self
.assertRaises(bt2
.InvalidQueryObject
):
94 res
= bt2
.QueryExecutor().query(MySink
, 'obj', [17, 23])
96 def test_query_logging_level_invalid_type(self
):
97 class MySink(bt2
._UserSinkComponent
):
102 def _query(cls
, query_exec
, obj
, params
, log_level
):
105 with self
.assertRaises(TypeError):
106 res
= bt2
.QueryExecutor().query(MySink
, 'obj', [17, 23], 'yeah')
108 def test_query_logging_level_invalid_value(self
):
109 class MySink(bt2
._UserSinkComponent
):
114 def _query(cls
, query_exec
, obj
, params
, log_level
):
117 with self
.assertRaises(ValueError):
118 res
= bt2
.QueryExecutor().query(MySink
, 'obj', [17, 23], 12345)
120 def test_query_invalid_params(self
):
121 class MySink(bt2
._UserSinkComponent
):
126 def _query(cls
, query_exec
, obj
, params
, log_level
):
127 raise bt2
.InvalidQueryParams
129 with self
.assertRaises(bt2
.InvalidQueryParams
):
130 res
= bt2
.QueryExecutor().query(MySink
, 'obj', [17, 23])
132 def test_query_try_again(self
):
133 class MySink(bt2
._UserSinkComponent
):
138 def _query(cls
, query_exec
, obj
, params
, log_level
):
141 with self
.assertRaises(bt2
.TryAgain
):
142 res
= bt2
.QueryExecutor().query(MySink
, 'obj', [17, 23])
144 def test_cancel_no_query(self
):
145 query_exec
= bt2
.QueryExecutor()
146 self
.assertFalse(query_exec
.is_canceled
)
148 self
.assertTrue(query_exec
.is_canceled
)
150 def test_query_canceled(self
):
151 class MySink(bt2
._UserSinkComponent
):
156 def _query(cls
, query_exec
, obj
, params
, log_level
):
159 query_exec
= bt2
.QueryExecutor()
162 with self
.assertRaises(bt2
.QueryExecutorCanceled
):
163 res
= query_exec
.query(MySink
, 'obj', [17, 23])
This page took 0.034702 seconds and 5 git commands to generate.