1 # SPDX-License-Identifier: GPL-2.0-only
3 # Copyright (C) 2019 EfficiOS Inc.
12 bt2
.register_plugin(__name__
, "test_exit_status")
15 class StatusIter(bt2
._UserMessageIterator
):
16 def __init__(self
, config
, output_port
):
17 self
.case
= output_port
.user_data
["case"]
20 if self
.case
== "STOP":
22 if self
.case
== "INTERRUPTED":
23 os
.kill(os
.getpid(), signal
.SIGINT
)
25 # Wait until the graph is in the interrupted state.
27 for _
in range(timeout_s
* 10):
28 if self
._is
_interrupted
:
34 "{} was not interrupted after {} seconds".format(
35 self
.__class
__.__name
__, timeout_s
39 elif self
.case
== "ERROR":
40 raise TypeError("Raising type error")
42 raise ValueError("Invalid parameter")
45 @bt2.plugin_component_class
46 class StatusSrc(bt2
._UserSourceComponent
, message_iterator_class
=StatusIter
):
47 def __init__(self
, config
, params
, obj
):
48 self
._add
_output
_port
("out", {"case": params
["case"]})
This page took 0.035814 seconds and 4 git commands to generate.