Fix: src.ctf.fs: initialize the other_entry variable
[babeltrace.git] / tests / data / cli / exit_status / bt_plugin_test_cli_exit_status.py
CommitLineData
b5cd7d65
FD
1import bt2
2import signal
3import os
4import time
5
6bt2.register_plugin(__name__, "test_exit_status")
7
8
9class StatusIter(bt2._UserMessageIterator):
10 def __init__(self, config, output_port):
11 self.case = output_port.user_data['case']
12
13 def __next__(self):
14 if self.case == "STOP":
15 raise bt2.Stop()
16 if self.case == "INTERRUPTED":
17 os.kill(os.getpid(), signal.SIGINT)
18
19 # Wait until the graph is in the interrupted state.
20 timeout_s = 10
21 for _ in range(timeout_s * 10):
22 if self._is_interrupted:
23 raise bt2.TryAgain()
24
25 time.sleep(0.1)
26
27 raise Exception(
28 '{} was not interrupted after {} seconds'.format(
29 self.__class__.__name__, timeout_s
30 )
31 )
32
33 elif self.case == "ERROR":
34 raise TypeError("Raising type error")
35 else:
36 raise ValueError("Invalid parameter")
37
38
39@bt2.plugin_component_class
40class StatusSrc(bt2._UserSourceComponent, message_iterator_class=StatusIter):
41 def __init__(self, config, params, obj):
42 self._add_output_port("out", {'case': params['case']})
This page took 0.029187 seconds and 4 git commands to generate.