From 3c22c122c56f58b7055d913b3901e7e8f48dded7 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=A9mie=20Galarneau?= Date: Mon, 15 Jan 2024 18:37:59 -0500 Subject: [PATCH] Tests: lttng_live_server: support trace creation timestamps MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Add support for a per-trace "creation time" as part of the live server scenarios. The objective of this change is to simulate situations where streams are created and declared to the live client while other streams are already being consumed. The approach taken is to add a 'creation-timestamp' property to the trace objects expressed in the scenario files. The creation timestamp, when left unspecified is implicitly at the origin of the timeline. Streams that exist when a client attaches to a live tracing session are announced as part of the reply of _LttngLiveViewerAttachToTracingSessionCommand. When the creation timestamp is not at the origin, the live server starts expressing the presence of new available streams once it delivers index entries that start at, or after, the trace creation timestamp. As part of the live protocol, newly-available streams are expressed by setting the 'NEW_STREAM' flag as part of the 'flags' of a GetNextIndex command's reply payload. Hence, when handling the _LttngLiveViewerGetNextDataStreamIndexEntryCommand, the server checks for unannounced streams that have a creation timestamp lesser or equal to the beginning timestamp of the index entry being delivered. If any such streams are available, the 'NEW_STREAMS' flag is set as part of the reply's flags. When the 'NEW_STREAMS' flag is set, the client is expected to eventually issue a _LttngLiveViewerGetNewStreamInfosCommand to list any previously unannounced stream. The _LttngLiveViewerGetNewStreamInfosCommand handler was previously stubbed-out to only reply with the 'HUP' status (meaning the client should hang-up) since the command was never used to announce new streams. This doesn't mean that the command was unused. When a live viewer session has reached a point where all of its streams have been torn-down (no more data to consume), the live viewer periodically polls the live server with the 'GetNewStreamInfos' command to resume consumption when/if new streams appear. Since the command's handler always returned with the "hang-up" status, the live client would always gracefully exit at that point as it had consumed all of the available data provided by the various tests. The handler now implements the command to announce new streams, when those are available. Thus, multiple statuses can now be returned: - OK: new streams are announced as part of the reply - NO_NEW: no new streams are available at this point, but there are still index entries (and data) left to consume as part of the current scenario's traces - HUP: all of the current scenario's streams no longer have index entries left to be delivered and all streams have been announced to the client. In terms of the live protocol, the sequence of events leading to the use of this command is: - The client issues LTTNG_VIEWER_GET_NEXT_INDEX to receive a new index entry - There is a new stream with a creation time <= to the next available index: announce it to the client by replying with an "inactivity beacon" (an empty index entry) + the LTTNG_VIEWER_FLAG_NEW_STREAM status flag - In response to the LTTNG_VIEWER_FLAG_NEW_STREAM flag's presence, the client asks for new streams by issueing the LTTNG_VIEWER_GET_NEW_STREAMS command - The server replies with a description of the new stream(s) containing: - channel name - relative path of the file - 'is_metadata' flag - stream id - trace id (which may not have existed up to this point) - The client then issues LTTNG_VIEWER_GET_METADATA to get any new metadata for the various traces it now tracks. In the case where a new trace was announced (a new trace id), this will be the moment when the client gets an initial fragment of metadata to initialize the trace class hierarchy (see lttng-live.cpp:481) Notes ----- The `is_sent` property of stream states is renamed to `all_data_is_sent` as it felt ambiguous with regards to the new `is_announced` property which expresses whether or not a stream has been made visible to the live client. Change-Id: Iecb86917b444f5068a445e8481749d34ad3b07c7 Signed-off-by: Jérémie Galarneau Reviewed-on: https://review.lttng.org/c/babeltrace/+/11687 Reviewed-by: Philippe Proulx --- .../src.ctf.lttng-live/lttng_live_server.py | 249 ++++++++++++++---- 1 file changed, 196 insertions(+), 53 deletions(-) diff --git a/tests/data/plugins/src.ctf.lttng-live/lttng_live_server.py b/tests/data/plugins/src.ctf.lttng-live/lttng_live_server.py index 3a3c565e..e1101137 100644 --- a/tests/data/plugins/src.ctf.lttng-live/lttng_live_server.py +++ b/tests/data/plugins/src.ctf.lttng-live/lttng_live_server.py @@ -14,6 +14,7 @@ import logging import os.path import argparse import tempfile +from abc import ABC, abstractmethod from typing import Dict, Union, Iterable, Optional, Sequence, overload import tjson @@ -799,9 +800,23 @@ class _LttngDataStreamIndex(Sequence[_LttngIndexEntryT]): return self._path +# Any LTTng stream (metadata or data). +class _LttngStream(ABC): + @abstractmethod + def __init__(self, creation_timestamp: int): + self._creation_timestamp: int = creation_timestamp + + @property + def creation_timestamp(self): + return self._creation_timestamp + + # An LTTng data stream. -class _LttngDataStream: - def __init__(self, path: str, beacons_json: Optional[tjson.ArrayVal]): +class _LttngDataStream(_LttngStream): + def __init__( + self, path: str, beacons_json: Optional[tjson.ArrayVal], creation_timestamp: int + ): + super().__init__(creation_timestamp) self._path = path filename = os.path.basename(path) match = re.match(r"(.*)_\d+", filename) @@ -862,12 +877,14 @@ class _LttngMetadataStreamSection: # An LTTng metadata stream. -class _LttngMetadataStream: +class _LttngMetadataStream(_LttngStream): def __init__( self, metadata_file_path: str, config_sections: Sequence[_LttngMetadataStreamSection], + creation_timestamp: int, ): + super().__init__(creation_timestamp) self._path = metadata_file_path self._sections = config_sections logging.info( @@ -913,8 +930,8 @@ def _parse_metadata_sections_config(metadata_sections_json: tjson.ArrayVal): for section in metadata_sections_json: if isinstance(section, tjson.StrVal): if section.val == "empty": - # Found a empty section marker. Actually append the section at the - # timestamp of the next concrete section. + # Found an empty section marker. Actually append the + # section at the timestamp of the next concrete section. append_empty_section = True else: raise ValueError("Invalid string value at {}.".format(section.path)) @@ -1018,8 +1035,10 @@ class LttngTrace(Sequence[_LttngDataStream]): trace_dir: str, metadata_sections_json: Optional[tjson.ArrayVal], beacons_json: Optional[tjson.ObjVal], + creation_timestamp: int, ): self._path = trace_dir + self._creation_timestamp = creation_timestamp self._create_metadata_stream(trace_dir, metadata_sections_json) self._create_data_streams(trace_dir, beacons_json) logging.info('Built trace: path="{}"'.format(trace_dir)) @@ -1053,7 +1072,9 @@ class LttngTrace(Sequence[_LttngDataStream]): this_beacons_json = beacons_json.at(stream_name, tjson.ArrayVal) self._data_streams.append( - _LttngDataStream(data_stream_path, this_beacons_json) + _LttngDataStream( + data_stream_path, this_beacons_json, self._creation_timestamp + ) ) def _create_metadata_stream( @@ -1072,7 +1093,9 @@ class LttngTrace(Sequence[_LttngDataStream]): metadata_path, metadata_sections_json ) - self._metadata_stream = _LttngMetadataStream(metadata_path, metadata_sections) + self._metadata_stream = _LttngMetadataStream( + metadata_path, metadata_sections, self.creation_timestamp + ) @property def path(self): @@ -1082,6 +1105,10 @@ class LttngTrace(Sequence[_LttngDataStream]): def metadata_stream(self): return self._metadata_stream + @property + def creation_timestamp(self): + return self._creation_timestamp + @overload def __getitem__(self, index: int) -> _LttngDataStream: ... @@ -1099,8 +1126,31 @@ class LttngTrace(Sequence[_LttngDataStream]): return len(self._data_streams) +# Stream (metadata or data) state specific to the LTTng live protocol. +class _LttngLiveViewerSessionStreamState: + @abstractmethod + def __init__(self): + # A stream is considered "announced" when it has been returned + # to the LTTng live client in response to a "get new stream + # infos" (`_LttngLiveViewerGetNewStreamInfosCommand`) command. + self._announced = False # type: bool + pass + + @property + def is_announced(self): + return self._announced + + def mark_as_announced(self): + self._announced = True + + @property + @abstractmethod + def stream(self) -> _LttngStream: + pass + + # The state of a single data stream. -class _LttngLiveViewerSessionDataStreamState: +class _LttngLiveViewerSessionDataStreamState(_LttngLiveViewerSessionStreamState): def __init__( self, ts_state: "_LttngLiveViewerSessionTracingSessionState", @@ -1108,6 +1158,7 @@ class _LttngLiveViewerSessionDataStreamState: data_stream: _LttngDataStream, metadata_stream_id: int, ): + super().__init__() self._ts_state = ts_state self._info = info self._data_stream = data_stream @@ -1132,7 +1183,7 @@ class _LttngLiveViewerSessionDataStreamState: return self._info @property - def data_stream(self): + def stream(self): return self._data_stream @property @@ -1151,13 +1202,14 @@ class _LttngLiveViewerSessionDataStreamState: # The state of a single metadata stream. -class _LttngLiveViewerSessionMetadataStreamState: +class _LttngLiveViewerSessionMetadataStreamState(_LttngLiveViewerSessionStreamState): def __init__( self, ts_state: "_LttngLiveViewerSessionTracingSessionState", info: _LttngLiveViewerStreamInfo, metadata_stream: _LttngMetadataStream, ): + super().__init__() self._ts_state = ts_state self._info = info self._metadata_stream = metadata_stream @@ -1169,7 +1221,7 @@ class _LttngLiveViewerSessionMetadataStreamState: else: self._next_metadata_stream_section_timestamp = None - self._is_sent = False + self._all_data_is_sent = False fmt = 'Built metadata stream state: id={}, ts-id={}, ts-name="{}", path="{}"' logging.info( fmt.format( @@ -1185,16 +1237,16 @@ class _LttngLiveViewerSessionMetadataStreamState: return self._info @property - def metadata_stream(self): + def stream(self): return self._metadata_stream @property - def is_sent(self): - return self._is_sent + def all_data_is_sent(self): + return self._all_data_is_sent - @is_sent.setter - def is_sent(self, value: bool): - self._is_sent = value + @all_data_is_sent.setter + def all_data_is_sent(self, value: bool): + self._all_data_is_sent = value @property def cur_section(self): @@ -1262,41 +1314,61 @@ class LttngTracingSessionDescriptor: class _LttngLiveViewerSessionTracingSessionState: def __init__(self, tc_descr: LttngTracingSessionDescriptor, base_stream_id: int): self._tc_descr = tc_descr - self._stream_infos = [] # type: list[_LttngLiveViewerStreamInfo] + self._client_visible_stream_infos = [] # type: list[_LttngLiveViewerStreamInfo] self._ds_states = {} # type: dict[int, _LttngLiveViewerSessionDataStreamState] self._ms_states = ( {} ) # type: dict[int, _LttngLiveViewerSessionMetadataStreamState] - stream_id = base_stream_id + self._last_delivered_index_timestamp = 0 + self._last_allocated_stream_id = base_stream_id for trace in tc_descr.traces: - trace_id = stream_id * 1000 + trace_stream_infos = [] # type: list[_LttngLiveViewerStreamInfo] + trace_id = self._last_allocated_stream_id * 1000 # Metadata stream -> stream info and metadata stream state info = _LttngLiveViewerStreamInfo( - stream_id, trace_id, True, trace.metadata_stream.path, "metadata" + self._last_allocated_stream_id, + trace_id, + True, + trace.metadata_stream.path, + "metadata", ) - self._stream_infos.append(info) - self._ms_states[stream_id] = _LttngLiveViewerSessionMetadataStreamState( + + trace_stream_infos.append(info) + self._ms_states[ + self._last_allocated_stream_id + ] = _LttngLiveViewerSessionMetadataStreamState( self, info, trace.metadata_stream ) - metadata_stream_id = stream_id - stream_id += 1 + metadata_stream_id = self._last_allocated_stream_id + self._last_allocated_stream_id += 1 # Data streams -> stream infos and data stream states for data_stream in trace: info = _LttngLiveViewerStreamInfo( - stream_id, + self._last_allocated_stream_id, trace_id, False, data_stream.path, data_stream.channel_name, ) - self._stream_infos.append(info) - self._ds_states[stream_id] = _LttngLiveViewerSessionDataStreamState( + trace_stream_infos.append(info) + self._ds_states[ + self._last_allocated_stream_id + ] = _LttngLiveViewerSessionDataStreamState( self, info, data_stream, metadata_stream_id ) - stream_id += 1 + self._last_allocated_stream_id += 1 + + if trace.creation_timestamp == 0: + # Only announce streams for traces that are created at + # the origin. + # + # The rest of the streams will be discovered by the + # client as indexes are received with a "has new data + # streams" flag set in the reply. + self._client_visible_stream_infos.extend(trace_stream_infos) self._is_attached = False fmt = 'Built tracing session state: id={}, name="{}"' @@ -1315,12 +1387,17 @@ class _LttngLiveViewerSessionTracingSessionState: return self._ms_states @property - def stream_infos(self): - return self._stream_infos + def client_visible_stream_infos(self): + return self._client_visible_stream_infos @property def has_new_metadata(self): - return any([not ms.is_sent for ms in self._ms_states.values()]) + return any( + [ + ms.is_announced and not ms.all_data_is_sent + for ms in self._ms_states.values() + ] + ) @property def is_attached(self): @@ -1369,7 +1446,7 @@ class _LttngLiveViewerSession: ) ts_id = ts_state.tracing_session_descriptor.info.tracing_session_id self._ts_states[ts_id] = ts_state - total_stream_infos += len(ts_state.stream_infos) + total_stream_infos += len(ts_state.client_visible_stream_infos) # Update session's stream states to have the new states self._stream_states.update(ts_state.data_stream_states) @@ -1450,8 +1527,18 @@ class _LttngLiveViewerSession: ts_state.is_attached = True status = _LttngLiveViewerAttachToTracingSessionReply.Status.OK + stream_infos_to_announce = ts_state.client_visible_stream_infos + + # Mark stream infos transmitted as part of the reply as + # announced. + for si in stream_infos_to_announce: + if si.is_metadata: + self._get_metadata_stream_state(si.id).mark_as_announced() + else: + self._get_data_stream_state(si.id).mark_as_announced() + return _LttngLiveViewerAttachToTracingSessionReply( - status, ts_state.stream_infos + status, stream_infos_to_announce ) def _handle_detach_from_tracing_session_command( @@ -1473,6 +1560,21 @@ class _LttngLiveViewerSession: status = _LttngLiveViewerDetachFromTracingSessionReply.Status.OK return _LttngLiveViewerDetachFromTracingSessionReply(status) + @staticmethod + def _stream_is_ready( + stream_state: _LttngLiveViewerSessionStreamState, creation_timestamp: int + ): + return ( + not stream_state.is_announced + and stream_state.stream.creation_timestamp <= creation_timestamp + ) + + def _needs_new_streams(self, current_timestamp: int): + return any( + self._stream_is_ready(ss, current_timestamp) + for ss in self._stream_states.values() + ) + def _handle_get_next_data_stream_index_entry_command( self, cmd: _LttngLiveViewerGetNextDataStreamIndexEntryCommand ): @@ -1498,7 +1600,7 @@ class _LttngLiveViewerSession: timestamp_begin = _get_entry_timestamp_begin(stream_state.cur_index_entry) if needs_new_metadata_section(metadata_stream_state, timestamp_begin): - metadata_stream_state.is_sent = False + metadata_stream_state.all_data_is_sent = False metadata_stream_state.goto_next_section() # The viewer only checks the `has_new_metadata` flag if the @@ -1510,8 +1612,12 @@ class _LttngLiveViewerSession: status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.INACTIVE reply = _LttngLiveViewerGetNextDataStreamIndexEntryReply( - status, stream_state.cur_index_entry, has_new_metadata, False + status, + stream_state.cur_index_entry, + has_new_metadata, + self._needs_new_streams(timestamp_begin), ) + self._last_delivered_index_timestamp_begin = timestamp_begin stream_state.goto_next_index_entry() return reply @@ -1539,7 +1645,7 @@ class _LttngLiveViewerSession: fmt = 'Limiting "get data stream packet data" command: req-length={} actual response size={}' logging.info(fmt.format(cmd.req_length, data_response_length)) - data = stream_state.data_stream.get_data(cmd.offset, data_response_length) + data = stream_state.stream.get_data(cmd.offset, data_response_length) status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.OK return _LttngLiveViewerGetDataStreamPacketDataReply(status, data, False, False) @@ -1550,18 +1656,18 @@ class _LttngLiveViewerSession: logging.info(fmt.format(cmd.stream_id)) metadata_stream_state = self._get_metadata_stream_state(cmd.stream_id) - if metadata_stream_state.is_sent: + if metadata_stream_state.all_data_is_sent: status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.NO_NEW return _LttngLiveViewerGetMetadataStreamDataContentReply(status, bytes()) - metadata_stream_state.is_sent = True + metadata_stream_state.all_data_is_sent = True status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.OK metadata_section = metadata_stream_state.cur_section assert metadata_section is not None # If we are sending an empty section, ready the next one right away. if len(metadata_section.data) == 0: - metadata_stream_state.is_sent = False + metadata_stream_state.all_data_is_sent = False metadata_stream_state.goto_next_section() fmt = 'Replying to "get metadata stream data" command: metadata-size={}' @@ -1570,19 +1676,54 @@ class _LttngLiveViewerSession: status, metadata_section.data ) + def _get_stream_infos_ready_for_announcement(self): + ready_stream_infos = [] # type: list[_LttngLiveViewerStreamInfo] + + for ss in self._stream_states.values(): + if self._stream_is_ready(ss, ss.stream.creation_timestamp): + ready_stream_infos.append(ss.info) + + return ready_stream_infos + + # A stream is considered finished if it has been announced and all + # of its index entries have been provided to the client. + def _all_streams_finished(self): + return all( + isinstance(stream_state, _LttngLiveViewerSessionMetadataStreamState) + or (stream_state.cur_index_entry is None and stream_state.is_announced) + for stream_state in self._stream_states.values() + ) + def _handle_get_new_stream_infos_command( self, cmd: _LttngLiveViewerGetNewStreamInfosCommand ): fmt = 'Handling "get new stream infos" command: ts-id={}' logging.info(fmt.format(cmd.tracing_session_id)) + newly_announced_stream_infos = self._get_stream_infos_ready_for_announcement() - # As of this version, all the tracing session's stream infos are - # always given to the viewer when sending the "attach to tracing - # session" reply, so there's nothing new here. Return the `HUP` - # status as, if we're handling this command, the viewer consumed - # all the existing data streams. - status = _LttngLiveViewerGetNewStreamInfosReply.Status.HUP - return _LttngLiveViewerGetNewStreamInfosReply(status, []) + # Mark stream infos transmitted as part of the reply as + # announced. + for si in newly_announced_stream_infos: + if si.is_metadata: + self._get_metadata_stream_state(si.id).mark_as_announced() + else: + self._get_data_stream_state(si.id).mark_as_announced() + + status = _LttngLiveViewerGetNewStreamInfosReply.Status.OK + + if len(newly_announced_stream_infos) == 0: + # If all streams have been transmitted and no new traces are + # scheduled for creation, hang up to signal that the tracing + # session is "done". + status = ( + _LttngLiveViewerGetNewStreamInfosReply.Status.HUP + if self._all_streams_finished() + else _LttngLiveViewerGetNewStreamInfosReply.Status.NO_NEW + ) + + return _LttngLiveViewerGetNewStreamInfosReply( + status, newly_announced_stream_infos + ) def _handle_get_tracing_session_infos_command( self, cmd: _LttngLiveViewerGetTracingSessionInfosCommand @@ -1840,7 +1981,8 @@ def _session_descriptors_from_path( # "client-count": 23, # "traces": [ # { - # "path": "lol" + # "path": "lol", + # creation-timestamp: 12948 # }, # { # "path": "meow/mix", @@ -1884,16 +2026,17 @@ def _session_descriptors_from_path( else None ) path = trace_json.at("path", tjson.StrVal).val + creation_timestamp = ( + trace_json.at("creation-timestamp", tjson.IntVal).val + if "creation-timestamp" in trace_json + else 0 + ) # type: int if not os.path.isabs(path) and trace_path_prefix: path = os.path.join(trace_path_prefix, path) traces.append( - LttngTrace( - path, - metadata_sections, - beacons, - ) + LttngTrace(path, metadata_sections, beacons, creation_timestamp) ) sessions.append( -- 2.34.1