Remove `skip-string-normalization` in Python formatter config
[babeltrace.git] / tests / data / plugins / src.ctf.lttng-live / lttng_live_server.py
index 91e1cc8a6f446b4df30e78520a7cfebffb26c45c..d0eb26d25c5a343b37a50574d223fab0a9a7c369 100644 (file)
@@ -1,24 +1,7 @@
-# The MIT License (MIT)
+# SPDX-License-Identifier: MIT
 #
-# Copyright (c) 2019 Philippe Proulx <pproulx@efficios.com>
+# Copyright (C) 2019 Philippe Proulx <pproulx@efficios.com>
 #
-# Permission is hereby granted, free of charge, to any person obtaining a copy
-# of this software and associated documentation files (the "Software"), to deal
-# in the Software without restriction, including without limitation the rights
-# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-# copies of the Software, and to permit persons to whom the Software is
-# furnished to do so, subject to the following conditions:
-#
-# The above copyright notice and this permission notice shall be included in
-# all copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-# THE SOFTWARE.
 
 import argparse
 import collections.abc
@@ -30,6 +13,8 @@ import socket
 import struct
 import sys
 import tempfile
+import json
+from collections import namedtuple
 
 
 class UnexpectedInput(RuntimeError):
@@ -409,14 +394,14 @@ class _LttngLiveViewerDetachFromTracingSessionReply:
 # An LTTng live protocol codec can convert bytes to command objects and
 # reply objects to bytes.
 class _LttngLiveViewerProtocolCodec:
-    _COMMAND_HEADER_STRUCT_FMT = 'QII'
+    _COMMAND_HEADER_STRUCT_FMT = "QII"
     _COMMAND_HEADER_SIZE_BYTES = struct.calcsize(_COMMAND_HEADER_STRUCT_FMT)
 
     def __init__(self):
         pass
 
     def _unpack(self, fmt, data, offset=0):
-        fmt = '!' + fmt
+        fmt = "!" + fmt
         return struct.unpack_from(fmt, data, offset)
 
     def _unpack_payload(self, fmt, data):
