X-Git-Url: http://git.efficios.com/?p=babeltrace.git;a=blobdiff_plain;f=tests%2Fdata%2Fplugins%2Fsrc.ctf.lttng-live%2Flttng_live_server.py;fp=tests%2Fdata%2Fplugins%2Fsrc.ctf.lttng-live%2Flttng_live_server.py;h=d0eb26d25c5a343b37a50574d223fab0a9a7c369;hp=0ffb909968a7f8a3a9f497c29be0fdb90880d599;hb=f5567ea88d172767b34373bc6e402da8bfd85ef8;hpb=419d8c49bd978a59b8a0619d83cb6ba26b18f970 diff --git a/tests/data/plugins/src.ctf.lttng-live/lttng_live_server.py b/tests/data/plugins/src.ctf.lttng-live/lttng_live_server.py index 0ffb9099..d0eb26d2 100644 --- a/tests/data/plugins/src.ctf.lttng-live/lttng_live_server.py +++ b/tests/data/plugins/src.ctf.lttng-live/lttng_live_server.py @@ -394,14 +394,14 @@ class _LttngLiveViewerDetachFromTracingSessionReply: # An LTTng live protocol codec can convert bytes to command objects and # reply objects to bytes. class _LttngLiveViewerProtocolCodec: - _COMMAND_HEADER_STRUCT_FMT = 'QII' + _COMMAND_HEADER_STRUCT_FMT = "QII" _COMMAND_HEADER_SIZE_BYTES = struct.calcsize(_COMMAND_HEADER_STRUCT_FMT) def __init__(self): pass def _unpack(self, fmt, data, offset=0): - fmt = '!' + fmt + fmt = "!" + fmt return struct.unpack_from(fmt, data, offset) def _unpack_payload(self, fmt, data): @@ -418,7 +418,7 @@ class _LttngLiveViewerProtocolCodec: self._COMMAND_HEADER_STRUCT_FMT, data ) logging.info( - 'Decoded command header: payload-size={}, cmd-type={}, version={}'.format( + "Decoded command header: payload-size={}, cmd-type={}, version={}".format( payload_size, cmd_type, version ) ) @@ -429,7 +429,7 @@ class _LttngLiveViewerProtocolCodec: if cmd_type == 1: viewer_session_id, major, minor, conn_type = self._unpack_payload( - 'QIII', data + "QIII", data ) return _LttngLiveViewerConnectCommand( version, viewer_session_id, major, minor @@ -437,46 +437,46 @@ class _LttngLiveViewerProtocolCodec: elif cmd_type == 2: return _LttngLiveViewerGetTracingSessionInfosCommand(version) elif cmd_type == 3: - tracing_session_id, offset, seek_type = self._unpack_payload('QQI', data) + tracing_session_id, offset, seek_type = self._unpack_payload("QQI", data) return _LttngLiveViewerAttachToTracingSessionCommand( version, tracing_session_id, offset, seek_type ) elif cmd_type == 4: - (stream_id,) = self._unpack_payload('Q', data) + (stream_id,) = self._unpack_payload("Q", data) return _LttngLiveViewerGetNextDataStreamIndexEntryCommand( version, stream_id ) elif cmd_type == 5: - stream_id, offset, req_length = self._unpack_payload('QQI', data) + stream_id, offset, req_length = self._unpack_payload("QQI", data) return _LttngLiveViewerGetDataStreamPacketDataCommand( version, stream_id, offset, req_length ) elif cmd_type == 6: - (stream_id,) = self._unpack_payload('Q', data) + (stream_id,) = self._unpack_payload("Q", data) return _LttngLiveViewerGetMetadataStreamDataCommand(version, stream_id) elif cmd_type == 7: - (tracing_session_id,) = self._unpack_payload('Q', data) + (tracing_session_id,) = self._unpack_payload("Q", data) return _LttngLiveViewerGetNewStreamInfosCommand(version, tracing_session_id) elif cmd_type == 8: return _LttngLiveViewerCreateViewerSessionCommand(version) elif cmd_type == 9: - (tracing_session_id,) = self._unpack_payload('Q', data) + (tracing_session_id,) = self._unpack_payload("Q", data) return _LttngLiveViewerDetachFromTracingSessionCommand( version, tracing_session_id ) else: - raise UnexpectedInput('Unknown command type {}'.format(cmd_type)) + raise UnexpectedInput("Unknown command type {}".format(cmd_type)) def _pack(self, fmt, *args): # Force network byte order - return struct.pack('!' + fmt, *args) + return struct.pack("!" + fmt, *args) def _encode_zero_padded_str(self, string, length): data = string.encode() - return data.ljust(length, b'\x00') + return data.ljust(length, b"\x00") def _encode_stream_info(self, info): - data = self._pack('QQI', info.id, info.trace_id, int(info.is_metadata)) + data = self._pack("QQI", info.id, info.trace_id, int(info.is_metadata)) data += self._encode_zero_padded_str(info.path, 4096) data += self._encode_zero_padded_str(info.channel_name, 255) return data @@ -495,14 +495,14 @@ class _LttngLiveViewerProtocolCodec: def encode(self, reply): if type(reply) is _LttngLiveViewerConnectReply: data = self._pack( - 'QIII', reply.viewer_session_id, reply.major, reply.minor, 2 + "QIII", reply.viewer_session_id, reply.major, reply.minor, 2 ) elif type(reply) is _LttngLiveViewerGetTracingSessionInfosReply: - data = self._pack('I', len(reply.tracing_session_infos)) + data = self._pack("I", len(reply.tracing_session_infos)) for info in reply.tracing_session_infos: data += self._pack( - 'QIII', + "QIII", info.tracing_session_id, info.live_timer_freq, info.client_count, @@ -511,12 +511,12 @@ class _LttngLiveViewerProtocolCodec: data += self._encode_zero_padded_str(info.hostname, 64) data += self._encode_zero_padded_str(info.name, 255) elif type(reply) is _LttngLiveViewerAttachToTracingSessionReply: - data = self._pack('II', reply.status, len(reply.stream_infos)) + data = self._pack("II", reply.status, len(reply.stream_infos)) for info in reply.stream_infos: data += self._encode_stream_info(info) elif type(reply) is _LttngLiveViewerGetNextDataStreamIndexEntryReply: - index_format = 'QQQQQQQII' + index_format = "QQQQQQQII" entry = reply.index_entry flags = self._get_has_new_stuff_flags( reply.has_new_metadata, reply.has_new_data_stream @@ -553,23 +553,23 @@ class _LttngLiveViewerProtocolCodec: flags = self._get_has_new_stuff_flags( reply.has_new_metadata, reply.has_new_data_stream ) - data = self._pack('III', reply.status, len(reply.data), flags) + data = self._pack("III", reply.status, len(reply.data), flags) data += reply.data elif type(reply) is _LttngLiveViewerGetMetadataStreamDataContentReply: - data = self._pack('QI', len(reply.data), reply.status) + data = self._pack("QI", len(reply.data), reply.status) data += reply.data elif type(reply) is _LttngLiveViewerGetNewStreamInfosReply: - data = self._pack('II', reply.status, len(reply.stream_infos)) + data = self._pack("II", reply.status, len(reply.stream_infos)) for info in reply.stream_infos: data += self._encode_stream_info(info) elif type(reply) is _LttngLiveViewerCreateViewerSessionReply: - data = self._pack('I', reply.status) + data = self._pack("I", reply.status) elif type(reply) is _LttngLiveViewerDetachFromTracingSessionReply: - data = self._pack('I', reply.status) + data = self._pack("I", reply.status) else: raise ValueError( - 'Unknown reply object with class `{}`'.format(reply.__class__.__name__) + "Unknown reply object with class `{}`".format(reply.__class__.__name__) ) return data @@ -680,9 +680,9 @@ class _LttngDataStreamIndex(collections.abc.Sequence): self._entries = [] assert os.path.isfile(self._path) - with open(self._path, 'rb') as f: + with open(self._path, "rb") as f: # Read header first - fmt = '>IIII' + fmt = ">IIII" size = struct.calcsize(fmt) data = f.read(size) assert len(data) == size @@ -692,7 +692,7 @@ class _LttngDataStreamIndex(collections.abc.Sequence): assert magic == 0xC1F1DCC1 # Read index entries - fmt = '>QQQQQQQ' + fmt = ">QQQQQQQ" size = struct.calcsize(fmt) while True: @@ -760,13 +760,13 @@ class _LttngDataStream: def __init__(self, path, beacons): self._path = path filename = os.path.basename(path) - match = re.match(r'(.*)_\d+', filename) + match = re.match(r"(.*)_\d+", filename) self._channel_name = match.group(1) trace_dir = os.path.dirname(path) - index_path = os.path.join(trace_dir, 'index', filename + '.idx') + index_path = os.path.join(trace_dir, "index", filename + ".idx") self._index = _LttngDataStreamIndex(index_path, beacons) assert os.path.isfile(path) - self._file = open(path, 'rb') + self._file = open(path, "rb") logging.info( 'Built data stream: path="{}", channel-name="{}"'.format( path, self._channel_name @@ -798,7 +798,7 @@ class _LttngMetadataStreamSection: else: self._data = data logging.info( - 'Built metadata stream section: ts={}, data-len={}'.format( + "Built metadata stream section: ts={}, data-len={}".format( self._timestamp, len(self._data) ) ) @@ -818,7 +818,7 @@ class _LttngMetadataStream: self._path = metadata_file_path self._sections = config_sections logging.info( - 'Built metadata stream: path={}, section-len={}'.format( + "Built metadata stream: path={}, section-len={}".format( self._path, len(self._sections) ) ) @@ -833,7 +833,7 @@ class _LttngMetadataStream: LttngMetadataConfigSection = namedtuple( - 'LttngMetadataConfigSection', ['line', 'timestamp', 'is_empty'] + "LttngMetadataConfigSection", ["line", "timestamp", "is_empty"] ) @@ -845,14 +845,14 @@ def _parse_metadata_sections_config(config_sections): last_line = 0 for config_section in config_sections: - if config_section == 'empty': + if config_section == "empty": # Found a empty section marker. Actually append the section at the # timestamp of the next concrete section. append_empty_section = True else: assert type(config_section) is dict - line = config_section.get('line') - ts = config_section.get('timestamp') + line = config_section.get("line") + ts = config_section.get("timestamp") # Sections' timestamps and lines must both be increasing. assert ts > last_timestamp @@ -877,7 +877,7 @@ def _split_metadata_sections(metadata_file_path, raw_config_sections): parsed_sections = _parse_metadata_sections_config(raw_config_sections) sections = [] - with open(metadata_file_path, 'r') as metadata_file: + with open(metadata_file_path, "r") as metadata_file: metadata_lines = [line for line in metadata_file] config_metadata_sections_idx = 0 @@ -891,7 +891,7 @@ def _split_metadata_sections(metadata_file_path, raw_config_sections): # If there are no more sections, simply append the line. if config_metadata_sections_idx + 1 >= len(parsed_sections): - curr_metadata_section += bytearray(line_content, 'utf8') + curr_metadata_section += bytearray(line_content, "utf8") continue next_section_line_number = parsed_sections[ @@ -915,7 +915,7 @@ def _split_metadata_sections(metadata_file_path, raw_config_sections): # Clear old content and append current line for the next section. curr_metadata_section.clear() - curr_metadata_section += bytearray(line_content, 'utf8') + curr_metadata_section += bytearray(line_content, "utf8") # Append any empty sections. while parsed_sections[config_metadata_sections_idx].is_empty: @@ -927,7 +927,7 @@ def _split_metadata_sections(metadata_file_path, raw_config_sections): config_metadata_sections_idx += 1 else: # Append line_content to the current metadata section. - curr_metadata_section += bytearray(line_content, 'utf8') + curr_metadata_section += bytearray(line_content, "utf8") # We iterated over all the lines of the metadata file. Close the current section. sections.append( @@ -958,10 +958,10 @@ class LttngTrace(collections.abc.Sequence): if not os.path.isfile(path): continue - if filename.startswith('.'): + if filename.startswith("."): continue - if filename == 'metadata': + if filename == "metadata": continue data_stream_paths.append(path) @@ -981,11 +981,11 @@ class LttngTrace(collections.abc.Sequence): ) def _create_metadata_stream(self, trace_dir, config_metadata_sections): - metadata_path = os.path.join(trace_dir, 'metadata') + metadata_path = os.path.join(trace_dir, "metadata") metadata_sections = [] if config_metadata_sections is None: - with open(metadata_path, 'rb') as metadata_file: + with open(metadata_path, "rb") as metadata_file: metadata_sections.append( _LttngMetadataStreamSection(0, metadata_file.read()) ) @@ -1134,7 +1134,7 @@ class _LttngLiveViewerSessionTracingSessionState: # Metadata stream -> stream info and metadata stream state info = _LttngLiveViewerStreamInfo( - stream_id, trace_id, True, trace.metadata_stream.path, 'metadata' + stream_id, trace_id, True, trace.metadata_stream.path, "metadata" ) self._stream_infos.append(info) self._ms_states[stream_id] = _LttngLiveViewerSessionMetadataStreamState( @@ -1246,20 +1246,20 @@ class _LttngLiveViewerSession: def _get_tracing_session_state(self, tracing_session_id): if tracing_session_id not in self._ts_states: raise UnexpectedInput( - 'Unknown tracing session ID {}'.format(tracing_session_id) + "Unknown tracing session ID {}".format(tracing_session_id) ) return self._ts_states[tracing_session_id] def _get_stream_state(self, stream_id): if stream_id not in self._stream_states: - UnexpectedInput('Unknown stream ID {}'.format(stream_id)) + UnexpectedInput("Unknown stream ID {}".format(stream_id)) return self._stream_states[stream_id] def handle_command(self, cmd): logging.info( - 'Handling command in viewer session: cmd-cls-name={}'.format( + "Handling command in viewer session: cmd-cls-name={}".format( cmd.__class__.__name__ ) ) @@ -1267,7 +1267,7 @@ class _LttngLiveViewerSession: if cmd_type not in self._command_handlers: raise UnexpectedInput( - 'Unexpected command: cmd-cls-name={}'.format(cmd.__class__.__name__) + "Unexpected command: cmd-cls-name={}".format(cmd.__class__.__name__) ) return self._command_handlers[cmd_type](cmd) @@ -1280,7 +1280,7 @@ class _LttngLiveViewerSession: if ts_state.is_attached: raise UnexpectedInput( - 'Cannot attach to tracing session `{}`: viewer is already attached'.format( + "Cannot attach to tracing session `{}`: viewer is already attached".format( info.name ) ) @@ -1299,7 +1299,7 @@ class _LttngLiveViewerSession: if not ts_state.is_attached: raise UnexpectedInput( - 'Cannot detach to tracing session `{}`: viewer is not attached'.format( + "Cannot detach to tracing session `{}`: viewer is not attached".format( info.name ) ) @@ -1316,7 +1316,7 @@ class _LttngLiveViewerSession: if type(stream_state) is not _LttngLiveViewerSessionDataStreamState: raise UnexpectedInput( - 'Stream with ID {} is not a data stream'.format(cmd.stream_id) + "Stream with ID {} is not a data stream".format(cmd.stream_id) ) if stream_state.cur_index_entry is None: @@ -1360,7 +1360,7 @@ class _LttngLiveViewerSession: if type(stream_state) is not _LttngLiveViewerSessionDataStreamState: raise UnexpectedInput( - 'Stream with ID {} is not a data stream'.format(cmd.stream_id) + "Stream with ID {} is not a data stream".format(cmd.stream_id) ) if stream_state.tracing_session_state.has_new_metadata: @@ -1393,7 +1393,7 @@ class _LttngLiveViewerSession: is not _LttngLiveViewerSessionMetadataStreamState ): raise UnexpectedInput( - 'Stream with ID {} is not a metadata stream'.format(cmd.stream_id) + "Stream with ID {} is not a metadata stream".format(cmd.stream_id) ) if metadata_stream_state.is_sent: @@ -1461,13 +1461,13 @@ class LttngLiveServer: def __init__( self, port_filename, tracing_session_descriptors, max_query_data_response_size ): - logging.info('Server configuration:') + logging.info("Server configuration:") - logging.info(' Port file name: `{}`'.format(port_filename)) + logging.info(" Port file name: `{}`".format(port_filename)) if max_query_data_response_size is not None: logging.info( - ' Maximum response data query size: `{}`'.format( + " Maximum response data query size: `{}`".format( max_query_data_response_size ) ) @@ -1495,7 +1495,7 @@ class LttngLiveServer: self._codec = _LttngLiveViewerProtocolCodec() # Port 0: OS assigns an unused port - serv_addr = ('localhost', 0) + serv_addr = ("localhost", 0) self._sock.bind(serv_addr) self._write_port_to_file(port_filename) @@ -1503,7 +1503,7 @@ class LttngLiveServer: self._listen() finally: self._sock.close() - logging.info('Closed connection and socket.') + logging.info("Closed connection and socket.") @property def _server_port(self): @@ -1513,33 +1513,33 @@ class LttngLiveServer: data = bytes() while True: - logging.info('Waiting for viewer command.') + logging.info("Waiting for viewer command.") buf = self._conn.recv(128) if not buf: - logging.info('Client closed connection.') + logging.info("Client closed connection.") if data: raise UnexpectedInput( - 'Client closed connection after having sent {} command bytes.'.format( + "Client closed connection after having sent {} command bytes.".format( len(data) ) ) return - logging.info('Received data from viewer: length={}'.format(len(buf))) + logging.info("Received data from viewer: length={}".format(len(buf))) data += buf try: cmd = self._codec.decode(data) except struct.error as exc: - raise UnexpectedInput('Malformed command: {}'.format(exc)) from exc + raise UnexpectedInput("Malformed command: {}".format(exc)) from exc if cmd is not None: logging.info( - 'Received command from viewer: cmd-cls-name={}'.format( + "Received command from viewer: cmd-cls-name={}".format( cmd.__class__.__name__ ) ) @@ -1548,7 +1548,7 @@ class LttngLiveServer: def _send_reply(self, reply): data = self._codec.encode(reply) logging.info( - 'Sending reply to viewer: reply-cls-name={}, length={}'.format( + "Sending reply to viewer: reply-cls-name={}, length={}".format( reply.__class__.__name__, len(data) ) ) @@ -1567,7 +1567,7 @@ class LttngLiveServer: # Create viewer session (arbitrary ID 23) logging.info( - 'LTTng live viewer connected: version={}.{}'.format(cmd.major, cmd.minor) + "LTTng live viewer connected: version={}.{}".format(cmd.major, cmd.minor) ) viewer_session = _LttngLiveViewerSession( 23, self._ts_descriptors, self._max_query_data_response_size @@ -1590,13 +1590,13 @@ class LttngLiveServer: self._send_reply(viewer_session.handle_command(cmd)) def _listen(self): - logging.info('Listening: port={}'.format(self._server_port)) + logging.info("Listening: port={}".format(self._server_port)) # Backlog must be present for Python version < 3.5. # 128 is an arbitrary number since we expect only 1 connection anyway. self._sock.listen(128) self._conn, viewer_addr = self._sock.accept() logging.info( - 'Accepted viewer: addr={}:{}'.format(viewer_addr[0], viewer_addr[1]) + "Accepted viewer: addr={}:{}".format(viewer_addr[0], viewer_addr[1]) ) try: @@ -1606,8 +1606,8 @@ class LttngLiveServer: def _write_port_to_file(self, port_filename): # Write the port number to a temporary file. - with tempfile.NamedTemporaryFile(mode='w', delete=False) as tmp_port_file: - print(self._server_port, end='', file=tmp_port_file) + with tempfile.NamedTemporaryFile(mode="w", delete=False) as tmp_port_file: + print(self._server_port, end="", file=tmp_port_file) # Rename temporary file to real file os.replace(tmp_port_file.name, port_filename) @@ -1628,7 +1628,7 @@ class LttngTracingSessionDescriptor: ): for trace in traces: if name not in trace.path: - fmt = 'Tracing session name must be part of every trace path (`{}` not found in `{}`)' + fmt = "Tracing session name must be part of every trace path (`{}` not found in `{}`)" raise ValueError(fmt.format(name, trace.path)) self._traces = traces @@ -1680,23 +1680,23 @@ def _session_descriptors_from_path(sessions_filename, trace_path_prefix): # ] # } # ] - with open(sessions_filename, 'r') as sessions_file: + with open(sessions_filename, "r") as sessions_file: params = json.load(sessions_file) sessions = [] for session in params: - name = session['name'] - tracing_session_id = session['id'] - hostname = session['hostname'] - live_timer_freq = session['live-timer-freq'] - client_count = session['client-count'] + name = session["name"] + tracing_session_id = session["id"] + hostname = session["hostname"] + live_timer_freq = session["live-timer-freq"] + client_count = session["client-count"] traces = [] - for trace in session['traces']: - metadata_sections = trace.get('metadata-sections') - beacons = trace.get('beacons') - path = trace['path'] + for trace in session["traces"]: + metadata_sections = trace.get("metadata-sections") + beacons = trace.get("beacons") + path = trace["path"] if not os.path.isabs(path): path = os.path.join(trace_path_prefix, path) @@ -1718,54 +1718,54 @@ def _session_descriptors_from_path(sessions_filename, trace_path_prefix): def _loglevel_parser(string): - loglevels = {'info': logging.INFO, 'warning': logging.WARNING} + loglevels = {"info": logging.INFO, "warning": logging.WARNING} if string not in loglevels: msg = "{} is not a valid loglevel".format(string) raise argparse.ArgumentTypeError(msg) return loglevels[string] -if __name__ == '__main__': - logging.basicConfig(format='# %(asctime)-25s%(message)s') +if __name__ == "__main__": + logging.basicConfig(format="# %(asctime)-25s%(message)s") parser = argparse.ArgumentParser( - description='LTTng-live protocol mocker', add_help=False + description="LTTng-live protocol mocker", add_help=False ) parser.add_argument( - '--log-level', - default='warning', - choices=['info', 'warning'], - help='The loglevel to be used.', + "--log-level", + default="warning", + choices=["info", "warning"], + help="The loglevel to be used.", ) loglevel_namespace, remaining_args = parser.parse_known_args() logging.getLogger().setLevel(_loglevel_parser(loglevel_namespace.log_level)) parser.add_argument( - '--port-filename', - help='The final port file. This file is present when the server is ready to receive connection.', + "--port-filename", + help="The final port file. This file is present when the server is ready to receive connection.", required=True, ) parser.add_argument( - '--max-query-data-response-size', + "--max-query-data-response-size", type=int, - help='The maximum size of control data response in bytes', + help="The maximum size of control data response in bytes", ) parser.add_argument( - '--trace-path-prefix', + "--trace-path-prefix", type=str, - help='Prefix to prepend to the trace paths of session configurations', + help="Prefix to prepend to the trace paths of session configurations", ) parser.add_argument( - '--sessions-filename', + "--sessions-filename", type=str, - help='Path to a session configuration file', + help="Path to a session configuration file", ) parser.add_argument( - '-h', - '--help', - action='help', + "-h", + "--help", + action="help", default=argparse.SUPPRESS, - help='Show this help message and exit.', + help="Show this help message and exit.", ) args = parser.parse_args(args=remaining_args)