2 # Copyright (C) 2019 EfficiOS Inc.
4 # This program is free software; you can redistribute it and/or
5 # modify it under the terms of the GNU General Public License
6 # as published by the Free Software Foundation; only version 2
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software
16 # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
25 class QueryExecutorTestCase(unittest
.TestCase
):
27 class MySink(bt2
._UserSinkComponent
):
31 def _graph_is_configured(self
):
35 def _query(cls
, query_exec
, obj
, params
, log_level
):
38 return {'null': None, 'bt2': 'BT2'}
42 'array': ['coucou', 23, None],
43 'other_map': {'yes': 'yeah', '19': 19, 'minus 1.5': -1.5},
47 res
= bt2
.QueryExecutor().query(MySink
, 'obj', params
)
48 self
.assertEqual(query_params
, params
)
49 self
.assertEqual(res
, {'null': None, 'bt2': 'BT2'})
52 def test_query_params_none(self
):
53 class MySink(bt2
._UserSinkComponent
):
57 def _graph_is_configured(self
):
61 def _query(cls
, query_exec
, obj
, params
, log_level
):
66 res
= bt2
.QueryExecutor().query(MySink
, 'obj', None)
67 self
.assertEqual(query_params
, None)
70 def test_query_logging_level(self
):
71 class MySink(bt2
._UserSinkComponent
):
75 def _graph_is_configured(self
):
79 def _query(cls
, query_exec
, obj
, params
, log_level
):
80 nonlocal query_log_level
81 query_log_level
= log_level
83 query_log_level
= None
84 res
= bt2
.QueryExecutor().query(MySink
, 'obj', None, bt2
.LoggingLevel
.INFO
)
85 self
.assertEqual(query_log_level
, bt2
.LoggingLevel
.INFO
)
88 def test_query_gen_error(self
):
89 class MySink(bt2
._UserSinkComponent
):
93 def _graph_is_configured(self
):
97 def _query(cls
, query_exec
, obj
, params
, log_level
):
100 with self
.assertRaises(bt2
.Error
):
101 res
= bt2
.QueryExecutor().query(MySink
, 'obj', [17, 23])
103 def test_query_invalid_object(self
):
104 class MySink(bt2
._UserSinkComponent
):
108 def _graph_is_configured(self
):
112 def _query(cls
, query_exec
, obj
, params
, log_level
):
113 raise bt2
.InvalidObject
115 with self
.assertRaises(bt2
.InvalidObject
):
116 res
= bt2
.QueryExecutor().query(MySink
, 'obj', [17, 23])
118 def test_query_logging_level_invalid_type(self
):
119 class MySink(bt2
._UserSinkComponent
):
123 def _graph_is_configured(self
):
127 def _query(cls
, query_exec
, obj
, params
, log_level
):
130 with self
.assertRaises(TypeError):
131 res
= bt2
.QueryExecutor().query(MySink
, 'obj', [17, 23], 'yeah')
133 def test_query_logging_level_invalid_value(self
):
134 class MySink(bt2
._UserSinkComponent
):
138 def _graph_is_configured(self
):
142 def _query(cls
, query_exec
, obj
, params
, log_level
):
145 with self
.assertRaises(ValueError):
146 res
= bt2
.QueryExecutor().query(MySink
, 'obj', [17, 23], 12345)
148 def test_query_invalid_params(self
):
149 class MySink(bt2
._UserSinkComponent
):
153 def _graph_is_configured(self
):
157 def _query(cls
, query_exec
, obj
, params
, log_level
):
158 raise bt2
.InvalidParams
160 with self
.assertRaises(bt2
.InvalidParams
):
161 res
= bt2
.QueryExecutor().query(MySink
, 'obj', [17, 23])
163 def test_query_try_again(self
):
164 class MySink(bt2
._UserSinkComponent
):
168 def _graph_is_configured(self
):
172 def _query(cls
, query_exec
, obj
, params
, log_level
):
175 with self
.assertRaises(bt2
.TryAgain
):
176 res
= bt2
.QueryExecutor().query(MySink
, 'obj', [17, 23])
178 def test_cancel_no_query(self
):
179 query_exec
= bt2
.QueryExecutor()
180 self
.assertFalse(query_exec
.is_canceled
)
182 self
.assertTrue(query_exec
.is_canceled
)
184 def test_query_canceled(self
):
185 class MySink(bt2
._UserSinkComponent
):
189 def _graph_is_configured(self
):
193 def _query(cls
, query_exec
, obj
, params
, log_level
):
196 query_exec
= bt2
.QueryExecutor()
199 with self
.assertRaises(bt2
.Canceled
):
200 res
= query_exec
.query(MySink
, 'obj', [17, 23])
This page took 0.035665 seconds and 4 git commands to generate.