5367fc9ce25b1a43338a1a8a55681e50c6d8a37c
6 bt2
.register_plugin(__name__
, "test_exit_status")
9 class StatusIter(bt2
._UserMessageIterator
):
10 def __init__(self
, config
, output_port
):
11 self
.case
= output_port
.user_data
['case']
14 if self
.case
== "STOP":
16 if self
.case
== "INTERRUPTED":
17 os
.kill(os
.getpid(), signal
.SIGINT
)
19 # Wait until the graph is in the interrupted state.
21 for _
in range(timeout_s
* 10):
22 if self
._is
_interrupted
:
28 '{} was not interrupted after {} seconds'.format(
29 self
.__class
__.__name
__, timeout_s
33 elif self
.case
== "ERROR":
34 raise TypeError("Raising type error")
36 raise ValueError("Invalid parameter")
39 @bt2.plugin_component_class
40 class StatusSrc(bt2
._UserSourceComponent
, message_iterator_class
=StatusIter
):
41 def __init__(self
, config
, params
, obj
):
42 self
._add
_output
_port
("out", {'case': params
['case']})
This page took 0.029848 seconds and 3 git commands to generate.