lib: make packets and packet messages optional, disabled by default
[babeltrace.git] / tests / data / plugins / flt.utils.trimmer / bt_plugin_trimmer_test.py
1 import bt2
2
3
4 class TheIteratorOfAllEvil(bt2._UserMessageIterator):
5 def __init__(self, port):
6 tc, sc, ec1, ec2, params = port.user_data
7 trace = tc()
8 stream = trace.create_stream(sc)
9
10 # Test with and without packets, once packets are optional.
11 packet = stream.create_packet()
12
13 if params['with-stream-msgs-cs']:
14 sb_msg = self._create_stream_beginning_message(stream, 100)
15 else:
16 sb_msg = self._create_stream_beginning_message(stream)
17
18 ev_msg1 = self._create_event_message(ec1, packet, 300)
19 ev_msg2 = self._create_event_message(ec2, packet, 400)
20
21 if params['with-stream-msgs-cs']:
22 se_msg = self._create_stream_end_message(stream, 1000)
23 else:
24 se_msg = self._create_stream_end_message(stream)
25
26 self._msgs = [
27 sb_msg,
28 self._create_packet_beginning_message(packet, 200),
29 ev_msg1,
30 ev_msg2,
31 self._create_packet_end_message(packet, 900),
32 se_msg,
33 ]
34 self._at = 0
35
36 def _seek_beginning(self):
37 self._at = 0
38
39 def __next__(self):
40 if self._at < len(self._msgs):
41 msg = self._msgs[self._at]
42 self._at += 1
43 return msg
44 else:
45 raise StopIteration
46
47 @bt2.plugin_component_class
48 class TheSourceOfAllEvil(bt2._UserSourceComponent,
49 message_iterator_class=TheIteratorOfAllEvil):
50 def __init__(self, params):
51 tc = self._create_trace_class()
52
53 # Use a clock class with an offset, so we can test with --begin or --end
54 # smaller than this offset (in other words, a time that it's not
55 # possible to represent with this clock class).
56 cc = self._create_clock_class(frequency=1, offset=bt2.ClockClassOffset(10000))
57 sc = tc.create_stream_class(default_clock_class=cc,
58 supports_packets=True,
59 packets_have_beginning_default_clock_snapshot=True,
60 packets_have_end_default_clock_snapshot=True)
61 ec1 = sc.create_event_class(name='event 1')
62 ec2 = sc.create_event_class(name='event 2')
63 self._add_output_port('out', (tc, sc, ec1, ec2, params))
64
65
66 bt2.register_plugin(__name__, 'test-trimmer')
This page took 0.02986 seconds and 4 git commands to generate.