From: Jérémie Galarneau Date: Fri, 5 Jun 2020 19:55:54 +0000 (-0400) Subject: Tests: src.ctf.lttng-live: add support for beacon injections X-Git-Url: http://git.efficios.com/?p=babeltrace.git;a=commitdiff_plain;h=71f56e5f59aa8f94f19b4311408ad04cf9519807 Tests: src.ctf.lttng-live: add support for beacon injections Add a 'beacons' field to the sessions configuration accepted by lttng_live_server.py. 'beacons' is expected to be a list of clock snapshots (in cycles) at which live inactivity beacons should be sent to a live client. From a protocol standpoint, an inactivity beacon is a "special" type of index entry. They are inserted in the index list (_entries) so that they can simply be sent in response to a LttngLiveViewerGetNextDataStreamIndexEntryCommand in the same way typical index entries are processed. Signed-off-by: Jérémie Galarneau Change-Id: I1f86cf369a5115af6787e9a9d52469d16f1ab3aa Reviewed-on: https://review.lttng.org/c/babeltrace/+/3618 Tested-by: jenkins Reviewed-by: Philippe Proulx --- diff --git a/tests/data/plugins/src.ctf.lttng-live/lttng_live_server.py b/tests/data/plugins/src.ctf.lttng-live/lttng_live_server.py index 6b674666..9bff6086 100644 --- a/tests/data/plugins/src.ctf.lttng-live/lttng_live_server.py +++ b/tests/data/plugins/src.ctf.lttng-live/lttng_live_server.py @@ -515,23 +515,39 @@ class _LttngLiveViewerProtocolCodec: for info in reply.stream_infos: data += self._encode_stream_info(info) elif type(reply) is _LttngLiveViewerGetNextDataStreamIndexEntryReply: + index_format = 'QQQQQQQII' entry = reply.index_entry flags = self._get_has_new_stuff_flags( reply.has_new_metadata, reply.has_new_data_stream ) - data = self._pack( - 'QQQQQQQII', - entry.offset_bytes, - entry.total_size_bits, - entry.content_size_bits, - entry.timestamp_begin, - entry.timestamp_end, - entry.events_discarded, - entry.stream_class_id, - reply.status, - flags, - ) + if type(entry) is _LttngDataStreamIndexEntry: + data = self._pack( + index_format, + entry.offset_bytes, + entry.total_size_bits, + entry.content_size_bits, + entry.timestamp_begin, + entry.timestamp_end, + entry.events_discarded, + entry.stream_class_id, + reply.status, + flags, + ) + else: + assert type(entry) is _LttngDataStreamBeaconEntry + data = self._pack( + index_format, + 0, + 0, + 0, + 0, + entry.timestamp, + 0, + entry.stream_class_id, + reply.status, + flags, + ) elif type(reply) is _LttngLiveViewerGetDataStreamPacketDataReply: flags = self._get_has_new_stuff_flags( reply.has_new_metadata, reply.has_new_data_stream @@ -615,11 +631,36 @@ class _LttngDataStreamIndexEntry: return self._stream_class_id +# An entry within the index of an LTTng data stream. While a stream beacon entry +# is conceptually unrelated to an index, it is sent as a reply to a +# LttngLiveViewerGetNextDataStreamIndexEntryCommand +class _LttngDataStreamBeaconEntry: + def __init__(self, stream_class_id, timestamp): + self._stream_class_id = stream_class_id + self._timestamp = timestamp + + @property + def timestamp(self): + return self._timestamp + + @property + def stream_class_id(self): + return self._stream_class_id + + # The index of an LTTng data stream, a sequence of index entries. class _LttngDataStreamIndex(collections.abc.Sequence): - def __init__(self, path): + def __init__(self, path, beacons): self._path = path self._build() + + if beacons: + stream_class_id = self._entries[0].stream_class_id + beacons = [ + _LttngDataStreamBeaconEntry(stream_class_id, ts) for ts in beacons + ] + self._add_beacons(beacons) + logging.info( 'Built data stream index entries: path="{}", count={}'.format( path, len(self._entries) @@ -683,6 +724,17 @@ class _LttngDataStreamIndex(collections.abc.Sequence): # Skip anything else before the next entry f.seek(index_entry_length - size, os.SEEK_CUR) + def _add_beacons(self, beacons): + # Assumes entries[n + 1].timestamp_end >= entries[n].timestamp_begin + def sort_key(entry): + if type(entry) is _LttngDataStreamBeaconEntry: + return entry.timestamp + else: + return entry.timestamp_end + + self._entries += beacons + self._entries.sort(key=sort_key) + def __getitem__(self, index): return self._entries[index] @@ -696,14 +748,14 @@ class _LttngDataStreamIndex(collections.abc.Sequence): # An LTTng data stream. class _LttngDataStream: - def __init__(self, path): + def __init__(self, path, beacons): self._path = path filename = os.path.basename(path) match = re.match(r'(.*)_\d+', filename) self._channel_name = match.group(1) trace_dir = os.path.dirname(path) index_path = os.path.join(trace_dir, 'index', filename + '.idx') - self._index = _LttngDataStreamIndex(index_path) + self._index = _LttngDataStreamIndex(index_path, beacons) assert os.path.isfile(path) self._file = open(path, 'rb') logging.info( @@ -749,16 +801,16 @@ class _LttngMetadataStream: # An LTTng trace, a sequence of LTTng data streams. class LttngTrace(collections.abc.Sequence): - def __init__(self, trace_dir): + def __init__(self, trace_dir, beacons): assert os.path.isdir(trace_dir) self._path = trace_dir self._metadata_stream = _LttngMetadataStream( os.path.join(trace_dir, 'metadata') ) - self._create_data_streams(trace_dir) + self._create_data_streams(trace_dir, beacons) logging.info('Built trace: path="{}"'.format(trace_dir)) - def _create_data_streams(self, trace_dir): + def _create_data_streams(self, trace_dir, beacons): data_stream_paths = [] for filename in os.listdir(trace_dir): @@ -779,7 +831,15 @@ class LttngTrace(collections.abc.Sequence): self._data_streams = [] for data_stream_path in data_stream_paths: - self._data_streams.append(_LttngDataStream(data_stream_path)) + stream_name = os.path.basename(data_stream_path) + this_stream_beacons = None + + if beacons is not None and stream_name in beacons: + this_stream_beacons = beacons[stream_name] + + self._data_streams.append( + _LttngDataStream(data_stream_path, this_stream_beacons) + ) @property def path(self): @@ -1076,7 +1136,12 @@ class _LttngLiveViewerSession: # The viewer only checks the `has_new_metadata` flag if the # reply's status is `OK`, so we need to provide an index here has_new_metadata = stream_state.tracing_session_state.has_new_metadata - status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.OK + if type(stream_state.cur_index_entry) is _LttngDataStreamIndexEntry: + status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.OK + else: + assert type(stream_state.cur_index_entry) is _LttngDataStreamBeaconEntry + status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.INACTIVE + reply = _LttngLiveViewerGetNextDataStreamIndexEntryReply( status, stream_state.cur_index_entry, has_new_metadata, False ) @@ -1385,7 +1450,10 @@ def _session_descriptors_from_path(sessions_filename, trace_path_prefix): # "path": "lol" # }, # { - # "path": "meow/mix" + # "path": "meow/mix", + # "beacons": { + # "my_stream": [ 5235787, 728375283 ] + # } # } # ] # } @@ -1404,12 +1472,13 @@ def _session_descriptors_from_path(sessions_filename, trace_path_prefix): traces = [] for trace in session['traces']: + beacons = trace.get('beacons') path = trace['path'] if not os.path.isabs(path): path = os.path.join(trace_path_prefix, path) - traces.append(LttngTrace(path)) + traces.append(LttngTrace(path, beacons)) sessions.append( LttngTracingSessionDescriptor(