X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=tests%2Fdata%2Fplugins%2Fsrc.ctf.lttng-live%2Flttng_live_server.py;h=d0eb26d25c5a343b37a50574d223fab0a9a7c369;hb=f5567ea88d172767b34373bc6e402da8bfd85ef8;hp=91e1cc8a6f446b4df30e78520a7cfebffb26c45c;hpb=75882e97aed48428d4a0abc32a6382be811ad994;p=babeltrace.git diff --git a/tests/data/plugins/src.ctf.lttng-live/lttng_live_server.py b/tests/data/plugins/src.ctf.lttng-live/lttng_live_server.py index 91e1cc8a..d0eb26d2 100644 --- a/tests/data/plugins/src.ctf.lttng-live/lttng_live_server.py +++ b/tests/data/plugins/src.ctf.lttng-live/lttng_live_server.py @@ -1,24 +1,7 @@ -# The MIT License (MIT) +# SPDX-License-Identifier: MIT # -# Copyright (c) 2019 Philippe Proulx +# Copyright (C) 2019 Philippe Proulx # -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. import argparse import collections.abc @@ -30,6 +13,8 @@ import socket import struct import sys import tempfile +import json +from collections import namedtuple class UnexpectedInput(RuntimeError): @@ -409,14 +394,14 @@ class _LttngLiveViewerDetachFromTracingSessionReply: # An LTTng live protocol codec can convert bytes to command objects and # reply objects to bytes. class _LttngLiveViewerProtocolCodec: - _COMMAND_HEADER_STRUCT_FMT = 'QII' + _COMMAND_HEADER_STRUCT_FMT = "QII" _COMMAND_HEADER_SIZE_BYTES = struct.calcsize(_COMMAND_HEADER_STRUCT_FMT) def __init__(self): pass def _unpack(self, fmt, data, offset=0): - fmt = '!' + fmt + fmt = "!" + fmt return struct.unpack_from(fmt, data, offset) def _unpack_payload(self, fmt, data): @@ -433,7 +418,7 @@ class _LttngLiveViewerProtocolCodec: self._COMMAND_HEADER_STRUCT_FMT, data ) logging.info( - 'Decoded command header: payload-size={}, cmd-type={}, version={}'.format( + "Decoded command header: payload-size={}, cmd-type={}, version={}".format( payload_size, cmd_type, version ) ) @@ -444,7 +429,7 @@ class _LttngLiveViewerProtocolCodec: if cmd_type == 1: viewer_session_id, major, minor, conn_type = self._unpack_payload( - 'QIII', data + "QIII", data ) return _LttngLiveViewerConnectCommand( version, viewer_session_id, major, minor @@ -452,46 +437,46 @@ class _LttngLiveViewerProtocolCodec: elif cmd_type == 2: return _LttngLiveViewerGetTracingSessionInfosCommand(version) elif cmd_type == 3: - tracing_session_id, offset, seek_type = self._unpack_payload('QQI', data) + tracing_session_id, offset, seek_type = self._unpack_payload("QQI", data) return _LttngLiveViewerAttachToTracingSessionCommand( version, tracing_session_id, offset, seek_type ) elif cmd_type == 4: - (stream_id,) = self._unpack_payload('Q', data) + (stream_id,) = self._unpack_payload("Q", data) return _LttngLiveViewerGetNextDataStreamIndexEntryCommand( version, stream_id ) elif cmd_type == 5: - stream_id, offset, req_length = self._unpack_payload('QQI', data) + stream_id, offset, req_length = self._unpack_payload("QQI", data) return _LttngLiveViewerGetDataStreamPacketDataCommand( version, stream_id, offset, req_length ) elif cmd_type == 6: - (stream_id,) = self._unpack_payload('Q', data) + (stream_id,) = self._unpack_payload("Q", data) return _LttngLiveViewerGetMetadataStreamDataCommand(version, stream_id) elif cmd_type == 7: - (tracing_session_id,) = self._unpack_payload('Q', data) + (tracing_session_id,) = self._unpack_payload("Q", data) return _LttngLiveViewerGetNewStreamInfosCommand(version, tracing_session_id) elif cmd_type == 8: return _LttngLiveViewerCreateViewerSessionCommand(version) elif cmd_type == 9: - (tracing_session_id,) = self._unpack_payload('Q', data) + (tracing_session_id,) = self._unpack_payload("Q", data) return _LttngLiveViewerDetachFromTracingSessionCommand( version, tracing_session_id ) else: - raise UnexpectedInput('Unknown command type {}'.format(cmd_type)) + raise UnexpectedInput("Unknown command type {}".format(cmd_type)) def _pack(self, fmt, *args): # Force network byte order - return struct.pack('!' + fmt, *args) + return struct.pack("!" + fmt, *args) def _encode_zero_padded_str(self, string, length): data = string.encode() - return data.ljust(length, b'\x00') + return data.ljust(length, b"\x00") def _encode_stream_info(self, info): - data = self._pack('QQI', info.id, info.trace_id, int(info.is_metadata)) + data = self._pack("QQI", info.id, info.trace_id, int(info.is_metadata)) data += self._encode_zero_padded_str(info.path, 4096) data += self._encode_zero_padded_str(info.channel_name, 255) return data @@ -510,14 +495,14 @@ class _LttngLiveViewerProtocolCodec: def encode(self, reply): if type(reply) is _LttngLiveViewerConnectReply: data = self._pack( - 'QIII', reply.viewer_session_id, reply.major, reply.minor, 2 + "QIII", reply.viewer_session_id, reply.major, reply.minor, 2 ) elif type(reply) is _LttngLiveViewerGetTracingSessionInfosReply: - data = self._pack('I', len(reply.tracing_session_infos)) + data = self._pack("I", len(reply.tracing_session_infos)) for info in reply.tracing_session_infos: data += self._pack( - 'QIII', + "QIII", info.tracing_session_id, info.live_timer_freq, info.client_count, @@ -526,49 +511,65 @@ class _LttngLiveViewerProtocolCodec: data += self._encode_zero_padded_str(info.hostname, 64) data += self._encode_zero_padded_str(info.name, 255) elif type(reply) is _LttngLiveViewerAttachToTracingSessionReply: - data = self._pack('II', reply.status, len(reply.stream_infos)) + data = self._pack("II", reply.status, len(reply.stream_infos)) for info in reply.stream_infos: data += self._encode_stream_info(info) elif type(reply) is _LttngLiveViewerGetNextDataStreamIndexEntryReply: + index_format = "QQQQQQQII" entry = reply.index_entry flags = self._get_has_new_stuff_flags( reply.has_new_metadata, reply.has_new_data_stream ) - data = self._pack( - 'QQQQQQQII', - entry.offset_bytes, - entry.total_size_bits, - entry.content_size_bits, - entry.timestamp_begin, - entry.timestamp_end, - entry.events_discarded, - entry.stream_class_id, - reply.status, - flags, - ) + if type(entry) is _LttngDataStreamIndexEntry: + data = self._pack( + index_format, + entry.offset_bytes, + entry.total_size_bits, + entry.content_size_bits, + entry.timestamp_begin, + entry.timestamp_end, + entry.events_discarded, + entry.stream_class_id, + reply.status, + flags, + ) + else: + assert type(entry) is _LttngDataStreamBeaconEntry + data = self._pack( + index_format, + 0, + 0, + 0, + 0, + entry.timestamp, + 0, + entry.stream_class_id, + reply.status, + flags, + ) elif type(reply) is _LttngLiveViewerGetDataStreamPacketDataReply: flags = self._get_has_new_stuff_flags( reply.has_new_metadata, reply.has_new_data_stream ) - data = self._pack('III', reply.status, len(reply.data), flags) + data = self._pack("III", reply.status, len(reply.data), flags) data += reply.data elif type(reply) is _LttngLiveViewerGetMetadataStreamDataContentReply: - data = self._pack('QI', len(reply.data), reply.status) + data = self._pack("QI", len(reply.data), reply.status) data += reply.data elif type(reply) is _LttngLiveViewerGetNewStreamInfosReply: - data = self._pack('II', reply.status, len(reply.stream_infos)) + data = self._pack("II", reply.status, len(reply.stream_infos)) for info in reply.stream_infos: data += self._encode_stream_info(info) elif type(reply) is _LttngLiveViewerCreateViewerSessionReply: - data = self._pack('I', reply.status) + data = self._pack("I", reply.status) elif type(reply) is _LttngLiveViewerDetachFromTracingSessionReply: - data = self._pack('I', reply.status) + data = self._pack("I", reply.status) else: raise ValueError( - 'Unknown reply object with class `{}`'.format(reply.__class__.__name__) + "Unknown reply object with class `{}`".format(reply.__class__.__name__) ) return data @@ -631,11 +632,44 @@ class _LttngDataStreamIndexEntry: return self._stream_class_id +# An entry within the index of an LTTng data stream. While a stream beacon entry +# is conceptually unrelated to an index, it is sent as a reply to a +# LttngLiveViewerGetNextDataStreamIndexEntryCommand +class _LttngDataStreamBeaconEntry: + def __init__(self, stream_class_id, timestamp): + self._stream_class_id = stream_class_id + self._timestamp = timestamp + + @property + def timestamp(self): + return self._timestamp + + @property + def stream_class_id(self): + return self._stream_class_id + + +def _get_entry_timestamp_begin(entry): + if type(entry) is _LttngDataStreamBeaconEntry: + return entry.timestamp + else: + assert type(entry) is _LttngDataStreamIndexEntry + return entry.timestamp_begin + + # The index of an LTTng data stream, a sequence of index entries. class _LttngDataStreamIndex(collections.abc.Sequence): - def __init__(self, path): + def __init__(self, path, beacons): self._path = path self._build() + + if beacons: + stream_class_id = self._entries[0].stream_class_id + beacons = [ + _LttngDataStreamBeaconEntry(stream_class_id, ts) for ts in beacons + ] + self._add_beacons(beacons) + logging.info( 'Built data stream index entries: path="{}", count={}'.format( path, len(self._entries) @@ -646,9 +680,9 @@ class _LttngDataStreamIndex(collections.abc.Sequence): self._entries = [] assert os.path.isfile(self._path) - with open(self._path, 'rb') as f: + with open(self._path, "rb") as f: # Read header first - fmt = '>IIII' + fmt = ">IIII" size = struct.calcsize(fmt) data = f.read(size) assert len(data) == size @@ -658,7 +692,7 @@ class _LttngDataStreamIndex(collections.abc.Sequence): assert magic == 0xC1F1DCC1 # Read index entries - fmt = '>QQQQQQQ' + fmt = ">QQQQQQQ" size = struct.calcsize(fmt) while True: @@ -699,6 +733,17 @@ class _LttngDataStreamIndex(collections.abc.Sequence): # Skip anything else before the next entry f.seek(index_entry_length - size, os.SEEK_CUR) + def _add_beacons(self, beacons): + # Assumes entries[n + 1].timestamp_end >= entries[n].timestamp_begin + def sort_key(entry): + if type(entry) is _LttngDataStreamBeaconEntry: + return entry.timestamp + else: + return entry.timestamp_end + + self._entries += beacons + self._entries.sort(key=sort_key) + def __getitem__(self, index): return self._entries[index] @@ -712,16 +757,16 @@ class _LttngDataStreamIndex(collections.abc.Sequence): # An LTTng data stream. class _LttngDataStream: - def __init__(self, path): + def __init__(self, path, beacons): self._path = path filename = os.path.basename(path) - match = re.match(r'(.*)_\d+', filename) + match = re.match(r"(.*)_\d+", filename) self._channel_name = match.group(1) trace_dir = os.path.dirname(path) - index_path = os.path.join(trace_dir, 'index', filename + '.idx') - self._index = _LttngDataStreamIndex(index_path) + index_path = os.path.join(trace_dir, "index", filename + ".idx") + self._index = _LttngDataStreamIndex(index_path, beacons) assert os.path.isfile(path) - self._file = open(path, 'rb') + self._file = open(path, "rb") logging.info( 'Built data stream: path="{}", channel-name="{}"'.format( path, self._channel_name @@ -745,36 +790,166 @@ class _LttngDataStream: return self._file.read(len_bytes) +class _LttngMetadataStreamSection: + def __init__(self, timestamp, data): + self._timestamp = timestamp + if data is None: + self._data = bytes() + else: + self._data = data + logging.info( + "Built metadata stream section: ts={}, data-len={}".format( + self._timestamp, len(self._data) + ) + ) + + @property + def timestamp(self): + return self._timestamp + + @property + def data(self): + return self._data + + # An LTTng metadata stream. class _LttngMetadataStream: - def __init__(self, path): - self._path = path - logging.info('Built metadata stream: path="{}"'.format(path)) + def __init__(self, metadata_file_path, config_sections): + self._path = metadata_file_path + self._sections = config_sections + logging.info( + "Built metadata stream: path={}, section-len={}".format( + self._path, len(self._sections) + ) + ) @property def path(self): return self._path @property - def data(self): - assert os.path.isfile(self._path) + def sections(self): + return self._sections + + +LttngMetadataConfigSection = namedtuple( + "LttngMetadataConfigSection", ["line", "timestamp", "is_empty"] +) + + +def _parse_metadata_sections_config(config_sections): + assert config_sections is not None + config_metadata_sections = [] + append_empty_section = False + last_timestamp = 0 + last_line = 0 + + for config_section in config_sections: + if config_section == "empty": + # Found a empty section marker. Actually append the section at the + # timestamp of the next concrete section. + append_empty_section = True + else: + assert type(config_section) is dict + line = config_section.get("line") + ts = config_section.get("timestamp") + + # Sections' timestamps and lines must both be increasing. + assert ts > last_timestamp + last_timestamp = ts + assert line > last_line + last_line = line + + if append_empty_section: + config_metadata_sections.append( + LttngMetadataConfigSection(line, ts, True) + ) + append_empty_section = False + + config_metadata_sections.append(LttngMetadataConfigSection(line, ts, False)) + + return config_metadata_sections + + +def _split_metadata_sections(metadata_file_path, raw_config_sections): + assert isinstance(raw_config_sections, collections.abc.Sequence) + + parsed_sections = _parse_metadata_sections_config(raw_config_sections) + + sections = [] + with open(metadata_file_path, "r") as metadata_file: + metadata_lines = [line for line in metadata_file] + + config_metadata_sections_idx = 0 + curr_metadata_section = bytearray() + + for idx, line_content in enumerate(metadata_lines): + # Add one to the index to convert from the zero-indexing of the + # enumerate() function to the one-indexing used by humans when + # viewing a text file. + curr_line_number = idx + 1 + + # If there are no more sections, simply append the line. + if config_metadata_sections_idx + 1 >= len(parsed_sections): + curr_metadata_section += bytearray(line_content, "utf8") + continue + + next_section_line_number = parsed_sections[ + config_metadata_sections_idx + 1 + ].line + + # If the next section begins at the current line, create a + # section with the metadata we gathered so far. + if curr_line_number >= next_section_line_number: + + # Flushing the metadata of the current section. + sections.append( + _LttngMetadataStreamSection( + parsed_sections[config_metadata_sections_idx].timestamp, + bytes(curr_metadata_section), + ) + ) + + # Move to the next section. + config_metadata_sections_idx += 1 - with open(self._path, 'rb') as f: - return f.read() + # Clear old content and append current line for the next section. + curr_metadata_section.clear() + curr_metadata_section += bytearray(line_content, "utf8") + + # Append any empty sections. + while parsed_sections[config_metadata_sections_idx].is_empty: + sections.append( + _LttngMetadataStreamSection( + parsed_sections[config_metadata_sections_idx].timestamp, None + ) + ) + config_metadata_sections_idx += 1 + else: + # Append line_content to the current metadata section. + curr_metadata_section += bytearray(line_content, "utf8") + + # We iterated over all the lines of the metadata file. Close the current section. + sections.append( + _LttngMetadataStreamSection( + parsed_sections[config_metadata_sections_idx].timestamp, + bytes(curr_metadata_section), + ) + ) + + return sections # An LTTng trace, a sequence of LTTng data streams. class LttngTrace(collections.abc.Sequence): - def __init__(self, trace_dir): + def __init__(self, trace_dir, metadata_sections, beacons): assert os.path.isdir(trace_dir) self._path = trace_dir - self._metadata_stream = _LttngMetadataStream( - os.path.join(trace_dir, 'metadata') - ) - self._create_data_streams(trace_dir) + self._create_metadata_stream(trace_dir, metadata_sections) + self._create_data_streams(trace_dir, beacons) logging.info('Built trace: path="{}"'.format(trace_dir)) - def _create_data_streams(self, trace_dir): + def _create_data_streams(self, trace_dir, beacons): data_stream_paths = [] for filename in os.listdir(trace_dir): @@ -783,10 +958,10 @@ class LttngTrace(collections.abc.Sequence): if not os.path.isfile(path): continue - if filename.startswith('.'): + if filename.startswith("."): continue - if filename == 'metadata': + if filename == "metadata": continue data_stream_paths.append(path) @@ -795,7 +970,31 @@ class LttngTrace(collections.abc.Sequence): self._data_streams = [] for data_stream_path in data_stream_paths: - self._data_streams.append(_LttngDataStream(data_stream_path)) + stream_name = os.path.basename(data_stream_path) + this_stream_beacons = None + + if beacons is not None and stream_name in beacons: + this_stream_beacons = beacons[stream_name] + + self._data_streams.append( + _LttngDataStream(data_stream_path, this_stream_beacons) + ) + + def _create_metadata_stream(self, trace_dir, config_metadata_sections): + metadata_path = os.path.join(trace_dir, "metadata") + metadata_sections = [] + + if config_metadata_sections is None: + with open(metadata_path, "rb") as metadata_file: + metadata_sections.append( + _LttngMetadataStreamSection(0, metadata_file.read()) + ) + else: + metadata_sections = _split_metadata_sections( + metadata_path, config_metadata_sections + ) + + self._metadata_stream = _LttngMetadataStream(metadata_path, metadata_sections) @property def path(self): @@ -814,10 +1013,11 @@ class LttngTrace(collections.abc.Sequence): # The state of a single data stream. class _LttngLiveViewerSessionDataStreamState: - def __init__(self, ts_state, info, data_stream): + def __init__(self, ts_state, info, data_stream, metadata_stream_id): self._ts_state = ts_state self._info = info self._data_stream = data_stream + self._metadata_stream_id = metadata_stream_id self._cur_index_entry_index = 0 fmt = 'Built data stream state: id={}, ts-id={}, ts-name="{}", path="{}"' logging.info( @@ -858,6 +1058,14 @@ class _LttngLiveViewerSessionMetadataStreamState: self._ts_state = ts_state self._info = info self._metadata_stream = metadata_stream + self._cur_metadata_stream_section_index = 0 + if len(metadata_stream.sections) > 1: + self._next_metadata_stream_section_timestamp = metadata_stream.sections[ + 1 + ].timestamp + else: + self._next_metadata_stream_section_timestamp = None + self._is_sent = False fmt = 'Built metadata stream state: id={}, ts-id={}, ts-name="{}", path="{}"' logging.info( @@ -889,6 +1097,28 @@ class _LttngLiveViewerSessionMetadataStreamState: def is_sent(self, value): self._is_sent = value + @property + def cur_section(self): + fmt = "Get current metadata section: section-idx={}" + logging.info(fmt.format(self._cur_metadata_stream_section_index)) + if self._cur_metadata_stream_section_index == len( + self._metadata_stream.sections + ): + return + + return self._metadata_stream.sections[self._cur_metadata_stream_section_index] + + def goto_next_section(self): + self._cur_metadata_stream_section_index += 1 + if self.cur_section: + self._next_metadata_stream_section_timestamp = self.cur_section.timestamp + else: + self._next_metadata_stream_section_timestamp = None + + @property + def next_section_timestamp(self): + return self._next_metadata_stream_section_timestamp + # The state of a tracing session. class _LttngLiveViewerSessionTracingSessionState: @@ -902,6 +1132,17 @@ class _LttngLiveViewerSessionTracingSessionState: for trace in tc_descr.traces: trace_id = stream_id * 1000 + # Metadata stream -> stream info and metadata stream state + info = _LttngLiveViewerStreamInfo( + stream_id, trace_id, True, trace.metadata_stream.path, "metadata" + ) + self._stream_infos.append(info) + self._ms_states[stream_id] = _LttngLiveViewerSessionMetadataStreamState( + self, info, trace.metadata_stream + ) + metadata_stream_id = stream_id + stream_id += 1 + # Data streams -> stream infos and data stream states for data_stream in trace: info = _LttngLiveViewerStreamInfo( @@ -913,20 +1154,10 @@ class _LttngLiveViewerSessionTracingSessionState: ) self._stream_infos.append(info) self._ds_states[stream_id] = _LttngLiveViewerSessionDataStreamState( - self, info, data_stream + self, info, data_stream, metadata_stream_id ) stream_id += 1 - # Metadata stream -> stream info and metadata stream state - info = _LttngLiveViewerStreamInfo( - stream_id, trace_id, True, trace.metadata_stream.path, 'metadata' - ) - self._stream_infos.append(info) - self._ms_states[stream_id] = _LttngLiveViewerSessionMetadataStreamState( - self, info, trace.metadata_stream - ) - stream_id += 1 - self._is_attached = False fmt = 'Built tracing session state: id={}, name="{}"' logging.info(fmt.format(tc_descr.info.tracing_session_id, tc_descr.info.name)) @@ -960,6 +1191,16 @@ class _LttngLiveViewerSessionTracingSessionState: self._is_attached = value +def needs_new_metadata_section(metadata_stream_state, latest_timestamp): + if metadata_stream_state.next_section_timestamp is None: + return False + + if latest_timestamp >= metadata_stream_state.next_section_timestamp: + return True + else: + return False + + # An LTTng live viewer session manages a view on tracing sessions # and replies to commands accordingly. class _LttngLiveViewerSession: @@ -1005,20 +1246,20 @@ class _LttngLiveViewerSession: def _get_tracing_session_state(self, tracing_session_id): if tracing_session_id not in self._ts_states: raise UnexpectedInput( - 'Unknown tracing session ID {}'.format(tracing_session_id) + "Unknown tracing session ID {}".format(tracing_session_id) ) return self._ts_states[tracing_session_id] def _get_stream_state(self, stream_id): if stream_id not in self._stream_states: - UnexpectedInput('Unknown stream ID {}'.format(stream_id)) + UnexpectedInput("Unknown stream ID {}".format(stream_id)) return self._stream_states[stream_id] def handle_command(self, cmd): logging.info( - 'Handling command in viewer session: cmd-cls-name={}'.format( + "Handling command in viewer session: cmd-cls-name={}".format( cmd.__class__.__name__ ) ) @@ -1026,7 +1267,7 @@ class _LttngLiveViewerSession: if cmd_type not in self._command_handlers: raise UnexpectedInput( - 'Unexpected command: cmd-cls-name={}'.format(cmd.__class__.__name__) + "Unexpected command: cmd-cls-name={}".format(cmd.__class__.__name__) ) return self._command_handlers[cmd_type](cmd) @@ -1039,7 +1280,7 @@ class _LttngLiveViewerSession: if ts_state.is_attached: raise UnexpectedInput( - 'Cannot attach to tracing session `{}`: viewer is already attached'.format( + "Cannot attach to tracing session `{}`: viewer is already attached".format( info.name ) ) @@ -1058,7 +1299,7 @@ class _LttngLiveViewerSession: if not ts_state.is_attached: raise UnexpectedInput( - 'Cannot detach to tracing session `{}`: viewer is not attached'.format( + "Cannot detach to tracing session `{}`: viewer is not attached".format( info.name ) ) @@ -1071,10 +1312,11 @@ class _LttngLiveViewerSession: fmt = 'Handling "get next data stream index entry" command: stream-id={}' logging.info(fmt.format(cmd.stream_id)) stream_state = self._get_stream_state(cmd.stream_id) + metadata_stream_state = self._get_stream_state(stream_state._metadata_stream_id) if type(stream_state) is not _LttngLiveViewerSessionDataStreamState: raise UnexpectedInput( - 'Stream with ID {} is not a data stream'.format(cmd.stream_id) + "Stream with ID {} is not a data stream".format(cmd.stream_id) ) if stream_state.cur_index_entry is None: @@ -1089,10 +1331,21 @@ class _LttngLiveViewerSession: status, index_entry, False, False ) + timestamp_begin = _get_entry_timestamp_begin(stream_state.cur_index_entry) + + if needs_new_metadata_section(metadata_stream_state, timestamp_begin): + metadata_stream_state.is_sent = False + metadata_stream_state.goto_next_section() + # The viewer only checks the `has_new_metadata` flag if the # reply's status is `OK`, so we need to provide an index here has_new_metadata = stream_state.tracing_session_state.has_new_metadata - status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.OK + if type(stream_state.cur_index_entry) is _LttngDataStreamIndexEntry: + status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.OK + else: + assert type(stream_state.cur_index_entry) is _LttngDataStreamBeaconEntry + status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.INACTIVE + reply = _LttngLiveViewerGetNextDataStreamIndexEntryReply( status, stream_state.cur_index_entry, has_new_metadata, False ) @@ -1107,7 +1360,7 @@ class _LttngLiveViewerSession: if type(stream_state) is not _LttngLiveViewerSessionDataStreamState: raise UnexpectedInput( - 'Stream with ID {} is not a data stream'.format(cmd.stream_id) + "Stream with ID {} is not a data stream".format(cmd.stream_id) ) if stream_state.tracing_session_state.has_new_metadata: @@ -1133,21 +1386,33 @@ class _LttngLiveViewerSession: def _handle_get_metadata_stream_data_command(self, cmd): fmt = 'Handling "get metadata stream data" command: stream-id={}' logging.info(fmt.format(cmd.stream_id)) - stream_state = self._get_stream_state(cmd.stream_id) + metadata_stream_state = self._get_stream_state(cmd.stream_id) - if type(stream_state) is not _LttngLiveViewerSessionMetadataStreamState: + if ( + type(metadata_stream_state) + is not _LttngLiveViewerSessionMetadataStreamState + ): raise UnexpectedInput( - 'Stream with ID {} is not a metadata stream'.format(cmd.stream_id) + "Stream with ID {} is not a metadata stream".format(cmd.stream_id) ) - if stream_state.is_sent: + if metadata_stream_state.is_sent: status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.NO_NEW return _LttngLiveViewerGetMetadataStreamDataContentReply(status, bytes()) - stream_state.is_sent = True + metadata_stream_state.is_sent = True status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.OK + metadata_section = metadata_stream_state.cur_section + + # If we are sending an empty section, ready the next one right away. + if len(metadata_section.data) == 0: + metadata_stream_state.is_sent = False + metadata_stream_state.goto_next_section() + + fmt = 'Replying to "get metadata stream data" command: metadata-size={}' + logging.info(fmt.format(len(metadata_section.data))) return _LttngLiveViewerGetMetadataStreamDataContentReply( - status, stream_state.metadata_stream.data + status, metadata_section.data ) def _handle_get_new_stream_infos_command(self, cmd): @@ -1196,13 +1461,13 @@ class LttngLiveServer: def __init__( self, port_filename, tracing_session_descriptors, max_query_data_response_size ): - logging.info('Server configuration:') + logging.info("Server configuration:") - logging.info(' Port file name: `{}`'.format(port_filename)) + logging.info(" Port file name: `{}`".format(port_filename)) if max_query_data_response_size is not None: logging.info( - ' Maximum response data query size: `{}`'.format( + " Maximum response data query size: `{}`".format( max_query_data_response_size ) ) @@ -1230,7 +1495,7 @@ class LttngLiveServer: self._codec = _LttngLiveViewerProtocolCodec() # Port 0: OS assigns an unused port - serv_addr = ('localhost', 0) + serv_addr = ("localhost", 0) self._sock.bind(serv_addr) self._write_port_to_file(port_filename) @@ -1238,7 +1503,7 @@ class LttngLiveServer: self._listen() finally: self._sock.close() - logging.info('Closed connection and socket.') + logging.info("Closed connection and socket.") @property def _server_port(self): @@ -1248,33 +1513,33 @@ class LttngLiveServer: data = bytes() while True: - logging.info('Waiting for viewer command.') + logging.info("Waiting for viewer command.") buf = self._conn.recv(128) if not buf: - logging.info('Client closed connection.') + logging.info("Client closed connection.") if data: raise UnexpectedInput( - 'Client closed connection after having sent {} command bytes.'.format( + "Client closed connection after having sent {} command bytes.".format( len(data) ) ) return - logging.info('Received data from viewer: length={}'.format(len(buf))) + logging.info("Received data from viewer: length={}".format(len(buf))) data += buf try: cmd = self._codec.decode(data) except struct.error as exc: - raise UnexpectedInput('Malformed command: {}'.format(exc)) from exc + raise UnexpectedInput("Malformed command: {}".format(exc)) from exc if cmd is not None: logging.info( - 'Received command from viewer: cmd-cls-name={}'.format( + "Received command from viewer: cmd-cls-name={}".format( cmd.__class__.__name__ ) ) @@ -1283,7 +1548,7 @@ class LttngLiveServer: def _send_reply(self, reply): data = self._codec.encode(reply) logging.info( - 'Sending reply to viewer: reply-cls-name={}, length={}'.format( + "Sending reply to viewer: reply-cls-name={}, length={}".format( reply.__class__.__name__, len(data) ) ) @@ -1302,7 +1567,7 @@ class LttngLiveServer: # Create viewer session (arbitrary ID 23) logging.info( - 'LTTng live viewer connected: version={}.{}'.format(cmd.major, cmd.minor) + "LTTng live viewer connected: version={}.{}".format(cmd.major, cmd.minor) ) viewer_session = _LttngLiveViewerSession( 23, self._ts_descriptors, self._max_query_data_response_size @@ -1325,13 +1590,13 @@ class LttngLiveServer: self._send_reply(viewer_session.handle_command(cmd)) def _listen(self): - logging.info('Listening: port={}'.format(self._server_port)) + logging.info("Listening: port={}".format(self._server_port)) # Backlog must be present for Python version < 3.5. # 128 is an arbitrary number since we expect only 1 connection anyway. self._sock.listen(128) self._conn, viewer_addr = self._sock.accept() logging.info( - 'Accepted viewer: addr={}:{}'.format(viewer_addr[0], viewer_addr[1]) + "Accepted viewer: addr={}:{}".format(viewer_addr[0], viewer_addr[1]) ) try: @@ -1341,8 +1606,8 @@ class LttngLiveServer: def _write_port_to_file(self, port_filename): # Write the port number to a temporary file. - with tempfile.NamedTemporaryFile(mode='w', delete=False) as tmp_port_file: - print(self._server_port, end='', file=tmp_port_file) + with tempfile.NamedTemporaryFile(mode="w", delete=False) as tmp_port_file: + print(self._server_port, end="", file=tmp_port_file) # Rename temporary file to real file os.replace(tmp_port_file.name, port_filename) @@ -1363,7 +1628,7 @@ class LttngTracingSessionDescriptor: ): for trace in traces: if name not in trace.path: - fmt = 'Tracing session name must be part of every trace path (`{}` not found in `{}`)' + fmt = "Tracing session name must be part of every trace path (`{}` not found in `{}`)" raise ValueError(fmt.format(name, trace.path)) self._traces = traces @@ -1386,74 +1651,130 @@ class LttngTracingSessionDescriptor: return self._info -def _tracing_session_descriptors_from_arg(string): - # Format is: - # NAME,ID,HOSTNAME,FREQ,CLIENTS,TRACEPATH[,TRACEPATH]... - parts = string.split(',') - name = parts[0] - tracing_session_id = int(parts[1]) - hostname = parts[2] - live_timer_freq = int(parts[3]) - client_count = int(parts[4]) - traces = [LttngTrace(path) for path in parts[5:]] - return LttngTracingSessionDescriptor( - name, tracing_session_id, hostname, live_timer_freq, client_count, traces - ) +def _session_descriptors_from_path(sessions_filename, trace_path_prefix): + # File format is: + # + # [ + # { + # "name": "my-session", + # "id": 17, + # "hostname": "myhost", + # "live-timer-freq": 1000000, + # "client-count": 23, + # "traces": [ + # { + # "path": "lol" + # }, + # { + # "path": "meow/mix", + # "beacons": { + # "my_stream": [ 5235787, 728375283 ] + # }, + # "metadata-sections": [ + # { + # "line": 1, + # "timestamp": 0 + # } + # ] + # } + # ] + # } + # ] + with open(sessions_filename, "r") as sessions_file: + params = json.load(sessions_file) + + sessions = [] + + for session in params: + name = session["name"] + tracing_session_id = session["id"] + hostname = session["hostname"] + live_timer_freq = session["live-timer-freq"] + client_count = session["client-count"] + traces = [] + + for trace in session["traces"]: + metadata_sections = trace.get("metadata-sections") + beacons = trace.get("beacons") + path = trace["path"] + + if not os.path.isabs(path): + path = os.path.join(trace_path_prefix, path) + + traces.append(LttngTrace(path, metadata_sections, beacons)) + + sessions.append( + LttngTracingSessionDescriptor( + name, + tracing_session_id, + hostname, + live_timer_freq, + client_count, + traces, + ) + ) + + return sessions def _loglevel_parser(string): - loglevels = {'info': logging.INFO, 'warning': logging.WARNING} + loglevels = {"info": logging.INFO, "warning": logging.WARNING} if string not in loglevels: msg = "{} is not a valid loglevel".format(string) raise argparse.ArgumentTypeError(msg) return loglevels[string] -if __name__ == '__main__': - logging.basicConfig(format='# %(asctime)-25s%(message)s') +if __name__ == "__main__": + logging.basicConfig(format="# %(asctime)-25s%(message)s") parser = argparse.ArgumentParser( - description='LTTng-live protocol mocker', add_help=False + description="LTTng-live protocol mocker", add_help=False ) parser.add_argument( - '--log-level', - default='warning', - choices=['info', 'warning'], - help='The loglevel to be used.', + "--log-level", + default="warning", + choices=["info", "warning"], + help="The loglevel to be used.", ) loglevel_namespace, remaining_args = parser.parse_known_args() logging.getLogger().setLevel(_loglevel_parser(loglevel_namespace.log_level)) parser.add_argument( - '--port-filename', - help='The final port file. This file is present when the server is ready to receive connection.', + "--port-filename", + help="The final port file. This file is present when the server is ready to receive connection.", required=True, ) parser.add_argument( - '--max-query-data-response-size', + "--max-query-data-response-size", type=int, - help='The maximum size of control data response in bytes', + help="The maximum size of control data response in bytes", + ) + parser.add_argument( + "--trace-path-prefix", + type=str, + help="Prefix to prepend to the trace paths of session configurations", ) parser.add_argument( - 'sessions', - nargs="+", - metavar="SESSION", - type=_tracing_session_descriptors_from_arg, - help='A session configuration. There is no space after comma. Format is: NAME,ID,HOSTNAME,FREQ,CLIENTS,TRACEPATH[,TRACEPATH]....', + "--sessions-filename", + type=str, + help="Path to a session configuration file", ) parser.add_argument( - '-h', - '--help', - action='help', + "-h", + "--help", + action="help", default=argparse.SUPPRESS, - help='Show this help message and exit.', + help="Show this help message and exit.", ) args = parser.parse_args(args=remaining_args) try: - LttngLiveServer( - args.port_filename, args.sessions, args.max_query_data_response_size + sessions = _session_descriptors_from_path( + args.sessions_filename, + args.trace_path_prefix, ) + LttngLiveServer(args.port_filename, sessions, args.max_query_data_response_size) except UnexpectedInput as exc: logging.error(str(exc)) print(exc, file=sys.stderr)