1 # SPDX-License-Identifier: GPL-2.0-only
3 # Copyright (C) 2019 EfficiOS Inc.
11 bt2
.register_plugin(__name__
, "test_exit_status")
14 class StatusIter(bt2
._UserMessageIterator
):
15 def __init__(self
, config
, output_port
):
16 self
.case
= output_port
.user_data
['case']
19 if self
.case
== "STOP":
21 if self
.case
== "INTERRUPTED":
22 os
.kill(os
.getpid(), signal
.SIGINT
)
24 # Wait until the graph is in the interrupted state.
26 for _
in range(timeout_s
* 10):
27 if self
._is
_interrupted
:
33 '{} was not interrupted after {} seconds'.format(
34 self
.__class
__.__name
__, timeout_s
38 elif self
.case
== "ERROR":
39 raise TypeError("Raising type error")
41 raise ValueError("Invalid parameter")
44 @bt2.plugin_component_class
45 class StatusSrc(bt2
._UserSourceComponent
, message_iterator_class
=StatusIter
):
46 def __init__(self
, config
, params
, obj
):
47 self
._add
_output
_port
("out", {'case': params
['case']})
This page took 0.029824 seconds and 4 git commands to generate.