tests: change lttng_live_server.py's --sessions-filename to be a positional argument
[babeltrace.git] / tests / data / plugins / src.ctf.lttng-live / lttng_live_server.py
index 0ffb909968a7f8a3a9f497c29be0fdb90880d599..032b0349848d9c8914d457b5fdb539ca8a4ec6cd 100644 (file)
 # Copyright (C) 2019 Philippe Proulx <pproulx@efficios.com>
 #
 
-import argparse
-import collections.abc
-import logging
+# pyright: strict, reportTypeCommentUsage=false,  reportMissingTypeStubs=false
+
 import os
-import os.path
 import re
 import socket
 import struct
-import sys
+import logging
+import os.path
+import argparse
 import tempfile
-import json
-from collections import namedtuple
+from typing import Dict, Union, Iterable, Optional, Sequence, overload
 
+import tjson
 
-class UnexpectedInput(RuntimeError):
-    pass
+# isort: off
+from typing import Any, Callable  # noqa: F401
+
+# isort: on
+
+
+# An entry within the index of an LTTng data stream.
+class _LttngDataStreamIndexEntry:
+    def __init__(
+        self,
+        offset_bytes: int,
+        total_size_bits: int,
+        content_size_bits: int,
+        timestamp_begin: int,
+        timestamp_end: int,
+        events_discarded: int,
+        stream_class_id: int,
+    ):
+        self._offset_bytes = offset_bytes
+        self._total_size_bits = total_size_bits
+        self._content_size_bits = content_size_bits
+        self._timestamp_begin = timestamp_begin
+        self._timestamp_end = timestamp_end
+        self._events_discarded = events_discarded
+        self._stream_class_id = stream_class_id
+
+    @property
+    def offset_bytes(self):
+        return self._offset_bytes
+
+    @property
+    def total_size_bits(self):
+        return self._total_size_bits
+
+    @property
+    def total_size_bytes(self):
+        return self._total_size_bits // 8
+
+    @property
+    def content_size_bits(self):
+        return self._content_size_bits
+
+    @property
+    def content_size_bytes(self):
+        return self._content_size_bits // 8
+
+    @property
+    def timestamp_begin(self):
+        return self._timestamp_begin
+
+    @property
+    def timestamp_end(self):
+        return self._timestamp_end
+
+    @property
+    def events_discarded(self):
+        return self._events_discarded
+
+    @property
+    def stream_class_id(self):
+        return self._stream_class_id
+
+
+# An entry within the index of an LTTng data stream. While a stream beacon entry
+# is conceptually unrelated to an index, it is sent as a reply to a
+# LttngLiveViewerGetNextDataStreamIndexEntryCommand
+class _LttngDataStreamBeaconIndexEntry:
+    def __init__(self, stream_class_id: int, timestamp: int):
+        self._stream_class_id = stream_class_id
+        self._timestamp = timestamp
+
+    @property
+    def timestamp(self):
+        return self._timestamp
+
+    @property
+    def stream_class_id(self):
+        return self._stream_class_id
+
+
+_LttngIndexEntryT = Union[_LttngDataStreamIndexEntry, _LttngDataStreamBeaconIndexEntry]
 
 
 class _LttngLiveViewerCommand:
-    def __init__(self, version):
+    def __init__(self, version: int):
         self._version = version
 
     @property
@@ -31,7 +110,7 @@ class _LttngLiveViewerCommand:
 
 
 class _LttngLiveViewerConnectCommand(_LttngLiveViewerCommand):
-    def __init__(self, version, viewer_session_id, major, minor):
+    def __init__(self, version: int, viewer_session_id: int, major: int, minor: int):
         super().__init__(version)
         self._viewer_session_id = viewer_session_id
         self._major = major
@@ -50,8 +129,12 @@ class _LttngLiveViewerConnectCommand(_LttngLiveViewerCommand):
         return self._minor
 
 
-class _LttngLiveViewerConnectReply:
-    def __init__(self, viewer_session_id, major, minor):
+class _LttngLiveViewerReply:
+    pass
+
+
+class _LttngLiveViewerConnectReply(_LttngLiveViewerReply):
+    def __init__(self, viewer_session_id: int, major: int, minor: int):
         self._viewer_session_id = viewer_session_id
         self._major = major
         self._minor = minor
@@ -76,12 +159,12 @@ class _LttngLiveViewerGetTracingSessionInfosCommand(_LttngLiveViewerCommand):
 class _LttngLiveViewerTracingSessionInfo:
     def __init__(
         self,
-        tracing_session_id,
-        live_timer_freq,
-        client_count,
-        stream_count,
-        hostname,
-        name,
+        tracing_session_id: int,
+        live_timer_freq: int,
+        client_count: int,
+        stream_count: int,
+        hostname: str,
+        name: str,
     ):
         self._tracing_session_id = tracing_session_id
         self._live_timer_freq = live_timer_freq
@@ -115,8 +198,10 @@ class _LttngLiveViewerTracingSessionInfo:
         return self._name
 
 
-class _LttngLiveViewerGetTracingSessionInfosReply:
-    def __init__(self, tracing_session_infos):
+class _LttngLiveViewerGetTracingSessionInfosReply(_LttngLiveViewerReply):
+    def __init__(
+        self, tracing_session_infos: Sequence[_LttngLiveViewerTracingSessionInfo]
+    ):
         self._tracing_session_infos = tracing_session_infos
 
     @property
@@ -129,7 +214,9 @@ class _LttngLiveViewerAttachToTracingSessionCommand(_LttngLiveViewerCommand):
         BEGINNING = 1
         LAST = 2
 
-    def __init__(self, version, tracing_session_id, offset, seek_type):
+    def __init__(
+        self, version: int, tracing_session_id: int, offset: int, seek_type: int
+    ):
         super().__init__(version)
         self._tracing_session_id = tracing_session_id
         self._offset = offset
@@ -149,7 +236,9 @@ class _LttngLiveViewerAttachToTracingSessionCommand(_LttngLiveViewerCommand):
 
 
 class _LttngLiveViewerStreamInfo:
-    def __init__(self, id, trace_id, is_metadata, path, channel_name):
+    def __init__(
+        self, id: int, trace_id: int, is_metadata: bool, path: str, channel_name: str
+    ):
         self._id = id
         self._trace_id = trace_id
         self._is_metadata = is_metadata
@@ -177,7 +266,7 @@ class _LttngLiveViewerStreamInfo:
         return self._channel_name
 
 
-class _LttngLiveViewerAttachToTracingSessionReply:
+class _LttngLiveViewerAttachToTracingSessionReply(_LttngLiveViewerReply):
     class Status:
         OK = 1
         ALREADY = 2
@@ -186,7 +275,7 @@ class _LttngLiveViewerAttachToTracingSessionReply:
         SEEK_ERROR = 5
         NO_SESSION = 6
 
-    def __init__(self, status, stream_infos):
+    def __init__(self, status: int, stream_infos: Sequence[_LttngLiveViewerStreamInfo]):
         self._status = status
         self._stream_infos = stream_infos
 
@@ -200,7 +289,7 @@ class _LttngLiveViewerAttachToTracingSessionReply:
 
 
 class _LttngLiveViewerGetNextDataStreamIndexEntryCommand(_LttngLiveViewerCommand):
-    def __init__(self, version, stream_id):
+    def __init__(self, version: int, stream_id: int):
         super().__init__(version)
         self._stream_id = stream_id
 
