-from bt2 import values
+
+#
+# Copyright (C) 2019 EfficiOS Inc.
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; only version 2
+# of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+#
+
+from bt2 import value
import collections
import unittest
import copy
import bt2
-@unittest.skip("this is broken")
class UserMessageIteratorTestCase(unittest.TestCase):
@staticmethod
def _create_graph(src_comp_cls):
def _consume(self):
next(self._msg_iter)
- def _port_connected(self, port, other_port):
- self._msg_iter = port.connection.create_message_iterator()
+ def _graph_is_configured(self):
+ self._msg_iter = self._input_ports['in'].create_message_iterator()
graph = bt2.Graph()
src_comp = graph.add_component(src_comp_cls, 'src')
return graph
def test_init(self):
+ the_output_port_from_source = None
+ the_output_port_from_iter = None
+
class MyIter(bt2._UserMessageIterator):
- def __init__(self):
+ def __init__(self, self_port_output):
nonlocal initialized
+ nonlocal the_output_port_from_iter
initialized = True
+ the_output_port_from_iter = self_port_output
class MySource(bt2._UserSourceComponent,
message_iterator_class=MyIter):
def __init__(self, params):
- self._add_output_port('out')
+ nonlocal the_output_port_from_source
+ the_output_port_from_source = self._add_output_port('out', 'user data')
initialized = False
graph = self._create_graph(MySource)
+ graph.run()
self.assertTrue(initialized)
+ self.assertEqual(the_output_port_from_source.addr, the_output_port_from_iter.addr)
+ self.assertEqual(the_output_port_from_iter.user_data, 'user data')
def test_finalize(self):
class MyIter(bt2._UserMessageIterator):
finalized = False
graph = self._create_graph(MySource)
+ graph.run()
del graph
self.assertTrue(finalized)
def test_component(self):
class MyIter(bt2._UserMessageIterator):
- def __init__(self):
+ def __init__(self, self_port_output):
nonlocal salut
salut = self._component._salut
salut = None
graph = self._create_graph(MySource)
+ graph.run()
self.assertEqual(salut, 23)
def test_addr(self):
class MyIter(bt2._UserMessageIterator):
- def __init__(self):
+ def __init__(self, self_port_output):
nonlocal addr
addr = self.addr
addr = None
graph = self._create_graph(MySource)
+ graph.run()
self.assertIsNotNone(addr)
self.assertNotEqual(addr, 0)
-@unittest.skip("this is broken")
-class PrivateConnectionMessageIteratorTestCase(unittest.TestCase):
- def test_component(self):
- class MyIter(bt2._UserMessageIterator):
- pass
-
- class MySource(bt2._UserSourceComponent,
- message_iterator_class=MyIter):
- def __init__(self, params):
- self._add_output_port('out')
-
- class MySink(bt2._UserSinkComponent):
- def __init__(self, params):
- self._add_input_port('in')
-
- def _consume(self):
- next(self._msg_iter)
-
- def _port_connected(self, port, other_port):
- nonlocal upstream_comp
- self._msg_iter = port.connection.create_message_iterator()
- upstream_comp = self._msg_iter.component
-
- upstream_comp = None
- graph = bt2.Graph()
- src_comp = graph.add_component(MySource, 'src')
- sink_comp = graph.add_component(MySink, 'sink')
- graph.connect_ports(src_comp.output_ports['out'],
- sink_comp.input_ports['in'])
- self.assertEqual(src_comp, upstream_comp)
- del upstream_comp
-
-
-@unittest.skip("this is broken")
class OutputPortMessageIteratorTestCase(unittest.TestCase):
def test_component(self):
class MyIter(bt2._UserMessageIterator):
- def __init__(self):
- self._build_meta()
+ def __init__(self, self_port_output):
self._at = 0
- def _build_meta(self):
- self._trace = bt2.Trace()
- self._sc = bt2.StreamClass()
- self._ec = bt2.EventClass('salut')
- self._my_int_fc = bt2.IntegerFieldClass(32)
- self._ec.payload_field_class = bt2.StructureFieldClass()
- self._ec.payload_field_class += collections.OrderedDict([
- ('my_int', self._my_int_fc),
- ])
- self._sc.add_event_class(self._ec)
- self._trace.add_stream_class(self._sc)
- self._stream = self._sc()
- self._packet = self._stream.create_packet()
-
- def _create_event(self, value):
- ev = self._ec()
- ev.payload_field['my_int'] = value
- ev.packet = self._packet
- return ev
-
def __next__(self):
- if self._at == 5:
+ if self._at == 7:
raise bt2.Stop
- msg = bt2.EventMessage(self._create_event(self._at * 3))
+ if self._at == 0:
+ msg = self._create_stream_beginning_message(test_obj._stream)
+ elif self._at == 1:
+ msg = self._create_packet_beginning_message(test_obj._packet)
+ elif self._at == 5:
+ msg = self._create_packet_end_message(test_obj._packet)
+ elif self._at == 6:
+ msg = self._create_stream_end_message(test_obj._stream)
+ else:
+ msg = self._create_event_message(test_obj._event_class, test_obj._packet)
+ msg.event.payload_field['my_int'] = self._at * 3
+
self._at += 1
return msg
def __init__(self, params):
self._add_output_port('out')
+ trace_class = self._create_trace_class()
+ stream_class = trace_class.create_stream_class()
+
+ # Create payload field class
+ my_int_ft = trace_class.create_signed_integer_field_class(32)
+ payload_ft = trace_class.create_structure_field_class()
+ payload_ft += collections.OrderedDict([
+ ('my_int', my_int_ft),
+ ])
+
+ event_class = stream_class.create_event_class(name='salut', payload_field_class=payload_ft)
+
+ trace = trace_class()
+ stream = trace.create_stream(stream_class)
+ packet = stream.create_packet()
+
+ test_obj._event_class = event_class
+ test_obj._stream = stream
+ test_obj._packet = packet
+
+ test_obj = self
graph = bt2.Graph()
src = graph.add_component(MySource, 'src')
- types = [bt2.EventMessage]
- msg_iter = src.output_ports['out'].create_message_iterator(types)
+ msg_iter = graph.create_output_port_message_iterator(src.output_ports['out'])
for at, msg in enumerate(msg_iter):
- self.assertIsInstance(msg, bt2.EventMessage)
- self.assertEqual(msg.event.event_class.name, 'salut')
- field = msg.event.payload_field['my_int']
- self.assertEqual(field, at * 3)
+ if at == 0:
+ self.assertIsInstance(msg, bt2.message._StreamBeginningMessage)
+ elif at == 1:
+ self.assertIsInstance(msg, bt2.message._PacketBeginningMessage)
+ elif at == 5:
+ self.assertIsInstance(msg, bt2.message._PacketEndMessage)
+ elif at == 6:
+ self.assertIsInstance(msg, bt2.message._StreamEndMessage)
+ else:
+ self.assertIsInstance(msg, bt2.message._EventMessage)
+ self.assertEqual(msg.event.cls.name, 'salut')
+ field = msg.event.payload_field['my_int']
+ self.assertEqual(field, at * 3)