@@ -433,7 +418,7 @@ class _LttngLiveViewerProtocolCodec:
             self._COMMAND_HEADER_STRUCT_FMT, data
         )
         logging.info(
-            'Decoded command header: payload-size={}, cmd-type={}, version={}'.format(
+            "Decoded command header: payload-size={}, cmd-type={}, version={}".format(
                 payload_size, cmd_type, version
             )
         )
@@ -444,7 +429,7 @@ class _LttngLiveViewerProtocolCodec:
 
         if cmd_type == 1:
             viewer_session_id, major, minor, conn_type = self._unpack_payload(
-                'QIII', data
+                "QIII", data
             )
             return _LttngLiveViewerConnectCommand(
                 version, viewer_session_id, major, minor
@@ -452,46 +437,46 @@ class _LttngLiveViewerProtocolCodec:
         elif cmd_type == 2:
             return _LttngLiveViewerGetTracingSessionInfosCommand(version)
         elif cmd_type == 3:
-            tracing_session_id, offset, seek_type = self._unpack_payload('QQI', data)
+            tracing_session_id, offset, seek_type = self._unpack_payload("QQI", data)
             return _LttngLiveViewerAttachToTracingSessionCommand(
                 version, tracing_session_id, offset, seek_type
             )
         elif cmd_type == 4:
-            (stream_id,) = self._unpack_payload('Q', data)
+            (stream_id,) = self._unpack_payload("Q", data)
             return _LttngLiveViewerGetNextDataStreamIndexEntryCommand(
                 version, stream_id
             )
         elif cmd_type == 5:
-            stream_id, offset, req_length = self._unpack_payload('QQI', data)
+            stream_id, offset, req_length = self._unpack_payload("QQI", data)
             return _LttngLiveViewerGetDataStreamPacketDataCommand(
                 version, stream_id, offset, req_length
             )
         elif cmd_type == 6:
-            (stream_id,) = self._unpack_payload('Q', data)
+            (stream_id,) = self._unpack_payload("Q", data)
             return _LttngLiveViewerGetMetadataStreamDataCommand(version, stream_id)
         elif cmd_type == 7:
-            (tracing_session_id,) = self._unpack_payload('Q', data)
+            (tracing_session_id,) = self._unpack_payload("Q", data)
             return _LttngLiveViewerGetNewStreamInfosCommand(version, tracing_session_id)
         elif cmd_type == 8:
             return _LttngLiveViewerCreateViewerSessionCommand(version)
         elif cmd_type == 9:
-            (tracing_session_id,) = self._unpack_payload('Q', data)
+            (tracing_session_id,) = self._unpack_payload("Q", data)
             return _LttngLiveViewerDetachFromTracingSessionCommand(
                 version, tracing_session_id
             )
         else:
-            raise UnexpectedInput('Unknown command type {}'.format(cmd_type))
+            raise UnexpectedInput("Unknown command type {}".format(cmd_type))
 
     def _pack(self, fmt, *args):
         # Force network byte order
-        return struct.pack('!' + fmt, *args)
+        return struct.pack("!" + fmt, *args)
 
     def _encode_zero_padded_str(self, string, length):
         data = string.encode()
-        return data.ljust(length, b'\x00')
+        return data.ljust(length, b"\x00")
 
     def _encode_stream_info(self, info):
-        data = self._pack('QQI', info.id, info.trace_id, int(info.is_metadata))
+        data = self._pack("QQI", info.id, info.trace_id, int(info.is_metadata))
         data += self._encode_zero_padded_str(info.path, 4096)
         data += self._encode_zero_padded_str(info.channel_name, 255)
         return data
@@ -510,14 +495,14 @@ class _LttngLiveViewerProtocolCodec:
     def encode(self, reply):
         if type(reply) is _LttngLiveViewerConnectReply:
             data = self._pack(
-                'QIII', reply.viewer_session_id, reply.major, reply.minor, 2
+                "QIII", reply.viewer_session_id, reply.major, reply.minor, 2
             )
         elif type(reply) is _LttngLiveViewerGetTracingSessionInfosReply:
-            data = self._pack('I', len(reply.tracing_session_infos))
+            data = self._pack("I", len(reply.tracing_session_infos))
 
             for info in reply.tracing_session_infos:
                 data += self._pack(
-                    'QIII',
+                    "QIII",
                     info.tracing_session_id,
                     info.live_timer_freq,
                     info.client_count,
@@ -526,49 +511,65 @@ class _LttngLiveViewerProtocolCodec:
                 data += self._encode_zero_padded_str(info.hostname, 64)
                 data += self._encode_zero_padded_str(info.name, 255)
         elif type(reply) is _LttngLiveViewerAttachToTracingSessionReply:
-            data = self._pack('II', reply.status, len(reply.stream_infos))
+            data = self._pack("II", reply.status, len(reply.stream_infos))
 
             for info in reply.stream_infos:
                 data += self._encode_stream_info(info)
         elif type(reply) is _LttngLiveViewerGetNextDataStreamIndexEntryReply:
+            index_format = "QQQQQQQII"
             entry = reply.index_entry
             flags = self._get_has_new_stuff_flags(
                 reply.has_new_metadata, reply.has_new_data_stream
             )
 
-            data = self._pack(
-                'QQQQQQQII',
-                entry.offset_bytes,
-                entry.total_size_bits,
-                entry.content_size_bits,
-                entry.timestamp_begin,
-                entry.timestamp_end,
-                entry.events_discarded,
-                entry.stream_class_id,
-                reply.status,
-                flags,
-            )
+            if type(entry) is _LttngDataStreamIndexEntry:
+                data = self._pack(
+                    index_format,
+                    entry.offset_bytes,
+                    entry.total_size_bits,
+                    entry.content_size_bits,
+                    entry.timestamp_begin,
+                    entry.timestamp_end,
+                    entry.events_discarded,
+                    entry.stream_class_id,
+                    reply.status,
+                    flags,
+                )
+            else:
+                assert type(entry) is _LttngDataStreamBeaconEntry
+                data = self._pack(
+                    index_format,
+                    0,
+                    0,
+                    0,
+                    0,
+                    entry.timestamp,
+                    0,
+                    entry.stream_class_id,
+                    reply.status,
+                    flags,
+                )
         elif type(reply) is _LttngLiveViewerGetDataStreamPacketDataReply:
             flags = self._get_has_new_stuff_flags(
                 reply.has_new_metadata, reply.has_new_data_stream
             )
-            data = self._pack('III', reply.status, len(reply.data), flags)
+            data = self._pack("III", reply.status, len(reply.data), flags)
             data += reply.data
         elif type(reply) is _LttngLiveViewerGetMetadataStreamDataContentReply:
-            data = self._pack('QI', len(reply.data), reply.status)
+            data = self._pack("QI", len(reply.data), reply.status)
             data += reply.data
         elif type(reply) is _LttngLiveViewerGetNewStreamInfosReply:
-            data = self._pack('II', reply.status, len(reply.stream_infos))
+            data = self._pack("II", reply.status, len(reply.stream_infos))
 
             for info in reply.stream_infos:
                 data += self._encode_stream_info(info)
         elif type(reply) is _LttngLiveViewerCreateViewerSessionReply:
-            data = self._pack('I', reply.status)
+            data = self._pack("I", reply.status)
         elif type(reply) is _LttngLiveViewerDetachFromTracingSessionReply:
-            data = self._pack('I', reply.status)
+            data = self._pack("I", reply.status)
         else:
             raise ValueError(
-                'Unknown reply object with class `{}`'.format(reply.__class__.__name__)
+                "Unknown reply object with class `{}`".format(reply.__class__.__name__)
             )
 
         return data
@@ -631,11 +632,44 @@ class _LttngDataStreamIndexEntry:
         return self._stream_class_id
 
 
+# An entry within the index of an LTTng data stream. While a stream beacon entry
+# is conceptually unrelated to an index, it is sent as a reply to a
+# LttngLiveViewerGetNextDataStreamIndexEntryCommand
+class _LttngDataStreamBeaconEntry:
+    def __init__(self, stream_class_id, timestamp):
+        self._stream_class_id = stream_class_id
+        self._timestamp = timestamp
+
+    @property
+    def timestamp(self):
+        return self._timestamp
+
+    @property
+    def stream_class_id(self):
+        return self._stream_class_id
+
+
+def _get_entry_timestamp_begin(entry):
+    if type(entry) is _LttngDataStreamBeaconEntry:
+        return entry.timestamp
+    else:
+        assert type(entry) is _LttngDataStreamIndexEntry
+        return entry.timestamp_begin
+
+
 # The index of an LTTng data stream, a sequence of index entries.
 class _LttngDataStreamIndex(collections.abc.Sequence):
-    def __init__(self, path):
+    def __init__(self, path, beacons):
         self._path = path
         self._build()
+
+        if beacons:
+            stream_class_id = self._entries[0].stream_class_id
+            beacons = [
+                _LttngDataStreamBeaconEntry(stream_class_id, ts) for ts in beacons
+            ]
+            self._add_beacons(beacons)
+
         logging.info(
             'Built data stream index entries: path="{}", count={}'.format(
                 path, len(self._entries)
@@ -646,9 +680,9 @@ class _LttngDataStreamIndex(collections.abc.Sequence):
         self._entries = []
         assert os.path.isfile(self._path)
 
-        with open(self._path, 'rb') as f:
+        with open(self._path, "rb") as f:
             # Read header first
-            fmt = '>IIII'
+            fmt = ">IIII"
             size = struct.calcsize(fmt)
             data = f.read(size)
             assert len(data) == size
@@ -658,7 +692,7 @@ class _LttngDataStreamIndex(collections.abc.Sequence):
             assert magic == 0xC1F1DCC1
 
             # Read index entries
-            fmt = '>QQQQQQQ'
+            fmt = ">QQQQQQQ"
             size = struct.calcsize(fmt)
 
             while True:
@@ -699,6 +733,17 @@ class _LttngDataStreamIndex(collections.abc.Sequence):
                 # Skip anything else before the next entry
                 f.seek(index_entry_length - size, os.SEEK_CUR)
 
+    def _add_beacons(self, beacons):
+        # Assumes entries[n + 1].timestamp_end >= entries[n].timestamp_begin
+        def sort_key(entry):
+            if type(entry) is _LttngDataStreamBeaconEntry:
+                return entry.timestamp
+            else:
+                return entry.timestamp_end
+
+        self._entries += beacons
+        self._entries.sort(key=sort_key)
+
     def __getitem__(self, index):
         return self._entries[index]
 
@@ -712,16 +757,16 @@ class _LttngDataStreamIndex(collections.abc.Sequence):
 
 # An LTTng data stream.
 class _LttngDataStream:
-    def __init__(self, path):
+    def __init__(self, path, beacons):
         self._path = path
         filename = os.path.basename(path)
-        match = re.match(r'(.*)_\d+', filename)
+        match = re.match(r"(.*)_\d+", filename)
         self._channel_name = match.group(1)
         trace_dir = os.path.dirname(path)
-        index_path = os.path.join(trace_dir, 'index', filename + '.idx')
-        self._index = _LttngDataStreamIndex(index_path)
+        index_path = os.path.join(trace_dir, "index", filename + ".idx")
+        self._index = _LttngDataStreamIndex(index_path, beacons)
         assert os.path.isfile(path)
-        self._file = open(path, 'rb')
+        self._file = open(path, "rb")
         logging.info(
             'Built data stream: path="{}", channel-name="{}"'.format(
                 path, self._channel_name
@@ -745,36 +790,166 @@ class _LttngDataStream:
         return self._file.read(len_bytes)
 
 
+class _LttngMetadataStreamSection:
+    def __init__(self, timestamp, data):
+        self._timestamp = timestamp
+        if data is None:
+            self._data = bytes()
+        else:
+            self._data = data
+        logging.info(
+            "Built metadata stream section: ts={}, data-len={}".format(
+                self._timestamp, len(self._data)
+            )
+        )
+
+    @property
+    def timestamp(self):
+        return self._timestamp
+
+    @property
+    def data(self):
+        return self._data
+
+
 # An LTTng metadata stream.
 class _LttngMetadataStream:
-    def __init__(self, path):
-        self._path = path
-        logging.info('Built metadata stream: path="{}"'.format(path))
+    def __init__(self, metadata_file_path, config_sections):
+        self._path = metadata_file_path
+        self._sections = config_sections
+        logging.info(
+            "Built metadata stream: path={}, section-len={}".format(
+                self._path, len(self._sections)
+            )
+        )
 
     @property
     def path(self):
         return self._path
 
     @property
-    def data(self):
-        assert os.path.isfile(self._path)
+    def sections(self):
+        return self._sections
+
+
+LttngMetadataConfigSection = namedtuple(
+    "LttngMetadataConfigSection", ["line", "timestamp", "is_empty"]
+)
+
+
+def _parse_metadata_sections_config(config_sections):
+    assert config_sections is not None
+    config_metadata_sections = []
+    append_empty_section = False
+    last_timestamp = 0
+    last_line = 0
+
+    for config_section in config_sections:
+        if config_section == "empty":
+            # Found a empty section marker. Actually append the section at the
+            # timestamp of the next concrete section.
+            append_empty_section = True
+        else:
+            assert type(config_section) is dict
+            line = config_section.get("line")
+            ts = config_section.get("timestamp")
+
+            # Sections' timestamps and lines must both be increasing.
+            assert ts > last_timestamp
+            last_timestamp = ts
+            assert line > last_line
+            last_line = line
+
+            if append_empty_section:
+                config_metadata_sections.append(
+                    LttngMetadataConfigSection(line, ts, True)
+                )
+                append_empty_section = False
+
+            config_metadata_sections.append(LttngMetadataConfigSection(line, ts, False))
+
+    return config_metadata_sections
+
+
+def _split_metadata_sections(metadata_file_path, raw_config_sections):
+    assert isinstance(raw_config_sections, collections.abc.Sequence)
+
+    parsed_sections = _parse_metadata_sections_config(raw_config_sections)
+
+    sections = []
+    with open(metadata_file_path, "r") as metadata_file:
+        metadata_lines = [line for line in metadata_file]
+
+    config_metadata_sections_idx = 0
+    curr_metadata_section = bytearray()
+
+    for idx, line_content in enumerate(metadata_lines):
+        # Add one to the index to convert from the zero-indexing of the
+        # enumerate() function to the one-indexing used by humans when
+        # viewing a text file.
+        curr_line_number = idx + 1
+
+        # If there are no more sections, simply append the line.
+        if config_metadata_sections_idx + 1 >= len(parsed_sections):
+            curr_metadata_section += bytearray(line_content, "utf8")
+            continue
+
+        next_section_line_number = parsed_sections[
+            config_metadata_sections_idx + 1
+        ].line
+
+        # If the next section begins at the current line, create a
+        # section with the metadata we gathered so far.
+        if curr_line_number >= next_section_line_number:
+
+            # Flushing the metadata of the current section.
+            sections.append(
+                _LttngMetadataStreamSection(
+                    parsed_sections[config_metadata_sections_idx].timestamp,
+                    bytes(curr_metadata_section),
+                )
+            )
+
+            # Move to the next section.
+            config_metadata_sections_idx += 1
 
-        with open(self._path, 'rb') as f:
-            return f.read()
+            # Clear old content and append current line for the next section.
+            curr_metadata_section.clear()
+            curr_metadata_section += bytearray(line_content, "utf8")
+
+            # Append any empty sections.
+            while parsed_sections[config_metadata_sections_idx].is_empty:
+                sections.append(
+                    _LttngMetadataStreamSection(
+                        parsed_sections[config_metadata_sections_idx].timestamp, None
+                    )
+                )
+                config_metadata_sections_idx += 1
+        else:
+            # Append line_content to the current metadata section.
+            curr_metadata_section += bytearray(line_content, "utf8")
+
+    # We iterated over all the lines of the metadata file. Close the current section.
+    sections.append(
+        _LttngMetadataStreamSection(
+            parsed_sections[config_metadata_sections_idx].timestamp,
+            bytes(curr_metadata_section),
+        )
+    )
+
+    return sections
 
 
 # An LTTng trace, a sequence of LTTng data streams.
 class LttngTrace(collections.abc.Sequence):
-    def __init__(self, trace_dir):
+    def __init__(self, trace_dir, metadata_sections, beacons):
         assert os.path.isdir(trace_dir)
         self._path = trace_dir
-        self._metadata_stream = _LttngMetadataStream(
-            os.path.join(trace_dir, 'metadata')
-        )
-        self._create_data_streams(trace_dir)
+        self._create_metadata_stream(trace_dir, metadata_sections)
+        self._create_data_streams(trace_dir, beacons)
         logging.info('Built trace: path="{}"'.format(trace_dir))
 
-    def _create_data_streams(self, trace_dir):
+    def _create_data_streams(self, trace_dir, beacons):
         data_stream_paths = []
 
         for filename in os.listdir(trace_dir):
@@ -783,10 +958,10 @@ class LttngTrace(collections.abc.Sequence):
             if not os.path.isfile(path):
                 continue
 
-            if filename.startswith('.'):
+            if filename.startswith("."):
                 continue
 
-            if filename == 'metadata':
+            if filename == "metadata":
                 continue
 
             data_stream_paths.append(path)
@@ -795,7 +970,31 @@ class LttngTrace(collections.abc.Sequence):
         self._data_streams = []
 
         for data_stream_path in data_stream_paths:
-            self._data_streams.append(_LttngDataStream(data_stream_path))
+            stream_name = os.path.basename(data_stream_path)
+            this_stream_beacons = None
+
+            if beacons is not None and stream_name in beacons:
+                this_stream_beacons = beacons[stream_name]
+
+            self._data_streams.append(
+                _LttngDataStream(data_stream_path, this_stream_beacons)
+            )
+
+    def _create_metadata_stream(self, trace_dir, config_metadata_sections):
+        metadata_path = os.path.join(trace_dir, "metadata")
+        metadata_sections = []
+
+        if config_metadata_sections is None:
+            with open(metadata_path, "rb") as metadata_file:
+                metadata_sections.append(
+                    _LttngMetadataStreamSection(0, metadata_file.read())
+                )
+        else:
+            metadata_sections = _split_metadata_sections(
+                metadata_path, config_metadata_sections
+            )
+
+        self._metadata_stream = _LttngMetadataStream(metadata_path, metadata_sections)
 
     @property
     def path(self):
@@ -814,10 +1013,11 @@ class LttngTrace(collections.abc.Sequence):
 
 # The state of a single data stream.
 class _LttngLiveViewerSessionDataStreamState:
-    def __init__(self, ts_state, info, data_stream):
+    def __init__(self, ts_state, info, data_stream, metadata_stream_id):
         self._ts_state = ts_state
         self._info = info
         self._data_stream = data_stream
+        self._metadata_stream_id = metadata_stream_id
         self._cur_index_entry_index = 0
         fmt = 'Built data stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
         logging.info(
@@ -858,6 +1058,14 @@ class _LttngLiveViewerSessionMetadataStreamState:
         self._ts_state = ts_state
         self._info = info
         self._metadata_stream = metadata_stream
+        self._cur_metadata_stream_section_index = 0
+        if len(metadata_stream.sections) > 1:
+            self._next_metadata_stream_section_timestamp = metadata_stream.sections[
+                1
+            ].timestamp
+        else:
+            self._next_metadata_stream_section_timestamp = None
+
         self._is_sent = False
         fmt = 'Built metadata stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
         logging.info(
@@ -889,6 +1097,28 @@ class _LttngLiveViewerSessionMetadataStreamState:
     def is_sent(self, value):
         self._is_sent = value
 
+    @property
+    def cur_section(self):
+        fmt = "Get current metadata section: section-idx={}"
+        logging.info(fmt.format(self._cur_metadata_stream_section_index))
+        if self._cur_metadata_stream_section_index == len(
+            self._metadata_stream.sections
+        ):
+            return
+
+        return self._metadata_stream.sections[self._cur_metadata_stream_section_index]
+
+    def goto_next_section(self):
+        self._cur_metadata_stream_section_index += 1
+        if self.cur_section:
+            self._next_metadata_stream_section_timestamp = self.cur_section.timestamp
+        else:
+            self._next_metadata_stream_section_timestamp = None
+
+    @property
+    def next_section_timestamp(self):
+        return self._next_metadata_stream_section_timestamp
+
 
 # The state of a tracing session.
 class _LttngLiveViewerSessionTracingSessionState:
@@ -902,6 +1132,17 @@ class _LttngLiveViewerSessionTracingSessionState:
         for trace in tc_descr.traces:
             trace_id = stream_id * 1000
 
+            # Metadata stream -> stream info and metadata stream state
+            info = _LttngLiveViewerStreamInfo(
+                stream_id, trace_id, True, trace.metadata_stream.path, "metadata"
+            )
+            self._stream_infos.append(info)
+            self._ms_states[stream_id] = _LttngLiveViewerSessionMetadataStreamState(
+                self, info, trace.metadata_stream
+            )
+            metadata_stream_id = stream_id
+            stream_id += 1
+
             # Data streams -> stream infos and data stream states
             for data_stream in trace:
                 info = _LttngLiveViewerStreamInfo(
@@ -913,20 +1154,10 @@ class _LttngLiveViewerSessionTracingSessionState:
                 )
                 self._stream_infos.append(info)
                 self._ds_states[stream_id] = _LttngLiveViewerSessionDataStreamState(
-                    self, info, data_stream
+                    self, info, data_stream, metadata_stream_id
                 )
                 stream_id += 1
 
-            # Metadata stream -> stream info and metadata stream state
-            info = _LttngLiveViewerStreamInfo(
-                stream_id, trace_id, True, trace.metadata_stream.path, 'metadata'
-            )
-            self._stream_infos.append(info)
-            self._ms_states[stream_id] = _LttngLiveViewerSessionMetadataStreamState(
-                self, info, trace.metadata_stream
-            )
-            stream_id += 1
-
         self._is_attached = False
         fmt = 'Built tracing session state: id={}, name="{}"'
         logging.info(fmt.format(tc_descr.info.tracing_session_id, tc_descr.info.name))
@@ -960,6 +1191,16 @@ class _LttngLiveViewerSessionTracingSessionState:
         self._is_attached = value
 
 
+def needs_new_metadata_section(metadata_stream_state, latest_timestamp):
+    if metadata_stream_state.next_section_timestamp is None:
+        return False
+
+    if latest_timestamp >= metadata_stream_state.next_section_timestamp:
+        return True
+    else:
+        return False
+
+
 # An LTTng live viewer session manages a view on tracing sessions
 # and replies to commands accordingly.
 class _LttngLiveViewerSession:
@@ -1005,20 +1246,20 @@ class _LttngLiveViewerSession:
     def _get_tracing_session_state(self, tracing_session_id):
         if tracing_session_id not in self._ts_states:
             raise UnexpectedInput(
-                'Unknown tracing session ID {}'.format(tracing_session_id)
+                "Unknown tracing session ID {}".format(tracing_session_id)
             )
 
         return self._ts_states[tracing_session_id]
 
     def _get_stream_state(self, stream_id):
         if stream_id not in self._stream_states:
-            UnexpectedInput('Unknown stream ID {}'.format(stream_id))
+            UnexpectedInput("Unknown stream ID {}".format(stream_id))
 
         return self._stream_states[stream_id]
 
     def handle_command(self, cmd):
         logging.info(
-            'Handling command in viewer session: cmd-cls-name={}'.format(
+            "Handling command in viewer session: cmd-cls-name={}".format(
                 cmd.__class__.__name__
             )
         )
@@ -1026,7 +1267,7 @@ class _LttngLiveViewerSession:
 
         if cmd_type not in self._command_handlers:
             raise UnexpectedInput(
-                'Unexpected command: cmd-cls-name={}'.format(cmd.__class__.__name__)
+                "Unexpected command: cmd-cls-name={}".format(cmd.__class__.__name__)
             )
 
         return self._command_handlers[cmd_type](cmd)
@@ -1039,7 +1280,7 @@ class _LttngLiveViewerSession:
 
         if ts_state.is_attached:
             raise UnexpectedInput(
-                'Cannot attach to tracing session `{}`: viewer is already attached'.format(
+                "Cannot attach to tracing session `{}`: viewer is already attached".format(
                     info.name
                 )
             )
@@ -1058,7 +1299,7 @@ class _LttngLiveViewerSession:
 
         if not ts_state.is_attached:
             raise UnexpectedInput(
-                'Cannot detach to tracing session `{}`: viewer is not attached'.format(
+                "Cannot detach to tracing session `{}`: viewer is not attached".format(
                     info.name
                 )
             )
@@ -1071,10 +1312,11 @@ class _LttngLiveViewerSession:
         fmt = 'Handling "get next data stream index entry" command: stream-id={}'
         logging.info(fmt.format(cmd.stream_id))
         stream_state = self._get_stream_state(cmd.stream_id)
+        metadata_stream_state = self._get_stream_state(stream_state._metadata_stream_id)
 
         if type(stream_state) is not _LttngLiveViewerSessionDataStreamState:
             raise UnexpectedInput(
-                'Stream with ID {} is not a data stream'.format(cmd.stream_id)
+                "Stream with ID {} is not a data stream".format(cmd.stream_id)
             )
 
         if stream_state.cur_index_entry is None:
@@ -1089,10 +1331,21 @@ class _LttngLiveViewerSession:
                 status, index_entry, False, False
             )
 
+        timestamp_begin = _get_entry_timestamp_begin(stream_state.cur_index_entry)
+
+        if needs_new_metadata_section(metadata_stream_state, timestamp_begin):
+            metadata_stream_state.is_sent = False
+            metadata_stream_state.goto_next_section()
+
         # The viewer only checks the `has_new_metadata` flag if the
         # reply's status is `OK`, so we need to provide an index here
         has_new_metadata = stream_state.tracing_session_state.has_new_metadata
-        status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.OK
+        if type(stream_state.cur_index_entry) is _LttngDataStreamIndexEntry:
+            status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.OK
+        else:
+            assert type(stream_state.cur_index_entry) is _LttngDataStreamBeaconEntry
+            status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.INACTIVE
+
         reply = _LttngLiveViewerGetNextDataStreamIndexEntryReply(
             status, stream_state.cur_index_entry, has_new_metadata, False
         )
@@ -1107,7 +1360,7 @@ class _LttngLiveViewerSession:
 
         if type(stream_state) is not _LttngLiveViewerSessionDataStreamState:
             raise UnexpectedInput(
-                'Stream with ID {} is not a data stream'.format(cmd.stream_id)
+                "Stream with ID {} is not a data stream".format(cmd.stream_id)
             )
 
         if stream_state.tracing_session_state.has_new_metadata:
@@ -1133,21 +1386,33 @@ class _LttngLiveViewerSession:
     def _handle_get_metadata_stream_data_command(self, cmd):
         fmt = 'Handling "get metadata stream data" command: stream-id={}'
         logging.info(fmt.format(cmd.stream_id))
-        stream_state = self._get_stream_state(cmd.stream_id)
+        metadata_stream_state = self._get_stream_state(cmd.stream_id)
 
-        if type(stream_state) is not _LttngLiveViewerSessionMetadataStreamState:
+        if (
+            type(metadata_stream_state)
+            is not _LttngLiveViewerSessionMetadataStreamState
+        ):
             raise UnexpectedInput(
-                'Stream with ID {} is not a metadata stream'.format(cmd.stream_id)
+                "Stream with ID {} is not a metadata stream".format(cmd.stream_id)
             )
 
-        if stream_state.is_sent:
+        if metadata_stream_state.is_sent:
             status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.NO_NEW
             return _LttngLiveViewerGetMetadataStreamDataContentReply(status, bytes())
 
-        stream_state.is_sent = True
+        metadata_stream_state.is_sent = True
         status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.OK
+        metadata_section = metadata_stream_state.cur_section
+
+        # If we are sending an empty section, ready the next one right away.
+        if len(metadata_section.data) == 0:
+            metadata_stream_state.is_sent = False
+            metadata_stream_state.goto_next_section()
+
+        fmt = 'Replying to "get metadata stream data" command: metadata-size={}'
+        logging.info(fmt.format(len(metadata_section.data)))
         return _LttngLiveViewerGetMetadataStreamDataContentReply(
-            status, stream_state.metadata_stream.data
+            status, metadata_section.data
         )
 
     def _handle_get_new_stream_infos_command(self, cmd):
@@ -1196,13 +1461,13 @@ class LttngLiveServer:
     def __init__(
         self, port_filename, tracing_session_descriptors, max_query_data_response_size
     ):
-        logging.info('Server configuration:')
+        logging.info("Server configuration:")
 
-        logging.info('  Port file name: `{}`'.format(port_filename))
+        logging.info("  Port file name: `{}`".format(port_filename))
 
         if max_query_data_response_size is not None:
             logging.info(
-                '  Maximum response data query size: `{}`'.format(
+                "  Maximum response data query size: `{}`".format(
                     max_query_data_response_size
                 )
             )
@@ -1230,7 +1495,7 @@ class LttngLiveServer:
         self._codec = _LttngLiveViewerProtocolCodec()
 
         # Port 0: OS assigns an unused port
-        serv_addr = ('localhost', 0)
+        serv_addr = ("localhost", 0)
         self._sock.bind(serv_addr)
         self._write_port_to_file(port_filename)
 
@@ -1238,7 +1503,7 @@ class LttngLiveServer:
             self._listen()
         finally:
             self._sock.close()
-            logging.info('Closed connection and socket.')
+            logging.info("Closed connection and socket.")
 
     @property
     def _server_port(self):
@@ -1248,33 +1513,33 @@ class LttngLiveServer:
         data = bytes()
 
         while True:
-            logging.info('Waiting for viewer command.')
+            logging.info("Waiting for viewer command.")
             buf = self._conn.recv(128)
 
             if not buf:
-                logging.info('Client closed connection.')
+                logging.info("Client closed connection.")
 
                 if data:
                     raise UnexpectedInput(
-                        'Client closed connection after having sent {} command bytes.'.format(
+                        "Client closed connection after having sent {} command bytes.".format(
                             len(data)
                         )
                     )
 
                 return
 
-            logging.info('Received data from viewer: length={}'.format(len(buf)))
+            logging.info("Received data from viewer: length={}".format(len(buf)))
 
             data += buf
 
             try:
                 cmd = self._codec.decode(data)
             except struct.error as exc:
-                raise UnexpectedInput('Malformed command: {}'.format(exc)) from exc
+                raise UnexpectedInput("Malformed command: {}".format(exc)) from exc
 
             if cmd is not None:
                 logging.info(
-                    'Received command from viewer: cmd-cls-name={}'.format(
+                    "Received command from viewer: cmd-cls-name={}".format(
                         cmd.__class__.__name__
                     )
                 )
@@ -1283,7 +1548,7 @@ class LttngLiveServer:
     def _send_reply(self, reply):
         data = self._codec.encode(reply)
         logging.info(
-            'Sending reply to viewer: reply-cls-name={}, length={}'.format(
+            "Sending reply to viewer: reply-cls-name={}, length={}".format(
                 reply.__class__.__name__, len(data)
             )
         )
@@ -1302,7 +1567,7 @@ class LttngLiveServer:
 
         # Create viewer session (arbitrary ID 23)
         logging.info(
-            'LTTng live viewer connected: version={}.{}'.format(cmd.major, cmd.minor)
+            "LTTng live viewer connected: version={}.{}".format(cmd.major, cmd.minor)
         )
         viewer_session = _LttngLiveViewerSession(
             23, self._ts_descriptors, self._max_query_data_response_size
@@ -1325,13 +1590,13 @@ class LttngLiveServer:
             self._send_reply(viewer_session.handle_command(cmd))
 
     def _listen(self):
-        logging.info('Listening: port={}'.format(self._server_port))
+        logging.info("Listening: port={}".format(self._server_port))
         # Backlog must be present for Python version < 3.5.
         # 128 is an arbitrary number since we expect only 1 connection anyway.
         self._sock.listen(128)
         self._conn, viewer_addr = self._sock.accept()
         logging.info(
-            'Accepted viewer: addr={}:{}'.format(viewer_addr[0], viewer_addr[1])
+            "Accepted viewer: addr={}:{}".format(viewer_addr[0], viewer_addr[1])
         )
 
         try:
@@ -1341,8 +1606,8 @@ class LttngLiveServer:
 
     def _write_port_to_file(self, port_filename):
         # Write the port number to a temporary file.
-        with tempfile.NamedTemporaryFile(mode='w', delete=False) as tmp_port_file:
-            print(self._server_port, end='', file=tmp_port_file)
+        with tempfile.NamedTemporaryFile(mode="w", delete=False) as tmp_port_file:
+            print(self._server_port, end="", file=tmp_port_file)
 
         # Rename temporary file to real file
         os.replace(tmp_port_file.name, port_filename)
@@ -1363,7 +1628,7 @@ class LttngTracingSessionDescriptor:
     ):
         for trace in traces:
             if name not in trace.path:
-                fmt = 'Tracing session name must be part of every trace path (`{}` not found in `{}`)'
+                fmt = "Tracing session name must be part of every trace path (`{}` not found in `{}`)"
                 raise ValueError(fmt.format(name, trace.path))
 
         self._traces = traces
@@ -1386,74 +1651,130 @@ class LttngTracingSessionDescriptor:
         return self._info
 
 
-def _tracing_session_descriptors_from_arg(string):
-    # Format is:
-    #     NAME,ID,HOSTNAME,FREQ,CLIENTS,TRACEPATH[,TRACEPATH]...
-    parts = string.split(',')
-    name = parts[0]
-    tracing_session_id = int(parts[1])
-    hostname = parts[2]
-    live_timer_freq = int(parts[3])
-    client_count = int(parts[4])
-    traces = [LttngTrace(path) for path in parts[5:]]
-    return LttngTracingSessionDescriptor(
-        name, tracing_session_id, hostname, live_timer_freq, client_count, traces
-    )
+def _session_descriptors_from_path(sessions_filename, trace_path_prefix):
+    # File format is:
+    #
+    #     [
+    #         {
+    #             "name": "my-session",
+    #             "id": 17,
+    #             "hostname": "myhost",
+    #             "live-timer-freq": 1000000,
+    #             "client-count": 23,
+    #             "traces": [
+    #                 {
+    #                     "path": "lol"
+    #                 },
+    #                 {
+    #                     "path": "meow/mix",
+    #                     "beacons": {
+    #                         "my_stream": [ 5235787, 728375283 ]
+    #                     },
+    #                     "metadata-sections": [
+    #                           {
+    #                                "line": 1,
+    #                                "timestamp": 0
+    #                           }
+    #                      ]
+    #                 }
+    #             ]
+    #         }
+    #     ]
+    with open(sessions_filename, "r") as sessions_file:
+        params = json.load(sessions_file)
+
+    sessions = []
+
+    for session in params:
+        name = session["name"]
+        tracing_session_id = session["id"]
+        hostname = session["hostname"]
+        live_timer_freq = session["live-timer-freq"]
+        client_count = session["client-count"]
+        traces = []
+
+        for trace in session["traces"]:
+            metadata_sections = trace.get("metadata-sections")
+            beacons = trace.get("beacons")
+            path = trace["path"]
+
+            if not os.path.isabs(path):
+                path = os.path.join(trace_path_prefix, path)
+
+            traces.append(LttngTrace(path, metadata_sections, beacons))
+
+        sessions.append(
+            LttngTracingSessionDescriptor(
+                name,
+                tracing_session_id,
+                hostname,
+                live_timer_freq,
+                client_count,
+                traces,
+            )
+        )
+
+    return sessions
 
 
 def _loglevel_parser(string):
-    loglevels = {'info': logging.INFO, 'warning': logging.WARNING}
+    loglevels = {"info": logging.INFO, "warning": logging.WARNING}
     if string not in loglevels:
         msg = "{} is not a valid loglevel".format(string)
         raise argparse.ArgumentTypeError(msg)
     return loglevels[string]
 
 
-if __name__ == '__main__':
-    logging.basicConfig(format='# %(asctime)-25s%(message)s')
+if __name__ == "__main__":
+    logging.basicConfig(format="# %(asctime)-25s%(message)s")
     parser = argparse.ArgumentParser(
-        description='LTTng-live protocol mocker', add_help=False
+        description="LTTng-live protocol mocker", add_help=False
     )
     parser.add_argument(
-        '--log-level',
-        default='warning',
-        choices=['info', 'warning'],
-        help='The loglevel to be used.',
+        "--log-level",
+        default="warning",
+        choices=["info", "warning"],
+        help="The loglevel to be used.",
     )
 
     loglevel_namespace, remaining_args = parser.parse_known_args()
     logging.getLogger().setLevel(_loglevel_parser(loglevel_namespace.log_level))
 
     parser.add_argument(
-        '--port-filename',
-        help='The final port file. This file is present when the server is ready to receive connection.',
+        "--port-filename",
+        help="The final port file. This file is present when the server is ready to receive connection.",
         required=True,
     )
     parser.add_argument(
-        '--max-query-data-response-size',
+        "--max-query-data-response-size",
         type=int,
-        help='The maximum size of control data response in bytes',
+        help="The maximum size of control data response in bytes",
+    )
+    parser.add_argument(
+        "--trace-path-prefix",
+        type=str,
+        help="Prefix to prepend to the trace paths of session configurations",
     )
     parser.add_argument(
-        'sessions',
-        nargs="+",
-        metavar="SESSION",
-        type=_tracing_session_descriptors_from_arg,
-        help='A session configuration. There is no space after comma. Format is: NAME,ID,HOSTNAME,FREQ,CLIENTS,TRACEPATH[,TRACEPATH]....',
+        "--sessions-filename",
+        type=str,
+        help="Path to a session configuration file",
     )
     parser.add_argument(
-        '-h',
-        '--help',
-        action='help',
+        "-h",
+        "--help",
+        action="help",
         default=argparse.SUPPRESS,
-        help='Show this help message and exit.',
+        help="Show this help message and exit.",
     )
 
     args = parser.parse_args(args=remaining_args)
     try:
-        LttngLiveServer(
-            args.port_filename, args.sessions, args.max_query_data_response_size
+        sessions = _session_descriptors_from_path(
+            args.sessions_filename,
+            args.trace_path_prefix,
         )
+        LttngLiveServer(args.port_filename, sessions, args.max_query_data_response_size)
     except UnexpectedInput as exc:
         logging.error(str(exc))
         print(exc, file=sys.stderr)
This page took 0.037399 seconds and 4 git commands to generate.