@@ -209,7 +298,7 @@ class _LttngLiveViewerGetNextDataStreamIndexEntryCommand(_LttngLiveViewerCommand
         return self._stream_id
 
 
-class _LttngLiveViewerGetNextDataStreamIndexEntryReply:
+class _LttngLiveViewerGetNextDataStreamIndexEntryReply(_LttngLiveViewerReply):
     class Status:
         OK = 1
         RETRY = 2
@@ -218,7 +307,13 @@ class _LttngLiveViewerGetNextDataStreamIndexEntryReply:
         INACTIVE = 5
         EOF = 6
 
-    def __init__(self, status, index_entry, has_new_metadata, has_new_data_stream):
+    def __init__(
+        self,
+        status: int,
+        index_entry: _LttngIndexEntryT,
+        has_new_metadata: bool,
+        has_new_data_stream: bool,
+    ):
         self._status = status
         self._index_entry = index_entry
         self._has_new_metadata = has_new_metadata
@@ -242,7 +337,7 @@ class _LttngLiveViewerGetNextDataStreamIndexEntryReply:
 
 
 class _LttngLiveViewerGetDataStreamPacketDataCommand(_LttngLiveViewerCommand):
-    def __init__(self, version, stream_id, offset, req_length):
+    def __init__(self, version: int, stream_id: int, offset: int, req_length: int):
         super().__init__(version)
         self._stream_id = stream_id
         self._offset = offset
@@ -261,14 +356,20 @@ class _LttngLiveViewerGetDataStreamPacketDataCommand(_LttngLiveViewerCommand):
         return self._req_length
 
 
-class _LttngLiveViewerGetDataStreamPacketDataReply:
+class _LttngLiveViewerGetDataStreamPacketDataReply(_LttngLiveViewerReply):
     class Status:
         OK = 1
         RETRY = 2
         ERROR = 3
         EOF = 4
 
-    def __init__(self, status, data, has_new_metadata, has_new_data_stream):
+    def __init__(
+        self,
+        status: int,
+        data: bytes,
+        has_new_metadata: bool,
+        has_new_data_stream: bool,
+    ):
         self._status = status
         self._data = data
         self._has_new_metadata = has_new_metadata
@@ -292,7 +393,7 @@ class _LttngLiveViewerGetDataStreamPacketDataReply:
 
 
 class _LttngLiveViewerGetMetadataStreamDataCommand(_LttngLiveViewerCommand):
-    def __init__(self, version, stream_id):
+    def __init__(self, version: int, stream_id: int):
         super().__init__(version)
         self._stream_id = stream_id
 
@@ -301,13 +402,13 @@ class _LttngLiveViewerGetMetadataStreamDataCommand(_LttngLiveViewerCommand):
         return self._stream_id
 
 
-class _LttngLiveViewerGetMetadataStreamDataContentReply:
+class _LttngLiveViewerGetMetadataStreamDataContentReply(_LttngLiveViewerReply):
     class Status:
         OK = 1
         NO_NEW = 2
         ERROR = 3
 
-    def __init__(self, status, data):
+    def __init__(self, status: int, data: bytes):
         self._status = status
         self._data = data
 
@@ -321,7 +422,7 @@ class _LttngLiveViewerGetMetadataStreamDataContentReply:
 
 
 class _LttngLiveViewerGetNewStreamInfosCommand(_LttngLiveViewerCommand):
-    def __init__(self, version, tracing_session_id):
+    def __init__(self, version: int, tracing_session_id: int):
         super().__init__(version)
         self._tracing_session_id = tracing_session_id
 
@@ -330,14 +431,14 @@ class _LttngLiveViewerGetNewStreamInfosCommand(_LttngLiveViewerCommand):
         return self._tracing_session_id
 
 
-class _LttngLiveViewerGetNewStreamInfosReply:
+class _LttngLiveViewerGetNewStreamInfosReply(_LttngLiveViewerReply):
     class Status:
         OK = 1
         NO_NEW = 2
         ERROR = 3
         HUP = 4
 
-    def __init__(self, status, stream_infos):
+    def __init__(self, status: int, stream_infos: Sequence[_LttngLiveViewerStreamInfo]):
         self._status = status
         self._stream_infos = stream_infos
 
@@ -354,12 +455,12 @@ class _LttngLiveViewerCreateViewerSessionCommand(_LttngLiveViewerCommand):
     pass
 
 
-class _LttngLiveViewerCreateViewerSessionReply:
+class _LttngLiveViewerCreateViewerSessionReply(_LttngLiveViewerReply):
     class Status:
         OK = 1
         ERROR = 2
 
-    def __init__(self, status):
+    def __init__(self, status: int):
         self._status = status
 
     @property
@@ -368,7 +469,7 @@ class _LttngLiveViewerCreateViewerSessionReply:
 
 
 class _LttngLiveViewerDetachFromTracingSessionCommand(_LttngLiveViewerCommand):
-    def __init__(self, version, tracing_session_id):
+    def __init__(self, version: int, tracing_session_id: int):
         super().__init__(version)
         self._tracing_session_id = tracing_session_id
 
@@ -377,13 +478,13 @@ class _LttngLiveViewerDetachFromTracingSessionCommand(_LttngLiveViewerCommand):
         return self._tracing_session_id
 
 
-class _LttngLiveViewerDetachFromTracingSessionReply:
+class _LttngLiveViewerDetachFromTracingSessionReply(_LttngLiveViewerReply):
     class Status:
         OK = 1
         UNKNOWN = 2
         ERROR = 3
 
-    def __init__(self, status):
+    def __init__(self, status: int):
         self._status = status
 
     @property
@@ -394,22 +495,22 @@ class _LttngLiveViewerDetachFromTracingSessionReply:
 # An LTTng live protocol codec can convert bytes to command objects and
 # reply objects to bytes.
 class _LttngLiveViewerProtocolCodec:
-    _COMMAND_HEADER_STRUCT_FMT = 'QII'
+    _COMMAND_HEADER_STRUCT_FMT = "QII"
     _COMMAND_HEADER_SIZE_BYTES = struct.calcsize(_COMMAND_HEADER_STRUCT_FMT)
 
     def __init__(self):
         pass
 
-    def _unpack(self, fmt, data, offset=0):
-        fmt = '!' + fmt
+    def _unpack(self, fmt: str, data: bytes, offset: int = 0):
+        fmt = "!" + fmt
         return struct.unpack_from(fmt, data, offset)
 
-    def _unpack_payload(self, fmt, data):
+    def _unpack_payload(self, fmt: str, data: bytes):
         return self._unpack(
             fmt, data, _LttngLiveViewerProtocolCodec._COMMAND_HEADER_SIZE_BYTES
         )
 
-    def decode(self, data):
+    def decode(self, data: bytes):
         if len(data) < self._COMMAND_HEADER_SIZE_BYTES:
             # Not enough data to read the command header
             return
@@ -418,7 +519,7 @@ class _LttngLiveViewerProtocolCodec:
             self._COMMAND_HEADER_STRUCT_FMT, data
         )
         logging.info(
-            'Decoded command header: payload-size={}, cmd-type={}, version={}'.format(
+            "Decoded command header: payload-size={}, cmd-type={}, version={}".format(
                 payload_size, cmd_type, version
             )
         )
@@ -428,60 +529,60 @@ class _LttngLiveViewerProtocolCodec:
             return
 
         if cmd_type == 1:
-            viewer_session_id, major, minor, conn_type = self._unpack_payload(
-                'QIII', data
-            )
+            viewer_session_id, major, minor, _ = self._unpack_payload("QIII", data)
             return _LttngLiveViewerConnectCommand(
                 version, viewer_session_id, major, minor
             )
         elif cmd_type == 2:
             return _LttngLiveViewerGetTracingSessionInfosCommand(version)
         elif cmd_type == 3:
-            tracing_session_id, offset, seek_type = self._unpack_payload('QQI', data)
+            tracing_session_id, offset, seek_type = self._unpack_payload("QQI", data)
             return _LttngLiveViewerAttachToTracingSessionCommand(
                 version, tracing_session_id, offset, seek_type
             )
         elif cmd_type == 4:
-            (stream_id,) = self._unpack_payload('Q', data)
+            (stream_id,) = self._unpack_payload("Q", data)
             return _LttngLiveViewerGetNextDataStreamIndexEntryCommand(
                 version, stream_id
             )
         elif cmd_type == 5:
-            stream_id, offset, req_length = self._unpack_payload('QQI', data)
+            stream_id, offset, req_length = self._unpack_payload("QQI", data)
             return _LttngLiveViewerGetDataStreamPacketDataCommand(
                 version, stream_id, offset, req_length
             )
         elif cmd_type == 6:
-            (stream_id,) = self._unpack_payload('Q', data)
+            (stream_id,) = self._unpack_payload("Q", data)
             return _LttngLiveViewerGetMetadataStreamDataCommand(version, stream_id)
         elif cmd_type == 7:
-            (tracing_session_id,) = self._unpack_payload('Q', data)
+            (tracing_session_id,) = self._unpack_payload("Q", data)
             return _LttngLiveViewerGetNewStreamInfosCommand(version, tracing_session_id)
         elif cmd_type == 8:
             return _LttngLiveViewerCreateViewerSessionCommand(version)
         elif cmd_type == 9:
-            (tracing_session_id,) = self._unpack_payload('Q', data)
+            (tracing_session_id,) = self._unpack_payload("Q", data)
             return _LttngLiveViewerDetachFromTracingSessionCommand(
                 version, tracing_session_id
             )
         else:
-            raise UnexpectedInput('Unknown command type {}'.format(cmd_type))
+            raise RuntimeError("Unknown command type {}".format(cmd_type))
 
-    def _pack(self, fmt, *args):
+    def _pack(self, fmt: str, *args: Any):
         # Force network byte order
-        return struct.pack('!' + fmt, *args)
+        return struct.pack("!" + fmt, *args)
 
-    def _encode_zero_padded_str(self, string, length):
+    def _encode_zero_padded_str(self, string: str, length: int):
         data = string.encode()
-        return data.ljust(length, b'\x00')
+        return data.ljust(length, b"\x00")
 
-    def _encode_stream_info(self, info):
-        data = self._pack('QQI', info.id, info.trace_id, int(info.is_metadata))
+    def _encode_stream_info(self, info: _LttngLiveViewerStreamInfo):
+        data = self._pack("QQI", info.id, info.trace_id, int(info.is_metadata))
         data += self._encode_zero_padded_str(info.path, 4096)
         data += self._encode_zero_padded_str(info.channel_name, 255)
         return data
 
-    def _get_has_new_stuff_flags(self, has_new_metadata, has_new_data_streams):
+    def _get_has_new_stuff_flags(
+        self, has_new_metadata: bool, has_new_data_streams: bool
+    ):
         flags = 0
 
         if has_new_metadata:
@@ -492,17 +593,20 @@ class _LttngLiveViewerProtocolCodec:
 
         return flags
 
-    def encode(self, reply):
+    def encode(
+        self,
+        reply: _LttngLiveViewerReply,
+    ) -> bytes:
         if type(reply) is _LttngLiveViewerConnectReply:
             data = self._pack(
-                'QIII', reply.viewer_session_id, reply.major, reply.minor, 2
+                "QIII", reply.viewer_session_id, reply.major, reply.minor, 2
             )
         elif type(reply) is _LttngLiveViewerGetTracingSessionInfosReply:
-            data = self._pack('I', len(reply.tracing_session_infos))
+            data = self._pack("I", len(reply.tracing_session_infos))
 
             for info in reply.tracing_session_infos:
                 data += self._pack(
-                    'QIII',
+                    "QIII",
                     info.tracing_session_id,
                     info.live_timer_freq,
                     info.client_count,
@@ -511,18 +615,18 @@ class _LttngLiveViewerProtocolCodec:
                 data += self._encode_zero_padded_str(info.hostname, 64)
                 data += self._encode_zero_padded_str(info.name, 255)
         elif type(reply) is _LttngLiveViewerAttachToTracingSessionReply:
-            data = self._pack('II', reply.status, len(reply.stream_infos))
+            data = self._pack("II", reply.status, len(reply.stream_infos))
 
             for info in reply.stream_infos:
                 data += self._encode_stream_info(info)
         elif type(reply) is _LttngLiveViewerGetNextDataStreamIndexEntryReply:
-            index_format = 'QQQQQQQII'
+            index_format = "QQQQQQQII"
             entry = reply.index_entry
             flags = self._get_has_new_stuff_flags(
                 reply.has_new_metadata, reply.has_new_data_stream
             )
 
-            if type(entry) is _LttngDataStreamIndexEntry:
+            if isinstance(entry, _LttngDataStreamIndexEntry):
                 data = self._pack(
                     index_format,
                     entry.offset_bytes,
@@ -536,7 +640,6 @@ class _LttngLiveViewerProtocolCodec:
                     flags,
                 )
             else:
-                assert type(entry) is _LttngDataStreamBeaconEntry
                 data = self._pack(
                     index_format,
                     0,
@@ -553,122 +656,53 @@ class _LttngLiveViewerProtocolCodec:
             flags = self._get_has_new_stuff_flags(
                 reply.has_new_metadata, reply.has_new_data_stream
             )
-            data = self._pack('III', reply.status, len(reply.data), flags)
+            data = self._pack("III", reply.status, len(reply.data), flags)
             data += reply.data
         elif type(reply) is _LttngLiveViewerGetMetadataStreamDataContentReply:
-            data = self._pack('QI', len(reply.data), reply.status)
+            data = self._pack("QI", len(reply.data), reply.status)
             data += reply.data
         elif type(reply) is _LttngLiveViewerGetNewStreamInfosReply:
-            data = self._pack('II', reply.status, len(reply.stream_infos))
+            data = self._pack("II", reply.status, len(reply.stream_infos))
 
             for info in reply.stream_infos:
                 data += self._encode_stream_info(info)
         elif type(reply) is _LttngLiveViewerCreateViewerSessionReply:
-            data = self._pack('I', reply.status)
+            data = self._pack("I", reply.status)
         elif type(reply) is _LttngLiveViewerDetachFromTracingSessionReply:
-            data = self._pack('I', reply.status)
+            data = self._pack("I", reply.status)
         else:
             raise ValueError(
-                'Unknown reply object with class `{}`'.format(reply.__class__.__name__)
+                "Unknown reply object with class `{}`".format(reply.__class__.__name__)
             )
 
         return data
 
 
-# An entry within the index of an LTTng data stream.
-class _LttngDataStreamIndexEntry:
-    def __init__(
-        self,
-        offset_bytes,
-        total_size_bits,
-        content_size_bits,
-        timestamp_begin,
-        timestamp_end,
-        events_discarded,
-        stream_class_id,
-    ):
-        self._offset_bytes = offset_bytes
-        self._total_size_bits = total_size_bits
-        self._content_size_bits = content_size_bits
-        self._timestamp_begin = timestamp_begin
-        self._timestamp_end = timestamp_end
-        self._events_discarded = events_discarded
-        self._stream_class_id = stream_class_id
-
-    @property
-    def offset_bytes(self):
-        return self._offset_bytes
-
-    @property
-    def total_size_bits(self):
-        return self._total_size_bits
-
-    @property
-    def total_size_bytes(self):
-        return self._total_size_bits // 8
-
-    @property
-    def content_size_bits(self):
-        return self._content_size_bits
-
-    @property
-    def content_size_bytes(self):
-        return self._content_size_bits // 8
-
-    @property
-    def timestamp_begin(self):
-        return self._timestamp_begin
-
-    @property
-    def timestamp_end(self):
-        return self._timestamp_end
-
-    @property
-    def events_discarded(self):
-        return self._events_discarded
-
-    @property
-    def stream_class_id(self):
-        return self._stream_class_id
-
-
-# An entry within the index of an LTTng data stream. While a stream beacon entry
-# is conceptually unrelated to an index, it is sent as a reply to a
-# LttngLiveViewerGetNextDataStreamIndexEntryCommand
-class _LttngDataStreamBeaconEntry:
-    def __init__(self, stream_class_id, timestamp):
-        self._stream_class_id = stream_class_id
-        self._timestamp = timestamp
-
-    @property
-    def timestamp(self):
-        return self._timestamp
-
-    @property
-    def stream_class_id(self):
-        return self._stream_class_id
-
-
-def _get_entry_timestamp_begin(entry):
-    if type(entry) is _LttngDataStreamBeaconEntry:
+def _get_entry_timestamp_begin(
+    entry: _LttngIndexEntryT,
+):
+    if isinstance(entry, _LttngDataStreamBeaconIndexEntry):
         return entry.timestamp
     else:
-        assert type(entry) is _LttngDataStreamIndexEntry
         return entry.timestamp_begin
 
 
 # The index of an LTTng data stream, a sequence of index entries.
-class _LttngDataStreamIndex(collections.abc.Sequence):
-    def __init__(self, path, beacons):
+class _LttngDataStreamIndex(Sequence[_LttngIndexEntryT]):
+    def __init__(self, path: str, beacons: Optional[tjson.ArrayVal]):
         self._path = path
         self._build()
 
         if beacons:
             stream_class_id = self._entries[0].stream_class_id
-            beacons = [
-                _LttngDataStreamBeaconEntry(stream_class_id, ts) for ts in beacons
-            ]
-            self._add_beacons(beacons)
+
+            beacons_list = []  # type: list[_LttngDataStreamBeaconIndexEntry]
+            for ts in beacons.iter(tjson.IntVal):
+                beacons_list.append(
+                    _LttngDataStreamBeaconIndexEntry(stream_class_id, ts.val)
+                )
+
+            self._add_beacons(beacons_list)
 
         logging.info(
             'Built data stream index entries: path="{}", count={}'.format(
@@ -677,22 +711,20 @@ class _LttngDataStreamIndex(collections.abc.Sequence):
         )
 
     def _build(self):
-        self._entries = []
+        self._entries = []  # type: list[_LttngIndexEntryT]
         assert os.path.isfile(self._path)
 
-        with open(self._path, 'rb') as f:
+        with open(self._path, "rb") as f:
             # Read header first
-            fmt = '>IIII'
+            fmt = ">IIII"
             size = struct.calcsize(fmt)
             data = f.read(size)
             assert len(data) == size
-            magic, index_major, index_minor, index_entry_length = struct.unpack(
-                fmt, data
-            )
+            magic, _, _, index_entry_length = struct.unpack(fmt, data)
             assert magic == 0xC1F1DCC1
 
             # Read index entries
-            fmt = '>QQQQQQQ'
+            fmt = ">QQQQQQQ"
             size = struct.calcsize(fmt)
 
             while True:
@@ -733,10 +765,12 @@ class _LttngDataStreamIndex(collections.abc.Sequence):
                 # Skip anything else before the next entry
                 f.seek(index_entry_length - size, os.SEEK_CUR)
 
-    def _add_beacons(self, beacons):
+    def _add_beacons(self, beacons: Iterable[_LttngDataStreamBeaconIndexEntry]):
         # Assumes entries[n + 1].timestamp_end >= entries[n].timestamp_begin
-        def sort_key(entry):
-            if type(entry) is _LttngDataStreamBeaconEntry:
+        def sort_key(
+            entry: Union[_LttngDataStreamIndexEntry, _LttngDataStreamBeaconIndexEntry],
+        ) -> int:
+            if isinstance(entry, _LttngDataStreamBeaconIndexEntry):
                 return entry.timestamp
             else:
                 return entry.timestamp_end
@@ -744,7 +778,17 @@ class _LttngDataStreamIndex(collections.abc.Sequence):
         self._entries += beacons
         self._entries.sort(key=sort_key)
 
-    def __getitem__(self, index):
+    @overload
+    def __getitem__(self, index: int) -> _LttngIndexEntryT:
+        ...
+
+    @overload
+    def __getitem__(self, index: slice) -> Sequence[_LttngIndexEntryT]:  # noqa: F811
+        ...
+
+    def __getitem__(  # noqa: F811
+        self, index: Union[int, slice]
+    ) -> Union[_LttngIndexEntryT, Sequence[_LttngIndexEntryT],]:
         return self._entries[index]
 
     def __len__(self):
@@ -757,16 +801,21 @@ class _LttngDataStreamIndex(collections.abc.Sequence):
 
 # An LTTng data stream.
 class _LttngDataStream:
-    def __init__(self, path, beacons):
+    def __init__(self, path: str, beacons_json: Optional[tjson.ArrayVal]):
         self._path = path
         filename = os.path.basename(path)
-        match = re.match(r'(.*)_\d+', filename)
+        match = re.match(r"(.*)_\d+", filename)
+        if not match:
+            raise RuntimeError(
+                "Unexpected data stream file name pattern: {}".format(filename)
+            )
+
         self._channel_name = match.group(1)
         trace_dir = os.path.dirname(path)
-        index_path = os.path.join(trace_dir, 'index', filename + '.idx')
-        self._index = _LttngDataStreamIndex(index_path, beacons)
+        index_path = os.path.join(trace_dir, "index", filename + ".idx")
+        self._index = _LttngDataStreamIndex(index_path, beacons_json)
         assert os.path.isfile(path)
-        self._file = open(path, 'rb')
+        self._file = open(path, "rb")
         logging.info(
             'Built data stream: path="{}", channel-name="{}"'.format(
                 path, self._channel_name
@@ -785,20 +834,20 @@ class _LttngDataStream:
     def index(self):
         return self._index
 
-    def get_data(self, offset_bytes, len_bytes):
+    def get_data(self, offset_bytes: int, len_bytes: int):
         self._file.seek(offset_bytes)
         return self._file.read(len_bytes)
 
 
 class _LttngMetadataStreamSection:
-    def __init__(self, timestamp, data):
+    def __init__(self, timestamp: int, data: Optional[bytes]):
         self._timestamp = timestamp
         if data is None:
             self._data = bytes()
         else:
             self._data = data
         logging.info(
-            'Built metadata stream section: ts={}, data-len={}'.format(
+            "Built metadata stream section: ts={}, data-len={}".format(
                 self._timestamp, len(self._data)
             )
         )
@@ -814,11 +863,15 @@ class _LttngMetadataStreamSection:
 
 # An LTTng metadata stream.
 class _LttngMetadataStream:
-    def __init__(self, metadata_file_path, config_sections):
+    def __init__(
+        self,
+        metadata_file_path: str,
+        config_sections: Sequence[_LttngMetadataStreamSection],
+    ):
         self._path = metadata_file_path
         self._sections = config_sections
         logging.info(
-            'Built metadata stream: path={}, section-len={}'.format(
+            "Built metadata stream: path={}, section-len={}".format(
                 self._path, len(self._sections)
             )
         )
@@ -832,55 +885,73 @@ class _LttngMetadataStream:
         return self._sections
 
 
-LttngMetadataConfigSection = namedtuple(
-    'LttngMetadataConfigSection', ['line', 'timestamp', 'is_empty']
-)
+class LttngMetadataConfigSection:
+    def __init__(self, line: int, timestamp: int, is_empty: bool):
+        self._line = line
+        self._timestamp = timestamp
+        self._is_empty = is_empty
 
+    @property
+    def line(self):
+        return self._line
 
-def _parse_metadata_sections_config(config_sections):
-    assert config_sections is not None
-    config_metadata_sections = []
+    @property
+    def timestamp(self):
+        return self._timestamp
+
+    @property
+    def is_empty(self):
+        return self._is_empty
+
+
+def _parse_metadata_sections_config(metadata_sections_json: tjson.ArrayVal):
+    metadata_sections = []  # type: list[LttngMetadataConfigSection]
     append_empty_section = False
     last_timestamp = 0
     last_line = 0
 
-    for config_section in config_sections:
-        if config_section == 'empty':
-            # Found a empty section marker. Actually append the section at the
-            # timestamp of the next concrete section.
-            append_empty_section = True
-        else:
-            assert type(config_section) is dict
-            line = config_section.get('line')
-            ts = config_section.get('timestamp')
+    for section in metadata_sections_json:
+        if isinstance(section, tjson.StrVal):
+            if section.val == "empty":
+                # Found a empty section marker. Actually append the section at the
+                # timestamp of the next concrete section.
+                append_empty_section = True
+            else:
+                raise ValueError("Invalid string value at {}.".format(section.path))
+        elif isinstance(section, tjson.ObjVal):
+            line = section.at("line", tjson.IntVal).val
+            ts = section.at("timestamp", tjson.IntVal).val
 
             # Sections' timestamps and lines must both be increasing.
             assert ts > last_timestamp
             last_timestamp = ts
+
             assert line > last_line
             last_line = line
 
             if append_empty_section:
-                config_metadata_sections.append(
-                    LttngMetadataConfigSection(line, ts, True)
-                )
+                metadata_sections.append(LttngMetadataConfigSection(line, ts, True))
                 append_empty_section = False
 
-            config_metadata_sections.append(LttngMetadataConfigSection(line, ts, False))
-
-    return config_metadata_sections
+            metadata_sections.append(LttngMetadataConfigSection(line, ts, False))
+        else:
+            raise TypeError(
+                "`{}`: expecting a string or object value".format(section.path)
+            )
 
+    return metadata_sections
 
-def _split_metadata_sections(metadata_file_path, raw_config_sections):
-    assert isinstance(raw_config_sections, collections.abc.Sequence)
 
-    parsed_sections = _parse_metadata_sections_config(raw_config_sections)
+def _split_metadata_sections(
+    metadata_file_path: str, metadata_sections_json: tjson.ArrayVal
+):
+    metadata_sections = _parse_metadata_sections_config(metadata_sections_json)
 
-    sections = []
-    with open(metadata_file_path, 'r') as metadata_file:
+    sections = []  # type: list[_LttngMetadataStreamSection]
+    with open(metadata_file_path, "r") as metadata_file:
         metadata_lines = [line for line in metadata_file]
 
-    config_metadata_sections_idx = 0
+    metadata_section_idx = 0
     curr_metadata_section = bytearray()
 
     for idx, line_content in enumerate(metadata_lines):
@@ -890,49 +961,46 @@ def _split_metadata_sections(metadata_file_path, raw_config_sections):
         curr_line_number = idx + 1
 
         # If there are no more sections, simply append the line.
-        if config_metadata_sections_idx + 1 >= len(parsed_sections):
-            curr_metadata_section += bytearray(line_content, 'utf8')
+        if metadata_section_idx + 1 >= len(metadata_sections):
+            curr_metadata_section += bytearray(line_content, "utf8")
             continue
 
-        next_section_line_number = parsed_sections[
-            config_metadata_sections_idx + 1
-        ].line
+        next_section_line_number = metadata_sections[metadata_section_idx + 1].line
 
         # If the next section begins at the current line, create a
         # section with the metadata we gathered so far.
         if curr_line_number >= next_section_line_number:
-
             # Flushing the metadata of the current section.
             sections.append(
                 _LttngMetadataStreamSection(
-                    parsed_sections[config_metadata_sections_idx].timestamp,
+                    metadata_sections[metadata_section_idx].timestamp,
                     bytes(curr_metadata_section),
                 )
             )
 
             # Move to the next section.
-            config_metadata_sections_idx += 1
+            metadata_section_idx += 1
 
             # Clear old content and append current line for the next section.
             curr_metadata_section.clear()
-            curr_metadata_section += bytearray(line_content, 'utf8')
+            curr_metadata_section += bytearray(line_content, "utf8")
 
             # Append any empty sections.
-            while parsed_sections[config_metadata_sections_idx].is_empty:
+            while metadata_sections[metadata_section_idx].is_empty:
                 sections.append(
                     _LttngMetadataStreamSection(
-                        parsed_sections[config_metadata_sections_idx].timestamp, None
+                        metadata_sections[metadata_section_idx].timestamp, None
                     )
                 )
-                config_metadata_sections_idx += 1
+                metadata_section_idx += 1
         else:
             # Append line_content to the current metadata section.
-            curr_metadata_section += bytearray(line_content, 'utf8')
+            curr_metadata_section += bytearray(line_content, "utf8")
 
     # We iterated over all the lines of the metadata file. Close the current section.
     sections.append(
         _LttngMetadataStreamSection(
-            parsed_sections[config_metadata_sections_idx].timestamp,
+            metadata_sections[metadata_section_idx].timestamp,
             bytes(curr_metadata_section),
         )
     )
@@ -940,17 +1008,27 @@ def _split_metadata_sections(metadata_file_path, raw_config_sections):
     return sections
 
 
+_StreamBeaconsT = Dict[str, Iterable[int]]
+
+
 # An LTTng trace, a sequence of LTTng data streams.
-class LttngTrace(collections.abc.Sequence):
-    def __init__(self, trace_dir, metadata_sections, beacons):
+class LttngTrace(Sequence[_LttngDataStream]):
+    def __init__(
+        self,
+        trace_dir: str,
+        metadata_sections_json: Optional[tjson.ArrayVal],
+        beacons_json: Optional[tjson.ObjVal],
+    ):
         assert os.path.isdir(trace_dir)
         self._path = trace_dir
-        self._create_metadata_stream(trace_dir, metadata_sections)
-        self._create_data_streams(trace_dir, beacons)
+        self._create_metadata_stream(trace_dir, metadata_sections_json)
+        self._create_data_streams(trace_dir, beacons_json)
         logging.info('Built trace: path="{}"'.format(trace_dir))
 
-    def _create_data_streams(self, trace_dir, beacons):
-        data_stream_paths = []
+    def _create_data_streams(
+        self, trace_dir: str, beacons_json: Optional[tjson.ObjVal]
+    ):
+        data_stream_paths = []  # type: list[str]
 
         for filename in os.listdir(trace_dir):
             path = os.path.join(trace_dir, filename)
@@ -958,40 +1036,41 @@ class LttngTrace(collections.abc.Sequence):
             if not os.path.isfile(path):
                 continue
 
-            if filename.startswith('.'):
+            if filename.startswith("."):
                 continue
 
-            if filename == 'metadata':
+            if filename == "metadata":
                 continue
 
             data_stream_paths.append(path)
 
         data_stream_paths.sort()
-        self._data_streams = []
+        self._data_streams = []  # type: list[_LttngDataStream]
 
         for data_stream_path in data_stream_paths:
             stream_name = os.path.basename(data_stream_path)
-            this_stream_beacons = None
-
-            if beacons is not None and stream_name in beacons:
-                this_stream_beacons = beacons[stream_name]
+            this_beacons_json = None
+            if beacons_json is not None and stream_name in beacons_json:
+                this_beacons_json = beacons_json.at(stream_name, tjson.ArrayVal)
 
             self._data_streams.append(
-                _LttngDataStream(data_stream_path, this_stream_beacons)
+                _LttngDataStream(data_stream_path, this_beacons_json)
             )
 
-    def _create_metadata_stream(self, trace_dir, config_metadata_sections):
-        metadata_path = os.path.join(trace_dir, 'metadata')
-        metadata_sections = []
+    def _create_metadata_stream(
+        self, trace_dir: str, metadata_sections_json: Optional[tjson.ArrayVal]
+    ):
+        metadata_path = os.path.join(trace_dir, "metadata")
+        metadata_sections = []  # type: list[_LttngMetadataStreamSection]
 
-        if config_metadata_sections is None:
-            with open(metadata_path, 'rb') as metadata_file:
+        if metadata_sections_json is None:
+            with open(metadata_path, "rb") as metadata_file:
                 metadata_sections.append(
                     _LttngMetadataStreamSection(0, metadata_file.read())
                 )
         else:
             metadata_sections = _split_metadata_sections(
-                metadata_path, config_metadata_sections
+                metadata_path, metadata_sections_json
             )
 
         self._metadata_stream = _LttngMetadataStream(metadata_path, metadata_sections)
@@ -1004,7 +1083,17 @@ class LttngTrace(collections.abc.Sequence):
     def metadata_stream(self):
         return self._metadata_stream
 
-    def __getitem__(self, index):
+    @overload
+    def __getitem__(self, index: int) -> _LttngDataStream:
+        ...
+
+    @overload
+    def __getitem__(self, index: slice) -> Sequence[_LttngDataStream]:  # noqa: F811
+        ...
+
+    def __getitem__(  # noqa: F811
+        self, index: Union[int, slice]
+    ) -> Union[_LttngDataStream, Sequence[_LttngDataStream]]:
         return self._data_streams[index]
 
     def __len__(self):
@@ -1013,7 +1102,13 @@ class LttngTrace(collections.abc.Sequence):
 
 # The state of a single data stream.
 class _LttngLiveViewerSessionDataStreamState:
-    def __init__(self, ts_state, info, data_stream, metadata_stream_id):
+    def __init__(
+        self,
+        ts_state: "_LttngLiveViewerSessionTracingSessionState",
+        info: _LttngLiveViewerStreamInfo,
+        data_stream: _LttngDataStream,
+        metadata_stream_id: int,
+    ):
         self._ts_state = ts_state
         self._info = info
         self._data_stream = data_stream
@@ -1048,13 +1143,22 @@ class _LttngLiveViewerSessionDataStreamState:
 
         return self._data_stream.index[self._cur_index_entry_index]
 
+    @property
+    def metadata_stream_id(self):
+        return self._metadata_stream_id
+
     def goto_next_index_entry(self):
         self._cur_index_entry_index += 1
 
 
 # The state of a single metadata stream.
 class _LttngLiveViewerSessionMetadataStreamState:
-    def __init__(self, ts_state, info, metadata_stream):
+    def __init__(
+        self,
+        ts_state: "_LttngLiveViewerSessionTracingSessionState",
+        info: _LttngLiveViewerStreamInfo,
+        metadata_stream: _LttngMetadataStream,
+    ):
         self._ts_state = ts_state
         self._info = info
         self._metadata_stream = metadata_stream
@@ -1077,10 +1181,6 @@ class _LttngLiveViewerSessionMetadataStreamState:
             )
         )
 
-    @property
-    def trace_session_state(self):
-        return self._trace_session_state
-
     @property
     def info(self):
         return self._info
@@ -1094,7 +1194,7 @@ class _LttngLiveViewerSessionMetadataStreamState:
         return self._is_sent
 
     @is_sent.setter
-    def is_sent(self, value):
+    def is_sent(self, value: bool):
         self._is_sent = value
 
     @property
@@ -1120,13 +1220,54 @@ class _LttngLiveViewerSessionMetadataStreamState:
         return self._next_metadata_stream_section_timestamp
 
 
+# A tracing session descriptor.
+#
+# In the constructor, `traces` is a list of LTTng traces (`LttngTrace`
+# objects).
+class LttngTracingSessionDescriptor:
+    def __init__(
+        self,
+        name: str,
+        tracing_session_id: int,
+        hostname: str,
+        live_timer_freq: int,
+        client_count: int,
+        traces: Iterable[LttngTrace],
+    ):
+        for trace in traces:
+            if name not in trace.path:
+                fmt = "Tracing session name must be part of every trace path (`{}` not found in `{}`)"
+                raise ValueError(fmt.format(name, trace.path))
+
+        self._traces = traces
+        stream_count = sum([len(t) + 1 for t in traces])
+        self._info = _LttngLiveViewerTracingSessionInfo(
+            tracing_session_id,
+            live_timer_freq,
+            client_count,
+            stream_count,
+            hostname,
+            name,
+        )
+
+    @property
+    def traces(self):
+        return self._traces
+
+    @property
+    def info(self):
+        return self._info
+
+
 # The state of a tracing session.
 class _LttngLiveViewerSessionTracingSessionState:
-    def __init__(self, tc_descr, base_stream_id):
+    def __init__(self, tc_descr: LttngTracingSessionDescriptor, base_stream_id: int):
         self._tc_descr = tc_descr
-        self._stream_infos = []
-        self._ds_states = {}
-        self._ms_states = {}
+        self._stream_infos = []  # type: list[_LttngLiveViewerStreamInfo]
+        self._ds_states = {}  # type: dict[int, _LttngLiveViewerSessionDataStreamState]
+        self._ms_states = (
+            {}
+        )  # type: dict[int, _LttngLiveViewerSessionMetadataStreamState]
         stream_id = base_stream_id
 
         for trace in tc_descr.traces:
@@ -1134,7 +1275,7 @@ class _LttngLiveViewerSessionTracingSessionState:
 
             # Metadata stream -> stream info and metadata stream state
             info = _LttngLiveViewerStreamInfo(
-                stream_id, trace_id, True, trace.metadata_stream.path, 'metadata'
+                stream_id, trace_id, True, trace.metadata_stream.path, "metadata"
             )
             self._stream_infos.append(info)
             self._ms_states[stream_id] = _LttngLiveViewerSessionMetadataStreamState(
@@ -1187,11 +1328,14 @@ class _LttngLiveViewerSessionTracingSessionState:
         return self._is_attached
 
     @is_attached.setter
-    def is_attached(self, value):
+    def is_attached(self, value: bool):
         self._is_attached = value
 
 
-def needs_new_metadata_section(metadata_stream_state, latest_timestamp):
+def needs_new_metadata_section(
+    metadata_stream_state: _LttngLiveViewerSessionMetadataStreamState,
+    latest_timestamp: int,
+):
     if metadata_stream_state.next_section_timestamp is None:
         return False
 
@@ -1206,13 +1350,17 @@ def needs_new_metadata_section(metadata_stream_state, latest_timestamp):
 class _LttngLiveViewerSession:
     def __init__(
         self,
-        viewer_session_id,
-        tracing_session_descriptors,
-        max_query_data_response_size,
+        viewer_session_id: int,
+        tracing_session_descriptors: Iterable[LttngTracingSessionDescriptor],
+        max_query_data_response_size: Optional[int],
     ):
         self._viewer_session_id = viewer_session_id
-        self._ts_states = {}
-        self._stream_states = {}
+        self._ts_states = (
+            {}
+        )  # type:  dict[int, _LttngLiveViewerSessionTracingSessionState]
+        self._stream_states = (
+            {}
+        )  # type: dict[int, _LttngLiveViewerSessionDataStreamState | _LttngLiveViewerSessionMetadataStreamState]
         self._max_query_data_response_size = max_query_data_response_size
         total_stream_infos = 0
 
@@ -1237,50 +1385,66 @@ class _LttngLiveViewerSession:
             _LttngLiveViewerGetNewStreamInfosCommand: self._handle_get_new_stream_infos_command,
             _LttngLiveViewerGetNextDataStreamIndexEntryCommand: self._handle_get_next_data_stream_index_entry_command,
             _LttngLiveViewerGetTracingSessionInfosCommand: self._handle_get_tracing_session_infos_command,
-        }
+        }  # type: dict[type[_LttngLiveViewerCommand], Callable[[Any], _LttngLiveViewerReply]]
 
     @property
     def viewer_session_id(self):
         return self._viewer_session_id
 
-    def _get_tracing_session_state(self, tracing_session_id):
+    def _get_tracing_session_state(self, tracing_session_id: int):
         if tracing_session_id not in self._ts_states:
-            raise UnexpectedInput(
-                'Unknown tracing session ID {}'.format(tracing_session_id)
+            raise RuntimeError(
+                "Unknown tracing session ID {}".format(tracing_session_id)
             )
 
         return self._ts_states[tracing_session_id]
 
-    def _get_stream_state(self, stream_id):
+    def _get_data_stream_state(self, stream_id: int):
         if stream_id not in self._stream_states:
-            UnexpectedInput('Unknown stream ID {}'.format(stream_id))
+            RuntimeError("Unknown stream ID {}".format(stream_id))
 
-        return self._stream_states[stream_id]
+        stream = self._stream_states[stream_id]
+        if type(stream) is not _LttngLiveViewerSessionDataStreamState:
+            raise RuntimeError("Stream is not a data stream")
 
-    def handle_command(self, cmd):
+        return stream
+
+    def _get_metadata_stream_state(self, stream_id: int):
+        if stream_id not in self._stream_states:
+            RuntimeError("Unknown stream ID {}".format(stream_id))
+
+        stream = self._stream_states[stream_id]
+        if type(stream) is not _LttngLiveViewerSessionMetadataStreamState:
+            raise RuntimeError("Stream is not a metadata stream")
+
+        return stream
+
+    def handle_command(self, cmd: _LttngLiveViewerCommand):
         logging.info(
-            'Handling command in viewer session: cmd-cls-name={}'.format(
+            "Handling command in viewer session: cmd-cls-name={}".format(
                 cmd.__class__.__name__
             )
         )
         cmd_type = type(cmd)
 
         if cmd_type not in self._command_handlers:
-            raise UnexpectedInput(
-                'Unexpected command: cmd-cls-name={}'.format(cmd.__class__.__name__)
+            raise RuntimeError(
+                "Unexpected command: cmd-cls-name={}".format(cmd.__class__.__name__)
             )
 
         return self._command_handlers[cmd_type](cmd)
 
-    def _handle_attach_to_tracing_session_command(self, cmd):
+    def _handle_attach_to_tracing_session_command(
+        self, cmd: _LttngLiveViewerAttachToTracingSessionCommand
+    ):
         fmt = 'Handling "attach to tracing session" command: ts-id={}, offset={}, seek-type={}'
         logging.info(fmt.format(cmd.tracing_session_id, cmd.offset, cmd.seek_type))
         ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
         info = ts_state.tracing_session_descriptor.info
 
         if ts_state.is_attached:
-            raise UnexpectedInput(
-                'Cannot attach to tracing session `{}`: viewer is already attached'.format(
+            raise RuntimeError(
+                "Cannot attach to tracing session `{}`: viewer is already attached".format(
                     info.name
                 )
             )
@@ -1291,15 +1455,17 @@ class _LttngLiveViewerSession:
             status, ts_state.stream_infos
         )
 
-    def _handle_detach_from_tracing_session_command(self, cmd):
+    def _handle_detach_from_tracing_session_command(
+        self, cmd: _LttngLiveViewerDetachFromTracingSessionCommand
+    ):
         fmt = 'Handling "detach from tracing session" command: ts-id={}'
         logging.info(fmt.format(cmd.tracing_session_id))
         ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
         info = ts_state.tracing_session_descriptor.info
 
         if not ts_state.is_attached:
-            raise UnexpectedInput(
-                'Cannot detach to tracing session `{}`: viewer is not attached'.format(
+            raise RuntimeError(
+                "Cannot detach to tracing session `{}`: viewer is not attached".format(
                     info.name
                 )
             )
@@ -1308,16 +1474,15 @@ class _LttngLiveViewerSession:
         status = _LttngLiveViewerDetachFromTracingSessionReply.Status.OK
         return _LttngLiveViewerDetachFromTracingSessionReply(status)
 
-    def _handle_get_next_data_stream_index_entry_command(self, cmd):
+    def _handle_get_next_data_stream_index_entry_command(
+        self, cmd: _LttngLiveViewerGetNextDataStreamIndexEntryCommand
+    ):
         fmt = 'Handling "get next data stream index entry" command: stream-id={}'
         logging.info(fmt.format(cmd.stream_id))
-        stream_state = self._get_stream_state(cmd.stream_id)
-        metadata_stream_state = self._get_stream_state(stream_state._metadata_stream_id)
-
-        if type(stream_state) is not _LttngLiveViewerSessionDataStreamState:
-            raise UnexpectedInput(
-                'Stream with ID {} is not a data stream'.format(cmd.stream_id)
-            )
+        stream_state = self._get_data_stream_state(cmd.stream_id)
+        metadata_stream_state = self._get_metadata_stream_state(
+            stream_state.metadata_stream_id
+        )
 
         if stream_state.cur_index_entry is None:
             # The viewer is done reading this stream
@@ -1340,10 +1505,9 @@ class _LttngLiveViewerSession:
         # The viewer only checks the `has_new_metadata` flag if the
         # reply's status is `OK`, so we need to provide an index here
         has_new_metadata = stream_state.tracing_session_state.has_new_metadata
-        if type(stream_state.cur_index_entry) is _LttngDataStreamIndexEntry:
+        if isinstance(stream_state.cur_index_entry, _LttngDataStreamIndexEntry):
             status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.OK
         else:
-            assert type(stream_state.cur_index_entry) is _LttngDataStreamBeaconEntry
             status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.INACTIVE
 
         reply = _LttngLiveViewerGetNextDataStreamIndexEntryReply(
@@ -1352,17 +1516,14 @@ class _LttngLiveViewerSession:
         stream_state.goto_next_index_entry()
         return reply
 
-    def _handle_get_data_stream_packet_data_command(self, cmd):
+    def _handle_get_data_stream_packet_data_command(
+        self, cmd: _LttngLiveViewerGetDataStreamPacketDataCommand
+    ):
         fmt = 'Handling "get data stream packet data" command: stream-id={}, offset={}, req-length={}'
         logging.info(fmt.format(cmd.stream_id, cmd.offset, cmd.req_length))
-        stream_state = self._get_stream_state(cmd.stream_id)
+        stream_state = self._get_data_stream_state(cmd.stream_id)
         data_response_length = cmd.req_length
 
-        if type(stream_state) is not _LttngLiveViewerSessionDataStreamState:
-            raise UnexpectedInput(
-                'Stream with ID {} is not a data stream'.format(cmd.stream_id)
-            )
-
         if stream_state.tracing_session_state.has_new_metadata:
             status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.ERROR
             return _LttngLiveViewerGetDataStreamPacketDataReply(
@@ -1383,18 +1544,12 @@ class _LttngLiveViewerSession:
         status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.OK
         return _LttngLiveViewerGetDataStreamPacketDataReply(status, data, False, False)
 
-    def _handle_get_metadata_stream_data_command(self, cmd):
+    def _handle_get_metadata_stream_data_command(
+        self, cmd: _LttngLiveViewerGetMetadataStreamDataCommand
+    ):
         fmt = 'Handling "get metadata stream data" command: stream-id={}'
         logging.info(fmt.format(cmd.stream_id))
-        metadata_stream_state = self._get_stream_state(cmd.stream_id)
-
-        if (
-            type(metadata_stream_state)
-            is not _LttngLiveViewerSessionMetadataStreamState
-        ):
-            raise UnexpectedInput(
-                'Stream with ID {} is not a metadata stream'.format(cmd.stream_id)
-            )
+        metadata_stream_state = self._get_metadata_stream_state(cmd.stream_id)
 
         if metadata_stream_state.is_sent:
             status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.NO_NEW
@@ -1403,6 +1558,7 @@ class _LttngLiveViewerSession:
         metadata_stream_state.is_sent = True
         status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.OK
         metadata_section = metadata_stream_state.cur_section
+        assert metadata_section is not None
 
         # If we are sending an empty section, ready the next one right away.
         if len(metadata_section.data) == 0:
@@ -1415,7 +1571,9 @@ class _LttngLiveViewerSession:
             status, metadata_section.data
         )
 
-    def _handle_get_new_stream_infos_command(self, cmd):
+    def _handle_get_new_stream_infos_command(
+        self, cmd: _LttngLiveViewerGetNewStreamInfosCommand
+    ):
         fmt = 'Handling "get new stream infos" command: ts-id={}'
         logging.info(fmt.format(cmd.tracing_session_id))
 
@@ -1427,7 +1585,9 @@ class _LttngLiveViewerSession:
         status = _LttngLiveViewerGetNewStreamInfosReply.Status.HUP
         return _LttngLiveViewerGetNewStreamInfosReply(status, [])
 
-    def _handle_get_tracing_session_infos_command(self, cmd):
+    def _handle_get_tracing_session_infos_command(
+        self, cmd: _LttngLiveViewerGetTracingSessionInfosCommand
+    ):
         logging.info('Handling "get tracing session infos" command.')
         infos = [
             tss.tracing_session_descriptor.info for tss in self._ts_states.values()
@@ -1435,7 +1595,9 @@ class _LttngLiveViewerSession:
         infos.sort(key=lambda info: info.name)
         return _LttngLiveViewerGetTracingSessionInfosReply(infos)
 
-    def _handle_create_viewer_session_command(self, cmd):
+    def _handle_create_viewer_session_command(
+        self, cmd: _LttngLiveViewerCreateViewerSessionCommand
+    ):
         logging.info('Handling "create viewer session" command.')
         status = _LttngLiveViewerCreateViewerSessionReply.Status.OK
 
@@ -1446,9 +1608,10 @@ class _LttngLiveViewerSession:
 
 # An LTTng live TCP server.
 #
-# On creation, it binds to `localhost` with an OS-assigned TCP port. It writes
-# the decimal TCP port number to a temporary port file.  It renames the
-# temporary port file to `port_filename`.
+# On creation, it binds to `localhost` on the TCP port `port` if not `None`, or
+# on an OS-assigned TCP port otherwise. It writes the decimal TCP port number
+# to a temporary port file.  It renames the temporary port file to
+# `port_filename`.
 #
 # `tracing_session_descriptors` is a list of tracing session descriptors
 # (`LttngTracingSessionDescriptor`) to serve.
@@ -1459,15 +1622,19 @@ class _LttngLiveViewerSession:
 # returns.
 class LttngLiveServer:
     def __init__(
-        self, port_filename, tracing_session_descriptors, max_query_data_response_size
+        self,
+        port: Optional[int],
+        port_filename: Optional[str],
+        tracing_session_descriptors: Iterable[LttngTracingSessionDescriptor],
+        max_query_data_response_size: Optional[int],
     ):
-        logging.info('Server configuration:')
+        logging.info("Server configuration:")
 
-        logging.info('  Port file name: `{}`'.format(port_filename))
+        logging.info("  Port file name: `{}`".format(port_filename))
 
         if max_query_data_response_size is not None:
             logging.info(
-                '  Maximum response data query size: `{}`'.format(
+                "  Maximum response data query size: `{}`".format(
                     max_query_data_response_size
                 )
             )
@@ -1495,15 +1662,19 @@ class LttngLiveServer:
         self._codec = _LttngLiveViewerProtocolCodec()
 
         # Port 0: OS assigns an unused port
-        serv_addr = ('localhost', 0)
+        serv_addr = ("localhost", port if port is not None else 0)
         self._sock.bind(serv_addr)
-        self._write_port_to_file(port_filename)
+
+        if port_filename is not None:
+            self._write_port_to_file(port_filename)
+
+        print("Listening on port {}".format(self._server_port))
 
         try:
             self._listen()
         finally:
             self._sock.close()
-            logging.info('Closed connection and socket.')
+            logging.info("Closed connection and socket.")
 
     @property
     def _server_port(self):
@@ -1513,42 +1684,42 @@ class LttngLiveServer:
         data = bytes()
 
         while True:
-            logging.info('Waiting for viewer command.')
+            logging.info("Waiting for viewer command.")
             buf = self._conn.recv(128)
 
             if not buf:
-                logging.info('Client closed connection.')
+                logging.info("Client closed connection.")
 
                 if data:
-                    raise UnexpectedInput(
-                        'Client closed connection after having sent {} command bytes.'.format(
+                    raise RuntimeError(
+                        "Client closed connection after having sent {} command bytes.".format(
                             len(data)
                         )
                     )
 
                 return
 
-            logging.info('Received data from viewer: length={}'.format(len(buf)))
+            logging.info("Received data from viewer: length={}".format(len(buf)))
 
             data += buf
 
             try:
                 cmd = self._codec.decode(data)
             except struct.error as exc:
-                raise UnexpectedInput('Malformed command: {}'.format(exc)) from exc
+                raise RuntimeError("Malformed command: {}".format(exc)) from exc
 
             if cmd is not None:
                 logging.info(
-                    'Received command from viewer: cmd-cls-name={}'.format(
+                    "Received command from viewer: cmd-cls-name={}".format(
                         cmd.__class__.__name__
                     )
                 )
                 return cmd
 
-    def _send_reply(self, reply):
+    def _send_reply(self, reply: _LttngLiveViewerReply):
         data = self._codec.encode(reply)
         logging.info(
-            'Sending reply to viewer: reply-cls-name={}, length={}'.format(
+            "Sending reply to viewer: reply-cls-name={}, length={}".format(
                 reply.__class__.__name__, len(data)
             )
         )
@@ -1559,7 +1730,7 @@ class LttngLiveServer:
         cmd = self._recv_command()
 
         if type(cmd) is not _LttngLiveViewerConnectCommand:
-            raise UnexpectedInput(
+            raise RuntimeError(
                 'First command is not "connect": cmd-cls-name={}'.format(
                     cmd.__class__.__name__
                 )
@@ -1567,7 +1738,7 @@ class LttngLiveServer:
 
         # Create viewer session (arbitrary ID 23)
         logging.info(
-            'LTTng live viewer connected: version={}.{}'.format(cmd.major, cmd.minor)
+            "LTTng live viewer connected: version={}.{}".format(cmd.major, cmd.minor)
         )
         viewer_session = _LttngLiveViewerSession(
             23, self._ts_descriptors, self._max_query_data_response_size
@@ -1590,13 +1761,13 @@ class LttngLiveServer:
             self._send_reply(viewer_session.handle_command(cmd))
 
     def _listen(self):
-        logging.info('Listening: port={}'.format(self._server_port))
+        logging.info("Listening: port={}".format(self._server_port))
         # Backlog must be present for Python version < 3.5.
         # 128 is an arbitrary number since we expect only 1 connection anyway.
         self._sock.listen(128)
         self._conn, viewer_addr = self._sock.accept()
         logging.info(
-            'Accepted viewer: addr={}:{}'.format(viewer_addr[0], viewer_addr[1])
+            "Accepted viewer: addr={}:{}".format(viewer_addr[0], viewer_addr[1])
         )
 
         try:
@@ -1604,10 +1775,10 @@ class LttngLiveServer:
         finally:
             self._conn.close()
 
-    def _write_port_to_file(self, port_filename):
+    def _write_port_to_file(self, port_filename: str):
         # Write the port number to a temporary file.
-        with tempfile.NamedTemporaryFile(mode='w', delete=False) as tmp_port_file:
-            print(self._server_port, end='', file=tmp_port_file)
+        with tempfile.NamedTemporaryFile(mode="w", delete=False) as tmp_port_file:
+            print(self._server_port, end="", file=tmp_port_file)
 
         # Rename temporary file to real file
         os.replace(tmp_port_file.name, port_filename)
@@ -1618,40 +1789,9 @@ class LttngLiveServer:
         )
 
 
-# A tracing session descriptor.
-#
-# In the constructor, `traces` is a list of LTTng traces (`LttngTrace`
-# objects).
-class LttngTracingSessionDescriptor:
-    def __init__(
-        self, name, tracing_session_id, hostname, live_timer_freq, client_count, traces
-    ):
-        for trace in traces:
-            if name not in trace.path:
-                fmt = 'Tracing session name must be part of every trace path (`{}` not found in `{}`)'
-                raise ValueError(fmt.format(name, trace.path))
-
-        self._traces = traces
-        stream_count = sum([len(t) + 1 for t in traces])
-        self._info = _LttngLiveViewerTracingSessionInfo(
-            tracing_session_id,
-            live_timer_freq,
-            client_count,
-            stream_count,
-            hostname,
-            name,
-        )
-
-    @property
-    def traces(self):
-        return self._traces
-
-    @property
-    def info(self):
-        return self._info
-
-
-def _session_descriptors_from_path(sessions_filename, trace_path_prefix):
+def _session_descriptors_from_path(
+    sessions_filename: str, trace_path_prefix: Optional[str]
+):
     # File format is:
     #
     #     [
@@ -1680,28 +1820,44 @@ def _session_descriptors_from_path(sessions_filename, trace_path_prefix):
     #             ]
     #         }
     #     ]
-    with open(sessions_filename, 'r') as sessions_file:
-        params = json.load(sessions_file)
-
-    sessions = []
-
-    for session in params:
-        name = session['name']
-        tracing_session_id = session['id']
-        hostname = session['hostname']
-        live_timer_freq = session['live-timer-freq']
-        client_count = session['client-count']
-        traces = []
-
-        for trace in session['traces']:
-            metadata_sections = trace.get('metadata-sections')
-            beacons = trace.get('beacons')
-            path = trace['path']
+    with open(sessions_filename, "r") as sessions_file:
+        sessions_json = tjson.load(sessions_file, tjson.ArrayVal)
+
+    sessions = []  # type: list[LttngTracingSessionDescriptor]
+
+    for session_json in sessions_json.iter(tjson.ObjVal):
+        name = session_json.at("name", tjson.StrVal).val
+        tracing_session_id = session_json.at("id", tjson.IntVal).val
+        hostname = session_json.at("hostname", tjson.StrVal).val
+        live_timer_freq = session_json.at("live-timer-freq", tjson.IntVal).val
+        client_count = session_json.at("client-count", tjson.IntVal).val
+        traces_json = session_json.at("traces", tjson.ArrayVal)
+
+        traces = []  # type: list[LttngTrace]
+
+        for trace_json in traces_json.iter(tjson.ObjVal):
+            metadata_sections = (
+                trace_json.at("metadata-sections", tjson.ArrayVal)
+                if "metadata-sections" in trace_json
+                else None
+            )
+            beacons = (
+                trace_json.at("beacons", tjson.ObjVal)
+                if "beacons" in trace_json
+                else None
+            )
+            path = trace_json.at("path", tjson.StrVal).val
 
-            if not os.path.isabs(path):
+            if not os.path.isabs(path) and trace_path_prefix:
                 path = os.path.join(trace_path_prefix, path)
 
-            traces.append(LttngTrace(path, metadata_sections, beacons))
+            traces.append(
+                LttngTrace(
+                    path,
+                    metadata_sections,
+                    beacons,
+                )
+            )
 
         sessions.append(
             LttngTracingSessionDescriptor(
@@ -1717,65 +1873,71 @@ def _session_descriptors_from_path(sessions_filename, trace_path_prefix):
     return sessions
 
 
-def _loglevel_parser(string):
-    loglevels = {'info': logging.INFO, 'warning': logging.WARNING}
+def _loglevel_parser(string: str):
+    loglevels = {"info": logging.INFO, "warning": logging.WARNING}
     if string not in loglevels:
         msg = "{} is not a valid loglevel".format(string)
         raise argparse.ArgumentTypeError(msg)
     return loglevels[string]
 
 
-if __name__ == '__main__':
-    logging.basicConfig(format='# %(asctime)-25s%(message)s')
+if __name__ == "__main__":
+    logging.basicConfig(format="# %(asctime)-25s%(message)s")
     parser = argparse.ArgumentParser(
-        description='LTTng-live protocol mocker', add_help=False
+        description="LTTng-live protocol mocker", add_help=False
     )
     parser.add_argument(
-        '--log-level',
-        default='warning',
-        choices=['info', 'warning'],
-        help='The loglevel to be used.',
+        "--log-level",
+        default="warning",
+        choices=["info", "warning"],
+        help="The loglevel to be used.",
     )
 
     loglevel_namespace, remaining_args = parser.parse_known_args()
     logging.getLogger().setLevel(_loglevel_parser(loglevel_namespace.log_level))
 
     parser.add_argument(
-        '--port-filename',
-        help='The final port file. This file is present when the server is ready to receive connection.',
-        required=True,
+        "--port",
+        help="The port to bind to. If missing, use an OS-assigned port..",
+        type=int,
     )
     parser.add_argument(
-        '--max-query-data-response-size',
+        "--port-filename",
+        help="The final port file. This file is present when the server is ready to receive connection.",
+    )
+    parser.add_argument(
+        "--max-query-data-response-size",
         type=int,
-        help='The maximum size of control data response in bytes',
+        help="The maximum size of control data response in bytes",
     )
     parser.add_argument(
-        '--trace-path-prefix',
+        "--trace-path-prefix",
         type=str,
-        help='Prefix to prepend to the trace paths of session configurations',
+        help="Prefix to prepend to the trace paths of session configurations",
     )
     parser.add_argument(
-        '--sessions-filename',
+        "sessions_filename",
         type=str,
-        help='Path to a session configuration file',
+        help="Path to a session configuration file",
+        metavar="sessions-filename",
     )
     parser.add_argument(
-        '-h',
-        '--help',
-        action='help',
+        "-h",
+        "--help",
+        action="help",
         default=argparse.SUPPRESS,
-        help='Show this help message and exit.',
+        help="Show this help message and exit.",
     )
 
     args = parser.parse_args(args=remaining_args)
-    try:
-        sessions = _session_descriptors_from_path(
-            args.sessions_filename,
-            args.trace_path_prefix,
-        )
-        LttngLiveServer(args.port_filename, sessions, args.max_query_data_response_size)
-    except UnexpectedInput as exc:
-        logging.error(str(exc))
-        print(exc, file=sys.stderr)
-        sys.exit(1)
+    sessions_filename = args.sessions_filename  # type: str
+    trace_path_prefix = args.trace_path_prefix  # type: str | None
+    sessions = _session_descriptors_from_path(
+        sessions_filename,
+        trace_path_prefix,
+    )
+
+    port = args.port  # type: int | None
+    port_filename = args.port_filename  # type: str | None
+    max_query_data_response_size = args.max_query_data_response_size  # type: int | None
+    LttngLiveServer(port, port_filename, sessions, max_query_data_response_size)
This page took 0.047366 seconds and 4 git commands to generate.