Explicitly mention `black` in CodingStyle guidelines
[babeltrace.git] / tests / bindings / python / bt2 / test_query_executor.py
CommitLineData
d2d857a8
MJ
1#
2# Copyright (C) 2019 EfficiOS Inc.
3#
4# This program is free software; you can redistribute it and/or
5# modify it under the terms of the GNU General Public License
6# as published by the Free Software Foundation; only version 2
7# of the License.
8#
9# This program is distributed in the hope that it will be useful,
10# but WITHOUT ANY WARRANTY; without even the implied warranty of
11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12# GNU General Public License for more details.
13#
14# You should have received a copy of the GNU General Public License
15# along with this program; if not, write to the Free Software
16# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
17#
18
c4239792 19from bt2 import value
c7eee084
PP
20import unittest
21import copy
22import bt2
23
24
25class QueryExecutorTestCase(unittest.TestCase):
26 def test_query(self):
27 class MySink(bt2._UserSinkComponent):
28 def _consume(self):
29 pass
30
a01b452b
SM
31 def _graph_is_configured(self):
32 pass
33
c7eee084 34 @classmethod
f4e38e70 35 def _query(cls, query_exec, obj, params, log_level):
c7eee084
PP
36 nonlocal query_params
37 query_params = params
38 return {
39 'null': None,
40 'bt2': 'BT2',
41 }
42
43 query_params = None
44 params = {
45 'array': ['coucou', 23, None],
46 'other_map': {
47 'yes': 'yeah',
48 '19': 19,
49 'minus 1.5': -1.5,
50 },
51 'null': None,
52 }
53
54 res = bt2.QueryExecutor().query(MySink, 'obj', params)
55 self.assertEqual(query_params, params)
56 self.assertEqual(res, {
57 'null': None,
58 'bt2': 'BT2',
59 })
60 del query_params
61
62 def test_query_params_none(self):
63 class MySink(bt2._UserSinkComponent):
64 def _consume(self):
65 pass
66
a01b452b
SM
67 def _graph_is_configured(self):
68 pass
69
c7eee084 70 @classmethod
f4e38e70 71 def _query(cls, query_exec, obj, params, log_level):
c7eee084
PP
72 nonlocal query_params
73 query_params = params
74
75 query_params = 23
76 res = bt2.QueryExecutor().query(MySink, 'obj', None)
77 self.assertEqual(query_params, None)
78 del query_params
79
761e1890
PP
80 def test_query_logging_level(self):
81 class MySink(bt2._UserSinkComponent):
82 def _consume(self):
83 pass
84
a01b452b
SM
85 def _graph_is_configured(self):
86 pass
87
761e1890
PP
88 @classmethod
89 def _query(cls, query_exec, obj, params, log_level):
90 nonlocal query_log_level
91 query_log_level = log_level
92
93 query_log_level = None
94 res = bt2.QueryExecutor().query(MySink, 'obj', None,
95 bt2.LoggingLevel.INFO)
96 self.assertEqual(query_log_level, bt2.LoggingLevel.INFO)
97 del query_log_level
98
c7eee084
PP
99 def test_query_gen_error(self):
100 class MySink(bt2._UserSinkComponent):
101 def _consume(self):
102 pass
103
a01b452b
SM
104 def _graph_is_configured(self):
105 pass
106
c7eee084 107 @classmethod
f4e38e70 108 def _query(cls, query_exec, obj, params, log_level):
c7eee084
PP
109 raise ValueError
110
111 with self.assertRaises(bt2.Error):
112 res = bt2.QueryExecutor().query(MySink, 'obj', [17, 23])
113
114 def test_query_invalid_object(self):
115 class MySink(bt2._UserSinkComponent):
116 def _consume(self):
117 pass
118
a01b452b
SM
119 def _graph_is_configured(self):
120 pass
121
c7eee084 122 @classmethod
f4e38e70 123 def _query(cls, query_exec, obj, params, log_level):
d24d5663 124 raise bt2.InvalidObject
c7eee084 125
d24d5663 126 with self.assertRaises(bt2.InvalidObject):
c7eee084
PP
127 res = bt2.QueryExecutor().query(MySink, 'obj', [17, 23])
128
761e1890
PP
129 def test_query_logging_level_invalid_type(self):
130 class MySink(bt2._UserSinkComponent):
131 def _consume(self):
132 pass
133
a01b452b
SM
134 def _graph_is_configured(self):
135 pass
136
761e1890
PP
137 @classmethod
138 def _query(cls, query_exec, obj, params, log_level):
139 pass
140
141 with self.assertRaises(TypeError):
142 res = bt2.QueryExecutor().query(MySink, 'obj', [17, 23], 'yeah')
143
144 def test_query_logging_level_invalid_value(self):
145 class MySink(bt2._UserSinkComponent):
146 def _consume(self):
147 pass
148
a01b452b
SM
149 def _graph_is_configured(self):
150 pass
151
761e1890
PP
152 @classmethod
153 def _query(cls, query_exec, obj, params, log_level):
154 pass
155
156 with self.assertRaises(ValueError):
157 res = bt2.QueryExecutor().query(MySink, 'obj', [17, 23], 12345)
158
c7eee084
PP
159 def test_query_invalid_params(self):
160 class MySink(bt2._UserSinkComponent):
161 def _consume(self):
162 pass
163
a01b452b
SM
164 def _graph_is_configured(self):
165 pass
166
c7eee084 167 @classmethod
f4e38e70 168 def _query(cls, query_exec, obj, params, log_level):
d24d5663 169 raise bt2.InvalidParams
c7eee084 170
d24d5663 171 with self.assertRaises(bt2.InvalidParams):
c7eee084
PP
172 res = bt2.QueryExecutor().query(MySink, 'obj', [17, 23])
173
174 def test_query_try_again(self):
175 class MySink(bt2._UserSinkComponent):
176 def _consume(self):
177 pass
178
a01b452b
SM
179 def _graph_is_configured(self):
180 pass
181
c7eee084 182 @classmethod
f4e38e70 183 def _query(cls, query_exec, obj, params, log_level):
c7eee084
PP
184 raise bt2.TryAgain
185
186 with self.assertRaises(bt2.TryAgain):
187 res = bt2.QueryExecutor().query(MySink, 'obj', [17, 23])
188
189 def test_cancel_no_query(self):
190 query_exec = bt2.QueryExecutor()
191 self.assertFalse(query_exec.is_canceled)
192 query_exec.cancel()
193 self.assertTrue(query_exec.is_canceled)
194
195 def test_query_canceled(self):
196 class MySink(bt2._UserSinkComponent):
197 def _consume(self):
198 pass
199
a01b452b
SM
200 def _graph_is_configured(self):
201 pass
202
c7eee084 203 @classmethod
f4e38e70 204 def _query(cls, query_exec, obj, params, log_level):
c7eee084
PP
205 raise bt2.TryAgain
206
207 query_exec = bt2.QueryExecutor()
208 query_exec.cancel()
209
d24d5663 210 with self.assertRaises(bt2.Canceled):
c7eee084 211 res = query_exec.query(MySink, 'obj', [17, 23])
This page took 0.04356 seconds and 4 git commands to generate.