Tests babeltrace: adapt python tests to use unittest
[babeltrace.git] / tests / bindings / python / babeltrace / test_ctf_writer_empty_packet.py
1 #!/usr/bin/env python3
2 #
3 # The MIT License (MIT)
4 #
5 # Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
6 #
7 # Permission is hereby granted, free of charge, to any person obtaining a copy
8 # of this software and associated documentation files (the "Software"), to deal
9 # in the Software without restriction, including without limitation the rights
10 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11 # copies of the Software, and to permit persons to whom the Software is
12 # furnished to do so, subject to the following conditions:
13 #
14 # The above copyright notice and this permission notice shall be included in
15 # all copies or substantial portions of the Software.
16 #
17 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
23 # SOFTWARE.
24
25 import tempfile
26 import babeltrace.writer as btw
27 import babeltrace.reader as btr
28 import shutil
29 import uuid
30 import unittest
31
32 class CtfWriterEmptyPacketTestCase(unittest.TestCase):
33 def setUp(self):
34 self._expected_event_count = 100
35 self._trace_path = tempfile.mkdtemp()
36
37 def tearDown(self):
38 shutil.rmtree(self._trace_path)
39
40 def _create_trace(self):
41 trace = btw.Writer(self._trace_path)
42 clock = btw.Clock('test_clock')
43 trace.add_clock(clock)
44
45 integer_field_type = btw.IntegerFieldDeclaration(32)
46
47 event_class = btw.EventClass('simple_event')
48 event_class.add_field(integer_field_type, 'int_field')
49
50 stream_class = btw.StreamClass('empty_packet_stream')
51 stream_class.add_event_class(event_class)
52 stream_class.clock = clock
53
54 stream = trace.create_stream(stream_class)
55
56 for i in range(self._expected_event_count):
57 event = btw.Event(event_class)
58 event.payload('int_field').value = i
59 stream.append_event(event)
60 stream.flush()
61
62 # The CTF writer will not be able to populate the packet context's
63 # timestamp_begin and timestamp_end fields if it is asked to flush
64 # without any queued events.
65 with self.assertRaises(ValueError):
66 stream.flush()
67
68 packet_context = stream.packet_context
69 packet_context.field('timestamp_begin').value = 1
70 packet_context.field('timestamp_end').value = 123456
71
72 stream.flush()
73
74 def test_trace_empty_packet(self):
75 self._create_trace()
76
77 traces = btr.TraceCollection()
78 trace_handle = traces.add_trace(self._trace_path, 'ctf')
79 self.assertIsNotNone(trace_handle)
80
81 event_count = sum(1 for event in traces.events)
82 self.assertEqual(self._expected_event_count, event_count)
This page took 0.029747 seconds and 4 git commands to generate.