Tests: lttng_live_server: support trace creation timestamps
authorJérémie Galarneau <jeremie.galarneau@efficios.com>
Mon, 15 Jan 2024 23:37:59 +0000 (18:37 -0500)
committerPhilippe Proulx <eeppeliteloop@gmail.com>
Tue, 26 Mar 2024 14:57:50 +0000 (10:57 -0400)
Add support for a per-trace "creation time" as part of the live server
scenarios.

The objective of this change is to simulate situations where streams are
created and declared to the live client while other streams are already
being consumed.

The approach taken is to add a 'creation-timestamp' property to the
trace objects expressed in the scenario files. The creation timestamp,
when left unspecified is implicitly at the origin of the timeline.

Streams that exist when a client attaches to a live tracing session are
announced as part of the reply of
_LttngLiveViewerAttachToTracingSessionCommand.

When the creation timestamp is not at the origin, the live server starts
expressing the presence of new available streams once it delivers
index entries that start at, or after, the trace creation timestamp.

As part of the live protocol, newly-available streams are expressed by
setting the 'NEW_STREAM' flag as part of the 'flags' of a GetNextIndex
command's reply payload.

Hence, when handling the
_LttngLiveViewerGetNextDataStreamIndexEntryCommand, the server checks
for unannounced streams that have a creation timestamp lesser or equal
to the beginning timestamp of the index entry being delivered. If any
such streams are available, the 'NEW_STREAMS' flag is set as part of the
reply's flags.

When the 'NEW_STREAMS' flag is set, the client is expected to eventually
issue a _LttngLiveViewerGetNewStreamInfosCommand to list any previously
unannounced stream.

The _LttngLiveViewerGetNewStreamInfosCommand handler was previously
stubbed-out to only reply with the 'HUP' status (meaning the client
should hang-up) since the command was never used to announce new
streams. This doesn't mean that the command was unused.

When a live viewer session has reached a point where all of its streams
have been torn-down (no more data to consume), the live viewer
periodically polls the live server with the 'GetNewStreamInfos' command
to resume consumption when/if new streams appear.

Since the command's handler always returned with the "hang-up" status,
the live client would always gracefully exit at that point as it had
consumed all of the available data provided by the various tests.

The handler now implements the command to announce new streams, when
those are available. Thus, multiple statuses can now be returned:
  - OK: new streams are announced as part of the reply
  - NO_NEW: no new streams are available at this point, but there are
    still index entries (and data) left to consume as part of the
    current scenario's traces
  - HUP: all of the current scenario's streams no longer have index
    entries left to be delivered and all streams have been announced to
    the client.

In terms of the live protocol, the sequence of events leading to the use
of this command is:
  - The client issues LTTNG_VIEWER_GET_NEXT_INDEX to receive a new index
    entry
  - There is a new stream with a creation time <= to the next available
    index: announce it to the client by replying with an "inactivity
    beacon" (an empty index entry) + the LTTNG_VIEWER_FLAG_NEW_STREAM
    status flag
  - In response to the LTTNG_VIEWER_FLAG_NEW_STREAM flag's presence, the
    client asks for new streams by issueing the
    LTTNG_VIEWER_GET_NEW_STREAMS command
  - The server replies with a description of the new stream(s) containing:
    - channel name
    - relative path of the file
    - 'is_metadata' flag
    - stream id
    - trace id (which may not have existed up to this point)
  - The client then issues LTTNG_VIEWER_GET_METADATA to get any new
    metadata for the various traces it now tracks. In the case where a
    new trace was announced (a new trace id), this will be the moment
    when the client gets an initial fragment of metadata to initialize
    the trace class hierarchy (see lttng-live.cpp:481)

Notes
-----

The `is_sent` property of stream states is renamed to `all_data_is_sent`
as it felt ambiguous with regards to the new `is_announced` property
which expresses whether or not a stream has been made visible to the
live client.

