- def setUp(self):
- self._ec = self._create_ec()
-
- def tearDown(self):
- del self._ec
-
- def _create_ec(self, with_eh=True, with_sec=True, with_ec=True, with_ep=True):
- # event header
- if with_eh:
- eh = bt2.StructureFieldType()
- eh += OrderedDict((
- ('id', bt2.IntegerFieldType(8)),
- ('ts', bt2.IntegerFieldType(32)),
- ))
- else:
- eh = None
-
- # stream event context
- if with_sec:
- sec = bt2.StructureFieldType()
- sec += OrderedDict((
- ('cpu_id', bt2.IntegerFieldType(8)),
- ('stuff', bt2.FloatingPointNumberFieldType()),
- ))
- else:
- sec = None
-
- # packet context
- pc = bt2.StructureFieldType()
- pc += OrderedDict((
- ('something', bt2.IntegerFieldType(8)),
- ('something_else', bt2.FloatingPointNumberFieldType()),
- ))
-
- # stream class
- sc = bt2.StreamClass()
- sc.packet_context_field_type = pc
- sc.event_header_field_type = eh
- sc.event_context_field_type = sec
-
- # event context
- if with_ec:
- ec = bt2.StructureFieldType()
- ec += OrderedDict((
- ('ant', bt2.IntegerFieldType(16, is_signed=True)),
- ('msg', bt2.StringFieldType()),
- ))
- else:
- ec = None
-
- # event payload
- if with_ep:
- ep = bt2.StructureFieldType()
- ep += OrderedDict((
- ('giraffe', bt2.IntegerFieldType(32)),
- ('gnu', bt2.IntegerFieldType(8)),
- ('mosquito', bt2.IntegerFieldType(8)),
- ))
- else:
- ep = None
-
- # event class
- event_class = bt2.EventClass('ec')
- event_class.context_field_type = ec
- event_class.payload_field_type = ep
- sc.add_event_class(event_class)
- return event_class
+ def _create_test_event_message(
+ self,
+ packet_fields_config=None,
+ event_fields_config=None,
+ with_clockclass=False,
+ with_cc=False,
+ with_sc=False,
+ with_ep=False,
+ with_packet=False,
+ ):
+ class MyIter(bt2._UserMessageIterator):
+ def __init__(self, self_output_port):
+ self._at = 0
+ self._msgs = [self._create_stream_beginning_message(test_obj.stream)]
+
+ if with_packet:
+ assert test_obj.packet
+ self._msgs.append(
+ self._create_packet_beginning_message(test_obj.packet)
+ )
+
+ default_clock_snapshot = 789 if with_clockclass else None
+
+ if with_packet:
+ assert test_obj.packet
+ ev_parent = test_obj.packet
+ else:
+ assert test_obj.stream
+ ev_parent = test_obj.stream
+
+ msg = self._create_event_message(
+ test_obj.event_class, ev_parent, default_clock_snapshot
+ )
+
+ if event_fields_config is not None:
+ event_fields_config(msg.event)
+
+ self._msgs.append(msg)
+
+ if with_packet:
+ self._msgs.append(self._create_packet_end_message(test_obj.packet))
+
+ self._msgs.append(self._create_stream_end_message(test_obj.stream))
+
+ def __next__(self):
+ if self._at == len(self._msgs):
+ raise bt2.Stop
+
+ msg = self._msgs[self._at]
+ self._at += 1
+ return msg
+
+ class MySrc(bt2._UserSourceComponent, message_iterator_class=MyIter):
+ def __init__(self, params, obj):
+ self._add_output_port('out')
+ tc = self._create_trace_class()
+
+ clock_class = None
+ if with_clockclass:
+ clock_class = self._create_clock_class(frequency=1000)
+
+ # event common context (stream-class-defined)
+ cc = None
+ if with_cc:
+ cc = tc.create_structure_field_class()
+ cc += [
+ ('cpu_id', tc.create_signed_integer_field_class(8)),
+ ('stuff', tc.create_real_field_class()),
+ ]
+
+ # packet context (stream-class-defined)
+ pc = None
+
+ if with_packet:
+ pc = tc.create_structure_field_class()
+ pc += [
+ ('something', tc.create_unsigned_integer_field_class(8)),
+ ('something_else', tc.create_real_field_class()),
+ ]
+
+ stream_class = tc.create_stream_class(
+ default_clock_class=clock_class,
+ event_common_context_field_class=cc,
+ packet_context_field_class=pc,
+ supports_packets=with_packet,
+ )
+
+ # specific context (event-class-defined)
+ sc = None
+ if with_sc:
+ sc = tc.create_structure_field_class()
+ sc += [
+ ('ant', tc.create_signed_integer_field_class(16)),
+ ('msg', tc.create_string_field_class()),
+ ]
+
+ # event payload
+ ep = None
+ if with_ep:
+ ep = tc.create_structure_field_class()
+ ep += [
+ ('giraffe', tc.create_signed_integer_field_class(32)),
+ ('gnu', tc.create_signed_integer_field_class(8)),
+ ('mosquito', tc.create_signed_integer_field_class(8)),
+ ]
+
+ event_class = stream_class.create_event_class(
+ name='garou',
+ specific_context_field_class=sc,
+ payload_field_class=ep,
+ )
+
+ trace = tc()
+ stream = trace.create_stream(stream_class)
+
+ if with_packet:
+ packet = stream.create_packet()
+
+ if packet_fields_config is not None:
+ assert packet
+ packet_fields_config(packet)
+
+ if with_packet:
+ test_obj.packet = packet
+
+ test_obj.stream = stream
+ test_obj.event_class = event_class
+
+ test_obj = self
+ self._graph = bt2.Graph()
+ self._src_comp = self._graph.add_component(MySrc, 'my_source')
+ self._msg_iter = TestOutputPortMessageIterator(
+ self._graph, self._src_comp.output_ports['out']
+ )
+
+ for msg in self._msg_iter:
+ if type(msg) is bt2._EventMessage:
+ return msg