Tests: lttng_live_server: support trace creation timestamps
[babeltrace.git] / tests / data / plugins / src.ctf.lttng-live / lttng_live_server.py
index 3a3c565e094a53caa5956ab4173fff4f125968c9..e11011379c91fea4125c9c448c1465673c487463 100644 (file)
@@ -14,6 +14,7 @@ import logging
 import os.path
 import argparse
 import tempfile
+from abc import ABC, abstractmethod
 from typing import Dict, Union, Iterable, Optional, Sequence, overload
 
 import tjson
@@ -799,9 +800,23 @@ class _LttngDataStreamIndex(Sequence[_LttngIndexEntryT]):
         return self._path
 
 
+# Any LTTng stream (metadata or data).
+class _LttngStream(ABC):
+    @abstractmethod
+    def __init__(self, creation_timestamp: int):
+        self._creation_timestamp: int = creation_timestamp
+
+    @property
+    def creation_timestamp(self):
+        return self._creation_timestamp
+
+
 # An LTTng data stream.
-class _LttngDataStream:
-    def __init__(self, path: str, beacons_json: Optional[tjson.ArrayVal]):
+class _LttngDataStream(_LttngStream):
+    def __init__(
+        self, path: str, beacons_json: Optional[tjson.ArrayVal], creation_timestamp: int
+    ):
+        super().__init__(creation_timestamp)
         self._path = path
         filename = os.path.basename(path)
         match = re.match(r"(.*)_\d+", filename)
@@ -862,12 +877,14 @@ class _LttngMetadataStreamSection:
 
 
 # An LTTng metadata stream.
-class _LttngMetadataStream:
+class _LttngMetadataStream(_LttngStream):
     def __init__(
         self,
         metadata_file_path: str,
         config_sections: Sequence[_LttngMetadataStreamSection],
+        creation_timestamp: int,
     ):
+        super().__init__(creation_timestamp)
         self._path = metadata_file_path
         self._sections = config_sections
         logging.info(
@@ -913,8 +930,8 @@ def _parse_metadata_sections_config(metadata_sections_json: tjson.ArrayVal):
     for section in metadata_sections_json:
         if isinstance(section, tjson.StrVal):
             if section.val == "empty":
-                # Found a empty section marker. Actually append the section at the
-                # timestamp of the next concrete section.
+                # Found an empty section marker. Actually append the
+                # section at the timestamp of the next concrete section.
                 append_empty_section = True
             else:
                 raise ValueError("Invalid string value at {}.".format(section.path))
@@ -1018,8 +1035,10 @@ class LttngTrace(Sequence[_LttngDataStream]):
         trace_dir: str,
         metadata_sections_json: Optional[tjson.ArrayVal],
         beacons_json: Optional[tjson.ObjVal],
+        creation_timestamp: int,
     ):
         self._path = trace_dir
+        self._creation_timestamp = creation_timestamp
         self._create_metadata_stream(trace_dir, metadata_sections_json)
         self._create_data_streams(trace_dir, beacons_json)
         logging.info('Built trace: path="{}"'.format(trace_dir))
@@ -1053,7 +1072,9 @@ class LttngTrace(Sequence[_LttngDataStream]):
                 this_beacons_json = beacons_json.at(stream_name, tjson.ArrayVal)
 
             self._data_streams.append(
-                _LttngDataStream(data_stream_path, this_beacons_json)
+                _LttngDataStream(
+                    data_stream_path, this_beacons_json, self._creation_timestamp
+                )
             )
 
     def _create_metadata_stream(
@@ -1072,7 +1093,9 @@ class LttngTrace(Sequence[_LttngDataStream]):
                 metadata_path, metadata_sections_json
             )
 
-        self._metadata_stream = _LttngMetadataStream(metadata_path, metadata_sections)
+        self._metadata_stream = _LttngMetadataStream(
+            metadata_path, metadata_sections, self.creation_timestamp
+        )
 
     @property
     def path(self):