Change-Id: Iecb86917b444f5068a445e8481749d34ad3b07c7
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
Reviewed-on: https://review.lttng.org/c/babeltrace/+/11687
Reviewed-by: Philippe Proulx <eeppeliteloop@gmail.com>
tests/data/plugins/src.ctf.lttng-live/lttng_live_server.py

index 3a3c565e094a53caa5956ab4173fff4f125968c9..e11011379c91fea4125c9c448c1465673c487463 100644 (file)
@@ -14,6 +14,7 @@ import logging
 import os.path
 import argparse
 import tempfile
+from abc import ABC, abstractmethod
 from typing import Dict, Union, Iterable, Optional, Sequence, overload
 
 import tjson
@@ -799,9 +800,23 @@ class _LttngDataStreamIndex(Sequence[_LttngIndexEntryT]):
         return self._path
 
 
+# Any LTTng stream (metadata or data).
+class _LttngStream(ABC):
+    @abstractmethod
+    def __init__(self, creation_timestamp: int):
+        self._creation_timestamp: int = creation_timestamp
+
+    @property
+    def creation_timestamp(self):
+        return self._creation_timestamp
+
+
 # An LTTng data stream.
-class _LttngDataStream:
-    def __init__(self, path: str, beacons_json: Optional[tjson.ArrayVal]):
+class _LttngDataStream(_LttngStream):
+    def __init__(
+        self, path: str, beacons_json: Optional[tjson.ArrayVal], creation_timestamp: int
+    ):
+        super().__init__(creation_timestamp)
         self._path = path
         filename = os.path.basename(path)
         match = re.match(r"(.*)_\d+", filename)
@@ -862,12 +877,14 @@ class _LttngMetadataStreamSection:
 
 
 # An LTTng metadata stream.
-class _LttngMetadataStream:
+class _LttngMetadataStream(_LttngStream):
     def __init__(
         self,
         metadata_file_path: str,
         config_sections: Sequence[_LttngMetadataStreamSection],
+        creation_timestamp: int,
     ):
+        super().__init__(creation_timestamp)
         self._path = metadata_file_path
         self._sections = config_sections
         logging.info(
@@ -913,8 +930,8 @@ def _parse_metadata_sections_config(metadata_sections_json: tjson.ArrayVal):
     for section in metadata_sections_json:
         if isinstance(section, tjson.StrVal):
             if section.val == "empty":
-                # Found a empty section marker. Actually append the section at the
-                # timestamp of the next concrete section.
+                # Found an empty section marker. Actually append the
+                # section at the timestamp of the next concrete section.
                 append_empty_section = True
             else:
                 raise ValueError("Invalid string value at {}.".format(section.path))
@@ -1018,8 +1035,10 @@ class LttngTrace(Sequence[_LttngDataStream]):
         trace_dir: str,
         metadata_sections_json: Optional[tjson.ArrayVal],
         beacons_json: Optional[tjson.ObjVal],
+        creation_timestamp: int,
     ):
         self._path = trace_dir
+        self._creation_timestamp = creation_timestamp
         self._create_metadata_stream(trace_dir, metadata_sections_json)
         self._create_data_streams(trace_dir, beacons_json)
         logging.info('Built trace: path="{}"'.format(trace_dir))
@@ -1053,7 +1072,9 @@ class LttngTrace(Sequence[_LttngDataStream]):
                 this_beacons_json = beacons_json.at(stream_name, tjson.ArrayVal)
 
             self._data_streams.append(
-                _LttngDataStream(data_stream_path, this_beacons_json)
+                _LttngDataStream(
+                    data_stream_path, this_beacons_json, self._creation_timestamp
+                )
             )
 
     def _create_metadata_stream(
@@ -1072,7 +1093,9 @@ class LttngTrace(Sequence[_LttngDataStream]):
                 metadata_path, metadata_sections_json
             )
 
-        self._metadata_stream = _LttngMetadataStream(metadata_path, metadata_sections)
+        self._metadata_stream = _LttngMetadataStream(
+            metadata_path, metadata_sections, self.creation_timestamp
+        )
 
     @property
     def path(self):
