-from bt2 import values
+from bt2 import value
import collections
import unittest
import copy
self._trace = bt2.Trace()
self._sc = bt2.StreamClass()
self._ec = bt2.EventClass('salut')
- self._my_int_ft = bt2.IntegerFieldType(32)
- self._ec.payload_field_type = bt2.StructureFieldType()
- self._ec.payload_field_type += collections.OrderedDict([
- ('my_int', self._my_int_ft),
+ self._my_int_fc = bt2.IntegerFieldClass(32)
+ self._ec.payload_field_class = bt2.StructureFieldClass()
+ self._ec.payload_field_class += collections.OrderedDict([
+ ('my_int', self._my_int_fc),
])
self._sc.add_event_class(self._ec)
self._trace.add_stream_class(self._sc)
self._trace = bt2.Trace()
self._sc = bt2.StreamClass()
self._ec = bt2.EventClass('salut')
- self._my_int_ft = bt2.IntegerFieldType(32)
- self._ec.payload_field_type = bt2.StructureFieldType()
- self._ec.payload_field_type += collections.OrderedDict([
- ('my_int', self._my_int_ft),
+ self._my_int_fc = bt2.IntegerFieldClass(32)
+ self._ec.payload_field_class = bt2.StructureFieldClass()
+ self._ec.payload_field_class += collections.OrderedDict([
+ ('my_int', self._my_int_fc),
])
self._sc.add_event_class(self._ec)
self._trace.add_stream_class(self._sc)
with self.assertRaises(bt2.TryAgain):
self._graph.run()
- def test_run_no_sink(self):
- class MyIter(bt2._UserMessageIterator):
- pass
-
- class MySource(bt2._UserSourceComponent,
- message_iterator_class=MyIter):
- def __init__(self, params):
- self._add_output_port('out')
-
- class MyFilter(bt2._UserFilterComponent,
- message_iterator_class=MyIter):
- def __init__(self, params):
- self._add_output_port('out')
- self._add_input_port('in')
-
- src = self._graph.add_component(MySource, 'src')
- flt = self._graph.add_component(MyFilter, 'flt')
- conn = self._graph.connect_ports(src.output_ports['out'],
- flt.input_ports['in'])
-
- with self.assertRaises(bt2.NoSinkComponent):
- self._graph.run()
-
def test_run_error(self):
class MyIter(bt2._UserMessageIterator):
def __init__(self):
self._trace = bt2.Trace()
self._sc = bt2.StreamClass()
self._ec = bt2.EventClass('salut')
- self._my_int_ft = bt2.IntegerFieldType(32)
- self._ec.payload_field_type = bt2.StructureFieldType()
- self._ec.payload_field_type += collections.OrderedDict([
- ('my_int', self._my_int_ft),
+ self._my_int_fc = bt2.IntegerFieldClass(32)
+ self._ec.payload_field_class = bt2.StructureFieldClass()
+ self._ec.payload_field_class += collections.OrderedDict([
+ ('my_int', self._my_int_fc),
])
self._sc.add_event_class(self._ec)
self._trace.add_stream_class(self._sc)