Tests babeltrace: adapt python tests to use unittest
[babeltrace.git] / tests / bindings / python / babeltrace / test_ctf_writer_empty_packet.py
CommitLineData
2fafa4c1
JG
1#!/usr/bin/env python3
2#
3# The MIT License (MIT)
4#
5# Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
6#
7# Permission is hereby granted, free of charge, to any person obtaining a copy
8# of this software and associated documentation files (the "Software"), to deal
9# in the Software without restriction, including without limitation the rights
10# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11# copies of the Software, and to permit persons to whom the Software is
12# furnished to do so, subject to the following conditions:
13#
14# The above copyright notice and this permission notice shall be included in
15# all copies or substantial portions of the Software.
16#
17# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
23# SOFTWARE.
24
2fafa4c1
JG
25import tempfile
26import babeltrace.writer as btw
27import babeltrace.reader as btr
28import shutil
29import uuid
295f4581 30import unittest
2fafa4c1 31
295f4581
JG
32class CtfWriterEmptyPacketTestCase(unittest.TestCase):
33 def setUp(self):
34 self._expected_event_count = 100
35 self._trace_path = tempfile.mkdtemp()
2fafa4c1 36
295f4581
JG
37 def tearDown(self):
38 shutil.rmtree(self._trace_path)
2fafa4c1 39
295f4581
JG
40 def _create_trace(self):
41 trace = btw.Writer(self._trace_path)
2fafa4c1
JG
42 clock = btw.Clock('test_clock')
43 trace.add_clock(clock)
44
45 integer_field_type = btw.IntegerFieldDeclaration(32)
46
47 event_class = btw.EventClass('simple_event')
48 event_class.add_field(integer_field_type, 'int_field')
49
50 stream_class = btw.StreamClass('empty_packet_stream')
51 stream_class.add_event_class(event_class)
52 stream_class.clock = clock
53
54 stream = trace.create_stream(stream_class)
55
295f4581
JG
56 for i in range(self._expected_event_count):
57 event = btw.Event(event_class)
58 event.payload('int_field').value = i
59 stream.append_event(event)
2fafa4c1 60 stream.flush()
2fafa4c1
JG
61
62 # The CTF writer will not be able to populate the packet context's
63 # timestamp_begin and timestamp_end fields if it is asked to flush
64 # without any queued events.
295f4581 65 with self.assertRaises(ValueError):
2fafa4c1 66 stream.flush()
2fafa4c1 67
2fafa4c1
JG
68 packet_context = stream.packet_context
69 packet_context.field('timestamp_begin').value = 1
70 packet_context.field('timestamp_end').value = 123456
2fafa4c1 71
295f4581 72 stream.flush()
2fafa4c1 73
295f4581
JG
74 def test_trace_empty_packet(self):
75 self._create_trace()
2fafa4c1 76
295f4581
JG
77 traces = btr.TraceCollection()
78 trace_handle = traces.add_trace(self._trace_path, 'ctf')
79 self.assertIsNotNone(trace_handle)
2fafa4c1 80
295f4581
JG
81 event_count = sum(1 for event in traces.events)
82 self.assertEqual(self._expected_event_count, event_count)
This page took 0.034117 seconds and 4 git commands to generate.