@@ -1082,6 +1105,10 @@ class LttngTrace(Sequence[_LttngDataStream]):
     def metadata_stream(self):
         return self._metadata_stream
 
+    @property
+    def creation_timestamp(self):
+        return self._creation_timestamp
+
     @overload
     def __getitem__(self, index: int) -> _LttngDataStream:
         ...
@@ -1099,8 +1126,31 @@ class LttngTrace(Sequence[_LttngDataStream]):
         return len(self._data_streams)
 
 
+# Stream (metadata or data) state specific to the LTTng live protocol.
+class _LttngLiveViewerSessionStreamState:
+    @abstractmethod
+    def __init__(self):
+        # A stream is considered "announced" when it has been returned
+        # to the LTTng live client in response to a "get new stream
+        # infos" (`_LttngLiveViewerGetNewStreamInfosCommand`) command.
+        self._announced = False  # type: bool
+        pass
+
+    @property
+    def is_announced(self):
+        return self._announced
+
+    def mark_as_announced(self):
+        self._announced = True
+
+    @property
+    @abstractmethod
+    def stream(self) -> _LttngStream:
+        pass
+
+
 # The state of a single data stream.
-class _LttngLiveViewerSessionDataStreamState:
+class _LttngLiveViewerSessionDataStreamState(_LttngLiveViewerSessionStreamState):
     def __init__(
         self,
         ts_state: "_LttngLiveViewerSessionTracingSessionState",
@@ -1108,6 +1158,7 @@ class _LttngLiveViewerSessionDataStreamState:
         data_stream: _LttngDataStream,
         metadata_stream_id: int,
     ):
+        super().__init__()
         self._ts_state = ts_state
         self._info = info
         self._data_stream = data_stream
@@ -1132,7 +1183,7 @@ class _LttngLiveViewerSessionDataStreamState:
         return self._info
 
     @property
-    def data_stream(self):
+    def stream(self):
         return self._data_stream
 
     @property
@@ -1151,13 +1202,14 @@ class _LttngLiveViewerSessionDataStreamState:
 
 
 # The state of a single metadata stream.
-class _LttngLiveViewerSessionMetadataStreamState:
+class _LttngLiveViewerSessionMetadataStreamState(_LttngLiveViewerSessionStreamState):
     def __init__(
         self,
         ts_state: "_LttngLiveViewerSessionTracingSessionState",
         info: _LttngLiveViewerStreamInfo,
         metadata_stream: _LttngMetadataStream,
     ):
+        super().__init__()
         self._ts_state = ts_state
         self._info = info
         self._metadata_stream = metadata_stream
@@ -1169,7 +1221,7 @@ class _LttngLiveViewerSessionMetadataStreamState:
         else:
             self._next_metadata_stream_section_timestamp = None
 
