X-Git-Url: http://git.efficios.com/?p=babeltrace.git;a=blobdiff_plain;f=tests%2Fdata%2Fplugins%2Fsrc.ctf.lttng-live%2Flttng_live_server.py;fp=tests%2Fdata%2Fplugins%2Fsrc.ctf.lttng-live%2Flttng_live_server.py;h=e11011379c91fea4125c9c448c1465673c487463;hp=3a3c565e094a53caa5956ab4173fff4f125968c9;hb=3c22c122c56f58b7055d913b3901e7e8f48dded7;hpb=ca6225465edba76718064ccd4b6fba6253cf02f4 diff --git a/tests/data/plugins/src.ctf.lttng-live/lttng_live_server.py b/tests/data/plugins/src.ctf.lttng-live/lttng_live_server.py index 3a3c565e..e1101137 100644 --- a/tests/data/plugins/src.ctf.lttng-live/lttng_live_server.py +++ b/tests/data/plugins/src.ctf.lttng-live/lttng_live_server.py @@ -14,6 +14,7 @@ import logging import os.path import argparse import tempfile +from abc import ABC, abstractmethod from typing import Dict, Union, Iterable, Optional, Sequence, overload import tjson @@ -799,9 +800,23 @@ class _LttngDataStreamIndex(Sequence[_LttngIndexEntryT]): return self._path +# Any LTTng stream (metadata or data). +class _LttngStream(ABC): + @abstractmethod + def __init__(self, creation_timestamp: int): + self._creation_timestamp: int = creation_timestamp + + @property + def creation_timestamp(self): + return self._creation_timestamp + + # An LTTng data stream. -class _LttngDataStream: - def __init__(self, path: str, beacons_json: Optional[tjson.ArrayVal]): +class _LttngDataStream(_LttngStream): + def __init__( + self, path: str, beacons_json: Optional[tjson.ArrayVal], creation_timestamp: int + ): + super().__init__(creation_timestamp) self._path = path filename = os.path.basename(path) match = re.match(r"(.*)_\d+", filename) @@ -862,12 +877,14 @@ class _LttngMetadataStreamSection: # An LTTng metadata stream. -class _LttngMetadataStream: +class _LttngMetadataStream(_LttngStream): def __init__( self, metadata_file_path: str, config_sections: Sequence[_LttngMetadataStreamSection], + creation_timestamp: int, ): + super().__init__(creation_timestamp) self._path = metadata_file_path self._sections = config_sections logging.info( @@ -913,8 +930,8 @@ def _parse_metadata_sections_config(metadata_sections_json: tjson.ArrayVal): for section in metadata_sections_json: if isinstance(section, tjson.StrVal): if section.val == "empty": - # Found a empty section marker. Actually append the section at the - # timestamp of the next concrete section. + # Found an empty section marker. Actually append the + # section at the timestamp of the next concrete section. append_empty_section = True else: raise ValueError("Invalid string value at {}.".format(section.path)) @@ -1018,8 +1035,10 @@ class LttngTrace(Sequence[_LttngDataStream]): trace_dir: str, metadata_sections_json: Optional[tjson.ArrayVal], beacons_json: Optional[tjson.ObjVal], + creation_timestamp: int, ): self._path = trace_dir + self._creation_timestamp = creation_timestamp self._create_metadata_stream(trace_dir, metadata_sections_json) self._create_data_streams(trace_dir, beacons_json) logging.info('Built trace: path="{}"'.format(trace_dir)) @@ -1053,7 +1072,9 @@ class LttngTrace(Sequence[_LttngDataStream]): this_beacons_json = beacons_json.at(stream_name, tjson.ArrayVal) self._data_streams.append( - _LttngDataStream(data_stream_path, this_beacons_json) + _LttngDataStream( + data_stream_path, this_beacons_json, self._creation_timestamp + ) ) def _create_metadata_stream( @@ -1072,7 +1093,9 @@ class LttngTrace(Sequence[_LttngDataStream]): metadata_path, metadata_sections_json ) - self._metadata_stream = _LttngMetadataStream(metadata_path, metadata_sections) + self._metadata_stream = _LttngMetadataStream( + metadata_path, metadata_sections, self.creation_timestamp + ) @property def path(self): @@ -1082,6 +1105,10 @@ class LttngTrace(Sequence[_LttngDataStream]): def metadata_stream(self): return self._metadata_stream + @property + def creation_timestamp(self): + return self._creation_timestamp + @overload def __getitem__(self, index: int) -> _LttngDataStream: ... @@ -1099,8 +1126,31 @@ class LttngTrace(Sequence[_LttngDataStream]): return len(self._data_streams) +# Stream (metadata or data) state specific to the LTTng live protocol. +class _LttngLiveViewerSessionStreamState: + @abstractmethod + def __init__(self): + # A stream is considered "announced" when it has been returned + # to the LTTng live client in response to a "get new stream + # infos" (`_LttngLiveViewerGetNewStreamInfosCommand`) command. + self._announced = False # type: bool + pass + + @property + def is_announced(self): + return self._announced + + def mark_as_announced(self): + self._announced = True + + @property + @abstractmethod + def stream(self) -> _LttngStream: + pass + + # The state of a single data stream. -class _LttngLiveViewerSessionDataStreamState: +class _LttngLiveViewerSessionDataStreamState(_LttngLiveViewerSessionStreamState): def __init__( self, ts_state: "_LttngLiveViewerSessionTracingSessionState", @@ -1108,6 +1158,7 @@ class _LttngLiveViewerSessionDataStreamState: data_stream: _LttngDataStream, metadata_stream_id: int, ): + super().__init__() self._ts_state = ts_state self._info = info self._data_stream = data_stream @@ -1132,7 +1183,7 @@ class _LttngLiveViewerSessionDataStreamState: return self._info @property - def data_stream(self): + def stream(self): return self._data_stream @property @@ -1151,13 +1202,14 @@ class _LttngLiveViewerSessionDataStreamState: # The state of a single metadata stream. -class _LttngLiveViewerSessionMetadataStreamState: +class _LttngLiveViewerSessionMetadataStreamState(_LttngLiveViewerSessionStreamState): def __init__( self, ts_state: "_LttngLiveViewerSessionTracingSessionState", info: _LttngLiveViewerStreamInfo, metadata_stream: _LttngMetadataStream, ): + super().__init__() self._ts_state = ts_state self._info = info self._metadata_stream = metadata_stream @@ -1169,7 +1221,7 @@ class _LttngLiveViewerSessionMetadataStreamState: else: self._next_metadata_stream_section_timestamp = None - self._is_sent = False + self._all_data_is_sent = False fmt = 'Built metadata stream state: id={}, ts-id={}, ts-name="{}", path="{}"' logging.info( fmt.format( @@ -1185,16 +1237,16 @@ class _LttngLiveViewerSessionMetadataStreamState: return self._info @property - def metadata_stream(self): + def stream(self): return self._metadata_stream @property - def is_sent(self): - return self._is_sent + def all_data_is_sent(self): + return self._all_data_is_sent - @is_sent.setter - def is_sent(self, value: bool): - self._is_sent = value + @all_data_is_sent.setter + def all_data_is_sent(self, value: bool): + self._all_data_is_sent = value @property def cur_section(self): @@ -1262,41 +1314,61 @@ class LttngTracingSessionDescriptor: class _LttngLiveViewerSessionTracingSessionState: def __init__(self, tc_descr: LttngTracingSessionDescriptor, base_stream_id: int): self._tc_descr = tc_descr - self._stream_infos = [] # type: list[_LttngLiveViewerStreamInfo] + self._client_visible_stream_infos = [] # type: list[_LttngLiveViewerStreamInfo] self._ds_states = {} # type: dict[int, _LttngLiveViewerSessionDataStreamState] self._ms_states = ( {} ) # type: dict[int, _LttngLiveViewerSessionMetadataStreamState] - stream_id = base_stream_id + self._last_delivered_index_timestamp = 0 + self._last_allocated_stream_id = base_stream_id for trace in tc_descr.traces: - trace_id = stream_id * 1000 + trace_stream_infos = [] # type: list[_LttngLiveViewerStreamInfo] + trace_id = self._last_allocated_stream_id * 1000 # Metadata stream -> stream info and metadata stream state info = _LttngLiveViewerStreamInfo( - stream_id, trace_id, True, trace.metadata_stream.path, "metadata" + self._last_allocated_stream_id, + trace_id, + True, + trace.metadata_stream.path, + "metadata", ) - self._stream_infos.append(info) - self._ms_states[stream_id] = _LttngLiveViewerSessionMetadataStreamState( + + trace_stream_infos.append(info) + self._ms_states[ + self._last_allocated_stream_id + ] = _LttngLiveViewerSessionMetadataStreamState( self, info, trace.metadata_stream ) - metadata_stream_id = stream_id - stream_id += 1 + metadata_stream_id = self._last_allocated_stream_id + self._last_allocated_stream_id += 1 # Data streams -> stream infos and data stream states for data_stream in trace: info = _LttngLiveViewerStreamInfo( - stream_id, + self._last_allocated_stream_id, trace_id, False, data_stream.path, data_stream.channel_name, ) - self._stream_infos.append(info) - self._ds_states[stream_id] = _LttngLiveViewerSessionDataStreamState( + trace_stream_infos.append(info) + self._ds_states[ + self._last_allocated_stream_id + ] = _LttngLiveViewerSessionDataStreamState( self, info, data_stream, metadata_stream_id ) - stream_id += 1 + self._last_allocated_stream_id += 1 + + if trace.creation_timestamp == 0: + # Only announce streams for traces that are created at + # the origin. + # + # The rest of the streams will be discovered by the + # client as indexes are received with a "has new data + # streams" flag set in the reply. + self._client_visible_stream_infos.extend(trace_stream_infos) self._is_attached = False fmt = 'Built tracing session state: id={}, name="{}"' @@ -1315,12 +1387,17 @@ class _LttngLiveViewerSessionTracingSessionState: return self._ms_states @property - def stream_infos(self): - return self._stream_infos + def client_visible_stream_infos(self): + return self._client_visible_stream_infos @property def has_new_metadata(self): - return any([not ms.is_sent for ms in self._ms_states.values()]) + return any( + [ + ms.is_announced and not ms.all_data_is_sent + for ms in self._ms_states.values() + ] + ) @property def is_attached(self): @@ -1369,7 +1446,7 @@ class _LttngLiveViewerSession: ) ts_id = ts_state.tracing_session_descriptor.info.tracing_session_id self._ts_states[ts_id] = ts_state - total_stream_infos += len(ts_state.stream_infos) + total_stream_infos += len(ts_state.client_visible_stream_infos) # Update session's stream states to have the new states self._stream_states.update(ts_state.data_stream_states) @@ -1450,8 +1527,18 @@ class _LttngLiveViewerSession: ts_state.is_attached = True status = _LttngLiveViewerAttachToTracingSessionReply.Status.OK + stream_infos_to_announce = ts_state.client_visible_stream_infos + + # Mark stream infos transmitted as part of the reply as + # announced. + for si in stream_infos_to_announce: + if si.is_metadata: + self._get_metadata_stream_state(si.id).mark_as_announced() + else: + self._get_data_stream_state(si.id).mark_as_announced() + return _LttngLiveViewerAttachToTracingSessionReply( - status, ts_state.stream_infos + status, stream_infos_to_announce ) def _handle_detach_from_tracing_session_command( @@ -1473,6 +1560,21 @@ class _LttngLiveViewerSession: status = _LttngLiveViewerDetachFromTracingSessionReply.Status.OK return _LttngLiveViewerDetachFromTracingSessionReply(status) + @staticmethod + def _stream_is_ready( + stream_state: _LttngLiveViewerSessionStreamState, creation_timestamp: int + ): + return ( + not stream_state.is_announced + and stream_state.stream.creation_timestamp <= creation_timestamp + ) + + def _needs_new_streams(self, current_timestamp: int): + return any( + self._stream_is_ready(ss, current_timestamp) + for ss in self._stream_states.values() + ) + def _handle_get_next_data_stream_index_entry_command( self, cmd: _LttngLiveViewerGetNextDataStreamIndexEntryCommand ): @@ -1498,7 +1600,7 @@ class _LttngLiveViewerSession: timestamp_begin = _get_entry_timestamp_begin(stream_state.cur_index_entry) if needs_new_metadata_section(metadata_stream_state, timestamp_begin): - metadata_stream_state.is_sent = False + metadata_stream_state.all_data_is_sent = False metadata_stream_state.goto_next_section() # The viewer only checks the `has_new_metadata` flag if the @@ -1510,8 +1612,12 @@ class _LttngLiveViewerSession: status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.INACTIVE reply = _LttngLiveViewerGetNextDataStreamIndexEntryReply( - status, stream_state.cur_index_entry, has_new_metadata, False + status, + stream_state.cur_index_entry, + has_new_metadata, + self._needs_new_streams(timestamp_begin), ) + self._last_delivered_index_timestamp_begin = timestamp_begin stream_state.goto_next_index_entry() return reply @@ -1539,7 +1645,7 @@ class _LttngLiveViewerSession: fmt = 'Limiting "get data stream packet data" command: req-length={} actual response size={}' logging.info(fmt.format(cmd.req_length, data_response_length)) - data = stream_state.data_stream.get_data(cmd.offset, data_response_length) + data = stream_state.stream.get_data(cmd.offset, data_response_length) status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.OK return _LttngLiveViewerGetDataStreamPacketDataReply(status, data, False, False) @@ -1550,18 +1656,18 @@ class _LttngLiveViewerSession: logging.info(fmt.format(cmd.stream_id)) metadata_stream_state = self._get_metadata_stream_state(cmd.stream_id) - if metadata_stream_state.is_sent: + if metadata_stream_state.all_data_is_sent: status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.NO_NEW return _LttngLiveViewerGetMetadataStreamDataContentReply(status, bytes()) - metadata_stream_state.is_sent = True + metadata_stream_state.all_data_is_sent = True status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.OK metadata_section = metadata_stream_state.cur_section assert metadata_section is not None # If we are sending an empty section, ready the next one right away. if len(metadata_section.data) == 0: - metadata_stream_state.is_sent = False + metadata_stream_state.all_data_is_sent = False metadata_stream_state.goto_next_section() fmt = 'Replying to "get metadata stream data" command: metadata-size={}' @@ -1570,19 +1676,54 @@ class _LttngLiveViewerSession: status, metadata_section.data ) + def _get_stream_infos_ready_for_announcement(self): + ready_stream_infos = [] # type: list[_LttngLiveViewerStreamInfo] + + for ss in self._stream_states.values(): + if self._stream_is_ready(ss, ss.stream.creation_timestamp): + ready_stream_infos.append(ss.info) + + return ready_stream_infos + + # A stream is considered finished if it has been announced and all + # of its index entries have been provided to the client. + def _all_streams_finished(self): + return all( + isinstance(stream_state, _LttngLiveViewerSessionMetadataStreamState) + or (stream_state.cur_index_entry is None and stream_state.is_announced) + for stream_state in self._stream_states.values() + ) + def _handle_get_new_stream_infos_command( self, cmd: _LttngLiveViewerGetNewStreamInfosCommand ): fmt = 'Handling "get new stream infos" command: ts-id={}' logging.info(fmt.format(cmd.tracing_session_id)) + newly_announced_stream_infos = self._get_stream_infos_ready_for_announcement() - # As of this version, all the tracing session's stream infos are - # always given to the viewer when sending the "attach to tracing - # session" reply, so there's nothing new here. Return the `HUP` - # status as, if we're handling this command, the viewer consumed - # all the existing data streams. - status = _LttngLiveViewerGetNewStreamInfosReply.Status.HUP - return _LttngLiveViewerGetNewStreamInfosReply(status, []) + # Mark stream infos transmitted as part of the reply as + # announced. + for si in newly_announced_stream_infos: + if si.is_metadata: + self._get_metadata_stream_state(si.id).mark_as_announced() + else: + self._get_data_stream_state(si.id).mark_as_announced() + + status = _LttngLiveViewerGetNewStreamInfosReply.Status.OK + + if len(newly_announced_stream_infos) == 0: + # If all streams have been transmitted and no new traces are + # scheduled for creation, hang up to signal that the tracing + # session is "done". + status = ( + _LttngLiveViewerGetNewStreamInfosReply.Status.HUP + if self._all_streams_finished() + else _LttngLiveViewerGetNewStreamInfosReply.Status.NO_NEW + ) + + return _LttngLiveViewerGetNewStreamInfosReply( + status, newly_announced_stream_infos + ) def _handle_get_tracing_session_infos_command( self, cmd: _LttngLiveViewerGetTracingSessionInfosCommand @@ -1840,7 +1981,8 @@ def _session_descriptors_from_path( # "client-count": 23, # "traces": [ # { - # "path": "lol" + # "path": "lol", + # creation-timestamp: 12948 # }, # { # "path": "meow/mix", @@ -1884,16 +2026,17 @@ def _session_descriptors_from_path( else None ) path = trace_json.at("path", tjson.StrVal).val + creation_timestamp = ( + trace_json.at("creation-timestamp", tjson.IntVal).val + if "creation-timestamp" in trace_json + else 0 + ) # type: int if not os.path.isabs(path) and trace_path_prefix: path = os.path.join(trace_path_prefix, path) traces.append( - LttngTrace( - path, - metadata_sections, - beacons, - ) + LttngTrace(path, metadata_sections, beacons, creation_timestamp) ) sessions.append(