def _consume(self):
pass
+ def _graph_is_configured(self):
+ pass
+
self._test_no_init(MySink)
def test_incomplete_source_no_msg_iter_cls(self):
def _consume(self):
pass
+ def _graph_is_configured(self):
+ pass
+
def test_default_name(self):
class MySink(bt2._UserSinkComponent):
def _consume(self):
pass
+ def _graph_is_configured(self):
+ pass
+
self.assertEqual(MySink.name, 'MySink')
def test_custom_name(self):
def _consume(self):
pass
+ def _graph_is_configured(self):
+ pass
+
self.assertEqual(MySink.name, 'salut')
def test_invalid_custom_name(self):
def _consume(self):
pass
+ def _graph_is_configured(self):
+ pass
+
def test_description(self):
class MySink(bt2._UserSinkComponent):
"""
def _consume(self):
pass
+ def _graph_is_configured(self):
+ pass
+
self.assertEqual(MySink.description, 'The description.')
def test_empty_description(self):
def _consume(self):
pass
+ def _graph_is_configured(self):
+ pass
+
self.assertIsNone(MySink.description)
def test_help(self):
def _consume(self):
pass
+ def _graph_is_configured(self):
+ pass
+
self.assertEqual(MySink.help, 'The help\ntext is\nhere.')
def test_addr(self):
def _consume(self):
pass
+ def _graph_is_configured(self):
+ pass
+
self.assertIsInstance(MySink.addr, int)
self.assertNotEqual(MySink.addr, 0)
def _consume(self):
pass
+ def _graph_is_configured(self):
+ pass
+
with self.assertRaises(bt2.Error):
bt2.QueryExecutor().query(MySink, 'obj', 23)
def _consume(self):
pass
+ def _graph_is_configured(self):
+ pass
+
@classmethod
def _query(cls, query_exec, obj, params, log_level):
raise ValueError
def _consume(self):
pass
+ def _graph_is_configured(self):
+ pass
+
@classmethod
def _query(cls, query_exec, obj, params, log_level):
return ...
def _consume(self):
pass
+ def _graph_is_configured(self):
+ pass
+
@classmethod
def _query(cls, query_exec, obj, params, log_level):
nonlocal query_params
def _consume(self):
pass
+ def _graph_is_configured(self):
+ pass
+
@classmethod
def _query(cls, query_exec, obj, params, log_level):
nonlocal query_log_level
def _consume(self):
pass
+ def _graph_is_configured(self):
+ pass
+
@staticmethod
def _query(query_exec, obj, params, log_level):
return
def _consume(self):
pass
+ def _graph_is_configured(self):
+ pass
+
@classmethod
def _query(cls, query_exec, obj, params, log_level):
nonlocal query_params
def _consume(self):
pass
+ def _graph_is_configured(self):
+ pass
+
@classmethod
def _query(cls, query_exec, obj, params, log_level):
nonlocal query_params
def _consume(self):
pass
+ def _graph_is_configured(self):
+ pass
+
self.assertEqual(MySink, MySink)
def _consume(self):
pass
+ def _graph_is_configured(self):
+ pass
+
@classmethod
def _query(cls, query_exec, obj, params, log_level):
return [obj, params, 23]