@@ -1082,6 +1105,10 @@ class LttngTrace(Sequence[_LttngDataStream]):
     def metadata_stream(self):
         return self._metadata_stream
 
+    @property
+    def creation_timestamp(self):
+        return self._creation_timestamp
+
     @overload
     def __getitem__(self, index: int) -> _LttngDataStream:
         ...
@@ -1099,8 +1126,31 @@ class LttngTrace(Sequence[_LttngDataStream]):
         return len(self._data_streams)
 
 
+# Stream (metadata or data) state specific to the LTTng live protocol.
+class _LttngLiveViewerSessionStreamState:
+    @abstractmethod
+    def __init__(self):
+        # A stream is considered "announced" when it has been returned
+        # to the LTTng live client in response to a "get new stream
+        # infos" (`_LttngLiveViewerGetNewStreamInfosCommand`) command.
+        self._announced = False  # type: bool
+        pass
+
+    @property
+    def is_announced(self):
+        return self._announced
+
+    def mark_as_announced(self):
+        self._announced = True
+
+    @property
+    @abstractmethod
+    def stream(self) -> _LttngStream:
+        pass
+
+
 # The state of a single data stream.
-class _LttngLiveViewerSessionDataStreamState:
+class _LttngLiveViewerSessionDataStreamState(_LttngLiveViewerSessionStreamState):
     def __init__(
         self,
         ts_state: "_LttngLiveViewerSessionTracingSessionState",
@@ -1108,6 +1158,7 @@ class _LttngLiveViewerSessionDataStreamState:
         data_stream: _LttngDataStream,
         metadata_stream_id: int,
     ):
+        super().__init__()
         self._ts_state = ts_state
         self._info = info
         self._data_stream = data_stream
@@ -1132,7 +1183,7 @@ class _LttngLiveViewerSessionDataStreamState:
         return self._info
 
     @property
-    def data_stream(self):
+    def stream(self):
         return self._data_stream
 
     @property
@@ -1151,13 +1202,14 @@ class _LttngLiveViewerSessionDataStreamState:
 
 
 # The state of a single metadata stream.
-class _LttngLiveViewerSessionMetadataStreamState:
+class _LttngLiveViewerSessionMetadataStreamState(_LttngLiveViewerSessionStreamState):
     def __init__(
         self,
         ts_state: "_LttngLiveViewerSessionTracingSessionState",
         info: _LttngLiveViewerStreamInfo,
         metadata_stream: _LttngMetadataStream,
     ):
+        super().__init__()
         self._ts_state = ts_state
         self._info = info
         self._metadata_stream = metadata_stream
@@ -1169,7 +1221,7 @@ class _LttngLiveViewerSessionMetadataStreamState:
         else:
             self._next_metadata_stream_section_timestamp = None
 
