Add typing annotations throughout lttng_live_server.py, such that
pyright passes cleanly in strict mode.
Specify in a comment in lttng_live_server.py to run in strict mode, but
disable these two warnings:
- reportTypeCommentUsage: we need to use the comment form to support
Python 3.4
- reportMissingTypeStubs: it would complain about not having a stub
file for the utils module (although it can inspect utils.py just
fine)
Note that the sessions_filename variable near the bottom is marked as a
string, even though it's currently an optional argument (so it should be
`str | None`). But the rest of the code treats it as non-optional.
This is corrected in a subsequent patch that makes the sessions filename
mandatory.
Change the way lttng_live_server.py is launched to use run_python, to
have access to Python modules in tests/utils/python, to have access to
jsonw.py and possible the typing module shim for Python 3.4.
Change-Id: I842f4c696eea7c99932876af26c195bf99d5cfff
Signed-off-by: Simon Marchi <simon.marchi@efficios.com>
Reviewed-on: https://review.lttng.org/c/babeltrace/+/10869
Tested-by: jenkins <jenkins@lttng.org>
Reviewed-by: Philippe Proulx <eeppeliteloop@gmail.com>
# Copyright (C) 2019 Philippe Proulx <pproulx@efficios.com>
#
# Copyright (C) 2019 Philippe Proulx <pproulx@efficios.com>
#
+# pyright: strict, reportTypeCommentUsage=false, reportMissingTypeStubs=false
+
import os
import re
import sys
import os
import re
import sys
import socket
import struct
import logging
import os.path
import argparse
import tempfile
import socket
import struct
import logging
import os.path
import argparse
import tempfile
-import collections.abc
-from collections import namedtuple
+from typing import Dict, Union, Iterable, Optional, Sequence, overload
+
+import tjson
+
+# isort: off
+from typing import Any, Callable # noqa: F401
+
+# isort: on
class UnexpectedInput(RuntimeError):
class UnexpectedInput(RuntimeError):
class _LttngDataStreamIndexEntry:
def __init__(
self,
class _LttngDataStreamIndexEntry:
def __init__(
self,
- offset_bytes,
- total_size_bits,
- content_size_bits,
- timestamp_begin,
- timestamp_end,
- events_discarded,
- stream_class_id,
+ offset_bytes: int,
+ total_size_bits: int,
+ content_size_bits: int,
+ timestamp_begin: int,
+ timestamp_end: int,
+ events_discarded: int,
+ stream_class_id: int,
):
self._offset_bytes = offset_bytes
self._total_size_bits = total_size_bits
):
self._offset_bytes = offset_bytes
self._total_size_bits = total_size_bits
# An entry within the index of an LTTng data stream. While a stream beacon entry
# is conceptually unrelated to an index, it is sent as a reply to a
# LttngLiveViewerGetNextDataStreamIndexEntryCommand
# An entry within the index of an LTTng data stream. While a stream beacon entry
# is conceptually unrelated to an index, it is sent as a reply to a
# LttngLiveViewerGetNextDataStreamIndexEntryCommand
-class _LttngDataStreamBeaconEntry:
- def __init__(self, stream_class_id, timestamp):
+class _LttngDataStreamBeaconIndexEntry:
+ def __init__(self, stream_class_id: int, timestamp: int):
self._stream_class_id = stream_class_id
self._timestamp = timestamp
self._stream_class_id = stream_class_id
self._timestamp = timestamp
return self._stream_class_id
return self._stream_class_id
+_LttngIndexEntryT = Union[_LttngDataStreamIndexEntry, _LttngDataStreamBeaconIndexEntry]
+
+
class _LttngLiveViewerCommand:
class _LttngLiveViewerCommand:
- def __init__(self, version):
+ def __init__(self, version: int):
self._version = version
@property
self._version = version
@property
class _LttngLiveViewerConnectCommand(_LttngLiveViewerCommand):
class _LttngLiveViewerConnectCommand(_LttngLiveViewerCommand):
- def __init__(self, version, viewer_session_id, major, minor):
+ def __init__(self, version: int, viewer_session_id: int, major: int, minor: int):
super().__init__(version)
self._viewer_session_id = viewer_session_id
self._major = major
super().__init__(version)
self._viewer_session_id = viewer_session_id
self._major = major
-class _LttngLiveViewerConnectReply:
- def __init__(self, viewer_session_id, major, minor):
+class _LttngLiveViewerReply:
+ pass
+
+
+class _LttngLiveViewerConnectReply(_LttngLiveViewerReply):
+ def __init__(self, viewer_session_id: int, major: int, minor: int):
self._viewer_session_id = viewer_session_id
self._major = major
self._minor = minor
self._viewer_session_id = viewer_session_id
self._major = major
self._minor = minor
class _LttngLiveViewerTracingSessionInfo:
def __init__(
self,
class _LttngLiveViewerTracingSessionInfo:
def __init__(
self,
- tracing_session_id,
- live_timer_freq,
- client_count,
- stream_count,
- hostname,
- name,
+ tracing_session_id: int,
+ live_timer_freq: int,
+ client_count: int,
+ stream_count: int,
+ hostname: str,
+ name: str,
):
self._tracing_session_id = tracing_session_id
self._live_timer_freq = live_timer_freq
):
self._tracing_session_id = tracing_session_id
self._live_timer_freq = live_timer_freq
-class _LttngLiveViewerGetTracingSessionInfosReply:
- def __init__(self, tracing_session_infos):
+class _LttngLiveViewerGetTracingSessionInfosReply(_LttngLiveViewerReply):
+ def __init__(
+ self, tracing_session_infos: Sequence[_LttngLiveViewerTracingSessionInfo]
+ ):
self._tracing_session_infos = tracing_session_infos
@property
self._tracing_session_infos = tracing_session_infos
@property
- def __init__(self, version, tracing_session_id, offset, seek_type):
+ def __init__(
+ self, version: int, tracing_session_id: int, offset: int, seek_type: int
+ ):
super().__init__(version)
self._tracing_session_id = tracing_session_id
self._offset = offset
super().__init__(version)
self._tracing_session_id = tracing_session_id
self._offset = offset
class _LttngLiveViewerStreamInfo:
class _LttngLiveViewerStreamInfo:
- def __init__(self, id, trace_id, is_metadata, path, channel_name):
+ def __init__(
+ self, id: int, trace_id: int, is_metadata: bool, path: str, channel_name: str
+ ):
self._id = id
self._trace_id = trace_id
self._is_metadata = is_metadata
self._id = id
self._trace_id = trace_id
self._is_metadata = is_metadata
return self._channel_name
return self._channel_name
-class _LttngLiveViewerAttachToTracingSessionReply:
+class _LttngLiveViewerAttachToTracingSessionReply(_LttngLiveViewerReply):
class Status:
OK = 1
ALREADY = 2
class Status:
OK = 1
ALREADY = 2
SEEK_ERROR = 5
NO_SESSION = 6
SEEK_ERROR = 5
NO_SESSION = 6
- def __init__(self, status, stream_infos):
+ def __init__(self, status: int, stream_infos: Sequence[_LttngLiveViewerStreamInfo]):
self._status = status
self._stream_infos = stream_infos
self._status = status
self._stream_infos = stream_infos
class _LttngLiveViewerGetNextDataStreamIndexEntryCommand(_LttngLiveViewerCommand):
class _LttngLiveViewerGetNextDataStreamIndexEntryCommand(_LttngLiveViewerCommand):
- def __init__(self, version, stream_id):
+ def __init__(self, version: int, stream_id: int):
super().__init__(version)
self._stream_id = stream_id
super().__init__(version)
self._stream_id = stream_id
-class _LttngLiveViewerGetNextDataStreamIndexEntryReply:
+class _LttngLiveViewerGetNextDataStreamIndexEntryReply(_LttngLiveViewerReply):
class Status:
OK = 1
RETRY = 2
class Status:
OK = 1
RETRY = 2
- def __init__(self, status, index_entry, has_new_metadata, has_new_data_stream):
+ def __init__(
+ self,
+ status: int,
+ index_entry: _LttngIndexEntryT,
+ has_new_metadata: bool,
+ has_new_data_stream: bool,
+ ):
self._status = status
self._index_entry = index_entry
self._has_new_metadata = has_new_metadata
self._status = status
self._index_entry = index_entry
self._has_new_metadata = has_new_metadata
class _LttngLiveViewerGetDataStreamPacketDataCommand(_LttngLiveViewerCommand):
class _LttngLiveViewerGetDataStreamPacketDataCommand(_LttngLiveViewerCommand):
- def __init__(self, version, stream_id, offset, req_length):
+ def __init__(self, version: int, stream_id: int, offset: int, req_length: int):
super().__init__(version)
self._stream_id = stream_id
self._offset = offset
super().__init__(version)
self._stream_id = stream_id
self._offset = offset
-class _LttngLiveViewerGetDataStreamPacketDataReply:
+class _LttngLiveViewerGetDataStreamPacketDataReply(_LttngLiveViewerReply):
class Status:
OK = 1
RETRY = 2
ERROR = 3
EOF = 4
class Status:
OK = 1
RETRY = 2
ERROR = 3
EOF = 4
- def __init__(self, status, data, has_new_metadata, has_new_data_stream):
+ def __init__(
+ self,
+ status: int,
+ data: bytes,
+ has_new_metadata: bool,
+ has_new_data_stream: bool,
+ ):
self._status = status
self._data = data
self._has_new_metadata = has_new_metadata
self._status = status
self._data = data
self._has_new_metadata = has_new_metadata
class _LttngLiveViewerGetMetadataStreamDataCommand(_LttngLiveViewerCommand):
class _LttngLiveViewerGetMetadataStreamDataCommand(_LttngLiveViewerCommand):
- def __init__(self, version, stream_id):
+ def __init__(self, version: int, stream_id: int):
super().__init__(version)
self._stream_id = stream_id
super().__init__(version)
self._stream_id = stream_id
-class _LttngLiveViewerGetMetadataStreamDataContentReply:
+class _LttngLiveViewerGetMetadataStreamDataContentReply(_LttngLiveViewerReply):
class Status:
OK = 1
NO_NEW = 2
ERROR = 3
class Status:
OK = 1
NO_NEW = 2
ERROR = 3
- def __init__(self, status, data):
+ def __init__(self, status: int, data: bytes):
self._status = status
self._data = data
self._status = status
self._data = data
class _LttngLiveViewerGetNewStreamInfosCommand(_LttngLiveViewerCommand):
class _LttngLiveViewerGetNewStreamInfosCommand(_LttngLiveViewerCommand):
- def __init__(self, version, tracing_session_id):
+ def __init__(self, version: int, tracing_session_id: int):
super().__init__(version)
self._tracing_session_id = tracing_session_id
super().__init__(version)
self._tracing_session_id = tracing_session_id
return self._tracing_session_id
return self._tracing_session_id
-class _LttngLiveViewerGetNewStreamInfosReply:
+class _LttngLiveViewerGetNewStreamInfosReply(_LttngLiveViewerReply):
class Status:
OK = 1
NO_NEW = 2
ERROR = 3
HUP = 4
class Status:
OK = 1
NO_NEW = 2
ERROR = 3
HUP = 4
- def __init__(self, status, stream_infos):
+ def __init__(self, status: int, stream_infos: Sequence[_LttngLiveViewerStreamInfo]):
self._status = status
self._stream_infos = stream_infos
self._status = status
self._stream_infos = stream_infos
-class _LttngLiveViewerCreateViewerSessionReply:
+class _LttngLiveViewerCreateViewerSessionReply(_LttngLiveViewerReply):
class Status:
OK = 1
ERROR = 2
class Status:
OK = 1
ERROR = 2
- def __init__(self, status):
+ def __init__(self, status: int):
self._status = status
@property
self._status = status
@property
class _LttngLiveViewerDetachFromTracingSessionCommand(_LttngLiveViewerCommand):
class _LttngLiveViewerDetachFromTracingSessionCommand(_LttngLiveViewerCommand):
- def __init__(self, version, tracing_session_id):
+ def __init__(self, version: int, tracing_session_id: int):
super().__init__(version)
self._tracing_session_id = tracing_session_id
super().__init__(version)
self._tracing_session_id = tracing_session_id
return self._tracing_session_id
return self._tracing_session_id
-class _LttngLiveViewerDetachFromTracingSessionReply:
+class _LttngLiveViewerDetachFromTracingSessionReply(_LttngLiveViewerReply):
class Status:
OK = 1
UNKNOWN = 2
ERROR = 3
class Status:
OK = 1
UNKNOWN = 2
ERROR = 3
- def __init__(self, status):
+ def __init__(self, status: int):
self._status = status
@property
self._status = status
@property
- def _unpack(self, fmt, data, offset=0):
+ def _unpack(self, fmt: str, data: bytes, offset: int = 0):
fmt = "!" + fmt
return struct.unpack_from(fmt, data, offset)
fmt = "!" + fmt
return struct.unpack_from(fmt, data, offset)
- def _unpack_payload(self, fmt, data):
+ def _unpack_payload(self, fmt: str, data: bytes):
return self._unpack(
fmt, data, _LttngLiveViewerProtocolCodec._COMMAND_HEADER_SIZE_BYTES
)
return self._unpack(
fmt, data, _LttngLiveViewerProtocolCodec._COMMAND_HEADER_SIZE_BYTES
)
- def decode(self, data):
+ def decode(self, data: bytes):
if len(data) < self._COMMAND_HEADER_SIZE_BYTES:
# Not enough data to read the command header
return
if len(data) < self._COMMAND_HEADER_SIZE_BYTES:
# Not enough data to read the command header
return
- viewer_session_id, major, minor, conn_type = self._unpack_payload(
- "QIII", data
- )
+ viewer_session_id, major, minor, _ = self._unpack_payload("QIII", data)
return _LttngLiveViewerConnectCommand(
version, viewer_session_id, major, minor
)
return _LttngLiveViewerConnectCommand(
version, viewer_session_id, major, minor
)
else:
raise UnexpectedInput("Unknown command type {}".format(cmd_type))
else:
raise UnexpectedInput("Unknown command type {}".format(cmd_type))
- def _pack(self, fmt, *args):
+ def _pack(self, fmt: str, *args: Any):
# Force network byte order
return struct.pack("!" + fmt, *args)
# Force network byte order
return struct.pack("!" + fmt, *args)
- def _encode_zero_padded_str(self, string, length):
+ def _encode_zero_padded_str(self, string: str, length: int):
data = string.encode()
return data.ljust(length, b"\x00")
data = string.encode()
return data.ljust(length, b"\x00")
- def _encode_stream_info(self, info):
+ def _encode_stream_info(self, info: _LttngLiveViewerStreamInfo):
data = self._pack("QQI", info.id, info.trace_id, int(info.is_metadata))
data += self._encode_zero_padded_str(info.path, 4096)
data += self._encode_zero_padded_str(info.channel_name, 255)
return data
data = self._pack("QQI", info.id, info.trace_id, int(info.is_metadata))
data += self._encode_zero_padded_str(info.path, 4096)
data += self._encode_zero_padded_str(info.channel_name, 255)
return data
- def _get_has_new_stuff_flags(self, has_new_metadata, has_new_data_streams):
+ def _get_has_new_stuff_flags(
+ self, has_new_metadata: bool, has_new_data_streams: bool
+ ):
flags = 0
if has_new_metadata:
flags = 0
if has_new_metadata:
- def encode(self, reply):
+ def encode(
+ self,
+ reply: _LttngLiveViewerReply,
+ ) -> bytes:
if type(reply) is _LttngLiveViewerConnectReply:
data = self._pack(
"QIII", reply.viewer_session_id, reply.major, reply.minor, 2
if type(reply) is _LttngLiveViewerConnectReply:
data = self._pack(
"QIII", reply.viewer_session_id, reply.major, reply.minor, 2
reply.has_new_metadata, reply.has_new_data_stream
)
reply.has_new_metadata, reply.has_new_data_stream
)
- if type(entry) is _LttngDataStreamIndexEntry:
+ if isinstance(entry, _LttngDataStreamIndexEntry):
data = self._pack(
index_format,
entry.offset_bytes,
data = self._pack(
index_format,
entry.offset_bytes,
- assert type(entry) is _LttngDataStreamBeaconEntry
data = self._pack(
index_format,
0,
data = self._pack(
index_format,
0,
-def _get_entry_timestamp_begin(entry):
- if type(entry) is _LttngDataStreamBeaconEntry:
+def _get_entry_timestamp_begin(
+ entry: _LttngIndexEntryT,
+):
+ if isinstance(entry, _LttngDataStreamBeaconIndexEntry):
return entry.timestamp
else:
return entry.timestamp
else:
- assert type(entry) is _LttngDataStreamIndexEntry
return entry.timestamp_begin
# The index of an LTTng data stream, a sequence of index entries.
return entry.timestamp_begin
# The index of an LTTng data stream, a sequence of index entries.
-class _LttngDataStreamIndex(collections.abc.Sequence):
- def __init__(self, path, beacons):
+class _LttngDataStreamIndex(Sequence[_LttngIndexEntryT]):
+ def __init__(self, path: str, beacons: Optional[tjson.ArrayVal]):
self._path = path
self._build()
if beacons:
stream_class_id = self._entries[0].stream_class_id
self._path = path
self._build()
if beacons:
stream_class_id = self._entries[0].stream_class_id
- beacons = [
- _LttngDataStreamBeaconEntry(stream_class_id, ts) for ts in beacons
- ]
- self._add_beacons(beacons)
+
+ beacons_list = [] # type: list[_LttngDataStreamBeaconIndexEntry]
+ for ts in beacons.iter(tjson.IntVal):
+ beacons_list.append(
+ _LttngDataStreamBeaconIndexEntry(stream_class_id, ts.val)
+ )
+
+ self._add_beacons(beacons_list)
logging.info(
'Built data stream index entries: path="{}", count={}'.format(
logging.info(
'Built data stream index entries: path="{}", count={}'.format(
+ self._entries = [] # type: list[_LttngIndexEntryT]
assert os.path.isfile(self._path)
with open(self._path, "rb") as f:
assert os.path.isfile(self._path)
with open(self._path, "rb") as f:
size = struct.calcsize(fmt)
data = f.read(size)
assert len(data) == size
size = struct.calcsize(fmt)
data = f.read(size)
assert len(data) == size
- magic, index_major, index_minor, index_entry_length = struct.unpack(
- fmt, data
- )
+ magic, _, _, index_entry_length = struct.unpack(fmt, data)
assert magic == 0xC1F1DCC1
# Read index entries
assert magic == 0xC1F1DCC1
# Read index entries
# Skip anything else before the next entry
f.seek(index_entry_length - size, os.SEEK_CUR)
# Skip anything else before the next entry
f.seek(index_entry_length - size, os.SEEK_CUR)
- def _add_beacons(self, beacons):
+ def _add_beacons(self, beacons: Iterable[_LttngDataStreamBeaconIndexEntry]):
# Assumes entries[n + 1].timestamp_end >= entries[n].timestamp_begin
# Assumes entries[n + 1].timestamp_end >= entries[n].timestamp_begin
- def sort_key(entry):
- if type(entry) is _LttngDataStreamBeaconEntry:
+ def sort_key(
+ entry: Union[_LttngDataStreamIndexEntry, _LttngDataStreamBeaconIndexEntry],
+ ) -> int:
+ if isinstance(entry, _LttngDataStreamBeaconIndexEntry):
return entry.timestamp
else:
return entry.timestamp_end
return entry.timestamp
else:
return entry.timestamp_end
self._entries += beacons
self._entries.sort(key=sort_key)
self._entries += beacons
self._entries.sort(key=sort_key)
- def __getitem__(self, index):
+ @overload
+ def __getitem__(self, index: int) -> _LttngIndexEntryT:
+ ...
+
+ @overload
+ def __getitem__(self, index: slice) -> Sequence[_LttngIndexEntryT]: # noqa: F811
+ ...
+
+ def __getitem__( # noqa: F811
+ self, index: Union[int, slice]
+ ) -> Union[_LttngIndexEntryT, Sequence[_LttngIndexEntryT],]:
return self._entries[index]
def __len__(self):
return self._entries[index]
def __len__(self):
# An LTTng data stream.
class _LttngDataStream:
# An LTTng data stream.
class _LttngDataStream:
- def __init__(self, path, beacons):
+ def __init__(self, path: str, beacons_json: Optional[tjson.ArrayVal]):
self._path = path
filename = os.path.basename(path)
match = re.match(r"(.*)_\d+", filename)
self._path = path
filename = os.path.basename(path)
match = re.match(r"(.*)_\d+", filename)
self._channel_name = match.group(1)
trace_dir = os.path.dirname(path)
index_path = os.path.join(trace_dir, "index", filename + ".idx")
self._channel_name = match.group(1)
trace_dir = os.path.dirname(path)
index_path = os.path.join(trace_dir, "index", filename + ".idx")
- self._index = _LttngDataStreamIndex(index_path, beacons)
+ self._index = _LttngDataStreamIndex(index_path, beacons_json)
assert os.path.isfile(path)
self._file = open(path, "rb")
logging.info(
assert os.path.isfile(path)
self._file = open(path, "rb")
logging.info(
def index(self):
return self._index
def index(self):
return self._index
- def get_data(self, offset_bytes, len_bytes):
+ def get_data(self, offset_bytes: int, len_bytes: int):
self._file.seek(offset_bytes)
return self._file.read(len_bytes)
class _LttngMetadataStreamSection:
self._file.seek(offset_bytes)
return self._file.read(len_bytes)
class _LttngMetadataStreamSection:
- def __init__(self, timestamp, data):
+ def __init__(self, timestamp: int, data: Optional[bytes]):
self._timestamp = timestamp
if data is None:
self._data = bytes()
self._timestamp = timestamp
if data is None:
self._data = bytes()
# An LTTng metadata stream.
class _LttngMetadataStream:
# An LTTng metadata stream.
class _LttngMetadataStream:
- def __init__(self, metadata_file_path, config_sections):
+ def __init__(
+ self,
+ metadata_file_path: str,
+ config_sections: Sequence[_LttngMetadataStreamSection],
+ ):
self._path = metadata_file_path
self._sections = config_sections
logging.info(
self._path = metadata_file_path
self._sections = config_sections
logging.info(
-LttngMetadataConfigSection = namedtuple(
- "LttngMetadataConfigSection", ["line", "timestamp", "is_empty"]
-)
+class LttngMetadataConfigSection:
+ def __init__(self, line: int, timestamp: int, is_empty: bool):
+ self._line = line
+ self._timestamp = timestamp
+ self._is_empty = is_empty
+
+ @property
+ def line(self):
+ return self._line
+ @property
+ def timestamp(self):
+ return self._timestamp
-def _parse_metadata_sections_config(config_sections):
- assert config_sections is not None
- config_metadata_sections = []
+ @property
+ def is_empty(self):
+ return self._is_empty
+
+
+def _parse_metadata_sections_config(metadata_sections_json: tjson.ArrayVal):
+ metadata_sections = [] # type: list[LttngMetadataConfigSection]
append_empty_section = False
last_timestamp = 0
last_line = 0
append_empty_section = False
last_timestamp = 0
last_line = 0
- for config_section in config_sections:
- if config_section == "empty":
- # Found a empty section marker. Actually append the section at the
- # timestamp of the next concrete section.
- append_empty_section = True
- else:
- assert type(config_section) is dict
- line = config_section.get("line")
- ts = config_section.get("timestamp")
-
- if type(line) is not int:
- raise RuntimeError("`line` is not an integer")
-
- if type(ts) is not int:
- raise RuntimeError("`timestamp` is not an integer")
+ for section in metadata_sections_json:
+ if isinstance(section, tjson.StrVal):
+ if section.val == "empty":
+ # Found a empty section marker. Actually append the section at the
+ # timestamp of the next concrete section.
+ append_empty_section = True
+ else:
+ raise ValueError("Invalid string value at {}.".format(section.path))
+ elif isinstance(section, tjson.ObjVal):
+ line = section.at("line", tjson.IntVal).val
+ ts = section.at("timestamp", tjson.IntVal).val
# Sections' timestamps and lines must both be increasing.
assert ts > last_timestamp
last_timestamp = ts
# Sections' timestamps and lines must both be increasing.
assert ts > last_timestamp
last_timestamp = ts
assert line > last_line
last_line = line
if append_empty_section:
assert line > last_line
last_line = line
if append_empty_section:
- config_metadata_sections.append(
- LttngMetadataConfigSection(line, ts, True)
- )
+ metadata_sections.append(LttngMetadataConfigSection(line, ts, True))
append_empty_section = False
append_empty_section = False
- config_metadata_sections.append(LttngMetadataConfigSection(line, ts, False))
-
- return config_metadata_sections
+ metadata_sections.append(LttngMetadataConfigSection(line, ts, False))
+ else:
+ raise TypeError(
+ "`{}`: expecting a string or object value".format(section.path)
+ )
+ return metadata_sections
-def _split_metadata_sections(metadata_file_path, raw_config_sections):
- assert isinstance(raw_config_sections, collections.abc.Sequence)
- parsed_sections = _parse_metadata_sections_config(raw_config_sections)
+def _split_metadata_sections(
+ metadata_file_path: str, metadata_sections_json: tjson.ArrayVal
+):
+ metadata_sections = _parse_metadata_sections_config(metadata_sections_json)
+ sections = [] # type: list[_LttngMetadataStreamSection]
with open(metadata_file_path, "r") as metadata_file:
metadata_lines = [line for line in metadata_file]
with open(metadata_file_path, "r") as metadata_file:
metadata_lines = [line for line in metadata_file]
- config_metadata_sections_idx = 0
+ metadata_section_idx = 0
curr_metadata_section = bytearray()
for idx, line_content in enumerate(metadata_lines):
curr_metadata_section = bytearray()
for idx, line_content in enumerate(metadata_lines):
curr_line_number = idx + 1
# If there are no more sections, simply append the line.
curr_line_number = idx + 1
# If there are no more sections, simply append the line.
- if config_metadata_sections_idx + 1 >= len(parsed_sections):
+ if metadata_section_idx + 1 >= len(metadata_sections):
curr_metadata_section += bytearray(line_content, "utf8")
continue
curr_metadata_section += bytearray(line_content, "utf8")
continue
- next_section_line_number = parsed_sections[
- config_metadata_sections_idx + 1
- ].line
+ next_section_line_number = metadata_sections[metadata_section_idx + 1].line
# If the next section begins at the current line, create a
# section with the metadata we gathered so far.
# If the next section begins at the current line, create a
# section with the metadata we gathered so far.
# Flushing the metadata of the current section.
sections.append(
_LttngMetadataStreamSection(
# Flushing the metadata of the current section.
sections.append(
_LttngMetadataStreamSection(
- parsed_sections[config_metadata_sections_idx].timestamp,
+ metadata_sections[metadata_section_idx].timestamp,
bytes(curr_metadata_section),
)
)
# Move to the next section.
bytes(curr_metadata_section),
)
)
# Move to the next section.
- config_metadata_sections_idx += 1
+ metadata_section_idx += 1
# Clear old content and append current line for the next section.
curr_metadata_section.clear()
curr_metadata_section += bytearray(line_content, "utf8")
# Append any empty sections.
# Clear old content and append current line for the next section.
curr_metadata_section.clear()
curr_metadata_section += bytearray(line_content, "utf8")
# Append any empty sections.
- while parsed_sections[config_metadata_sections_idx].is_empty:
+ while metadata_sections[metadata_section_idx].is_empty:
sections.append(
_LttngMetadataStreamSection(
sections.append(
_LttngMetadataStreamSection(
- parsed_sections[config_metadata_sections_idx].timestamp, None
+ metadata_sections[metadata_section_idx].timestamp, None
- config_metadata_sections_idx += 1
+ metadata_section_idx += 1
else:
# Append line_content to the current metadata section.
curr_metadata_section += bytearray(line_content, "utf8")
else:
# Append line_content to the current metadata section.
curr_metadata_section += bytearray(line_content, "utf8")
# We iterated over all the lines of the metadata file. Close the current section.
sections.append(
_LttngMetadataStreamSection(
# We iterated over all the lines of the metadata file. Close the current section.
sections.append(
_LttngMetadataStreamSection(
- parsed_sections[config_metadata_sections_idx].timestamp,
+ metadata_sections[metadata_section_idx].timestamp,
bytes(curr_metadata_section),
)
)
bytes(curr_metadata_section),
)
)
+_StreamBeaconsT = Dict[str, Iterable[int]]
+
+
# An LTTng trace, a sequence of LTTng data streams.
# An LTTng trace, a sequence of LTTng data streams.
-class LttngTrace(collections.abc.Sequence):
- def __init__(self, trace_dir, metadata_sections, beacons):
+class LttngTrace(Sequence[_LttngDataStream]):
+ def __init__(
+ self,
+ trace_dir: str,
+ metadata_sections_json: Optional[tjson.ArrayVal],
+ beacons_json: Optional[tjson.ObjVal],
+ ):
assert os.path.isdir(trace_dir)
self._path = trace_dir
assert os.path.isdir(trace_dir)
self._path = trace_dir
- self._create_metadata_stream(trace_dir, metadata_sections)
- self._create_data_streams(trace_dir, beacons)
+ self._create_metadata_stream(trace_dir, metadata_sections_json)
+ self._create_data_streams(trace_dir, beacons_json)
logging.info('Built trace: path="{}"'.format(trace_dir))
logging.info('Built trace: path="{}"'.format(trace_dir))
- def _create_data_streams(self, trace_dir, beacons):
- data_stream_paths = []
+ def _create_data_streams(
+ self, trace_dir: str, beacons_json: Optional[tjson.ObjVal]
+ ):
+ data_stream_paths = [] # type: list[str]
for filename in os.listdir(trace_dir):
path = os.path.join(trace_dir, filename)
for filename in os.listdir(trace_dir):
path = os.path.join(trace_dir, filename)
data_stream_paths.append(path)
data_stream_paths.sort()
data_stream_paths.append(path)
data_stream_paths.sort()
- self._data_streams = []
+ self._data_streams = [] # type: list[_LttngDataStream]
for data_stream_path in data_stream_paths:
stream_name = os.path.basename(data_stream_path)
for data_stream_path in data_stream_paths:
stream_name = os.path.basename(data_stream_path)
- this_stream_beacons = None
-
- if beacons is not None and stream_name in beacons:
- this_stream_beacons = beacons[stream_name]
+ this_beacons_json = None
+ if beacons_json is not None and stream_name in beacons_json:
+ this_beacons_json = beacons_json.at(stream_name, tjson.ArrayVal)
self._data_streams.append(
self._data_streams.append(
- _LttngDataStream(data_stream_path, this_stream_beacons)
+ _LttngDataStream(data_stream_path, this_beacons_json)
- def _create_metadata_stream(self, trace_dir, config_metadata_sections):
+ def _create_metadata_stream(
+ self, trace_dir: str, metadata_sections_json: Optional[tjson.ArrayVal]
+ ):
metadata_path = os.path.join(trace_dir, "metadata")
metadata_path = os.path.join(trace_dir, "metadata")
+ metadata_sections = [] # type: list[_LttngMetadataStreamSection]
- if config_metadata_sections is None:
+ if metadata_sections_json is None:
with open(metadata_path, "rb") as metadata_file:
metadata_sections.append(
_LttngMetadataStreamSection(0, metadata_file.read())
)
else:
metadata_sections = _split_metadata_sections(
with open(metadata_path, "rb") as metadata_file:
metadata_sections.append(
_LttngMetadataStreamSection(0, metadata_file.read())
)
else:
metadata_sections = _split_metadata_sections(
- metadata_path, config_metadata_sections
+ metadata_path, metadata_sections_json
)
self._metadata_stream = _LttngMetadataStream(metadata_path, metadata_sections)
)
self._metadata_stream = _LttngMetadataStream(metadata_path, metadata_sections)
def metadata_stream(self):
return self._metadata_stream
def metadata_stream(self):
return self._metadata_stream
- def __getitem__(self, index):
+ @overload
+ def __getitem__(self, index: int) -> _LttngDataStream:
+ ...
+
+ @overload
+ def __getitem__(self, index: slice) -> Sequence[_LttngDataStream]: # noqa: F811
+ ...
+
+ def __getitem__( # noqa: F811
+ self, index: Union[int, slice]
+ ) -> Union[_LttngDataStream, Sequence[_LttngDataStream]]:
return self._data_streams[index]
def __len__(self):
return self._data_streams[index]
def __len__(self):
# The state of a single data stream.
class _LttngLiveViewerSessionDataStreamState:
# The state of a single data stream.
class _LttngLiveViewerSessionDataStreamState:
- def __init__(self, ts_state, info, data_stream, metadata_stream_id):
+ def __init__(
+ self,
+ ts_state: "_LttngLiveViewerSessionTracingSessionState",
+ info: _LttngLiveViewerStreamInfo,
+ data_stream: _LttngDataStream,
+ metadata_stream_id: int,
+ ):
self._ts_state = ts_state
self._info = info
self._data_stream = data_stream
self._ts_state = ts_state
self._info = info
self._data_stream = data_stream
return self._data_stream.index[self._cur_index_entry_index]
return self._data_stream.index[self._cur_index_entry_index]
+ @property
+ def metadata_stream_id(self):
+ return self._metadata_stream_id
+
def goto_next_index_entry(self):
self._cur_index_entry_index += 1
# The state of a single metadata stream.
class _LttngLiveViewerSessionMetadataStreamState:
def goto_next_index_entry(self):
self._cur_index_entry_index += 1
# The state of a single metadata stream.
class _LttngLiveViewerSessionMetadataStreamState:
- def __init__(self, ts_state, info, metadata_stream):
+ def __init__(
+ self,
+ ts_state: "_LttngLiveViewerSessionTracingSessionState",
+ info: _LttngLiveViewerStreamInfo,
+ metadata_stream: _LttngMetadataStream,
+ ):
self._ts_state = ts_state
self._info = info
self._metadata_stream = metadata_stream
self._ts_state = ts_state
self._info = info
self._metadata_stream = metadata_stream
return self._is_sent
@is_sent.setter
return self._is_sent
@is_sent.setter
- def is_sent(self, value):
+ def is_sent(self, value: bool):
self._is_sent = value
@property
self._is_sent = value
@property
# objects).
class LttngTracingSessionDescriptor:
def __init__(
# objects).
class LttngTracingSessionDescriptor:
def __init__(
- self, name, tracing_session_id, hostname, live_timer_freq, client_count, traces
+ self,
+ name: str,
+ tracing_session_id: int,
+ hostname: str,
+ live_timer_freq: int,
+ client_count: int,
+ traces: Iterable[LttngTrace],
):
for trace in traces:
if name not in trace.path:
):
for trace in traces:
if name not in trace.path:
# The state of a tracing session.
class _LttngLiveViewerSessionTracingSessionState:
# The state of a tracing session.
class _LttngLiveViewerSessionTracingSessionState:
- def __init__(self, tc_descr, base_stream_id):
+ def __init__(self, tc_descr: LttngTracingSessionDescriptor, base_stream_id: int):
self._tc_descr = tc_descr
self._tc_descr = tc_descr
- self._stream_infos = []
- self._ds_states = {}
- self._ms_states = {}
+ self._stream_infos = [] # type: list[_LttngLiveViewerStreamInfo]
+ self._ds_states = {} # type: dict[int, _LttngLiveViewerSessionDataStreamState]
+ self._ms_states = (
+ {}
+ ) # type: dict[int, _LttngLiveViewerSessionMetadataStreamState]
stream_id = base_stream_id
for trace in tc_descr.traces:
stream_id = base_stream_id
for trace in tc_descr.traces:
return self._is_attached
@is_attached.setter
return self._is_attached
@is_attached.setter
- def is_attached(self, value):
+ def is_attached(self, value: bool):
self._is_attached = value
self._is_attached = value
-def needs_new_metadata_section(metadata_stream_state, latest_timestamp):
+def needs_new_metadata_section(
+ metadata_stream_state: _LttngLiveViewerSessionMetadataStreamState,
+ latest_timestamp: int,
+):
if metadata_stream_state.next_section_timestamp is None:
return False
if metadata_stream_state.next_section_timestamp is None:
return False
class _LttngLiveViewerSession:
def __init__(
self,
class _LttngLiveViewerSession:
def __init__(
self,
- viewer_session_id,
- tracing_session_descriptors,
- max_query_data_response_size,
+ viewer_session_id: int,
+ tracing_session_descriptors: Iterable[LttngTracingSessionDescriptor],
+ max_query_data_response_size: Optional[int],
):
self._viewer_session_id = viewer_session_id
):
self._viewer_session_id = viewer_session_id
- self._ts_states = {}
- self._stream_states = {}
+ self._ts_states = (
+ {}
+ ) # type: dict[int, _LttngLiveViewerSessionTracingSessionState]
+ self._stream_states = (
+ {}
+ ) # type: dict[int, _LttngLiveViewerSessionDataStreamState | _LttngLiveViewerSessionMetadataStreamState]
self._max_query_data_response_size = max_query_data_response_size
total_stream_infos = 0
self._max_query_data_response_size = max_query_data_response_size
total_stream_infos = 0
_LttngLiveViewerGetNewStreamInfosCommand: self._handle_get_new_stream_infos_command,
_LttngLiveViewerGetNextDataStreamIndexEntryCommand: self._handle_get_next_data_stream_index_entry_command,
_LttngLiveViewerGetTracingSessionInfosCommand: self._handle_get_tracing_session_infos_command,
_LttngLiveViewerGetNewStreamInfosCommand: self._handle_get_new_stream_infos_command,
_LttngLiveViewerGetNextDataStreamIndexEntryCommand: self._handle_get_next_data_stream_index_entry_command,
_LttngLiveViewerGetTracingSessionInfosCommand: self._handle_get_tracing_session_infos_command,
+ } # type: dict[type[_LttngLiveViewerCommand], Callable[[Any], _LttngLiveViewerReply]]
@property
def viewer_session_id(self):
return self._viewer_session_id
@property
def viewer_session_id(self):
return self._viewer_session_id
- def _get_tracing_session_state(self, tracing_session_id):
+ def _get_tracing_session_state(self, tracing_session_id: int):
if tracing_session_id not in self._ts_states:
raise UnexpectedInput(
"Unknown tracing session ID {}".format(tracing_session_id)
if tracing_session_id not in self._ts_states:
raise UnexpectedInput(
"Unknown tracing session ID {}".format(tracing_session_id)
return self._ts_states[tracing_session_id]
return self._ts_states[tracing_session_id]
- def _get_stream_state(self, stream_id):
+ def _get_data_stream_state(self, stream_id: int):
if stream_id not in self._stream_states:
if stream_id not in self._stream_states:
- UnexpectedInput("Unknown stream ID {}".format(stream_id))
+ RuntimeError("Unknown stream ID {}".format(stream_id))
- return self._stream_states[stream_id]
+ stream = self._stream_states[stream_id]
+ if type(stream) is not _LttngLiveViewerSessionDataStreamState:
+ raise RuntimeError("Stream is not a data stream")
- def handle_command(self, cmd):
+ return stream
+
+ def _get_metadata_stream_state(self, stream_id: int):
+ if stream_id not in self._stream_states:
+ RuntimeError("Unknown stream ID {}".format(stream_id))
+
+ stream = self._stream_states[stream_id]
+ if type(stream) is not _LttngLiveViewerSessionMetadataStreamState:
+ raise RuntimeError("Stream is not a metadata stream")
+
+ return stream
+
+ def handle_command(self, cmd: _LttngLiveViewerCommand):
logging.info(
"Handling command in viewer session: cmd-cls-name={}".format(
cmd.__class__.__name__
logging.info(
"Handling command in viewer session: cmd-cls-name={}".format(
cmd.__class__.__name__
return self._command_handlers[cmd_type](cmd)
return self._command_handlers[cmd_type](cmd)
- def _handle_attach_to_tracing_session_command(self, cmd):
+ def _handle_attach_to_tracing_session_command(
+ self, cmd: _LttngLiveViewerAttachToTracingSessionCommand
+ ):
fmt = 'Handling "attach to tracing session" command: ts-id={}, offset={}, seek-type={}'
logging.info(fmt.format(cmd.tracing_session_id, cmd.offset, cmd.seek_type))
ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
fmt = 'Handling "attach to tracing session" command: ts-id={}, offset={}, seek-type={}'
logging.info(fmt.format(cmd.tracing_session_id, cmd.offset, cmd.seek_type))
ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
status, ts_state.stream_infos
)
status, ts_state.stream_infos
)
- def _handle_detach_from_tracing_session_command(self, cmd):
+ def _handle_detach_from_tracing_session_command(
+ self, cmd: _LttngLiveViewerDetachFromTracingSessionCommand
+ ):
fmt = 'Handling "detach from tracing session" command: ts-id={}'
logging.info(fmt.format(cmd.tracing_session_id))
ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
fmt = 'Handling "detach from tracing session" command: ts-id={}'
logging.info(fmt.format(cmd.tracing_session_id))
ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
status = _LttngLiveViewerDetachFromTracingSessionReply.Status.OK
return _LttngLiveViewerDetachFromTracingSessionReply(status)
status = _LttngLiveViewerDetachFromTracingSessionReply.Status.OK
return _LttngLiveViewerDetachFromTracingSessionReply(status)
- def _handle_get_next_data_stream_index_entry_command(self, cmd):
+ def _handle_get_next_data_stream_index_entry_command(
+ self, cmd: _LttngLiveViewerGetNextDataStreamIndexEntryCommand
+ ):
fmt = 'Handling "get next data stream index entry" command: stream-id={}'
logging.info(fmt.format(cmd.stream_id))
fmt = 'Handling "get next data stream index entry" command: stream-id={}'
logging.info(fmt.format(cmd.stream_id))
- stream_state = self._get_stream_state(cmd.stream_id)
- metadata_stream_state = self._get_stream_state(stream_state._metadata_stream_id)
-
- if type(stream_state) is not _LttngLiveViewerSessionDataStreamState:
- raise UnexpectedInput(
- "Stream with ID {} is not a data stream".format(cmd.stream_id)
- )
+ stream_state = self._get_data_stream_state(cmd.stream_id)
+ metadata_stream_state = self._get_metadata_stream_state(
+ stream_state.metadata_stream_id
+ )
if stream_state.cur_index_entry is None:
# The viewer is done reading this stream
if stream_state.cur_index_entry is None:
# The viewer is done reading this stream
# The viewer only checks the `has_new_metadata` flag if the
# reply's status is `OK`, so we need to provide an index here
has_new_metadata = stream_state.tracing_session_state.has_new_metadata
# The viewer only checks the `has_new_metadata` flag if the
# reply's status is `OK`, so we need to provide an index here
has_new_metadata = stream_state.tracing_session_state.has_new_metadata
- if type(stream_state.cur_index_entry) is _LttngDataStreamIndexEntry:
+ if isinstance(stream_state.cur_index_entry, _LttngDataStreamIndexEntry):
status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.OK
else:
status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.OK
else:
- assert type(stream_state.cur_index_entry) is _LttngDataStreamBeaconEntry
status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.INACTIVE
reply = _LttngLiveViewerGetNextDataStreamIndexEntryReply(
status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.INACTIVE
reply = _LttngLiveViewerGetNextDataStreamIndexEntryReply(
stream_state.goto_next_index_entry()
return reply
stream_state.goto_next_index_entry()
return reply
- def _handle_get_data_stream_packet_data_command(self, cmd):
+ def _handle_get_data_stream_packet_data_command(
+ self, cmd: _LttngLiveViewerGetDataStreamPacketDataCommand
+ ):
fmt = 'Handling "get data stream packet data" command: stream-id={}, offset={}, req-length={}'
logging.info(fmt.format(cmd.stream_id, cmd.offset, cmd.req_length))
fmt = 'Handling "get data stream packet data" command: stream-id={}, offset={}, req-length={}'
logging.info(fmt.format(cmd.stream_id, cmd.offset, cmd.req_length))
- stream_state = self._get_stream_state(cmd.stream_id)
+ stream_state = self._get_data_stream_state(cmd.stream_id)
data_response_length = cmd.req_length
data_response_length = cmd.req_length
- if type(stream_state) is not _LttngLiveViewerSessionDataStreamState:
- raise UnexpectedInput(
- "Stream with ID {} is not a data stream".format(cmd.stream_id)
- )
-
if stream_state.tracing_session_state.has_new_metadata:
status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.ERROR
return _LttngLiveViewerGetDataStreamPacketDataReply(
if stream_state.tracing_session_state.has_new_metadata:
status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.ERROR
return _LttngLiveViewerGetDataStreamPacketDataReply(
status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.OK
return _LttngLiveViewerGetDataStreamPacketDataReply(status, data, False, False)
status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.OK
return _LttngLiveViewerGetDataStreamPacketDataReply(status, data, False, False)
- def _handle_get_metadata_stream_data_command(self, cmd):
+ def _handle_get_metadata_stream_data_command(
+ self, cmd: _LttngLiveViewerGetMetadataStreamDataCommand
+ ):
fmt = 'Handling "get metadata stream data" command: stream-id={}'
logging.info(fmt.format(cmd.stream_id))
fmt = 'Handling "get metadata stream data" command: stream-id={}'
logging.info(fmt.format(cmd.stream_id))
- metadata_stream_state = self._get_stream_state(cmd.stream_id)
-
- if (
- type(metadata_stream_state)
- is not _LttngLiveViewerSessionMetadataStreamState
- ):
- raise UnexpectedInput(
- "Stream with ID {} is not a metadata stream".format(cmd.stream_id)
- )
+ metadata_stream_state = self._get_metadata_stream_state(cmd.stream_id)
if metadata_stream_state.is_sent:
status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.NO_NEW
if metadata_stream_state.is_sent:
status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.NO_NEW
status, metadata_section.data
)
status, metadata_section.data
)
- def _handle_get_new_stream_infos_command(self, cmd):
+ def _handle_get_new_stream_infos_command(
+ self, cmd: _LttngLiveViewerGetNewStreamInfosCommand
+ ):
fmt = 'Handling "get new stream infos" command: ts-id={}'
logging.info(fmt.format(cmd.tracing_session_id))
fmt = 'Handling "get new stream infos" command: ts-id={}'
logging.info(fmt.format(cmd.tracing_session_id))
status = _LttngLiveViewerGetNewStreamInfosReply.Status.HUP
return _LttngLiveViewerGetNewStreamInfosReply(status, [])
status = _LttngLiveViewerGetNewStreamInfosReply.Status.HUP
return _LttngLiveViewerGetNewStreamInfosReply(status, [])
- def _handle_get_tracing_session_infos_command(self, cmd):
+ def _handle_get_tracing_session_infos_command(
+ self, cmd: _LttngLiveViewerGetTracingSessionInfosCommand
+ ):
logging.info('Handling "get tracing session infos" command.')
infos = [
tss.tracing_session_descriptor.info for tss in self._ts_states.values()
logging.info('Handling "get tracing session infos" command.')
infos = [
tss.tracing_session_descriptor.info for tss in self._ts_states.values()
infos.sort(key=lambda info: info.name)
return _LttngLiveViewerGetTracingSessionInfosReply(infos)
infos.sort(key=lambda info: info.name)
return _LttngLiveViewerGetTracingSessionInfosReply(infos)
- def _handle_create_viewer_session_command(self, cmd):
+ def _handle_create_viewer_session_command(
+ self, cmd: _LttngLiveViewerCreateViewerSessionCommand
+ ):
logging.info('Handling "create viewer session" command.')
status = _LttngLiveViewerCreateViewerSessionReply.Status.OK
logging.info('Handling "create viewer session" command.')
status = _LttngLiveViewerCreateViewerSessionReply.Status.OK
class LttngLiveServer:
def __init__(
self,
class LttngLiveServer:
def __init__(
self,
- port,
- port_filename,
- tracing_session_descriptors,
- max_query_data_response_size,
+ port: Optional[int],
+ port_filename: str,
+ tracing_session_descriptors: Iterable[LttngTracingSessionDescriptor],
+ max_query_data_response_size: Optional[int],
):
logging.info("Server configuration:")
):
logging.info("Server configuration:")
- def _send_reply(self, reply):
+ def _send_reply(self, reply: _LttngLiveViewerReply):
data = self._codec.encode(reply)
logging.info(
"Sending reply to viewer: reply-cls-name={}, length={}".format(
data = self._codec.encode(reply)
logging.info(
"Sending reply to viewer: reply-cls-name={}, length={}".format(
finally:
self._conn.close()
finally:
self._conn.close()
- def _write_port_to_file(self, port_filename):
+ def _write_port_to_file(self, port_filename: str):
# Write the port number to a temporary file.
with tempfile.NamedTemporaryFile(mode="w", delete=False) as tmp_port_file:
print(self._server_port, end="", file=tmp_port_file)
# Write the port number to a temporary file.
with tempfile.NamedTemporaryFile(mode="w", delete=False) as tmp_port_file:
print(self._server_port, end="", file=tmp_port_file)
-def _session_descriptors_from_path(sessions_filename, trace_path_prefix):
+def _session_descriptors_from_path(
+ sessions_filename: str, trace_path_prefix: Optional[str]
+):
# }
# ]
with open(sessions_filename, "r") as sessions_file:
# }
# ]
with open(sessions_filename, "r") as sessions_file:
- params = json.load(sessions_file)
+ sessions_json = tjson.load(sessions_file, tjson.ArrayVal)
+
+ sessions = [] # type: list[LttngTracingSessionDescriptor]
+ for session_json in sessions_json.iter(tjson.ObjVal):
+ name = session_json.at("name", tjson.StrVal).val
+ tracing_session_id = session_json.at("id", tjson.IntVal).val
+ hostname = session_json.at("hostname", tjson.StrVal).val
+ live_timer_freq = session_json.at("live-timer-freq", tjson.IntVal).val
+ client_count = session_json.at("client-count", tjson.IntVal).val
+ traces_json = session_json.at("traces", tjson.ArrayVal)
- for session in params:
- name = session["name"]
- tracing_session_id = session["id"]
- hostname = session["hostname"]
- live_timer_freq = session["live-timer-freq"]
- client_count = session["client-count"]
- traces = []
+ traces = [] # type: list[LttngTrace]
- for trace in session["traces"]:
- metadata_sections = trace.get("metadata-sections")
- beacons = trace.get("beacons")
- path = trace["path"]
+ for trace_json in traces_json.iter(tjson.ObjVal):
+ metadata_sections = (
+ trace_json.at("metadata-sections", tjson.ArrayVal)
+ if "metadata-sections" in trace_json
+ else None
+ )
+ beacons = (
+ trace_json.at("beacons", tjson.ObjVal)
+ if "beacons" in trace_json
+ else None
+ )
+ path = trace_json.at("path", tjson.StrVal).val
- if not os.path.isabs(path):
+ if not os.path.isabs(path) and trace_path_prefix:
path = os.path.join(trace_path_prefix, path)
path = os.path.join(trace_path_prefix, path)
- traces.append(LttngTrace(path, metadata_sections, beacons))
+ traces.append(
+ LttngTrace(
+ path,
+ metadata_sections,
+ beacons,
+ )
+ )
sessions.append(
LttngTracingSessionDescriptor(
sessions.append(
LttngTracingSessionDescriptor(
-def _loglevel_parser(string):
+def _loglevel_parser(string: str):
loglevels = {"info": logging.INFO, "warning": logging.WARNING}
if string not in loglevels:
msg = "{} is not a valid loglevel".format(string)
loglevels = {"info": logging.INFO, "warning": logging.WARNING}
if string not in loglevels:
msg = "{} is not a valid loglevel".format(string)
args = parser.parse_args(args=remaining_args)
try:
args = parser.parse_args(args=remaining_args)
try:
+ sessions_filename = args.sessions_filename # type: str
+ trace_path_prefix = args.trace_path_prefix # type: str | None
sessions = _session_descriptors_from_path(
sessions = _session_descriptors_from_path(
- args.sessions_filename,
- args.trace_path_prefix,
- )
- LttngLiveServer(
- args.port, args.port_filename, sessions, args.max_query_data_response_size
+ sessions_filename,
+ trace_path_prefix,
+
+ port = args.port # type: int | None
+ port_filename = args.port_filename # type: str
+ max_query_data_response_size = (
+ args.max_query_data_response_size
+ ) # type: int | None
+ LttngLiveServer(port, port_filename, sessions, max_query_data_response_size)
except UnexpectedInput as exc:
logging.error(str(exc))
print(exc, file=sys.stderr)
except UnexpectedInput as exc:
logging.error(str(exc))
print(exc, file=sys.stderr)
# start server
diag "$BT_TESTS_PYTHON_BIN $server_script --port-file $port_file --trace-path-prefix $trace_dir_native $server_args"
# start server
diag "$BT_TESTS_PYTHON_BIN $server_script --port-file $port_file --trace-path-prefix $trace_dir_native $server_args"
- echo "$server_args" | xargs "$BT_TESTS_PYTHON_BIN" "$server_script" \
+ echo "$server_args" | run_python xargs "$BT_TESTS_PYTHON_BIN" "$server_script" \
--port-file "$port_file" \
--trace-path-prefix "$trace_dir_native" &
--port-file "$port_file" \
--trace-path-prefix "$trace_dir_native" &