Fix: sink.ctf.fs: fix logic of make_unique_stream_file_name
[babeltrace.git] / tests / data / plugins / sink.ctf.fs / stream-names / bt_plugin_foo.py
1 # SPDX-License-Identifier: GPL-2.0-only
2 #
3 # Copyright (C) 2020 EfficiOS Inc.
4 #
5
6 import bt2
7
8
9 class TheSourceIterator(bt2._UserMessageIterator):
10 def __init__(self, config, port):
11 tc, sc, ec = port.user_data
12
13 trace = tc()
14
15 # Make two streams with the same name, to verify the stream filenames
16 # are de-duplicated properly. Make one with the name "metadata" to
17 # verify the resulting data file is not named "metadata".
18 stream1 = trace.create_stream(sc, name='the-stream')
19 stream2 = trace.create_stream(sc, name='the-stream')
20 stream3 = trace.create_stream(sc, name='metadata')
21
22 self._msgs = [
23 self._create_stream_beginning_message(stream1),
24 self._create_stream_beginning_message(stream2),
25 self._create_stream_beginning_message(stream3),
26 self._create_event_message(ec, stream1),
27 self._create_event_message(ec, stream2),
28 self._create_event_message(ec, stream3),
29 self._create_stream_end_message(stream1),
30 self._create_stream_end_message(stream2),
31 self._create_stream_end_message(stream3),
32 ]
33
34 def __next__(self):
35 if len(self._msgs) == 0:
36 raise StopIteration
37
38 return self._msgs.pop(0)
39
40
41 @bt2.plugin_component_class
42 class TheSource(bt2._UserSourceComponent, message_iterator_class=TheSourceIterator):
43 def __init__(self, config, params, obj):
44 tc = self._create_trace_class()
45 sc = tc.create_stream_class()
46 ec = sc.create_event_class(name='the-event')
47 self._add_output_port('out', user_data=(tc, sc, ec))
48
49
50 bt2.register_plugin(__name__, "foo")
This page took 0.030171 seconds and 4 git commands to generate.