import struct
import sys
import tempfile
+import json
+from collections import namedtuple
class UnexpectedInput(RuntimeError):
# An LTTng live protocol codec can convert bytes to command objects and
# reply objects to bytes.
class _LttngLiveViewerProtocolCodec:
- _COMMAND_HEADER_STRUCT_FMT = 'QII'
+ _COMMAND_HEADER_STRUCT_FMT = "QII"
_COMMAND_HEADER_SIZE_BYTES = struct.calcsize(_COMMAND_HEADER_STRUCT_FMT)
def __init__(self):
pass
def _unpack(self, fmt, data, offset=0):
- fmt = '!' + fmt
+ fmt = "!" + fmt
return struct.unpack_from(fmt, data, offset)
def _unpack_payload(self, fmt, data):
self._COMMAND_HEADER_STRUCT_FMT, data
)
logging.info(
- 'Decoded command header: payload-size={}, cmd-type={}, version={}'.format(
+ "Decoded command header: payload-size={}, cmd-type={}, version={}".format(
payload_size, cmd_type, version
)
)
if cmd_type == 1:
viewer_session_id, major, minor, conn_type = self._unpack_payload(
- 'QIII', data
+ "QIII", data
)
return _LttngLiveViewerConnectCommand(
version, viewer_session_id, major, minor
elif cmd_type == 2:
return _LttngLiveViewerGetTracingSessionInfosCommand(version)
elif cmd_type == 3:
- tracing_session_id, offset, seek_type = self._unpack_payload('QQI', data)
+ tracing_session_id, offset, seek_type = self._unpack_payload("QQI", data)
return _LttngLiveViewerAttachToTracingSessionCommand(
version, tracing_session_id, offset, seek_type
)
elif cmd_type == 4:
- (stream_id,) = self._unpack_payload('Q', data)
+ (stream_id,) = self._unpack_payload("Q", data)
return _LttngLiveViewerGetNextDataStreamIndexEntryCommand(
version, stream_id
)
elif cmd_type == 5:
- stream_id, offset, req_length = self._unpack_payload('QQI', data)
+ stream_id, offset, req_length = self._unpack_payload("QQI", data)
return _LttngLiveViewerGetDataStreamPacketDataCommand(
version, stream_id, offset, req_length
)
elif cmd_type == 6:
- (stream_id,) = self._unpack_payload('Q', data)
+ (stream_id,) = self._unpack_payload("Q", data)
return _LttngLiveViewerGetMetadataStreamDataCommand(version, stream_id)
elif cmd_type == 7:
- (tracing_session_id,) = self._unpack_payload('Q', data)
+ (tracing_session_id,) = self._unpack_payload("Q", data)
return _LttngLiveViewerGetNewStreamInfosCommand(version, tracing_session_id)
elif cmd_type == 8:
return _LttngLiveViewerCreateViewerSessionCommand(version)
elif cmd_type == 9:
- (tracing_session_id,) = self._unpack_payload('Q', data)
+ (tracing_session_id,) = self._unpack_payload("Q", data)
return _LttngLiveViewerDetachFromTracingSessionCommand(
version, tracing_session_id
)
else:
- raise UnexpectedInput('Unknown command type {}'.format(cmd_type))
+ raise UnexpectedInput("Unknown command type {}".format(cmd_type))
def _pack(self, fmt, *args):
# Force network byte order
- return struct.pack('!' + fmt, *args)
+ return struct.pack("!" + fmt, *args)
def _encode_zero_padded_str(self, string, length):
data = string.encode()
- return data.ljust(length, b'\x00')
+ return data.ljust(length, b"\x00")
def _encode_stream_info(self, info):
- data = self._pack('QQI', info.id, info.trace_id, int(info.is_metadata))
+ data = self._pack("QQI", info.id, info.trace_id, int(info.is_metadata))
data += self._encode_zero_padded_str(info.path, 4096)
data += self._encode_zero_padded_str(info.channel_name, 255)
return data
def encode(self, reply):
if type(reply) is _LttngLiveViewerConnectReply:
data = self._pack(
- 'QIII', reply.viewer_session_id, reply.major, reply.minor, 2
+ "QIII", reply.viewer_session_id, reply.major, reply.minor, 2
)
elif type(reply) is _LttngLiveViewerGetTracingSessionInfosReply:
- data = self._pack('I', len(reply.tracing_session_infos))
+ data = self._pack("I", len(reply.tracing_session_infos))
for info in reply.tracing_session_infos:
data += self._pack(
- 'QIII',
+ "QIII",
info.tracing_session_id,
info.live_timer_freq,
info.client_count,
data += self._encode_zero_padded_str(info.hostname, 64)
data += self._encode_zero_padded_str(info.name, 255)
elif type(reply) is _LttngLiveViewerAttachToTracingSessionReply:
- data = self._pack('II', reply.status, len(reply.stream_infos))
+ data = self._pack("II", reply.status, len(reply.stream_infos))
for info in reply.stream_infos:
data += self._encode_stream_info(info)
elif type(reply) is _LttngLiveViewerGetNextDataStreamIndexEntryReply:
+ index_format = "QQQQQQQII"
entry = reply.index_entry
flags = self._get_has_new_stuff_flags(
reply.has_new_metadata, reply.has_new_data_stream
)
- data = self._pack(
- 'QQQQQQQII',
- entry.offset_bytes,
- entry.total_size_bits,
- entry.content_size_bits,
- entry.timestamp_begin,
- entry.timestamp_end,
- entry.events_discarded,
- entry.stream_class_id,
- reply.status,
- flags,
- )
+ if type(entry) is _LttngDataStreamIndexEntry:
+ data = self._pack(
+ index_format,
+ entry.offset_bytes,
+ entry.total_size_bits,
+ entry.content_size_bits,
+ entry.timestamp_begin,
+ entry.timestamp_end,
+ entry.events_discarded,
+ entry.stream_class_id,
+ reply.status,
+ flags,
+ )
+ else:
+ assert type(entry) is _LttngDataStreamBeaconEntry
+ data = self._pack(
+ index_format,
+ 0,
+ 0,
+ 0,
+ 0,
+ entry.timestamp,
+ 0,
+ entry.stream_class_id,
+ reply.status,
+ flags,
+ )
elif type(reply) is _LttngLiveViewerGetDataStreamPacketDataReply:
flags = self._get_has_new_stuff_flags(
reply.has_new_metadata, reply.has_new_data_stream
)
- data = self._pack('III', reply.status, len(reply.data), flags)
+ data = self._pack("III", reply.status, len(reply.data), flags)
data += reply.data
elif type(reply) is _LttngLiveViewerGetMetadataStreamDataContentReply:
- data = self._pack('QI', len(reply.data), reply.status)
+ data = self._pack("QI", len(reply.data), reply.status)
data += reply.data
elif type(reply) is _LttngLiveViewerGetNewStreamInfosReply:
- data = self._pack('II', reply.status, len(reply.stream_infos))
+ data = self._pack("II", reply.status, len(reply.stream_infos))
for info in reply.stream_infos:
data += self._encode_stream_info(info)
elif type(reply) is _LttngLiveViewerCreateViewerSessionReply:
- data = self._pack('I', reply.status)
+ data = self._pack("I", reply.status)
elif type(reply) is _LttngLiveViewerDetachFromTracingSessionReply:
- data = self._pack('I', reply.status)
+ data = self._pack("I", reply.status)
else:
raise ValueError(
- 'Unknown reply object with class `{}`'.format(reply.__class__.__name__)
+ "Unknown reply object with class `{}`".format(reply.__class__.__name__)
)
return data
return self._stream_class_id
+# An entry within the index of an LTTng data stream. While a stream beacon entry
+# is conceptually unrelated to an index, it is sent as a reply to a
+# LttngLiveViewerGetNextDataStreamIndexEntryCommand
+class _LttngDataStreamBeaconEntry:
+ def __init__(self, stream_class_id, timestamp):
+ self._stream_class_id = stream_class_id
+ self._timestamp = timestamp
+
+ @property
+ def timestamp(self):
+ return self._timestamp
+
+ @property
+ def stream_class_id(self):
+ return self._stream_class_id
+
+
+def _get_entry_timestamp_begin(entry):
+ if type(entry) is _LttngDataStreamBeaconEntry:
+ return entry.timestamp
+ else:
+ assert type(entry) is _LttngDataStreamIndexEntry
+ return entry.timestamp_begin
+
+
# The index of an LTTng data stream, a sequence of index entries.
class _LttngDataStreamIndex(collections.abc.Sequence):
- def __init__(self, path):
+ def __init__(self, path, beacons):
self._path = path
self._build()
+
+ if beacons:
+ stream_class_id = self._entries[0].stream_class_id
+ beacons = [
+ _LttngDataStreamBeaconEntry(stream_class_id, ts) for ts in beacons
+ ]
+ self._add_beacons(beacons)
+
logging.info(
'Built data stream index entries: path="{}", count={}'.format(
path, len(self._entries)
self._entries = []
assert os.path.isfile(self._path)
- with open(self._path, 'rb') as f:
+ with open(self._path, "rb") as f:
# Read header first
- fmt = '>IIII'
+ fmt = ">IIII"
size = struct.calcsize(fmt)
data = f.read(size)
assert len(data) == size
assert magic == 0xC1F1DCC1
# Read index entries
- fmt = '>QQQQQQQ'
+ fmt = ">QQQQQQQ"
size = struct.calcsize(fmt)
while True:
# Skip anything else before the next entry
f.seek(index_entry_length - size, os.SEEK_CUR)
+ def _add_beacons(self, beacons):
+ # Assumes entries[n + 1].timestamp_end >= entries[n].timestamp_begin
+ def sort_key(entry):
+ if type(entry) is _LttngDataStreamBeaconEntry:
+ return entry.timestamp
+ else:
+ return entry.timestamp_end
+
+ self._entries += beacons
+ self._entries.sort(key=sort_key)
+
def __getitem__(self, index):
return self._entries[index]
# An LTTng data stream.
class _LttngDataStream:
- def __init__(self, path):
+ def __init__(self, path, beacons):
self._path = path
filename = os.path.basename(path)
- match = re.match(r'(.*)_\d+', filename)
+ match = re.match(r"(.*)_\d+", filename)
self._channel_name = match.group(1)
trace_dir = os.path.dirname(path)
- index_path = os.path.join(trace_dir, 'index', filename + '.idx')
- self._index = _LttngDataStreamIndex(index_path)
+ index_path = os.path.join(trace_dir, "index", filename + ".idx")
+ self._index = _LttngDataStreamIndex(index_path, beacons)
assert os.path.isfile(path)
- self._file = open(path, 'rb')
+ self._file = open(path, "rb")
logging.info(
'Built data stream: path="{}", channel-name="{}"'.format(
path, self._channel_name
return self._file.read(len_bytes)
+class _LttngMetadataStreamSection:
+ def __init__(self, timestamp, data):
+ self._timestamp = timestamp
+ if data is None:
+ self._data = bytes()
+ else:
+ self._data = data
+ logging.info(
+ "Built metadata stream section: ts={}, data-len={}".format(
+ self._timestamp, len(self._data)
+ )
+ )
+
+ @property
+ def timestamp(self):
+ return self._timestamp
+
+ @property
+ def data(self):
+ return self._data
+
+
# An LTTng metadata stream.
class _LttngMetadataStream:
- def __init__(self, path):
- self._path = path
- logging.info('Built metadata stream: path="{}"'.format(path))
+ def __init__(self, metadata_file_path, config_sections):
+ self._path = metadata_file_path
+ self._sections = config_sections
+ logging.info(
+ "Built metadata stream: path={}, section-len={}".format(
+ self._path, len(self._sections)
+ )
+ )
@property
def path(self):
return self._path
@property
- def data(self):
- assert os.path.isfile(self._path)
+ def sections(self):
+ return self._sections
+
+
+LttngMetadataConfigSection = namedtuple(
+ "LttngMetadataConfigSection", ["line", "timestamp", "is_empty"]
+)
+
+
+def _parse_metadata_sections_config(config_sections):
+ assert config_sections is not None
+ config_metadata_sections = []
+ append_empty_section = False
+ last_timestamp = 0
+ last_line = 0
+
+ for config_section in config_sections:
+ if config_section == "empty":
+ # Found a empty section marker. Actually append the section at the
+ # timestamp of the next concrete section.
+ append_empty_section = True
+ else:
+ assert type(config_section) is dict
+ line = config_section.get("line")
+ ts = config_section.get("timestamp")
+
+ # Sections' timestamps and lines must both be increasing.
+ assert ts > last_timestamp
+ last_timestamp = ts
+ assert line > last_line
+ last_line = line
+
+ if append_empty_section:
+ config_metadata_sections.append(
+ LttngMetadataConfigSection(line, ts, True)
+ )
+ append_empty_section = False
+
+ config_metadata_sections.append(LttngMetadataConfigSection(line, ts, False))
+
+ return config_metadata_sections
+
+
+def _split_metadata_sections(metadata_file_path, raw_config_sections):
+ assert isinstance(raw_config_sections, collections.abc.Sequence)
+
+ parsed_sections = _parse_metadata_sections_config(raw_config_sections)
+
+ sections = []
+ with open(metadata_file_path, "r") as metadata_file:
+ metadata_lines = [line for line in metadata_file]
+
+ config_metadata_sections_idx = 0
+ curr_metadata_section = bytearray()
+
+ for idx, line_content in enumerate(metadata_lines):
+ # Add one to the index to convert from the zero-indexing of the
+ # enumerate() function to the one-indexing used by humans when
+ # viewing a text file.
+ curr_line_number = idx + 1
+
+ # If there are no more sections, simply append the line.
+ if config_metadata_sections_idx + 1 >= len(parsed_sections):
+ curr_metadata_section += bytearray(line_content, "utf8")
+ continue
+
+ next_section_line_number = parsed_sections[
+ config_metadata_sections_idx + 1
+ ].line
+
+ # If the next section begins at the current line, create a
+ # section with the metadata we gathered so far.
+ if curr_line_number >= next_section_line_number:
+ # Flushing the metadata of the current section.
+ sections.append(
+ _LttngMetadataStreamSection(
+ parsed_sections[config_metadata_sections_idx].timestamp,
+ bytes(curr_metadata_section),
+ )
+ )
+
+ # Move to the next section.
+ config_metadata_sections_idx += 1
+
+ # Clear old content and append current line for the next section.
+ curr_metadata_section.clear()
+ curr_metadata_section += bytearray(line_content, "utf8")
+
+ # Append any empty sections.
+ while parsed_sections[config_metadata_sections_idx].is_empty:
+ sections.append(
+ _LttngMetadataStreamSection(
+ parsed_sections[config_metadata_sections_idx].timestamp, None
+ )
+ )
+ config_metadata_sections_idx += 1
+ else:
+ # Append line_content to the current metadata section.
+ curr_metadata_section += bytearray(line_content, "utf8")
+
+ # We iterated over all the lines of the metadata file. Close the current section.
+ sections.append(
+ _LttngMetadataStreamSection(
+ parsed_sections[config_metadata_sections_idx].timestamp,
+ bytes(curr_metadata_section),
+ )
+ )
- with open(self._path, 'rb') as f:
- return f.read()
+ return sections
# An LTTng trace, a sequence of LTTng data streams.
class LttngTrace(collections.abc.Sequence):
- def __init__(self, trace_dir):
+ def __init__(self, trace_dir, metadata_sections, beacons):
assert os.path.isdir(trace_dir)
self._path = trace_dir
- self._metadata_stream = _LttngMetadataStream(
- os.path.join(trace_dir, 'metadata')
- )
- self._create_data_streams(trace_dir)
+ self._create_metadata_stream(trace_dir, metadata_sections)
+ self._create_data_streams(trace_dir, beacons)
logging.info('Built trace: path="{}"'.format(trace_dir))
- def _create_data_streams(self, trace_dir):
+ def _create_data_streams(self, trace_dir, beacons):
data_stream_paths = []
for filename in os.listdir(trace_dir):
if not os.path.isfile(path):
continue
- if filename.startswith('.'):
+ if filename.startswith("."):
continue
- if filename == 'metadata':
+ if filename == "metadata":
continue
data_stream_paths.append(path)
self._data_streams = []
for data_stream_path in data_stream_paths:
- self._data_streams.append(_LttngDataStream(data_stream_path))
+ stream_name = os.path.basename(data_stream_path)
+ this_stream_beacons = None
+
+ if beacons is not None and stream_name in beacons:
+ this_stream_beacons = beacons[stream_name]
+
+ self._data_streams.append(
+ _LttngDataStream(data_stream_path, this_stream_beacons)
+ )
+
+ def _create_metadata_stream(self, trace_dir, config_metadata_sections):
+ metadata_path = os.path.join(trace_dir, "metadata")
+ metadata_sections = []
+
+ if config_metadata_sections is None:
+ with open(metadata_path, "rb") as metadata_file:
+ metadata_sections.append(
+ _LttngMetadataStreamSection(0, metadata_file.read())
+ )
+ else:
+ metadata_sections = _split_metadata_sections(
+ metadata_path, config_metadata_sections
+ )
+
+ self._metadata_stream = _LttngMetadataStream(metadata_path, metadata_sections)
@property
def path(self):
# The state of a single data stream.
class _LttngLiveViewerSessionDataStreamState:
- def __init__(self, ts_state, info, data_stream):
+ def __init__(self, ts_state, info, data_stream, metadata_stream_id):
self._ts_state = ts_state
self._info = info
self._data_stream = data_stream
+ self._metadata_stream_id = metadata_stream_id
self._cur_index_entry_index = 0
fmt = 'Built data stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
logging.info(
self._ts_state = ts_state
self._info = info
self._metadata_stream = metadata_stream
+ self._cur_metadata_stream_section_index = 0
+ if len(metadata_stream.sections) > 1:
+ self._next_metadata_stream_section_timestamp = metadata_stream.sections[
+ 1
+ ].timestamp
+ else:
+ self._next_metadata_stream_section_timestamp = None
+
self._is_sent = False
fmt = 'Built metadata stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
logging.info(
def is_sent(self, value):
self._is_sent = value
+ @property
+ def cur_section(self):
+ fmt = "Get current metadata section: section-idx={}"
+ logging.info(fmt.format(self._cur_metadata_stream_section_index))
+ if self._cur_metadata_stream_section_index == len(
+ self._metadata_stream.sections
+ ):
+ return
+
+ return self._metadata_stream.sections[self._cur_metadata_stream_section_index]
+
+ def goto_next_section(self):
+ self._cur_metadata_stream_section_index += 1
+ if self.cur_section:
+ self._next_metadata_stream_section_timestamp = self.cur_section.timestamp
+ else:
+ self._next_metadata_stream_section_timestamp = None
+
+ @property
+ def next_section_timestamp(self):
+ return self._next_metadata_stream_section_timestamp
+
# The state of a tracing session.
class _LttngLiveViewerSessionTracingSessionState:
for trace in tc_descr.traces:
trace_id = stream_id * 1000
+ # Metadata stream -> stream info and metadata stream state
+ info = _LttngLiveViewerStreamInfo(
+ stream_id, trace_id, True, trace.metadata_stream.path, "metadata"
+ )
+ self._stream_infos.append(info)
+ self._ms_states[stream_id] = _LttngLiveViewerSessionMetadataStreamState(
+ self, info, trace.metadata_stream
+ )
+ metadata_stream_id = stream_id
+ stream_id += 1
+
# Data streams -> stream infos and data stream states
for data_stream in trace:
info = _LttngLiveViewerStreamInfo(
)
self._stream_infos.append(info)
self._ds_states[stream_id] = _LttngLiveViewerSessionDataStreamState(
- self, info, data_stream
+ self, info, data_stream, metadata_stream_id
)
stream_id += 1
- # Metadata stream -> stream info and metadata stream state
- info = _LttngLiveViewerStreamInfo(
- stream_id, trace_id, True, trace.metadata_stream.path, 'metadata'
- )
- self._stream_infos.append(info)
- self._ms_states[stream_id] = _LttngLiveViewerSessionMetadataStreamState(
- self, info, trace.metadata_stream
- )
- stream_id += 1
-
self._is_attached = False
fmt = 'Built tracing session state: id={}, name="{}"'
logging.info(fmt.format(tc_descr.info.tracing_session_id, tc_descr.info.name))
self._is_attached = value
+def needs_new_metadata_section(metadata_stream_state, latest_timestamp):
+ if metadata_stream_state.next_section_timestamp is None:
+ return False
+
+ if latest_timestamp >= metadata_stream_state.next_section_timestamp:
+ return True
+ else:
+ return False
+
+
# An LTTng live viewer session manages a view on tracing sessions
# and replies to commands accordingly.
class _LttngLiveViewerSession:
def _get_tracing_session_state(self, tracing_session_id):
if tracing_session_id not in self._ts_states:
raise UnexpectedInput(
- 'Unknown tracing session ID {}'.format(tracing_session_id)
+ "Unknown tracing session ID {}".format(tracing_session_id)
)
return self._ts_states[tracing_session_id]
def _get_stream_state(self, stream_id):
if stream_id not in self._stream_states:
- UnexpectedInput('Unknown stream ID {}'.format(stream_id))
+ UnexpectedInput("Unknown stream ID {}".format(stream_id))
return self._stream_states[stream_id]
def handle_command(self, cmd):
logging.info(
- 'Handling command in viewer session: cmd-cls-name={}'.format(
+ "Handling command in viewer session: cmd-cls-name={}".format(
cmd.__class__.__name__
)
)
if cmd_type not in self._command_handlers:
raise UnexpectedInput(
- 'Unexpected command: cmd-cls-name={}'.format(cmd.__class__.__name__)
+ "Unexpected command: cmd-cls-name={}".format(cmd.__class__.__name__)
)
return self._command_handlers[cmd_type](cmd)
if ts_state.is_attached:
raise UnexpectedInput(
- 'Cannot attach to tracing session `{}`: viewer is already attached'.format(
+ "Cannot attach to tracing session `{}`: viewer is already attached".format(
info.name
)
)
if not ts_state.is_attached:
raise UnexpectedInput(
- 'Cannot detach to tracing session `{}`: viewer is not attached'.format(
+ "Cannot detach to tracing session `{}`: viewer is not attached".format(
info.name
)
)
fmt = 'Handling "get next data stream index entry" command: stream-id={}'
logging.info(fmt.format(cmd.stream_id))
stream_state = self._get_stream_state(cmd.stream_id)
+ metadata_stream_state = self._get_stream_state(stream_state._metadata_stream_id)
if type(stream_state) is not _LttngLiveViewerSessionDataStreamState:
raise UnexpectedInput(
- 'Stream with ID {} is not a data stream'.format(cmd.stream_id)
+ "Stream with ID {} is not a data stream".format(cmd.stream_id)
)
if stream_state.cur_index_entry is None:
status, index_entry, False, False
)
+ timestamp_begin = _get_entry_timestamp_begin(stream_state.cur_index_entry)
+
+ if needs_new_metadata_section(metadata_stream_state, timestamp_begin):
+ metadata_stream_state.is_sent = False
+ metadata_stream_state.goto_next_section()
+
# The viewer only checks the `has_new_metadata` flag if the
# reply's status is `OK`, so we need to provide an index here
has_new_metadata = stream_state.tracing_session_state.has_new_metadata
- status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.OK
+ if type(stream_state.cur_index_entry) is _LttngDataStreamIndexEntry:
+ status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.OK
+ else:
+ assert type(stream_state.cur_index_entry) is _LttngDataStreamBeaconEntry
+ status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.INACTIVE
+
reply = _LttngLiveViewerGetNextDataStreamIndexEntryReply(
status, stream_state.cur_index_entry, has_new_metadata, False
)
if type(stream_state) is not _LttngLiveViewerSessionDataStreamState:
raise UnexpectedInput(
- 'Stream with ID {} is not a data stream'.format(cmd.stream_id)
+ "Stream with ID {} is not a data stream".format(cmd.stream_id)
)
if stream_state.tracing_session_state.has_new_metadata:
def _handle_get_metadata_stream_data_command(self, cmd):
fmt = 'Handling "get metadata stream data" command: stream-id={}'
logging.info(fmt.format(cmd.stream_id))
- stream_state = self._get_stream_state(cmd.stream_id)
+ metadata_stream_state = self._get_stream_state(cmd.stream_id)
- if type(stream_state) is not _LttngLiveViewerSessionMetadataStreamState:
+ if (
+ type(metadata_stream_state)
+ is not _LttngLiveViewerSessionMetadataStreamState
+ ):
raise UnexpectedInput(
- 'Stream with ID {} is not a metadata stream'.format(cmd.stream_id)
+ "Stream with ID {} is not a metadata stream".format(cmd.stream_id)
)
- if stream_state.is_sent:
+ if metadata_stream_state.is_sent:
status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.NO_NEW
return _LttngLiveViewerGetMetadataStreamDataContentReply(status, bytes())
- stream_state.is_sent = True
+ metadata_stream_state.is_sent = True
status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.OK
+ metadata_section = metadata_stream_state.cur_section
+
+ # If we are sending an empty section, ready the next one right away.
+ if len(metadata_section.data) == 0:
+ metadata_stream_state.is_sent = False
+ metadata_stream_state.goto_next_section()
+
+ fmt = 'Replying to "get metadata stream data" command: metadata-size={}'
+ logging.info(fmt.format(len(metadata_section.data)))
return _LttngLiveViewerGetMetadataStreamDataContentReply(
- status, stream_state.metadata_stream.data
+ status, metadata_section.data
)
def _handle_get_new_stream_infos_command(self, cmd):
# An LTTng live TCP server.
#
-# On creation, it binds to `localhost` with an OS-assigned TCP port. It writes
-# the decimal TCP port number to a temporary port file. It renames the
-# temporary port file to `port_filename`.
+# On creation, it binds to `localhost` on the TCP port `port` if not `None`, or
+# on an OS-assigned TCP port otherwise. It writes the decimal TCP port number
+# to a temporary port file. It renames the temporary port file to
+# `port_filename`.
#
# `tracing_session_descriptors` is a list of tracing session descriptors
# (`LttngTracingSessionDescriptor`) to serve.
# returns.
class LttngLiveServer:
def __init__(
- self, port_filename, tracing_session_descriptors, max_query_data_response_size
+ self,
+ port,
+ port_filename,
+ tracing_session_descriptors,
+ max_query_data_response_size,
):
- logging.info('Server configuration:')
+ logging.info("Server configuration:")
- logging.info(' Port file name: `{}`'.format(port_filename))
+ logging.info(" Port file name: `{}`".format(port_filename))
if max_query_data_response_size is not None:
logging.info(
- ' Maximum response data query size: `{}`'.format(
+ " Maximum response data query size: `{}`".format(
max_query_data_response_size
)
)
self._codec = _LttngLiveViewerProtocolCodec()
# Port 0: OS assigns an unused port
- serv_addr = ('localhost', 0)
+ serv_addr = ("localhost", port if port is not None else 0)
self._sock.bind(serv_addr)
self._write_port_to_file(port_filename)
self._listen()
finally:
self._sock.close()
- logging.info('Closed connection and socket.')
+ logging.info("Closed connection and socket.")
@property
def _server_port(self):
data = bytes()
while True:
- logging.info('Waiting for viewer command.')
+ logging.info("Waiting for viewer command.")
buf = self._conn.recv(128)
if not buf:
- logging.info('Client closed connection.')
+ logging.info("Client closed connection.")
if data:
raise UnexpectedInput(
- 'Client closed connection after having sent {} command bytes.'.format(
+ "Client closed connection after having sent {} command bytes.".format(
len(data)
)
)
return
- logging.info('Received data from viewer: length={}'.format(len(buf)))
+ logging.info("Received data from viewer: length={}".format(len(buf)))
data += buf
try:
cmd = self._codec.decode(data)
except struct.error as exc:
- raise UnexpectedInput('Malformed command: {}'.format(exc)) from exc
+ raise UnexpectedInput("Malformed command: {}".format(exc)) from exc
if cmd is not None:
logging.info(
- 'Received command from viewer: cmd-cls-name={}'.format(
+ "Received command from viewer: cmd-cls-name={}".format(
cmd.__class__.__name__
)
)
def _send_reply(self, reply):
data = self._codec.encode(reply)
logging.info(
- 'Sending reply to viewer: reply-cls-name={}, length={}'.format(
+ "Sending reply to viewer: reply-cls-name={}, length={}".format(
reply.__class__.__name__, len(data)
)
)
# Create viewer session (arbitrary ID 23)
logging.info(
- 'LTTng live viewer connected: version={}.{}'.format(cmd.major, cmd.minor)
+ "LTTng live viewer connected: version={}.{}".format(cmd.major, cmd.minor)
)
viewer_session = _LttngLiveViewerSession(
23, self._ts_descriptors, self._max_query_data_response_size
self._send_reply(viewer_session.handle_command(cmd))
def _listen(self):
- logging.info('Listening: port={}'.format(self._server_port))
+ logging.info("Listening: port={}".format(self._server_port))
# Backlog must be present for Python version < 3.5.
# 128 is an arbitrary number since we expect only 1 connection anyway.
self._sock.listen(128)
self._conn, viewer_addr = self._sock.accept()
logging.info(
- 'Accepted viewer: addr={}:{}'.format(viewer_addr[0], viewer_addr[1])
+ "Accepted viewer: addr={}:{}".format(viewer_addr[0], viewer_addr[1])
)
try:
def _write_port_to_file(self, port_filename):
# Write the port number to a temporary file.
- with tempfile.NamedTemporaryFile(mode='w', delete=False) as tmp_port_file:
- print(self._server_port, end='', file=tmp_port_file)
+ with tempfile.NamedTemporaryFile(mode="w", delete=False) as tmp_port_file:
+ print(self._server_port, end="", file=tmp_port_file)
# Rename temporary file to real file
os.replace(tmp_port_file.name, port_filename)
):
for trace in traces:
if name not in trace.path:
- fmt = 'Tracing session name must be part of every trace path (`{}` not found in `{}`)'
+ fmt = "Tracing session name must be part of every trace path (`{}` not found in `{}`)"
raise ValueError(fmt.format(name, trace.path))
self._traces = traces
return self._info
-def _tracing_session_descriptors_from_arg(string):
- # Format is:
- # NAME,ID,HOSTNAME,FREQ,CLIENTS,TRACEPATH[,TRACEPATH]...
- parts = string.split(',')
- name = parts[0]
- tracing_session_id = int(parts[1])
- hostname = parts[2]
- live_timer_freq = int(parts[3])
- client_count = int(parts[4])
- traces = [LttngTrace(path) for path in parts[5:]]
- return LttngTracingSessionDescriptor(
- name, tracing_session_id, hostname, live_timer_freq, client_count, traces
- )
+def _session_descriptors_from_path(sessions_filename, trace_path_prefix):
+ # File format is:
+ #
+ # [
+ # {
+ # "name": "my-session",
+ # "id": 17,
+ # "hostname": "myhost",
+ # "live-timer-freq": 1000000,
+ # "client-count": 23,
+ # "traces": [
+ # {
+ # "path": "lol"
+ # },
+ # {
+ # "path": "meow/mix",
+ # "beacons": {
+ # "my_stream": [ 5235787, 728375283 ]
+ # },
+ # "metadata-sections": [
+ # {
+ # "line": 1,
+ # "timestamp": 0
+ # }
+ # ]
+ # }
+ # ]
+ # }
+ # ]
+ with open(sessions_filename, "r") as sessions_file:
+ params = json.load(sessions_file)
+
+ sessions = []
+
+ for session in params:
+ name = session["name"]
+ tracing_session_id = session["id"]
+ hostname = session["hostname"]
+ live_timer_freq = session["live-timer-freq"]
+ client_count = session["client-count"]
+ traces = []
+
+ for trace in session["traces"]:
+ metadata_sections = trace.get("metadata-sections")
+ beacons = trace.get("beacons")
+ path = trace["path"]
+
+ if not os.path.isabs(path):
+ path = os.path.join(trace_path_prefix, path)
+
+ traces.append(LttngTrace(path, metadata_sections, beacons))
+
+ sessions.append(
+ LttngTracingSessionDescriptor(
+ name,
+ tracing_session_id,
+ hostname,
+ live_timer_freq,
+ client_count,
+ traces,
+ )
+ )
+
+ return sessions
def _loglevel_parser(string):
- loglevels = {'info': logging.INFO, 'warning': logging.WARNING}
+ loglevels = {"info": logging.INFO, "warning": logging.WARNING}
if string not in loglevels:
msg = "{} is not a valid loglevel".format(string)
raise argparse.ArgumentTypeError(msg)
return loglevels[string]
-if __name__ == '__main__':
- logging.basicConfig(format='# %(asctime)-25s%(message)s')
+if __name__ == "__main__":
+ logging.basicConfig(format="# %(asctime)-25s%(message)s")
parser = argparse.ArgumentParser(
- description='LTTng-live protocol mocker', add_help=False
+ description="LTTng-live protocol mocker", add_help=False
)
parser.add_argument(
- '--log-level',
- default='warning',
- choices=['info', 'warning'],
- help='The loglevel to be used.',
+ "--log-level",
+ default="warning",
+ choices=["info", "warning"],
+ help="The loglevel to be used.",
)
loglevel_namespace, remaining_args = parser.parse_known_args()
logging.getLogger().setLevel(_loglevel_parser(loglevel_namespace.log_level))
parser.add_argument(
- '--port-filename',
- help='The final port file. This file is present when the server is ready to receive connection.',
+ "--port",
+ help="The port to bind to. If missing, use an OS-assigned port..",
+ type=int,
+ )
+ parser.add_argument(
+ "--port-filename",
+ help="The final port file. This file is present when the server is ready to receive connection.",
required=True,
)
parser.add_argument(
- '--max-query-data-response-size',
+ "--max-query-data-response-size",
type=int,
- help='The maximum size of control data response in bytes',
+ help="The maximum size of control data response in bytes",
)
parser.add_argument(
- 'sessions',
- nargs="+",
- metavar="SESSION",
- type=_tracing_session_descriptors_from_arg,
- help='A session configuration. There is no space after comma. Format is: NAME,ID,HOSTNAME,FREQ,CLIENTS,TRACEPATH[,TRACEPATH]....',
+ "--trace-path-prefix",
+ type=str,
+ help="Prefix to prepend to the trace paths of session configurations",
)
parser.add_argument(
- '-h',
- '--help',
- action='help',
+ "--sessions-filename",
+ type=str,
+ help="Path to a session configuration file",
+ )
+ parser.add_argument(
+ "-h",
+ "--help",
+ action="help",
default=argparse.SUPPRESS,
- help='Show this help message and exit.',
+ help="Show this help message and exit.",
)
args = parser.parse_args(args=remaining_args)
try:
+ sessions = _session_descriptors_from_path(
+ args.sessions_filename,
+ args.trace_path_prefix,
+ )
LttngLiveServer(
- args.port_filename, args.sessions, args.max_query_data_response_size
+ args.port, args.port_filename, sessions, args.max_query_data_response_size
)
except UnexpectedInput as exc:
logging.error(str(exc))