+ def test_query_with_method_obj(self):
+ class MySink(bt2._UserSinkComponent):
+ def _user_consume(self):
+ pass
+
+ @classmethod
+ def _user_query(cls, priv_query_exec, obj, params, method_obj):
+ nonlocal query_method_obj
+ query_method_obj = method_obj
+
+ query_method_obj = None
+ method_obj = object()
+ bt2.QueryExecutor(MySink, 'obj', method_obj=method_obj).query()
+ self.assertIs(query_method_obj, method_obj)
+ del query_method_obj
+
+ def test_query_with_method_obj_del_ref(self):
+ class MySink(bt2._UserSinkComponent):
+ def _user_consume(self):
+ pass
+
+ @classmethod
+ def _user_query(cls, priv_query_exec, obj, params, method_obj):
+ nonlocal query_method_obj
+ query_method_obj = method_obj
+
+ class Custom:
+ pass
+
+ query_method_obj = None
+ method_obj = Custom()
+ method_obj.hola = 'hello'
+ query_exec = bt2.QueryExecutor(MySink, 'obj', method_obj=method_obj)
+ del method_obj
+ query_exec.query()
+ self.assertIsInstance(query_method_obj, Custom)
+ self.assertEqual(query_method_obj.hola, 'hello')
+ del query_method_obj
+
+ def test_query_with_none_method_obj(self):
+ class MySink(bt2._UserSinkComponent):
+ def _user_consume(self):
+ pass
+
+ @classmethod
+ def _user_query(cls, priv_query_exec, obj, params, method_obj):
+ nonlocal query_method_obj
+ query_method_obj = method_obj
+
+ query_method_obj = object()
+ bt2.QueryExecutor(MySink, 'obj').query()
+ self.assertIsNone(query_method_obj)
+ del query_method_obj
+
+ def test_query_with_method_obj_non_python_comp_cls(self):
+ plugin = bt2.find_plugin('text', find_in_user_dir=False, find_in_sys_dir=False)
+ assert plugin is not None
+ cc = plugin.source_component_classes['dmesg']
+ assert cc is not None
+
+ with self.assertRaisesRegex(
+ ValueError,
+ re.escape(r'cannot pass a Python object to a non-Python component class'),
+ ):
+ bt2.QueryExecutor(cc, 'obj', method_obj=object()).query()
+