Move to kernel style SPDX license identifiers
[babeltrace.git] / tests / data / cli / exit_status / bt_plugin_test_cli_exit_status.py
1 # SPDX-License-Identifier: GPL-2.0-only
2 #
3 # Copyright (C) 2019 EfficiOS Inc.
4 #
5
6 import bt2
7 import signal
8 import os
9 import time
10
11 bt2.register_plugin(__name__, "test_exit_status")
12
13
14 class StatusIter(bt2._UserMessageIterator):
15 def __init__(self, config, output_port):
16 self.case = output_port.user_data['case']
17
18 def __next__(self):
19 if self.case == "STOP":
20 raise bt2.Stop()
21 if self.case == "INTERRUPTED":
22 os.kill(os.getpid(), signal.SIGINT)
23
24 # Wait until the graph is in the interrupted state.
25 timeout_s = 10
26 for _ in range(timeout_s * 10):
27 if self._is_interrupted:
28 raise bt2.TryAgain()
29
30 time.sleep(0.1)
31
32 raise Exception(
33 '{} was not interrupted after {} seconds'.format(
34 self.__class__.__name__, timeout_s
35 )
36 )
37
38 elif self.case == "ERROR":
39 raise TypeError("Raising type error")
40 else:
41 raise ValueError("Invalid parameter")
42
43
44 @bt2.plugin_component_class
45 class StatusSrc(bt2._UserSourceComponent, message_iterator_class=StatusIter):
46 def __init__(self, config, params, obj):
47 self._add_output_port("out", {'case': params['case']})
This page took 0.029547 seconds and 4 git commands to generate.