# Copyright (C) 2019 Philippe Proulx <pproulx@efficios.com>
#
-import argparse
-import collections.abc
-import logging
+# pyright: strict, reportTypeCommentUsage=false, reportMissingTypeStubs=false
+
import os
-import os.path
import re
+import time
import socket
import struct
-import sys
+import logging
+import os.path
+import argparse
import tempfile
-import json
-from collections import namedtuple
+from abc import ABC, abstractmethod
+from typing import Dict, Union, Iterable, Optional, Sequence, overload
+import tjson
-class UnexpectedInput(RuntimeError):
- pass
+# isort: off
+from typing import Any, Callable # noqa: F401
+
+# isort: on
+
+
+# An entry within the index of an LTTng data stream.
+class _LttngDataStreamIndexEntry:
+ def __init__(
+ self,
+ offset_bytes: int,
+ total_size_bits: int,
+ content_size_bits: int,
+ timestamp_begin: int,
+ timestamp_end: int,
+ events_discarded: int,
+ stream_class_id: int,
+ ):
+ self._offset_bytes = offset_bytes
+ self._total_size_bits = total_size_bits
+ self._content_size_bits = content_size_bits
+ self._timestamp_begin = timestamp_begin
+ self._timestamp_end = timestamp_end
+ self._events_discarded = events_discarded
+ self._stream_class_id = stream_class_id
+
+ @property
+ def offset_bytes(self):
+ return self._offset_bytes
+
+ @property
+ def total_size_bits(self):
+ return self._total_size_bits
+
+ @property
+ def total_size_bytes(self):
+ return self._total_size_bits // 8
+
+ @property
+ def content_size_bits(self):
+ return self._content_size_bits
+
+ @property
+ def content_size_bytes(self):
+ return self._content_size_bits // 8
+
+ @property
+ def timestamp_begin(self):
+ return self._timestamp_begin
+
+ @property
+ def timestamp_end(self):
+ return self._timestamp_end
+
+ @property
+ def events_discarded(self):
+ return self._events_discarded
+
+ @property
+ def stream_class_id(self):
+ return self._stream_class_id
+
+
+# An entry within the index of an LTTng data stream. While a stream beacon entry
+# is conceptually unrelated to an index, it is sent as a reply to a
+# LttngLiveViewerGetNextDataStreamIndexEntryCommand
+class _LttngDataStreamBeaconIndexEntry:
+ def __init__(self, stream_class_id: int, timestamp: int):
+ self._stream_class_id = stream_class_id
+ self._timestamp = timestamp
+
+ @property
+ def timestamp(self):
+ return self._timestamp
+
+ @property
+ def stream_class_id(self):
+ return self._stream_class_id
+
+
+_LttngIndexEntryT = Union[_LttngDataStreamIndexEntry, _LttngDataStreamBeaconIndexEntry]
class _LttngLiveViewerCommand:
- def __init__(self, version):
+ def __init__(self, version: int):
self._version = version
@property
class _LttngLiveViewerConnectCommand(_LttngLiveViewerCommand):
- def __init__(self, version, viewer_session_id, major, minor):
+ def __init__(self, version: int, viewer_session_id: int, major: int, minor: int):
super().__init__(version)
self._viewer_session_id = viewer_session_id
self._major = major
return self._minor
-class _LttngLiveViewerConnectReply:
- def __init__(self, viewer_session_id, major, minor):
+class _LttngLiveViewerReply:
+ pass
+
+
+class _LttngLiveViewerConnectReply(_LttngLiveViewerReply):
+ def __init__(self, viewer_session_id: int, major: int, minor: int):
self._viewer_session_id = viewer_session_id
self._major = major
self._minor = minor
class _LttngLiveViewerTracingSessionInfo:
def __init__(
self,
- tracing_session_id,
- live_timer_freq,
- client_count,
- stream_count,
- hostname,
- name,
+ tracing_session_id: int,
+ live_timer_freq: int,
+ client_count: int,
+ stream_count: int,
+ hostname: str,
+ name: str,
):
self._tracing_session_id = tracing_session_id
self._live_timer_freq = live_timer_freq
return self._name
-class _LttngLiveViewerGetTracingSessionInfosReply:
- def __init__(self, tracing_session_infos):
+class _LttngLiveViewerGetTracingSessionInfosReply(_LttngLiveViewerReply):
+ def __init__(
+ self, tracing_session_infos: Sequence[_LttngLiveViewerTracingSessionInfo]
+ ):
self._tracing_session_infos = tracing_session_infos
@property
BEGINNING = 1
LAST = 2
- def __init__(self, version, tracing_session_id, offset, seek_type):
+ def __init__(
+ self, version: int, tracing_session_id: int, offset: int, seek_type: int
+ ):
super().__init__(version)
self._tracing_session_id = tracing_session_id
self._offset = offset
class _LttngLiveViewerStreamInfo:
- def __init__(self, id, trace_id, is_metadata, path, channel_name):
+ def __init__(
+ self, id: int, trace_id: int, is_metadata: bool, path: str, channel_name: str
+ ):
self._id = id
self._trace_id = trace_id
self._is_metadata = is_metadata
return self._channel_name
-class _LttngLiveViewerAttachToTracingSessionReply:
+class _LttngLiveViewerAttachToTracingSessionReply(_LttngLiveViewerReply):
class Status:
OK = 1
ALREADY = 2
SEEK_ERROR = 5
NO_SESSION = 6
- def __init__(self, status, stream_infos):
+ def __init__(self, status: int, stream_infos: Sequence[_LttngLiveViewerStreamInfo]):
self._status = status
self._stream_infos = stream_infos
class _LttngLiveViewerGetNextDataStreamIndexEntryCommand(_LttngLiveViewerCommand):
- def __init__(self, version, stream_id):
+ def __init__(self, version: int, stream_id: int):
super().__init__(version)
self._stream_id = stream_id
return self._stream_id
-class _LttngLiveViewerGetNextDataStreamIndexEntryReply:
+class _LttngLiveViewerGetNextDataStreamIndexEntryReply(_LttngLiveViewerReply):
class Status:
OK = 1
RETRY = 2
INACTIVE = 5
EOF = 6
- def __init__(self, status, index_entry, has_new_metadata, has_new_data_stream):
+ def __init__(
+ self,
+ status: int,
+ index_entry: _LttngIndexEntryT,
+ has_new_metadata: bool,
+ has_new_data_stream: bool,
+ ):
self._status = status
self._index_entry = index_entry
self._has_new_metadata = has_new_metadata
class _LttngLiveViewerGetDataStreamPacketDataCommand(_LttngLiveViewerCommand):
- def __init__(self, version, stream_id, offset, req_length):
+ def __init__(self, version: int, stream_id: int, offset: int, req_length: int):
super().__init__(version)
self._stream_id = stream_id
self._offset = offset
return self._req_length
-class _LttngLiveViewerGetDataStreamPacketDataReply:
+class _LttngLiveViewerGetDataStreamPacketDataReply(_LttngLiveViewerReply):
class Status:
OK = 1
RETRY = 2
ERROR = 3
EOF = 4
- def __init__(self, status, data, has_new_metadata, has_new_data_stream):
+ def __init__(
+ self,
+ status: int,
+ data: bytes,
+ has_new_metadata: bool,
+ has_new_data_stream: bool,
+ ):
self._status = status
self._data = data
self._has_new_metadata = has_new_metadata
class _LttngLiveViewerGetMetadataStreamDataCommand(_LttngLiveViewerCommand):
- def __init__(self, version, stream_id):
+ def __init__(self, version: int, stream_id: int):
super().__init__(version)
self._stream_id = stream_id
return self._stream_id
-class _LttngLiveViewerGetMetadataStreamDataContentReply:
+class _LttngLiveViewerGetMetadataStreamDataContentReply(_LttngLiveViewerReply):
class Status:
OK = 1
NO_NEW = 2
ERROR = 3
- def __init__(self, status, data):
+ def __init__(self, status: int, data: bytes):
self._status = status
self._data = data
class _LttngLiveViewerGetNewStreamInfosCommand(_LttngLiveViewerCommand):
- def __init__(self, version, tracing_session_id):
+ def __init__(self, version: int, tracing_session_id: int):
super().__init__(version)
self._tracing_session_id = tracing_session_id
return self._tracing_session_id
-class _LttngLiveViewerGetNewStreamInfosReply:
+class _LttngLiveViewerGetNewStreamInfosReply(_LttngLiveViewerReply):
class Status:
OK = 1
NO_NEW = 2
ERROR = 3
HUP = 4
- def __init__(self, status, stream_infos):
+ def __init__(self, status: int, stream_infos: Sequence[_LttngLiveViewerStreamInfo]):
self._status = status
self._stream_infos = stream_infos
pass
-class _LttngLiveViewerCreateViewerSessionReply:
+class _LttngLiveViewerCreateViewerSessionReply(_LttngLiveViewerReply):
class Status:
OK = 1
ERROR = 2
- def __init__(self, status):
+ def __init__(self, status: int):
self._status = status
@property
class _LttngLiveViewerDetachFromTracingSessionCommand(_LttngLiveViewerCommand):
- def __init__(self, version, tracing_session_id):
+ def __init__(self, version: int, tracing_session_id: int):
super().__init__(version)
self._tracing_session_id = tracing_session_id
return self._tracing_session_id
-class _LttngLiveViewerDetachFromTracingSessionReply:
+class _LttngLiveViewerDetachFromTracingSessionReply(_LttngLiveViewerReply):
class Status:
OK = 1
UNKNOWN = 2
ERROR = 3
- def __init__(self, status):
+ def __init__(self, status: int):
self._status = status
@property
def __init__(self):
pass
- def _unpack(self, fmt, data, offset=0):
+ def _unpack(self, fmt: str, data: bytes, offset: int = 0):
fmt = "!" + fmt
return struct.unpack_from(fmt, data, offset)
- def _unpack_payload(self, fmt, data):
+ def _unpack_payload(self, fmt: str, data: bytes):
return self._unpack(
fmt, data, _LttngLiveViewerProtocolCodec._COMMAND_HEADER_SIZE_BYTES
)
- def decode(self, data):
+ def decode(self, data: bytes):
if len(data) < self._COMMAND_HEADER_SIZE_BYTES:
# Not enough data to read the command header
return
return
if cmd_type == 1:
- viewer_session_id, major, minor, conn_type = self._unpack_payload(
- "QIII", data
- )
+ viewer_session_id, major, minor, _ = self._unpack_payload("QIII", data)
return _LttngLiveViewerConnectCommand(
version, viewer_session_id, major, minor
)
version, tracing_session_id
)
else:
- raise UnexpectedInput("Unknown command type {}".format(cmd_type))
+ raise RuntimeError("Unknown command type {}".format(cmd_type))
- def _pack(self, fmt, *args):
+ def _pack(self, fmt: str, *args: Any):
# Force network byte order
return struct.pack("!" + fmt, *args)
- def _encode_zero_padded_str(self, string, length):
+ def _encode_zero_padded_str(self, string: str, length: int):
data = string.encode()
return data.ljust(length, b"\x00")
- def _encode_stream_info(self, info):
+ def _encode_stream_info(self, info: _LttngLiveViewerStreamInfo):
data = self._pack("QQI", info.id, info.trace_id, int(info.is_metadata))
data += self._encode_zero_padded_str(info.path, 4096)
data += self._encode_zero_padded_str(info.channel_name, 255)
return data
- def _get_has_new_stuff_flags(self, has_new_metadata, has_new_data_streams):
+ def _get_has_new_stuff_flags(
+ self, has_new_metadata: bool, has_new_data_streams: bool
+ ):
flags = 0
if has_new_metadata:
return flags
- def encode(self, reply):
+ def encode(
+ self,
+ reply: _LttngLiveViewerReply,
+ ) -> bytes:
if type(reply) is _LttngLiveViewerConnectReply:
data = self._pack(
"QIII", reply.viewer_session_id, reply.major, reply.minor, 2
reply.has_new_metadata, reply.has_new_data_stream
)
- if type(entry) is _LttngDataStreamIndexEntry:
+ if isinstance(entry, _LttngDataStreamIndexEntry):
data = self._pack(
index_format,
entry.offset_bytes,
flags,
)
else:
- assert type(entry) is _LttngDataStreamBeaconEntry
data = self._pack(
index_format,
0,
return data
-# An entry within the index of an LTTng data stream.
-class _LttngDataStreamIndexEntry:
- def __init__(
- self,
- offset_bytes,
- total_size_bits,
- content_size_bits,
- timestamp_begin,
- timestamp_end,
- events_discarded,
- stream_class_id,
- ):
- self._offset_bytes = offset_bytes
- self._total_size_bits = total_size_bits
- self._content_size_bits = content_size_bits
- self._timestamp_begin = timestamp_begin
- self._timestamp_end = timestamp_end
- self._events_discarded = events_discarded
- self._stream_class_id = stream_class_id
-
- @property
- def offset_bytes(self):
- return self._offset_bytes
-
- @property
- def total_size_bits(self):
- return self._total_size_bits
-
- @property
- def total_size_bytes(self):
- return self._total_size_bits // 8
-
- @property
- def content_size_bits(self):
- return self._content_size_bits
-
- @property
- def content_size_bytes(self):
- return self._content_size_bits // 8
-
- @property
- def timestamp_begin(self):
- return self._timestamp_begin
-
- @property
- def timestamp_end(self):
- return self._timestamp_end
-
- @property
- def events_discarded(self):
- return self._events_discarded
-
- @property
- def stream_class_id(self):
- return self._stream_class_id
-
-
-# An entry within the index of an LTTng data stream. While a stream beacon entry
-# is conceptually unrelated to an index, it is sent as a reply to a
-# LttngLiveViewerGetNextDataStreamIndexEntryCommand
-class _LttngDataStreamBeaconEntry:
- def __init__(self, stream_class_id, timestamp):
- self._stream_class_id = stream_class_id
- self._timestamp = timestamp
-
- @property
- def timestamp(self):
- return self._timestamp
-
- @property
- def stream_class_id(self):
- return self._stream_class_id
-
-
-def _get_entry_timestamp_begin(entry):
- if type(entry) is _LttngDataStreamBeaconEntry:
+def _get_entry_timestamp_begin(
+ entry: _LttngIndexEntryT,
+):
+ if isinstance(entry, _LttngDataStreamBeaconIndexEntry):
return entry.timestamp
else:
- assert type(entry) is _LttngDataStreamIndexEntry
return entry.timestamp_begin
# The index of an LTTng data stream, a sequence of index entries.
-class _LttngDataStreamIndex(collections.abc.Sequence):
- def __init__(self, path, beacons):
+class _LttngDataStreamIndex(Sequence[_LttngIndexEntryT]):
+ def __init__(self, path: str, beacons: Optional[tjson.ArrayVal]):
self._path = path
self._build()
if beacons:
stream_class_id = self._entries[0].stream_class_id
- beacons = [
- _LttngDataStreamBeaconEntry(stream_class_id, ts) for ts in beacons
- ]
- self._add_beacons(beacons)
+
+ beacons_list = [] # type: list[_LttngDataStreamBeaconIndexEntry]
+ for ts in beacons.iter(tjson.IntVal):
+ beacons_list.append(
+ _LttngDataStreamBeaconIndexEntry(stream_class_id, ts.val)
+ )
+
+ self._add_beacons(beacons_list)
logging.info(
'Built data stream index entries: path="{}", count={}'.format(
)
def _build(self):
- self._entries = []
- assert os.path.isfile(self._path)
+ self._entries = [] # type: list[_LttngIndexEntryT]
with open(self._path, "rb") as f:
# Read header first
size = struct.calcsize(fmt)
data = f.read(size)
assert len(data) == size
- magic, index_major, index_minor, index_entry_length = struct.unpack(
- fmt, data
- )
+ magic, _, _, index_entry_length = struct.unpack(fmt, data)
assert magic == 0xC1F1DCC1
# Read index entries
# Skip anything else before the next entry
f.seek(index_entry_length - size, os.SEEK_CUR)
- def _add_beacons(self, beacons):
+ def _add_beacons(self, beacons: Iterable[_LttngDataStreamBeaconIndexEntry]):
# Assumes entries[n + 1].timestamp_end >= entries[n].timestamp_begin
- def sort_key(entry):
- if type(entry) is _LttngDataStreamBeaconEntry:
+ def sort_key(
+ entry: Union[_LttngDataStreamIndexEntry, _LttngDataStreamBeaconIndexEntry],
+ ) -> int:
+ if isinstance(entry, _LttngDataStreamBeaconIndexEntry):
return entry.timestamp
else:
return entry.timestamp_end
self._entries += beacons
self._entries.sort(key=sort_key)
- def __getitem__(self, index):
+ @overload
+ def __getitem__(self, index: int) -> _LttngIndexEntryT:
+ ...
+
+ @overload
+ def __getitem__(self, index: slice) -> Sequence[_LttngIndexEntryT]: # noqa: F811
+ ...
+
+ def __getitem__( # noqa: F811
+ self, index: Union[int, slice]
+ ) -> Union[_LttngIndexEntryT, Sequence[_LttngIndexEntryT],]:
return self._entries[index]
def __len__(self):
return self._path
+# Any LTTng stream (metadata or data).
+class _LttngStream(ABC):
+ @abstractmethod
+ def __init__(self, creation_timestamp: int):
+ self._creation_timestamp: int = creation_timestamp
+
+ @property
+ def creation_timestamp(self):
+ return self._creation_timestamp
+
+
# An LTTng data stream.
-class _LttngDataStream:
- def __init__(self, path, beacons):
+class _LttngDataStream(_LttngStream):
+ def __init__(
+ self, path: str, beacons_json: Optional[tjson.ArrayVal], creation_timestamp: int
+ ):
+ super().__init__(creation_timestamp)
self._path = path
filename = os.path.basename(path)
match = re.match(r"(.*)_\d+", filename)
+ if not match:
+ raise RuntimeError(
+ "Unexpected data stream file name pattern: {}".format(filename)
+ )
+
self._channel_name = match.group(1)
trace_dir = os.path.dirname(path)
index_path = os.path.join(trace_dir, "index", filename + ".idx")
- self._index = _LttngDataStreamIndex(index_path, beacons)
+ self._index = _LttngDataStreamIndex(index_path, beacons_json)
assert os.path.isfile(path)
self._file = open(path, "rb")
logging.info(
def index(self):
return self._index
- def get_data(self, offset_bytes, len_bytes):
+ def get_data(self, offset_bytes: int, len_bytes: int):
self._file.seek(offset_bytes)
return self._file.read(len_bytes)
class _LttngMetadataStreamSection:
- def __init__(self, timestamp, data):
+ def __init__(self, timestamp: int, data: Optional[bytes]):
self._timestamp = timestamp
if data is None:
self._data = bytes()
# An LTTng metadata stream.
-class _LttngMetadataStream:
- def __init__(self, metadata_file_path, config_sections):
+class _LttngMetadataStream(_LttngStream):
+ def __init__(
+ self,
+ metadata_file_path: str,
+ config_sections: Sequence[_LttngMetadataStreamSection],
+ creation_timestamp: int,
+ ):
+ super().__init__(creation_timestamp)
self._path = metadata_file_path
self._sections = config_sections
logging.info(
return self._sections
-LttngMetadataConfigSection = namedtuple(
- "LttngMetadataConfigSection", ["line", "timestamp", "is_empty"]
-)
+class LttngMetadataConfigSection:
+ def __init__(self, line: int, timestamp: int, is_empty: bool):
+ self._line = line
+ self._timestamp = timestamp
+ self._is_empty = is_empty
+
+ @property
+ def line(self):
+ return self._line
+
+ @property
+ def timestamp(self):
+ return self._timestamp
+
+ @property
+ def is_empty(self):
+ return self._is_empty
-def _parse_metadata_sections_config(config_sections):
- assert config_sections is not None
- config_metadata_sections = []
+def _parse_metadata_sections_config(metadata_sections_json: tjson.ArrayVal):
+ metadata_sections = [] # type: list[LttngMetadataConfigSection]
append_empty_section = False
last_timestamp = 0
last_line = 0
- for config_section in config_sections:
- if config_section == "empty":
- # Found a empty section marker. Actually append the section at the
- # timestamp of the next concrete section.
- append_empty_section = True
- else:
- assert type(config_section) is dict
- line = config_section.get("line")
- ts = config_section.get("timestamp")
+ for section in metadata_sections_json:
+ if isinstance(section, tjson.StrVal):
+ if section.val == "empty":
+ # Found an empty section marker. Actually append the
+ # section at the timestamp of the next concrete section.
+ append_empty_section = True
+ else:
+ raise ValueError("Invalid string value at {}.".format(section.path))
+ elif isinstance(section, tjson.ObjVal):
+ line = section.at("line", tjson.IntVal).val
+ ts = section.at("timestamp", tjson.IntVal).val
# Sections' timestamps and lines must both be increasing.
assert ts > last_timestamp
last_timestamp = ts
+
assert line > last_line
last_line = line
if append_empty_section:
- config_metadata_sections.append(
- LttngMetadataConfigSection(line, ts, True)
- )
+ metadata_sections.append(LttngMetadataConfigSection(line, ts, True))
append_empty_section = False
- config_metadata_sections.append(LttngMetadataConfigSection(line, ts, False))
-
- return config_metadata_sections
+ metadata_sections.append(LttngMetadataConfigSection(line, ts, False))
+ else:
+ raise TypeError(
+ "`{}`: expecting a string or object value".format(section.path)
+ )
+ return metadata_sections
-def _split_metadata_sections(metadata_file_path, raw_config_sections):
- assert isinstance(raw_config_sections, collections.abc.Sequence)
- parsed_sections = _parse_metadata_sections_config(raw_config_sections)
+def _split_metadata_sections(
+ metadata_file_path: str, metadata_sections_json: tjson.ArrayVal
+):
+ metadata_sections = _parse_metadata_sections_config(metadata_sections_json)
- sections = []
+ sections = [] # type: list[_LttngMetadataStreamSection]
with open(metadata_file_path, "r") as metadata_file:
metadata_lines = [line for line in metadata_file]
- config_metadata_sections_idx = 0
+ metadata_section_idx = 0
curr_metadata_section = bytearray()
for idx, line_content in enumerate(metadata_lines):
curr_line_number = idx + 1
# If there are no more sections, simply append the line.
- if config_metadata_sections_idx + 1 >= len(parsed_sections):
+ if metadata_section_idx + 1 >= len(metadata_sections):
curr_metadata_section += bytearray(line_content, "utf8")
continue
- next_section_line_number = parsed_sections[
- config_metadata_sections_idx + 1
- ].line
+ next_section_line_number = metadata_sections[metadata_section_idx + 1].line
# If the next section begins at the current line, create a
# section with the metadata we gathered so far.
if curr_line_number >= next_section_line_number:
-
# Flushing the metadata of the current section.
sections.append(
_LttngMetadataStreamSection(
- parsed_sections[config_metadata_sections_idx].timestamp,
+ metadata_sections[metadata_section_idx].timestamp,
bytes(curr_metadata_section),
)
)
# Move to the next section.
- config_metadata_sections_idx += 1
+ metadata_section_idx += 1
# Clear old content and append current line for the next section.
curr_metadata_section.clear()
curr_metadata_section += bytearray(line_content, "utf8")
# Append any empty sections.
- while parsed_sections[config_metadata_sections_idx].is_empty:
+ while metadata_sections[metadata_section_idx].is_empty:
sections.append(
_LttngMetadataStreamSection(
- parsed_sections[config_metadata_sections_idx].timestamp, None
+ metadata_sections[metadata_section_idx].timestamp, None
)
)
- config_metadata_sections_idx += 1
+ metadata_section_idx += 1
else:
# Append line_content to the current metadata section.
curr_metadata_section += bytearray(line_content, "utf8")
# We iterated over all the lines of the metadata file. Close the current section.
sections.append(
_LttngMetadataStreamSection(
- parsed_sections[config_metadata_sections_idx].timestamp,
+ metadata_sections[metadata_section_idx].timestamp,
bytes(curr_metadata_section),
)
)
return sections
+_StreamBeaconsT = Dict[str, Iterable[int]]
+
+
# An LTTng trace, a sequence of LTTng data streams.
-class LttngTrace(collections.abc.Sequence):
- def __init__(self, trace_dir, metadata_sections, beacons):
- assert os.path.isdir(trace_dir)
+class LttngTrace(Sequence[_LttngDataStream]):
+ def __init__(
+ self,
+ trace_dir: str,
+ metadata_sections_json: Optional[tjson.ArrayVal],
+ beacons_json: Optional[tjson.ObjVal],
+ creation_timestamp: int,
+ ):
self._path = trace_dir
- self._create_metadata_stream(trace_dir, metadata_sections)
- self._create_data_streams(trace_dir, beacons)
+ self._creation_timestamp = creation_timestamp
+ self._create_metadata_stream(trace_dir, metadata_sections_json)
+ self._create_data_streams(trace_dir, beacons_json)
logging.info('Built trace: path="{}"'.format(trace_dir))
- def _create_data_streams(self, trace_dir, beacons):
- data_stream_paths = []
+ def _create_data_streams(
+ self, trace_dir: str, beacons_json: Optional[tjson.ObjVal]
+ ):
+ data_stream_paths = [] # type: list[str]
for filename in os.listdir(trace_dir):
path = os.path.join(trace_dir, filename)
data_stream_paths.append(path)
data_stream_paths.sort()
- self._data_streams = []
+ self._data_streams = [] # type: list[_LttngDataStream]
for data_stream_path in data_stream_paths:
stream_name = os.path.basename(data_stream_path)
- this_stream_beacons = None
-
- if beacons is not None and stream_name in beacons:
- this_stream_beacons = beacons[stream_name]
+ this_beacons_json = None
+ if beacons_json is not None and stream_name in beacons_json:
+ this_beacons_json = beacons_json.at(stream_name, tjson.ArrayVal)
self._data_streams.append(
- _LttngDataStream(data_stream_path, this_stream_beacons)
+ _LttngDataStream(
+ data_stream_path, this_beacons_json, self._creation_timestamp
+ )
)
- def _create_metadata_stream(self, trace_dir, config_metadata_sections):
+ def _create_metadata_stream(
+ self, trace_dir: str, metadata_sections_json: Optional[tjson.ArrayVal]
+ ):
metadata_path = os.path.join(trace_dir, "metadata")
- metadata_sections = []
+ metadata_sections = [] # type: list[_LttngMetadataStreamSection]
- if config_metadata_sections is None:
+ if metadata_sections_json is None:
with open(metadata_path, "rb") as metadata_file:
metadata_sections.append(
_LttngMetadataStreamSection(0, metadata_file.read())
)
else:
metadata_sections = _split_metadata_sections(
- metadata_path, config_metadata_sections
+ metadata_path, metadata_sections_json
)
- self._metadata_stream = _LttngMetadataStream(metadata_path, metadata_sections)
+ self._metadata_stream = _LttngMetadataStream(
+ metadata_path, metadata_sections, self.creation_timestamp
+ )
@property
def path(self):
def metadata_stream(self):
return self._metadata_stream
- def __getitem__(self, index):
+ @property
+ def creation_timestamp(self):
+ return self._creation_timestamp
+
+ @overload
+ def __getitem__(self, index: int) -> _LttngDataStream:
+ ...
+
+ @overload
+ def __getitem__(self, index: slice) -> Sequence[_LttngDataStream]: # noqa: F811
+ ...
+
+ def __getitem__( # noqa: F811
+ self, index: Union[int, slice]
+ ) -> Union[_LttngDataStream, Sequence[_LttngDataStream]]:
return self._data_streams[index]
def __len__(self):
return len(self._data_streams)
+# Stream (metadata or data) state specific to the LTTng live protocol.
+class _LttngLiveViewerSessionStreamState:
+ @abstractmethod
+ def __init__(self):
+ # A stream is considered "announced" when it has been returned
+ # to the LTTng live client in response to a "get new stream
+ # infos" (`_LttngLiveViewerGetNewStreamInfosCommand`) command.
+ self._announced = False # type: bool
+ pass
+
+ @property
+ def is_announced(self):
+ return self._announced
+
+ def mark_as_announced(self):
+ self._announced = True
+
+ @property
+ @abstractmethod
+ def stream(self) -> _LttngStream:
+ pass
+
+
# The state of a single data stream.
-class _LttngLiveViewerSessionDataStreamState:
- def __init__(self, ts_state, info, data_stream, metadata_stream_id):
+class _LttngLiveViewerSessionDataStreamState(_LttngLiveViewerSessionStreamState):
+ def __init__(
+ self,
+ ts_state: "_LttngLiveViewerSessionTracingSessionState",
+ info: _LttngLiveViewerStreamInfo,
+ data_stream: _LttngDataStream,
+ metadata_stream_id: int,
+ ):
+ super().__init__()
self._ts_state = ts_state
self._info = info
self._data_stream = data_stream
return self._info
@property
- def data_stream(self):
+ def stream(self):
return self._data_stream
@property
return self._data_stream.index[self._cur_index_entry_index]
+ @property
+ def metadata_stream_id(self):
+ return self._metadata_stream_id
+
def goto_next_index_entry(self):
self._cur_index_entry_index += 1
# The state of a single metadata stream.
-class _LttngLiveViewerSessionMetadataStreamState:
- def __init__(self, ts_state, info, metadata_stream):
+class _LttngLiveViewerSessionMetadataStreamState(_LttngLiveViewerSessionStreamState):
+ def __init__(
+ self,
+ ts_state: "_LttngLiveViewerSessionTracingSessionState",
+ info: _LttngLiveViewerStreamInfo,
+ metadata_stream: _LttngMetadataStream,
+ ):
+ super().__init__()
self._ts_state = ts_state
self._info = info
self._metadata_stream = metadata_stream
else:
self._next_metadata_stream_section_timestamp = None
- self._is_sent = False
+ self._all_data_is_sent = False
fmt = 'Built metadata stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
logging.info(
fmt.format(
)
)
- @property
- def trace_session_state(self):
- return self._trace_session_state
-
@property
def info(self):
return self._info
@property
- def metadata_stream(self):
+ def stream(self):
return self._metadata_stream
@property
- def is_sent(self):
- return self._is_sent
+ def all_data_is_sent(self):
+ return self._all_data_is_sent
- @is_sent.setter
- def is_sent(self, value):
- self._is_sent = value
+ @all_data_is_sent.setter
+ def all_data_is_sent(self, value: bool):
+ self._all_data_is_sent = value
@property
def cur_section(self):
return self._next_metadata_stream_section_timestamp
+# A tracing session descriptor.
+#
+# In the constructor, `traces` is a list of LTTng traces (`LttngTrace`
+# objects).
+class LttngTracingSessionDescriptor:
+ def __init__(
+ self,
+ name: str,
+ tracing_session_id: int,
+ hostname: str,
+ live_timer_freq: int,
+ client_count: int,
+ traces: Iterable[LttngTrace],
+ ):
+ for trace in traces:
+ if name not in trace.path:
+ fmt = "Tracing session name must be part of every trace path (`{}` not found in `{}`)"
+ raise ValueError(fmt.format(name, trace.path))
+
+ self._traces = traces
+ stream_count = sum([len(t) + 1 for t in traces])
+ self._info = _LttngLiveViewerTracingSessionInfo(
+ tracing_session_id,
+ live_timer_freq,
+ client_count,
+ stream_count,
+ hostname,
+ name,
+ )
+
+ @property
+ def traces(self):
+ return self._traces
+
+ @property
+ def info(self):
+ return self._info
+
+
# The state of a tracing session.
class _LttngLiveViewerSessionTracingSessionState:
- def __init__(self, tc_descr, base_stream_id):
+ def __init__(self, tc_descr: LttngTracingSessionDescriptor, base_stream_id: int):
self._tc_descr = tc_descr
- self._stream_infos = []
- self._ds_states = {}
- self._ms_states = {}
- stream_id = base_stream_id
+ self._client_visible_stream_infos = [] # type: list[_LttngLiveViewerStreamInfo]
+ self._ds_states = {} # type: dict[int, _LttngLiveViewerSessionDataStreamState]
+ self._ms_states = (
+ {}
+ ) # type: dict[int, _LttngLiveViewerSessionMetadataStreamState]
+ self._last_delivered_index_timestamp = 0
+ self._last_allocated_stream_id = base_stream_id
for trace in tc_descr.traces:
- trace_id = stream_id * 1000
+ trace_stream_infos = [] # type: list[_LttngLiveViewerStreamInfo]
+ trace_id = self._last_allocated_stream_id * 1000
# Metadata stream -> stream info and metadata stream state
info = _LttngLiveViewerStreamInfo(
- stream_id, trace_id, True, trace.metadata_stream.path, "metadata"
+ self._last_allocated_stream_id,
+ trace_id,
+ True,
+ trace.metadata_stream.path,
+ "metadata",
)
- self._stream_infos.append(info)
- self._ms_states[stream_id] = _LttngLiveViewerSessionMetadataStreamState(
+
+ trace_stream_infos.append(info)
+ self._ms_states[
+ self._last_allocated_stream_id
+ ] = _LttngLiveViewerSessionMetadataStreamState(
self, info, trace.metadata_stream
)
- metadata_stream_id = stream_id
- stream_id += 1
+ metadata_stream_id = self._last_allocated_stream_id
+ self._last_allocated_stream_id += 1
# Data streams -> stream infos and data stream states
for data_stream in trace:
info = _LttngLiveViewerStreamInfo(
- stream_id,
+ self._last_allocated_stream_id,
trace_id,
False,
data_stream.path,
data_stream.channel_name,
)
- self._stream_infos.append(info)
- self._ds_states[stream_id] = _LttngLiveViewerSessionDataStreamState(
+ trace_stream_infos.append(info)
+ self._ds_states[
+ self._last_allocated_stream_id
+ ] = _LttngLiveViewerSessionDataStreamState(
self, info, data_stream, metadata_stream_id
)
- stream_id += 1
+ self._last_allocated_stream_id += 1
+
+ if trace.creation_timestamp == 0:
+ # Only announce streams for traces that are created at
+ # the origin.
+ #
+ # The rest of the streams will be discovered by the
+ # client as indexes are received with a "has new data
+ # streams" flag set in the reply.
+ self._client_visible_stream_infos.extend(trace_stream_infos)
self._is_attached = False
fmt = 'Built tracing session state: id={}, name="{}"'
return self._ms_states
@property
- def stream_infos(self):
- return self._stream_infos
+ def client_visible_stream_infos(self):
+ return self._client_visible_stream_infos
@property
def has_new_metadata(self):
- return any([not ms.is_sent for ms in self._ms_states.values()])
+ return any(
+ [
+ ms.is_announced and not ms.all_data_is_sent
+ for ms in self._ms_states.values()
+ ]
+ )
@property
def is_attached(self):
return self._is_attached
@is_attached.setter
- def is_attached(self, value):
+ def is_attached(self, value: bool):
self._is_attached = value
-def needs_new_metadata_section(metadata_stream_state, latest_timestamp):
+def needs_new_metadata_section(
+ metadata_stream_state: _LttngLiveViewerSessionMetadataStreamState,
+ latest_timestamp: int,
+):
if metadata_stream_state.next_section_timestamp is None:
return False
class _LttngLiveViewerSession:
def __init__(
self,
- viewer_session_id,
- tracing_session_descriptors,
- max_query_data_response_size,
+ viewer_session_id: int,
+ tracing_session_descriptors: Iterable[LttngTracingSessionDescriptor],
+ max_query_data_response_size: Optional[int],
):
self._viewer_session_id = viewer_session_id
- self._ts_states = {}
- self._stream_states = {}
+ self._ts_states = (
+ {}
+ ) # type: dict[int, _LttngLiveViewerSessionTracingSessionState]
+ self._stream_states = (
+ {}
+ ) # type: dict[int, _LttngLiveViewerSessionDataStreamState | _LttngLiveViewerSessionMetadataStreamState]
self._max_query_data_response_size = max_query_data_response_size
total_stream_infos = 0
)
ts_id = ts_state.tracing_session_descriptor.info.tracing_session_id
self._ts_states[ts_id] = ts_state
- total_stream_infos += len(ts_state.stream_infos)
+ total_stream_infos += len(ts_state.client_visible_stream_infos)
# Update session's stream states to have the new states
self._stream_states.update(ts_state.data_stream_states)
_LttngLiveViewerGetNewStreamInfosCommand: self._handle_get_new_stream_infos_command,
_LttngLiveViewerGetNextDataStreamIndexEntryCommand: self._handle_get_next_data_stream_index_entry_command,
_LttngLiveViewerGetTracingSessionInfosCommand: self._handle_get_tracing_session_infos_command,
- }
+ } # type: dict[type[_LttngLiveViewerCommand], Callable[[Any], _LttngLiveViewerReply]]
@property
def viewer_session_id(self):
return self._viewer_session_id
- def _get_tracing_session_state(self, tracing_session_id):
+ def _get_tracing_session_state(self, tracing_session_id: int):
if tracing_session_id not in self._ts_states:
- raise UnexpectedInput(
+ raise RuntimeError(
"Unknown tracing session ID {}".format(tracing_session_id)
)
return self._ts_states[tracing_session_id]
- def _get_stream_state(self, stream_id):
+ def _get_data_stream_state(self, stream_id: int):
if stream_id not in self._stream_states:
- UnexpectedInput("Unknown stream ID {}".format(stream_id))
+ RuntimeError("Unknown stream ID {}".format(stream_id))
+
+ stream = self._stream_states[stream_id]
+ if type(stream) is not _LttngLiveViewerSessionDataStreamState:
+ raise RuntimeError("Stream is not a data stream")
+
+ return stream
+
+ def _get_metadata_stream_state(self, stream_id: int):
+ if stream_id not in self._stream_states:
+ RuntimeError("Unknown stream ID {}".format(stream_id))
+
+ stream = self._stream_states[stream_id]
+ if type(stream) is not _LttngLiveViewerSessionMetadataStreamState:
+ raise RuntimeError("Stream is not a metadata stream")
- return self._stream_states[stream_id]
+ return stream
- def handle_command(self, cmd):
+ def handle_command(self, cmd: _LttngLiveViewerCommand):
logging.info(
"Handling command in viewer session: cmd-cls-name={}".format(
cmd.__class__.__name__
cmd_type = type(cmd)
if cmd_type not in self._command_handlers:
- raise UnexpectedInput(
+ raise RuntimeError(
"Unexpected command: cmd-cls-name={}".format(cmd.__class__.__name__)
)
return self._command_handlers[cmd_type](cmd)
- def _handle_attach_to_tracing_session_command(self, cmd):
+ def _handle_attach_to_tracing_session_command(
+ self, cmd: _LttngLiveViewerAttachToTracingSessionCommand
+ ):
fmt = 'Handling "attach to tracing session" command: ts-id={}, offset={}, seek-type={}'
logging.info(fmt.format(cmd.tracing_session_id, cmd.offset, cmd.seek_type))
ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
info = ts_state.tracing_session_descriptor.info
if ts_state.is_attached:
- raise UnexpectedInput(
+ raise RuntimeError(
"Cannot attach to tracing session `{}`: viewer is already attached".format(
info.name
)
ts_state.is_attached = True
status = _LttngLiveViewerAttachToTracingSessionReply.Status.OK
+ stream_infos_to_announce = ts_state.client_visible_stream_infos
+
+ # Mark stream infos transmitted as part of the reply as
+ # announced.
+ for si in stream_infos_to_announce:
+ if si.is_metadata:
+ self._get_metadata_stream_state(si.id).mark_as_announced()
+ else:
+ self._get_data_stream_state(si.id).mark_as_announced()
+
return _LttngLiveViewerAttachToTracingSessionReply(
- status, ts_state.stream_infos
+ status, stream_infos_to_announce
)
- def _handle_detach_from_tracing_session_command(self, cmd):
+ def _handle_detach_from_tracing_session_command(
+ self, cmd: _LttngLiveViewerDetachFromTracingSessionCommand
+ ):
fmt = 'Handling "detach from tracing session" command: ts-id={}'
logging.info(fmt.format(cmd.tracing_session_id))
ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
info = ts_state.tracing_session_descriptor.info
if not ts_state.is_attached:
- raise UnexpectedInput(
+ raise RuntimeError(
"Cannot detach to tracing session `{}`: viewer is not attached".format(
info.name
)
status = _LttngLiveViewerDetachFromTracingSessionReply.Status.OK
return _LttngLiveViewerDetachFromTracingSessionReply(status)
- def _handle_get_next_data_stream_index_entry_command(self, cmd):
+ @staticmethod
+ def _stream_is_ready(
+ stream_state: _LttngLiveViewerSessionStreamState, creation_timestamp: int
+ ):
+ return (
+ not stream_state.is_announced
+ and stream_state.stream.creation_timestamp <= creation_timestamp
+ )
+
+ def _needs_new_streams(self, current_timestamp: int):
+ return any(
+ self._stream_is_ready(ss, current_timestamp)
+ for ss in self._stream_states.values()
+ )
+
+ def _handle_get_next_data_stream_index_entry_command(
+ self, cmd: _LttngLiveViewerGetNextDataStreamIndexEntryCommand
+ ):
fmt = 'Handling "get next data stream index entry" command: stream-id={}'
logging.info(fmt.format(cmd.stream_id))
- stream_state = self._get_stream_state(cmd.stream_id)
- metadata_stream_state = self._get_stream_state(stream_state._metadata_stream_id)
-
- if type(stream_state) is not _LttngLiveViewerSessionDataStreamState:
- raise UnexpectedInput(
- "Stream with ID {} is not a data stream".format(cmd.stream_id)
- )
+ stream_state = self._get_data_stream_state(cmd.stream_id)
+ metadata_stream_state = self._get_metadata_stream_state(
+ stream_state.metadata_stream_id
+ )
if stream_state.cur_index_entry is None:
# The viewer is done reading this stream
timestamp_begin = _get_entry_timestamp_begin(stream_state.cur_index_entry)
if needs_new_metadata_section(metadata_stream_state, timestamp_begin):
- metadata_stream_state.is_sent = False
+ metadata_stream_state.all_data_is_sent = False
metadata_stream_state.goto_next_section()
# The viewer only checks the `has_new_metadata` flag if the
# reply's status is `OK`, so we need to provide an index here
has_new_metadata = stream_state.tracing_session_state.has_new_metadata
- if type(stream_state.cur_index_entry) is _LttngDataStreamIndexEntry:
+ if isinstance(stream_state.cur_index_entry, _LttngDataStreamIndexEntry):
status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.OK
else:
- assert type(stream_state.cur_index_entry) is _LttngDataStreamBeaconEntry
status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.INACTIVE
reply = _LttngLiveViewerGetNextDataStreamIndexEntryReply(
- status, stream_state.cur_index_entry, has_new_metadata, False
+ status,
+ stream_state.cur_index_entry,
+ has_new_metadata,
+ self._needs_new_streams(timestamp_begin),
)
+ self._last_delivered_index_timestamp_begin = timestamp_begin
stream_state.goto_next_index_entry()
return reply
- def _handle_get_data_stream_packet_data_command(self, cmd):
+ def _handle_get_data_stream_packet_data_command(
+ self, cmd: _LttngLiveViewerGetDataStreamPacketDataCommand
+ ):
fmt = 'Handling "get data stream packet data" command: stream-id={}, offset={}, req-length={}'
logging.info(fmt.format(cmd.stream_id, cmd.offset, cmd.req_length))
- stream_state = self._get_stream_state(cmd.stream_id)
+ stream_state = self._get_data_stream_state(cmd.stream_id)
data_response_length = cmd.req_length
- if type(stream_state) is not _LttngLiveViewerSessionDataStreamState:
- raise UnexpectedInput(
- "Stream with ID {} is not a data stream".format(cmd.stream_id)
- )
-
if stream_state.tracing_session_state.has_new_metadata:
status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.ERROR
return _LttngLiveViewerGetDataStreamPacketDataReply(
fmt = 'Limiting "get data stream packet data" command: req-length={} actual response size={}'
logging.info(fmt.format(cmd.req_length, data_response_length))
- data = stream_state.data_stream.get_data(cmd.offset, data_response_length)
+ data = stream_state.stream.get_data(cmd.offset, data_response_length)
status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.OK
return _LttngLiveViewerGetDataStreamPacketDataReply(status, data, False, False)
- def _handle_get_metadata_stream_data_command(self, cmd):
+ def _handle_get_metadata_stream_data_command(
+ self, cmd: _LttngLiveViewerGetMetadataStreamDataCommand
+ ):
fmt = 'Handling "get metadata stream data" command: stream-id={}'
logging.info(fmt.format(cmd.stream_id))
- metadata_stream_state = self._get_stream_state(cmd.stream_id)
+ metadata_stream_state = self._get_metadata_stream_state(cmd.stream_id)
- if (
- type(metadata_stream_state)
- is not _LttngLiveViewerSessionMetadataStreamState
- ):
- raise UnexpectedInput(
- "Stream with ID {} is not a metadata stream".format(cmd.stream_id)
- )
-
- if metadata_stream_state.is_sent:
+ if metadata_stream_state.all_data_is_sent:
status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.NO_NEW
return _LttngLiveViewerGetMetadataStreamDataContentReply(status, bytes())
- metadata_stream_state.is_sent = True
+ metadata_stream_state.all_data_is_sent = True
status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.OK
metadata_section = metadata_stream_state.cur_section
+ assert metadata_section is not None
# If we are sending an empty section, ready the next one right away.
if len(metadata_section.data) == 0:
- metadata_stream_state.is_sent = False
+ metadata_stream_state.all_data_is_sent = False
metadata_stream_state.goto_next_section()
fmt = 'Replying to "get metadata stream data" command: metadata-size={}'
status, metadata_section.data
)
- def _handle_get_new_stream_infos_command(self, cmd):
+ def _get_stream_infos_ready_for_announcement(self):
+ ready_stream_infos = [] # type: list[_LttngLiveViewerStreamInfo]
+
+ for ss in self._stream_states.values():
+ if self._stream_is_ready(ss, ss.stream.creation_timestamp):
+ ready_stream_infos.append(ss.info)
+
+ return ready_stream_infos
+
+ # A stream is considered finished if it has been announced and all
+ # of its index entries have been provided to the client.
+ def _all_streams_finished(self):
+ return all(
+ isinstance(stream_state, _LttngLiveViewerSessionMetadataStreamState)
+ or (stream_state.cur_index_entry is None and stream_state.is_announced)
+ for stream_state in self._stream_states.values()
+ )
+
+ def _handle_get_new_stream_infos_command(
+ self, cmd: _LttngLiveViewerGetNewStreamInfosCommand
+ ):
fmt = 'Handling "get new stream infos" command: ts-id={}'
logging.info(fmt.format(cmd.tracing_session_id))
+ newly_announced_stream_infos = self._get_stream_infos_ready_for_announcement()
+
+ # Mark stream infos transmitted as part of the reply as
+ # announced.
+ for si in newly_announced_stream_infos:
+ if si.is_metadata:
+ self._get_metadata_stream_state(si.id).mark_as_announced()
+ else:
+ self._get_data_stream_state(si.id).mark_as_announced()
+
+ status = _LttngLiveViewerGetNewStreamInfosReply.Status.OK
+
+ if len(newly_announced_stream_infos) == 0:
+ # If all streams have been transmitted and no new traces are
+ # scheduled for creation, hang up to signal that the tracing
+ # session is "done".
+ status = (
+ _LttngLiveViewerGetNewStreamInfosReply.Status.HUP
+ if self._all_streams_finished()
+ else _LttngLiveViewerGetNewStreamInfosReply.Status.NO_NEW
+ )
- # As of this version, all the tracing session's stream infos are
- # always given to the viewer when sending the "attach to tracing
- # session" reply, so there's nothing new here. Return the `HUP`
- # status as, if we're handling this command, the viewer consumed
- # all the existing data streams.
- status = _LttngLiveViewerGetNewStreamInfosReply.Status.HUP
- return _LttngLiveViewerGetNewStreamInfosReply(status, [])
+ return _LttngLiveViewerGetNewStreamInfosReply(
+ status, newly_announced_stream_infos
+ )
- def _handle_get_tracing_session_infos_command(self, cmd):
+ def _handle_get_tracing_session_infos_command(
+ self, cmd: _LttngLiveViewerGetTracingSessionInfosCommand
+ ):
logging.info('Handling "get tracing session infos" command.')
infos = [
tss.tracing_session_descriptor.info for tss in self._ts_states.values()
infos.sort(key=lambda info: info.name)
return _LttngLiveViewerGetTracingSessionInfosReply(infos)
- def _handle_create_viewer_session_command(self, cmd):
+ def _handle_create_viewer_session_command(
+ self, cmd: _LttngLiveViewerCreateViewerSessionCommand
+ ):
logging.info('Handling "create viewer session" command.')
status = _LttngLiveViewerCreateViewerSessionReply.Status.OK
# An LTTng live TCP server.
#
-# On creation, it binds to `localhost` with an OS-assigned TCP port. It writes
-# the decimal TCP port number to a temporary port file. It renames the
-# temporary port file to `port_filename`.
+# On creation, it binds to `localhost` on the TCP port `port` if not `None`, or
+# on an OS-assigned TCP port otherwise. It writes the decimal TCP port number
+# to a temporary port file. It renames the temporary port file to
+# `port_filename`.
#
# `tracing_session_descriptors` is a list of tracing session descriptors
# (`LttngTracingSessionDescriptor`) to serve.
# returns.
class LttngLiveServer:
def __init__(
- self, port_filename, tracing_session_descriptors, max_query_data_response_size
+ self,
+ port: Optional[int],
+ port_filename: Optional[str],
+ tracing_session_descriptors: Iterable[LttngTracingSessionDescriptor],
+ max_query_data_response_size: Optional[int],
):
logging.info("Server configuration:")
self._codec = _LttngLiveViewerProtocolCodec()
# Port 0: OS assigns an unused port
- serv_addr = ("localhost", 0)
+ serv_addr = ("localhost", port if port is not None else 0)
self._sock.bind(serv_addr)
- self._write_port_to_file(port_filename)
+
+ if port_filename is not None:
+ self._write_port_to_file(port_filename)
+
+ print("Listening on port {}".format(self._server_port))
+
+ for ts_descr in tracing_session_descriptors:
+ info = ts_descr.info
+ print(
+ "net://localhost:{}/host/{}/{}".format(
+ self._server_port, info.hostname, info.name
+ )
+ )
try:
self._listen()
logging.info("Client closed connection.")
if data:
- raise UnexpectedInput(
+ raise RuntimeError(
"Client closed connection after having sent {} command bytes.".format(
len(data)
)
try:
cmd = self._codec.decode(data)
except struct.error as exc:
- raise UnexpectedInput("Malformed command: {}".format(exc)) from exc
+ raise RuntimeError("Malformed command: {}".format(exc)) from exc
if cmd is not None:
logging.info(
)
return cmd
- def _send_reply(self, reply):
+ def _send_reply(self, reply: _LttngLiveViewerReply):
data = self._codec.encode(reply)
logging.info(
"Sending reply to viewer: reply-cls-name={}, length={}".format(
cmd = self._recv_command()
if type(cmd) is not _LttngLiveViewerConnectCommand:
- raise UnexpectedInput(
+ raise RuntimeError(
'First command is not "connect": cmd-cls-name={}'.format(
cmd.__class__.__name__
)
finally:
self._conn.close()
- def _write_port_to_file(self, port_filename):
+ def _write_port_to_file(self, port_filename: str):
# Write the port number to a temporary file.
- with tempfile.NamedTemporaryFile(mode="w", delete=False) as tmp_port_file:
+ with tempfile.NamedTemporaryFile(
+ mode="w", delete=False, dir=os.path.dirname(port_filename)
+ ) as tmp_port_file:
print(self._server_port, end="", file=tmp_port_file)
- # Rename temporary file to real file
- os.replace(tmp_port_file.name, port_filename)
- logging.info(
- 'Renamed port file: src-path="{}", dst-path="{}"'.format(
- tmp_port_file.name, port_filename
- )
- )
-
-
-# A tracing session descriptor.
-#
-# In the constructor, `traces` is a list of LTTng traces (`LttngTrace`
-# objects).
-class LttngTracingSessionDescriptor:
- def __init__(
- self, name, tracing_session_id, hostname, live_timer_freq, client_count, traces
- ):
- for trace in traces:
- if name not in trace.path:
- fmt = "Tracing session name must be part of every trace path (`{}` not found in `{}`)"
- raise ValueError(fmt.format(name, trace.path))
-
- self._traces = traces
- stream_count = sum([len(t) + 1 for t in traces])
- self._info = _LttngLiveViewerTracingSessionInfo(
- tracing_session_id,
- live_timer_freq,
- client_count,
- stream_count,
- hostname,
- name,
- )
+ # Rename temporary file to real file.
+ #
+ # For unknown reasons, on Windows, moving the port file from its
+ # temporary location to its final location (where the user of
+ # the server expects it to appear) may raise a `PermissionError`
+ # exception.
+ #
+ # We suppose it's possible that something in the Windows kernel
+ # hasn't completely finished using the file when we try to move
+ # it.
+ #
+ # Use a wait-and-retry scheme as a (bad) workaround.
+ num_attempts = 5
+ retry_delay_s = 1
+
+ for attempt in reversed(range(num_attempts)):
+ try:
+ os.replace(tmp_port_file.name, port_filename)
+ logging.info(
+ 'Renamed port file: src-path="{}", dst-path="{}"'.format(
+ tmp_port_file.name, port_filename
+ )
+ )
+ return
+ except PermissionError:
+ logging.info(
+ 'Permission error while attempting to rename port file; retrying in {} second: src-path="{}", dst-path="{}"'.format(
+ retry_delay_s, tmp_port_file.name, port_filename
+ )
+ )
- @property
- def traces(self):
- return self._traces
+ if attempt == 0:
+ raise
- @property
- def info(self):
- return self._info
+ time.sleep(retry_delay_s)
-def _session_descriptors_from_path(sessions_filename, trace_path_prefix):
+def _session_descriptors_from_path(
+ sessions_filename: str, trace_path_prefix: Optional[str]
+):
# File format is:
#
# [
# "client-count": 23,
# "traces": [
# {
- # "path": "lol"
+ # "path": "lol",
+ # creation-timestamp: 12948
# },
# {
# "path": "meow/mix",
# }
# ]
with open(sessions_filename, "r") as sessions_file:
- params = json.load(sessions_file)
+ sessions_json = tjson.load(sessions_file, tjson.ArrayVal)
- sessions = []
+ sessions = [] # type: list[LttngTracingSessionDescriptor]
- for session in params:
- name = session["name"]
- tracing_session_id = session["id"]
- hostname = session["hostname"]
- live_timer_freq = session["live-timer-freq"]
- client_count = session["client-count"]
- traces = []
+ for session_json in sessions_json.iter(tjson.ObjVal):
+ name = session_json.at("name", tjson.StrVal).val
+ tracing_session_id = session_json.at("id", tjson.IntVal).val
+ hostname = session_json.at("hostname", tjson.StrVal).val
+ live_timer_freq = session_json.at("live-timer-freq", tjson.IntVal).val
+ client_count = session_json.at("client-count", tjson.IntVal).val
+ traces_json = session_json.at("traces", tjson.ArrayVal)
- for trace in session["traces"]:
- metadata_sections = trace.get("metadata-sections")
- beacons = trace.get("beacons")
- path = trace["path"]
+ traces = [] # type: list[LttngTrace]
- if not os.path.isabs(path):
+ for trace_json in traces_json.iter(tjson.ObjVal):
+ metadata_sections = (
+ trace_json.at("metadata-sections", tjson.ArrayVal)
+ if "metadata-sections" in trace_json
+ else None
+ )
+ beacons = (
+ trace_json.at("beacons", tjson.ObjVal)
+ if "beacons" in trace_json
+ else None
+ )
+ path = trace_json.at("path", tjson.StrVal).val
+ creation_timestamp = (
+ trace_json.at("creation-timestamp", tjson.IntVal).val
+ if "creation-timestamp" in trace_json
+ else 0
+ ) # type: int
+
+ if not os.path.isabs(path) and trace_path_prefix:
path = os.path.join(trace_path_prefix, path)
- traces.append(LttngTrace(path, metadata_sections, beacons))
+ traces.append(
+ LttngTrace(path, metadata_sections, beacons, creation_timestamp)
+ )
sessions.append(
LttngTracingSessionDescriptor(
return sessions
-def _loglevel_parser(string):
+def _loglevel_parser(string: str):
loglevels = {"info": logging.INFO, "warning": logging.WARNING}
if string not in loglevels:
msg = "{} is not a valid loglevel".format(string)
loglevel_namespace, remaining_args = parser.parse_known_args()
logging.getLogger().setLevel(_loglevel_parser(loglevel_namespace.log_level))
+ parser.add_argument(
+ "--port",
+ help="The port to bind to. If missing, use an OS-assigned port..",
+ type=int,
+ )
parser.add_argument(
"--port-filename",
help="The final port file. This file is present when the server is ready to receive connection.",
- required=True,
)
parser.add_argument(
"--max-query-data-response-size",
help="Prefix to prepend to the trace paths of session configurations",
)
parser.add_argument(
- "--sessions-filename",
+ "sessions_filename",
type=str,
help="Path to a session configuration file",
+ metavar="sessions-filename",
)
parser.add_argument(
"-h",
)
args = parser.parse_args(args=remaining_args)
- try:
- sessions = _session_descriptors_from_path(
- args.sessions_filename,
- args.trace_path_prefix,
- )
- LttngLiveServer(args.port_filename, sessions, args.max_query_data_response_size)
- except UnexpectedInput as exc:
- logging.error(str(exc))
- print(exc, file=sys.stderr)
- sys.exit(1)
+ sessions_filename = args.sessions_filename # type: str
+ trace_path_prefix = args.trace_path_prefix # type: str | None
+ sessions = _session_descriptors_from_path(
+ sessions_filename,
+ trace_path_prefix,
+ )
+
+ port = args.port # type: int | None
+ port_filename = args.port_filename # type: str | None
+ max_query_data_response_size = args.max_query_data_response_size # type: int | None
+ LttngLiveServer(port, port_filename, sessions, max_query_data_response_size)