-        self._is_sent = False
+        self._all_data_is_sent = False
         fmt = 'Built metadata stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
         logging.info(
             fmt.format(
@@ -1185,16 +1237,16 @@ class _LttngLiveViewerSessionMetadataStreamState:
         return self._info
 
     @property
-    def metadata_stream(self):
+    def stream(self):
         return self._metadata_stream
 
     @property
-    def is_sent(self):
-        return self._is_sent
+    def all_data_is_sent(self):
+        return self._all_data_is_sent
 
-    @is_sent.setter
-    def is_sent(self, value: bool):
-        self._is_sent = value
+    @all_data_is_sent.setter
+    def all_data_is_sent(self, value: bool):
+        self._all_data_is_sent = value
 
     @property
     def cur_section(self):
@@ -1262,41 +1314,61 @@ class LttngTracingSessionDescriptor:
 class _LttngLiveViewerSessionTracingSessionState:
     def __init__(self, tc_descr: LttngTracingSessionDescriptor, base_stream_id: int):
         self._tc_descr = tc_descr
-        self._stream_infos = []  # type: list[_LttngLiveViewerStreamInfo]
+        self._client_visible_stream_infos = []  # type: list[_LttngLiveViewerStreamInfo]
         self._ds_states = {}  # type: dict[int, _LttngLiveViewerSessionDataStreamState]
         self._ms_states = (
             {}
         )  # type: dict[int, _LttngLiveViewerSessionMetadataStreamState]
-        stream_id = base_stream_id
+        self._last_delivered_index_timestamp = 0
+        self._last_allocated_stream_id = base_stream_id
 
         for trace in tc_descr.traces:
-            trace_id = stream_id * 1000
+            trace_stream_infos = []  # type: list[_LttngLiveViewerStreamInfo]
+            trace_id = self._last_allocated_stream_id * 1000
 
             # Metadata stream -> stream info and metadata stream state
             info = _LttngLiveViewerStreamInfo(
-                stream_id, trace_id, True, trace.metadata_stream.path, "metadata"
+                self._last_allocated_stream_id,
+                trace_id,
+                True,
+                trace.metadata_stream.path,
+                "metadata",
             )
-            self._stream_infos.append(info)
-            self._ms_states[stream_id] = _LttngLiveViewerSessionMetadataStreamState(
+
+            trace_stream_infos.append(info)
+            self._ms_states[
+                self._last_allocated_stream_id
+            ] = _LttngLiveViewerSessionMetadataStreamState(
                 self, info, trace.metadata_stream
             )
-            metadata_stream_id = stream_id
-            stream_id += 1
+            metadata_stream_id = self._last_allocated_stream_id
+            self._last_allocated_stream_id += 1
 
             # Data streams -> stream infos and data stream states
             for data_stream in trace:
                 info = _LttngLiveViewerStreamInfo(
-                    stream_id,
+                    self._last_allocated_stream_id,
                     trace_id,
                     False,
                     data_stream.path,
                     data_stream.channel_name,
                 )
-                self._stream_infos.append(info)
-                self._ds_states[stream_id] = _LttngLiveViewerSessionDataStreamState(
+                trace_stream_infos.append(info)
+                self._ds_states[
+                    self._last_allocated_stream_id
+                ] = _LttngLiveViewerSessionDataStreamState(
                     self, info, data_stream, metadata_stream_id
                 )
-                stream_id += 1
+                self._last_allocated_stream_id += 1
+
+            if trace.creation_timestamp == 0:
+                # Only announce streams for traces that are created at
+                # the origin.
+                #
+                # The rest of the streams will be discovered by the
+                # client as indexes are received with a "has new data
+                # streams" flag set in the reply.
+                self._client_visible_stream_infos.extend(trace_stream_infos)
 
         self._is_attached = False
         fmt = 'Built tracing session state: id={}, name="{}"'
@@ -1315,12 +1387,17 @@ class _LttngLiveViewerSessionTracingSessionState:
         return self._ms_states
 
     @property
-    def stream_infos(self):
-        return self._stream_infos
+    def client_visible_stream_infos(self):
+        return self._client_visible_stream_infos
 
     @property
     def has_new_metadata(self):
-        return any([not ms.is_sent for ms in self._ms_states.values()])
+        return any(
+            [
+                ms.is_announced and not ms.all_data_is_sent
+                for ms in self._ms_states.values()
+            ]
+        )
 
     @property
     def is_attached(self):
@@ -1369,7 +1446,7 @@ class _LttngLiveViewerSession:
             )
             ts_id = ts_state.tracing_session_descriptor.info.tracing_session_id
             self._ts_states[ts_id] = ts_state
-            total_stream_infos += len(ts_state.stream_infos)
+            total_stream_infos += len(ts_state.client_visible_stream_infos)
 
             # Update session's stream states to have the new states
             self._stream_states.update(ts_state.data_stream_states)
@@ -1450,8 +1527,18 @@ class _LttngLiveViewerSession:
 
         ts_state.is_attached = True
         status = _LttngLiveViewerAttachToTracingSessionReply.Status.OK
+        stream_infos_to_announce = ts_state.client_visible_stream_infos
+
+        # Mark stream infos transmitted as part of the reply as
+        # announced.
+        for si in stream_infos_to_announce:
+            if si.is_metadata:
+                self._get_metadata_stream_state(si.id).mark_as_announced()
+            else:
+                self._get_data_stream_state(si.id).mark_as_announced()
+
         return _LttngLiveViewerAttachToTracingSessionReply(
-            status, ts_state.stream_infos
+            status, stream_infos_to_announce
         )
 
     def _handle_detach_from_tracing_session_command(
@@ -1473,6 +1560,21 @@ class _LttngLiveViewerSession:
         status = _LttngLiveViewerDetachFromTracingSessionReply.Status.OK
         return _LttngLiveViewerDetachFromTracingSessionReply(status)
 
+    @staticmethod
+    def _stream_is_ready(
+        stream_state: _LttngLiveViewerSessionStreamState, creation_timestamp: int
+    ):
+        return (
+            not stream_state.is_announced
+            and stream_state.stream.creation_timestamp <= creation_timestamp
+        )
+
+    def _needs_new_streams(self, current_timestamp: int):
+        return any(
+            self._stream_is_ready(ss, current_timestamp)
+            for ss in self._stream_states.values()
+        )
+
     def _handle_get_next_data_stream_index_entry_command(
         self, cmd: _LttngLiveViewerGetNextDataStreamIndexEntryCommand
     ):
@@ -1498,7 +1600,7 @@ class _LttngLiveViewerSession:
         timestamp_begin = _get_entry_timestamp_begin(stream_state.cur_index_entry)
 
         if needs_new_metadata_section(metadata_stream_state, timestamp_begin):
-            metadata_stream_state.is_sent = False
+            metadata_stream_state.all_data_is_sent = False
             metadata_stream_state.goto_next_section()
 
         # The viewer only checks the `has_new_metadata` flag if the
@@ -1510,8 +1612,12 @@ class _LttngLiveViewerSession:
             status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.INACTIVE
 
         reply = _LttngLiveViewerGetNextDataStreamIndexEntryReply(
-            status, stream_state.cur_index_entry, has_new_metadata, False
+            status,
+            stream_state.cur_index_entry,
+            has_new_metadata,
+            self._needs_new_streams(timestamp_begin),
         )
+        self._last_delivered_index_timestamp_begin = timestamp_begin
         stream_state.goto_next_index_entry()
         return reply
 
@@ -1539,7 +1645,7 @@ class _LttngLiveViewerSession:
             fmt = 'Limiting "get data stream packet data" command: req-length={} actual response size={}'
             logging.info(fmt.format(cmd.req_length, data_response_length))
 
-        data = stream_state.data_stream.get_data(cmd.offset, data_response_length)
+        data = stream_state.stream.get_data(cmd.offset, data_response_length)
         status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.OK
         return _LttngLiveViewerGetDataStreamPacketDataReply(status, data, False, False)
 
@@ -1550,18 +1656,18 @@ class _LttngLiveViewerSession:
         logging.info(fmt.format(cmd.stream_id))
         metadata_stream_state = self._get_metadata_stream_state(cmd.stream_id)
 
-        if metadata_stream_state.is_sent:
+        if metadata_stream_state.all_data_is_sent:
             status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.NO_NEW
             return _LttngLiveViewerGetMetadataStreamDataContentReply(status, bytes())
 
-        metadata_stream_state.is_sent = True
+        metadata_stream_state.all_data_is_sent = True
         status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.OK
         metadata_section = metadata_stream_state.cur_section
         assert metadata_section is not None
 
         # If we are sending an empty section, ready the next one right away.
         if len(metadata_section.data) == 0:
-            metadata_stream_state.is_sent = False
+            metadata_stream_state.all_data_is_sent = False
             metadata_stream_state.goto_next_section()
 
         fmt = 'Replying to "get metadata stream data" command: metadata-size={}'
@@ -1570,19 +1676,54 @@ class _LttngLiveViewerSession:
             status, metadata_section.data
         )
 
+    def _get_stream_infos_ready_for_announcement(self):
+        ready_stream_infos = []  # type: list[_LttngLiveViewerStreamInfo]
+
+        for ss in self._stream_states.values():
+            if self._stream_is_ready(ss, ss.stream.creation_timestamp):
+                ready_stream_infos.append(ss.info)
+
+        return ready_stream_infos
+
+    # A stream is considered finished if it has been announced and all
+    # of its index entries have been provided to the client.
+    def _all_streams_finished(self):
+        return all(
+            isinstance(stream_state, _LttngLiveViewerSessionMetadataStreamState)
+            or (stream_state.cur_index_entry is None and stream_state.is_announced)
+            for stream_state in self._stream_states.values()
+        )
+
     def _handle_get_new_stream_infos_command(
         self, cmd: _LttngLiveViewerGetNewStreamInfosCommand
     ):
         fmt = 'Handling "get new stream infos" command: ts-id={}'
         logging.info(fmt.format(cmd.tracing_session_id))
+        newly_announced_stream_infos = self._get_stream_infos_ready_for_announcement()
 
-        # As of this version, all the tracing session's stream infos are
-        # always given to the viewer when sending the "attach to tracing
-        # session" reply, so there's nothing new here. Return the `HUP`
-        # status as, if we're handling this command, the viewer consumed
-        # all the existing data streams.
-        status = _LttngLiveViewerGetNewStreamInfosReply.Status.HUP
-        return _LttngLiveViewerGetNewStreamInfosReply(status, [])
+        # Mark stream infos transmitted as part of the reply as
+        # announced.
+        for si in newly_announced_stream_infos:
+            if si.is_metadata:
+                self._get_metadata_stream_state(si.id).mark_as_announced()
+            else:
+                self._get_data_stream_state(si.id).mark_as_announced()
+
+        status = _LttngLiveViewerGetNewStreamInfosReply.Status.OK
+
+        if len(newly_announced_stream_infos) == 0:
+            # If all streams have been transmitted and no new traces are
+            # scheduled for creation, hang up to signal that the tracing
+            # session is "done".
+            status = (
+                _LttngLiveViewerGetNewStreamInfosReply.Status.HUP
+                if self._all_streams_finished()
+                else _LttngLiveViewerGetNewStreamInfosReply.Status.NO_NEW
+            )
+
+        return _LttngLiveViewerGetNewStreamInfosReply(
+            status, newly_announced_stream_infos
+        )
 
     def _handle_get_tracing_session_infos_command(
         self, cmd: _LttngLiveViewerGetTracingSessionInfosCommand
@@ -1840,7 +1981,8 @@ def _session_descriptors_from_path(
     #             "client-count": 23,
     #             "traces": [
     #                 {
-    #                     "path": "lol"
+    #                     "path": "lol",
+    #                     creation-timestamp: 12948
     #                 },
     #                 {
     #                     "path": "meow/mix",
@@ -1884,16 +2026,17 @@ def _session_descriptors_from_path(
                 else None
             )
             path = trace_json.at("path", tjson.StrVal).val
+            creation_timestamp = (
+                trace_json.at("creation-timestamp", tjson.IntVal).val
+                if "creation-timestamp" in trace_json
+                else 0
+            )  # type: int
 
             if not os.path.isabs(path) and trace_path_prefix:
                 path = os.path.join(trace_path_prefix, path)
 
             traces.append(
-                LttngTrace(
-                    path,
-                    metadata_sections,
-                    beacons,
-                )
+                LttngTrace(path, metadata_sections, beacons, creation_timestamp)
             )
 
         sessions.append(
This page took 0.032164 seconds and 4 git commands to generate.