-        self._is_sent = False
+        self._all_data_is_sent = False
         fmt = 'Built metadata stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
         logging.info(
             fmt.format(
@@ -1185,16 +1237,16 @@ class _LttngLiveViewerSessionMetadataStreamState:
         return self._info
 
     @property
-    def metadata_stream(self):
+    def stream(self):
         return self._metadata_stream
 
     @property
-    def is_sent(self):
-        return self._is_sent
+    def all_data_is_sent(self):
+        return self._all_data_is_sent
 
-    @is_sent.setter
-    def is_sent(self, value: bool):
-        self._is_sent = value
+    @all_data_is_sent.setter
+    def all_data_is_sent(self, value: bool):
+        self._all_data_is_sent = value
 
     @property
     def cur_section(self):
@@ -1262,41 +1314,61 @@ class LttngTracingSessionDescriptor:
 class _LttngLiveViewerSessionTracingSessionState:
     def __init__(self, tc_descr: LttngTracingSessionDescriptor, base_stream_id: int):
         self._tc_descr = tc_descr
-        self._stream_infos = []  # type: list[_LttngLiveViewerStreamInfo]
+        self._client_visible_stream_infos = []  # type: list[_LttngLiveViewerStreamInfo]
         self._ds_states = {}  # type: dict[int, _LttngLiveViewerSessionDataStreamState]
         self._ms_states = (
             {}
         )  # type: dict[int, _LttngLiveViewerSessionMetadataStreamState]
-        stream_id = base_stream_id
+        self._last_delivered_index_timestamp = 0
+        self._last_allocated_stream_id = base_stream_id
 
         for trace in tc_descr.traces:
-            trace_id = stream_id * 1000
+            trace_stream_infos = []  # type: list[_LttngLiveViewerStreamInfo]
+            trace_id = self._last_allocated_stream_id * 1000
 
             # Metadata stream -> stream info and metadata stream state
             info = _LttngLiveViewerStreamInfo(
-                stream_id, trace_id, True, trace.metadata_stream.path, "metadata"
+                self._last_allocated_stream_id,
+                trace_id,
+                True,
+                trace.metadata_stream.path,
+                "metadata",
             )
-            self._stream_infos.append(info)
-            self._ms_states[stream_id] = _LttngLiveViewerSessionMetadataStreamState(
+
+            trace_stream_infos.append(info)
+            self._ms_states[
+                self._last_allocated_stream_id
+            ] = _LttngLiveViewerSessionMetadataStreamState(
                 self, info, trace.metadata_stream
             )
-            metadata_stream_id = stream_id
-            stream_id += 1
+            metadata_stream_id = self._last_allocated_stream_id
+            self._last_allocated_stream_id += 1
 
             # Data streams -> stream infos and data stream states
             for data_stream in trace:
                 info = _LttngLiveViewerStreamInfo(
-                    stream_id,
+                    self._last_allocated_stream_id,
                     trace_id,
                     False,
                     data_stream.path,
                     data_stream.channel_name,
                 )
-                self._stream_infos.append(info)
-                self._ds_states[stream_id] = _LttngLiveViewerSessionDataStreamState(
+                trace_stream_infos.append(info)
+                self._ds_states[
+                    self._last_allocated_stream_id
+                ] = _LttngLiveViewerSessionDataStreamState(
                     self, info, data_stream, metadata_stream_id
                 )
-                stream_id += 1
+                self._last_allocated_stream_id += 1
+
+            if trace.creation_timestamp == 0:
+                # Only announce streams for traces that are created at
+                # the origin.
+                #
+                # The rest of the streams will be discovered by the
+                # client as indexes are received with a "has new data
+                # streams" flag set in the reply.
+                self._client_visible_stream_infos.extend(trace_stream_infos)
 
         self._is_attached = False
         fmt = 'Built tracing session state: id={}, name="{}"'
@@ -1315,12 +1387,17 @@ class _LttngLiveViewerSessionTracingSessionState:
         return self._ms_states
 
     @property
-    def stream_infos(self):
-        return self._stream_infos
+    def client_visible_stream_infos(self):
+        return self._client_visible_stream_infos
 
     @property
     def has_new_metadata(self):
-        return any([not ms.is_sent for ms in self._ms_states.values()])
+        return any(
+            [
+                ms.is_announced and not ms.all_data_is_sent
+                for ms in self._ms_states.values()
+            ]
+        )
 
     @property
     def is_attached(self):
@@ -1369,7 +1446,7 @@ class _LttngLiveViewerSession:
             )
             ts_id = ts_state.tracing_session_descriptor.info.tracing_session_id
             self._ts_states[ts_id] = ts_state
-            total_stream_infos += len(ts_state.stream_infos)
+            total_stream_infos += len(ts_state.client_visible_stream_infos)
 
             # Update session's stream states to have the new states
             self._stream_states.update(ts_state.data_stream_states)
@@ -1450,8 +1527,18 @@ class _LttngLiveViewerSession:
 
         ts_state.is_attached = True
         status = _LttngLiveViewerAttachToTracingSessionReply.Status.OK
+        stream_infos_to_announce = ts_state.client_visible_stream_infos
+
+        # Mark stream infos transmitted as part of the reply as
+        # announced.
+        for si in stream_infos_to_announce:
+            if si.is_metadata:
+                self._get_metadata_stream_state(si.id).mark_as_announced()
+            else:
+                self._get_data_stream_state(si.id).mark_as_announced()
+
         return _LttngLiveViewerAttachToTracingSessionReply(
-            status, ts_state.stream_infos
+            status, stream_infos_to_announce
         )
 
     def _handle_detach_from_tracing_session_command(
@@ -1473,6 +1560,21 @@ class _LttngLiveViewerSession:
         status = _LttngLiveViewerDetachFromTracingSessionReply.Status.OK
         return _LttngLiveViewerDetachFromTracingSessionReply(status)
 
+    @staticmethod
+    def _stream_is_ready(
+        stream_state: _LttngLiveViewerSessionStreamState, creation_timestamp: int
+    ):
+        return (
+            not stream_state.is_announced
+            and stream_state.stream.creation_timestamp <= creation_timestamp
+        )
+
+    def _needs_new_streams(self, current_timestamp: int):
+        return any(
+            self._stream_is_ready(ss, current_timestamp)
+            for ss in self._stream_states.values()
+        )
+
     def _handle_get_next_data_stream_index_entry_command(
         self, cmd: _LttngLiveViewerGetNextDataStreamIndexEntryCommand
     ):
@@ -1498,7 +1600,7 @@ class _LttngLiveViewerSession:
         timestamp_begin = _get_entry_timestamp_begin(stream_state.cur_index_entry)
 
         if needs_new_metadata_section(metadata_stream_state, timestamp_begin):
-            metadata_stream_state.is_sent = False
+            metadata_stream_state.all_data_is_sent = False
             metadata_stream_state.goto_next_section()
 
         # The viewer only checks the `has_new_metadata` flag if the
@@ -1510,8 +1612,12 @@ class _LttngLiveViewerSession:
             status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.INACTIVE
 
         reply = _LttngLiveViewerGetNextDataStreamIndexEntryReply(
-            status, stream_state.cur_index_entry, has_new_metadata, False
+            status,
+            stream_state.cur_index_entry,
+            has_new_metadata,
+            self._needs_new_streams(timestamp_begin),
         )
+        self._last_delivered_index_timestamp_begin = timestamp_begin
         stream_state.goto_next_index_entry()
         return reply
 
@@ -1539,7 +1645,7 @@ class _LttngLiveViewerSession:
             fmt = 'Limiting "get data stream packet data" command: req-length={} actual response size={}'
             logging.info(fmt.format(cmd.req_length, data_response_length))
 
-        data = stream_state.data_stream.get_data(cmd.offset, data_response_length)
+        data = stream_state.stream.get_data(cmd.offset, data_response_length)
         status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.OK
         return _LttngLiveViewerGetDataStreamPacketDataReply(status, data, False, False)
 
@@ -1550,18 +1656,18 @@ class _LttngLiveViewerSession:
         logging.info(fmt.format(cmd.stream_id))
         metadata_stream_state = self._get_metadata_stream_state(cmd.stream_id)
 
-        if metadata_stream_state.is_sent:
+        if metadata_stream_state.all_data_is_sent:
             status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.NO_NEW
             return _LttngLiveViewerGetMetadataStreamDataContentReply(status, bytes())
 
-        metadata_stream_state.is_sent = True
+        metadata_stream_state.all_data_is_sent = True
         status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.OK
         metadata_section = metadata_stream_state.cur_section
         assert metadata_section is not None
 
         # If we are sending an empty section, ready the next one right away.
         if len(metadata_section.data) == 0:
-            metadata_stream_state.is_sent = False
+            metadata_stream_state.all_data_is_sent = False
             metadata_stream_state.goto_next_section()
 
         fmt = 'Replying to "get metadata stream data" command: metadata-size={}'
@@ -1570,19 +1676,54 @@ class _LttngLiveViewerSession:
             status, metadata_section.data
         )
 
+    def _get_stream_infos_ready_for_announcement(self):
+        ready_stream_infos = []  # type: list[_LttngLiveViewerStreamInfo]
+
+        for ss in self._stream_states.values():
+            if self._stream_is_ready(ss, ss.stream.creation_timestamp):
+                ready_stream_infos.append(ss.info)
+
+        return ready_stream_infos
+
+    # A stream is considered finished if it has been announced and all
+    # of its index entries have been provided to the client.
+    def _all_streams_finished(self):
+        return all(
+            isinstance(stream_state, _LttngLiveViewerSessionMetadataStreamState)
+            or (stream_state.cur_index_entry is None and stream_state.is_announced)
+            for stream_state in self._stream_states.values()
+        )
+
     def _handle_get_new_stream_infos_command(
         self, cmd: _LttngLiveViewerGetNewStreamInfosCommand
     ):
         fmt = 'Handling "get new stream infos" command: ts-id={}'
         logging.info(fmt.format(cmd.tracing_session_id))
+        newly_announced_stream_infos = self._get_stream_infos_ready_for_announcement()
 
-        # As of this version, all the tracing session's stream infos are
-        # always given to the viewer when sending the "attach to tracing
-        # session" reply, so there's nothing new here. Return the `HUP`
-        # status as, if we're handling this command, the viewer consumed
-        # all the existing data streams.
-        status = _LttngLiveViewerGetNewStreamInfosReply.Status.HUP
-        return _LttngLiveViewerGetNewStreamInfosReply(status, [])
+        # Mark stream infos transmitted as part of the reply as
+        # announced.
+        for si in newly_announced_stream_infos:
+            if si.is_metadata:
+                self._get_metadata_stream_state(si.id).mark_as_announced()
+            else:
+                self._get_data_stream_state(si.id).mark_as_announced()
+
+        status = _LttngLiveViewerGetNewStreamInfosReply.Status.OK
+
+        if len(newly_announced_stream_infos) == 0:
+            # If all streams have been transmitted and no new traces are
+            # scheduled for creation, hang up to signal that the tracing
+            # session is "done".
+            status = (
+                _LttngLiveViewerGetNewStreamInfosReply.Status.HUP
+                if self._all_streams_finished()
+                else _LttngLiveViewerGetNewStreamInfosReply.Status.NO_NEW
+            )
+
+        return _LttngLiveViewerGetNewStreamInfosReply(
+            status, newly_announced_stream_infos
+        )
 
     def _handle_get_tracing_session_infos_command(
         self, cmd: _LttngLiveViewerGetTracingSessionInfosCommand
@@ -1840,7 +1981,8 @@ def _session_descriptors_from_path(
     #             "client-count": 23,
     #             "traces": [
     #                 {
-    #                     "path": "lol"
+    #                     "path": "lol",
+    #                     creation-timestamp: 12948
     #                 },
     #                 {
     #                     "path": "meow/mix",
@@ -1884,16 +2026,17 @@ def _session_descriptors_from_path(
                 else None
             )
             path = trace_json.at("path", tjson.StrVal).val
+            creation_timestamp = (
+                trace_json.at("creation-timestamp", tjson.IntVal).val
+                if "creation-timestamp" in trace_json
+                else 0
+            )  # type: int
 
             if not os.path.isabs(path) and trace_path_prefix:
                 path = os.path.join(trace_path_prefix, path)
 
             traces.append(
-                LttngTrace(
-                    path,
-                    metadata_sections,
-                    beacons,
-                )
+                LttngTrace(path, metadata_sections, beacons, creation_timestamp)
             )
 
         sessions.append(
This page took 0.029207 seconds and 4 git commands to generate.