# An LTTng live protocol codec can convert bytes to command objects and
# reply objects to bytes.
class _LttngLiveViewerProtocolCodec:
- _COMMAND_HEADER_STRUCT_FMT = 'QII'
+ _COMMAND_HEADER_STRUCT_FMT = "QII"
_COMMAND_HEADER_SIZE_BYTES = struct.calcsize(_COMMAND_HEADER_STRUCT_FMT)
def __init__(self):
pass
def _unpack(self, fmt, data, offset=0):
- fmt = '!' + fmt
+ fmt = "!" + fmt
return struct.unpack_from(fmt, data, offset)
def _unpack_payload(self, fmt, data):
self._COMMAND_HEADER_STRUCT_FMT, data
)
logging.info(
- 'Decoded command header: payload-size={}, cmd-type={}, version={}'.format(
+ "Decoded command header: payload-size={}, cmd-type={}, version={}".format(
payload_size, cmd_type, version
)
)
if cmd_type == 1:
viewer_session_id, major, minor, conn_type = self._unpack_payload(
- 'QIII', data
+ "QIII", data
)
return _LttngLiveViewerConnectCommand(
version, viewer_session_id, major, minor
elif cmd_type == 2:
return _LttngLiveViewerGetTracingSessionInfosCommand(version)
elif cmd_type == 3:
- tracing_session_id, offset, seek_type = self._unpack_payload('QQI', data)
+ tracing_session_id, offset, seek_type = self._unpack_payload("QQI", data)
return _LttngLiveViewerAttachToTracingSessionCommand(
version, tracing_session_id, offset, seek_type
)
elif cmd_type == 4:
- (stream_id,) = self._unpack_payload('Q', data)
+ (stream_id,) = self._unpack_payload("Q", data)
return _LttngLiveViewerGetNextDataStreamIndexEntryCommand(
version, stream_id
)
elif cmd_type == 5:
- stream_id, offset, req_length = self._unpack_payload('QQI', data)
+ stream_id, offset, req_length = self._unpack_payload("QQI", data)
return _LttngLiveViewerGetDataStreamPacketDataCommand(
version, stream_id, offset, req_length
)
elif cmd_type == 6:
- (stream_id,) = self._unpack_payload('Q', data)
+ (stream_id,) = self._unpack_payload("Q", data)
return _LttngLiveViewerGetMetadataStreamDataCommand(version, stream_id)
elif cmd_type == 7:
- (tracing_session_id,) = self._unpack_payload('Q', data)
+ (tracing_session_id,) = self._unpack_payload("Q", data)
return _LttngLiveViewerGetNewStreamInfosCommand(version, tracing_session_id)
elif cmd_type == 8:
return _LttngLiveViewerCreateViewerSessionCommand(version)
elif cmd_type == 9:
- (tracing_session_id,) = self._unpack_payload('Q', data)
+ (tracing_session_id,) = self._unpack_payload("Q", data)
return _LttngLiveViewerDetachFromTracingSessionCommand(
version, tracing_session_id
)
else:
- raise UnexpectedInput('Unknown command type {}'.format(cmd_type))
+ raise UnexpectedInput("Unknown command type {}".format(cmd_type))
def _pack(self, fmt, *args):
# Force network byte order
- return struct.pack('!' + fmt, *args)
+ return struct.pack("!" + fmt, *args)
def _encode_zero_padded_str(self, string, length):
data = string.encode()
- return data.ljust(length, b'\x00')
+ return data.ljust(length, b"\x00")
def _encode_stream_info(self, info):
- data = self._pack('QQI', info.id, info.trace_id, int(info.is_metadata))
+ data = self._pack("QQI", info.id, info.trace_id, int(info.is_metadata))
data += self._encode_zero_padded_str(info.path, 4096)
data += self._encode_zero_padded_str(info.channel_name, 255)
return data
def encode(self, reply):
if type(reply) is _LttngLiveViewerConnectReply:
data = self._pack(
- 'QIII', reply.viewer_session_id, reply.major, reply.minor, 2
+ "QIII", reply.viewer_session_id, reply.major, reply.minor, 2
)
elif type(reply) is _LttngLiveViewerGetTracingSessionInfosReply:
- data = self._pack('I', len(reply.tracing_session_infos))
+ data = self._pack("I", len(reply.tracing_session_infos))
for info in reply.tracing_session_infos:
data += self._pack(
- 'QIII',
+ "QIII",
info.tracing_session_id,
info.live_timer_freq,
info.client_count,
data += self._encode_zero_padded_str(info.hostname, 64)
data += self._encode_zero_padded_str(info.name, 255)
elif type(reply) is _LttngLiveViewerAttachToTracingSessionReply:
- data = self._pack('II', reply.status, len(reply.stream_infos))
+ data = self._pack("II", reply.status, len(reply.stream_infos))
for info in reply.stream_infos:
data += self._encode_stream_info(info)
elif type(reply) is _LttngLiveViewerGetNextDataStreamIndexEntryReply:
- index_format = 'QQQQQQQII'
+ index_format = "QQQQQQQII"
entry = reply.index_entry
flags = self._get_has_new_stuff_flags(
reply.has_new_metadata, reply.has_new_data_stream
flags = self._get_has_new_stuff_flags(
reply.has_new_metadata, reply.has_new_data_stream
)
- data = self._pack('III', reply.status, len(reply.data), flags)
+ data = self._pack("III", reply.status, len(reply.data), flags)
data += reply.data
elif type(reply) is _LttngLiveViewerGetMetadataStreamDataContentReply:
- data = self._pack('QI', len(reply.data), reply.status)
+ data = self._pack("QI", len(reply.data), reply.status)
data += reply.data
elif type(reply) is _LttngLiveViewerGetNewStreamInfosReply:
- data = self._pack('II', reply.status, len(reply.stream_infos))
+ data = self._pack("II", reply.status, len(reply.stream_infos))
for info in reply.stream_infos:
data += self._encode_stream_info(info)
elif type(reply) is _LttngLiveViewerCreateViewerSessionReply:
- data = self._pack('I', reply.status)
+ data = self._pack("I", reply.status)
elif type(reply) is _LttngLiveViewerDetachFromTracingSessionReply:
- data = self._pack('I', reply.status)
+ data = self._pack("I", reply.status)
else:
raise ValueError(
- 'Unknown reply object with class `{}`'.format(reply.__class__.__name__)
+ "Unknown reply object with class `{}`".format(reply.__class__.__name__)
)
return data
self._entries = []
assert os.path.isfile(self._path)
- with open(self._path, 'rb') as f:
+ with open(self._path, "rb") as f:
# Read header first
- fmt = '>IIII'
+ fmt = ">IIII"
size = struct.calcsize(fmt)
data = f.read(size)
assert len(data) == size
assert magic == 0xC1F1DCC1
# Read index entries
- fmt = '>QQQQQQQ'
+ fmt = ">QQQQQQQ"
size = struct.calcsize(fmt)
while True:
def __init__(self, path, beacons):
self._path = path
filename = os.path.basename(path)
- match = re.match(r'(.*)_\d+', filename)
+ match = re.match(r"(.*)_\d+", filename)
self._channel_name = match.group(1)
trace_dir = os.path.dirname(path)
- index_path = os.path.join(trace_dir, 'index', filename + '.idx')
+ index_path = os.path.join(trace_dir, "index", filename + ".idx")
self._index = _LttngDataStreamIndex(index_path, beacons)
assert os.path.isfile(path)
- self._file = open(path, 'rb')
+ self._file = open(path, "rb")
logging.info(
'Built data stream: path="{}", channel-name="{}"'.format(
path, self._channel_name
else:
self._data = data
logging.info(
- 'Built metadata stream section: ts={}, data-len={}'.format(
+ "Built metadata stream section: ts={}, data-len={}".format(
self._timestamp, len(self._data)
)
)
self._path = metadata_file_path
self._sections = config_sections
logging.info(
- 'Built metadata stream: path={}, section-len={}'.format(
+ "Built metadata stream: path={}, section-len={}".format(
self._path, len(self._sections)
)
)
LttngMetadataConfigSection = namedtuple(
- 'LttngMetadataConfigSection', ['line', 'timestamp', 'is_empty']
+ "LttngMetadataConfigSection", ["line", "timestamp", "is_empty"]
)
last_line = 0
for config_section in config_sections:
- if config_section == 'empty':
+ if config_section == "empty":
# Found a empty section marker. Actually append the section at the
# timestamp of the next concrete section.
append_empty_section = True
else:
assert type(config_section) is dict
- line = config_section.get('line')
- ts = config_section.get('timestamp')
+ line = config_section.get("line")
+ ts = config_section.get("timestamp")
# Sections' timestamps and lines must both be increasing.
assert ts > last_timestamp
parsed_sections = _parse_metadata_sections_config(raw_config_sections)
sections = []
- with open(metadata_file_path, 'r') as metadata_file:
+ with open(metadata_file_path, "r") as metadata_file:
metadata_lines = [line for line in metadata_file]
config_metadata_sections_idx = 0
# If there are no more sections, simply append the line.
if config_metadata_sections_idx + 1 >= len(parsed_sections):
- curr_metadata_section += bytearray(line_content, 'utf8')
+ curr_metadata_section += bytearray(line_content, "utf8")
continue
next_section_line_number = parsed_sections[
# Clear old content and append current line for the next section.
curr_metadata_section.clear()
- curr_metadata_section += bytearray(line_content, 'utf8')
+ curr_metadata_section += bytearray(line_content, "utf8")
# Append any empty sections.
while parsed_sections[config_metadata_sections_idx].is_empty:
config_metadata_sections_idx += 1
else:
# Append line_content to the current metadata section.
- curr_metadata_section += bytearray(line_content, 'utf8')
+ curr_metadata_section += bytearray(line_content, "utf8")
# We iterated over all the lines of the metadata file. Close the current section.
sections.append(
if not os.path.isfile(path):
continue
- if filename.startswith('.'):
+ if filename.startswith("."):
continue
- if filename == 'metadata':
+ if filename == "metadata":
continue
data_stream_paths.append(path)
)
def _create_metadata_stream(self, trace_dir, config_metadata_sections):
- metadata_path = os.path.join(trace_dir, 'metadata')
+ metadata_path = os.path.join(trace_dir, "metadata")
metadata_sections = []
if config_metadata_sections is None:
- with open(metadata_path, 'rb') as metadata_file:
+ with open(metadata_path, "rb") as metadata_file:
metadata_sections.append(
_LttngMetadataStreamSection(0, metadata_file.read())
)
# Metadata stream -> stream info and metadata stream state
info = _LttngLiveViewerStreamInfo(
- stream_id, trace_id, True, trace.metadata_stream.path, 'metadata'
+ stream_id, trace_id, True, trace.metadata_stream.path, "metadata"
)
self._stream_infos.append(info)
self._ms_states[stream_id] = _LttngLiveViewerSessionMetadataStreamState(
def _get_tracing_session_state(self, tracing_session_id):
if tracing_session_id not in self._ts_states:
raise UnexpectedInput(
- 'Unknown tracing session ID {}'.format(tracing_session_id)
+ "Unknown tracing session ID {}".format(tracing_session_id)
)
return self._ts_states[tracing_session_id]
def _get_stream_state(self, stream_id):
if stream_id not in self._stream_states:
- UnexpectedInput('Unknown stream ID {}'.format(stream_id))
+ UnexpectedInput("Unknown stream ID {}".format(stream_id))
return self._stream_states[stream_id]
def handle_command(self, cmd):
logging.info(
- 'Handling command in viewer session: cmd-cls-name={}'.format(
+ "Handling command in viewer session: cmd-cls-name={}".format(
cmd.__class__.__name__
)
)
if cmd_type not in self._command_handlers:
raise UnexpectedInput(
- 'Unexpected command: cmd-cls-name={}'.format(cmd.__class__.__name__)
+ "Unexpected command: cmd-cls-name={}".format(cmd.__class__.__name__)
)
return self._command_handlers[cmd_type](cmd)
if ts_state.is_attached:
raise UnexpectedInput(
- 'Cannot attach to tracing session `{}`: viewer is already attached'.format(
+ "Cannot attach to tracing session `{}`: viewer is already attached".format(
info.name
)
)
if not ts_state.is_attached:
raise UnexpectedInput(
- 'Cannot detach to tracing session `{}`: viewer is not attached'.format(
+ "Cannot detach to tracing session `{}`: viewer is not attached".format(
info.name
)
)
if type(stream_state) is not _LttngLiveViewerSessionDataStreamState:
raise UnexpectedInput(
- 'Stream with ID {} is not a data stream'.format(cmd.stream_id)
+ "Stream with ID {} is not a data stream".format(cmd.stream_id)
)
if stream_state.cur_index_entry is None:
if type(stream_state) is not _LttngLiveViewerSessionDataStreamState:
raise UnexpectedInput(
- 'Stream with ID {} is not a data stream'.format(cmd.stream_id)
+ "Stream with ID {} is not a data stream".format(cmd.stream_id)
)
if stream_state.tracing_session_state.has_new_metadata:
is not _LttngLiveViewerSessionMetadataStreamState
):
raise UnexpectedInput(
- 'Stream with ID {} is not a metadata stream'.format(cmd.stream_id)
+ "Stream with ID {} is not a metadata stream".format(cmd.stream_id)
)
if metadata_stream_state.is_sent:
def __init__(
self, port_filename, tracing_session_descriptors, max_query_data_response_size
):
- logging.info('Server configuration:')
+ logging.info("Server configuration:")
- logging.info(' Port file name: `{}`'.format(port_filename))
+ logging.info(" Port file name: `{}`".format(port_filename))
if max_query_data_response_size is not None:
logging.info(
- ' Maximum response data query size: `{}`'.format(
+ " Maximum response data query size: `{}`".format(
max_query_data_response_size
)
)
self._codec = _LttngLiveViewerProtocolCodec()
# Port 0: OS assigns an unused port
- serv_addr = ('localhost', 0)
+ serv_addr = ("localhost", 0)
self._sock.bind(serv_addr)
self._write_port_to_file(port_filename)
self._listen()
finally:
self._sock.close()
- logging.info('Closed connection and socket.')
+ logging.info("Closed connection and socket.")
@property
def _server_port(self):
data = bytes()
while True:
- logging.info('Waiting for viewer command.')
+ logging.info("Waiting for viewer command.")
buf = self._conn.recv(128)
if not buf:
- logging.info('Client closed connection.')
+ logging.info("Client closed connection.")
if data:
raise UnexpectedInput(
- 'Client closed connection after having sent {} command bytes.'.format(
+ "Client closed connection after having sent {} command bytes.".format(
len(data)
)
)
return
- logging.info('Received data from viewer: length={}'.format(len(buf)))
+ logging.info("Received data from viewer: length={}".format(len(buf)))
data += buf
try:
cmd = self._codec.decode(data)
except struct.error as exc:
- raise UnexpectedInput('Malformed command: {}'.format(exc)) from exc
+ raise UnexpectedInput("Malformed command: {}".format(exc)) from exc
if cmd is not None:
logging.info(
- 'Received command from viewer: cmd-cls-name={}'.format(
+ "Received command from viewer: cmd-cls-name={}".format(
cmd.__class__.__name__
)
)
def _send_reply(self, reply):
data = self._codec.encode(reply)
logging.info(
- 'Sending reply to viewer: reply-cls-name={}, length={}'.format(
+ "Sending reply to viewer: reply-cls-name={}, length={}".format(
reply.__class__.__name__, len(data)
)
)
# Create viewer session (arbitrary ID 23)
logging.info(
- 'LTTng live viewer connected: version={}.{}'.format(cmd.major, cmd.minor)
+ "LTTng live viewer connected: version={}.{}".format(cmd.major, cmd.minor)
)
viewer_session = _LttngLiveViewerSession(
23, self._ts_descriptors, self._max_query_data_response_size
self._send_reply(viewer_session.handle_command(cmd))
def _listen(self):
- logging.info('Listening: port={}'.format(self._server_port))
+ logging.info("Listening: port={}".format(self._server_port))
# Backlog must be present for Python version < 3.5.
# 128 is an arbitrary number since we expect only 1 connection anyway.
self._sock.listen(128)
self._conn, viewer_addr = self._sock.accept()
logging.info(
- 'Accepted viewer: addr={}:{}'.format(viewer_addr[0], viewer_addr[1])
+ "Accepted viewer: addr={}:{}".format(viewer_addr[0], viewer_addr[1])
)
try:
def _write_port_to_file(self, port_filename):
# Write the port number to a temporary file.
- with tempfile.NamedTemporaryFile(mode='w', delete=False) as tmp_port_file:
- print(self._server_port, end='', file=tmp_port_file)
+ with tempfile.NamedTemporaryFile(mode="w", delete=False) as tmp_port_file:
+ print(self._server_port, end="", file=tmp_port_file)
# Rename temporary file to real file
os.replace(tmp_port_file.name, port_filename)
):
for trace in traces:
if name not in trace.path:
- fmt = 'Tracing session name must be part of every trace path (`{}` not found in `{}`)'
+ fmt = "Tracing session name must be part of every trace path (`{}` not found in `{}`)"
raise ValueError(fmt.format(name, trace.path))
self._traces = traces
# ]
# }
# ]
- with open(sessions_filename, 'r') as sessions_file:
+ with open(sessions_filename, "r") as sessions_file:
params = json.load(sessions_file)
sessions = []
for session in params:
- name = session['name']
- tracing_session_id = session['id']
- hostname = session['hostname']
- live_timer_freq = session['live-timer-freq']
- client_count = session['client-count']
+ name = session["name"]
+ tracing_session_id = session["id"]
+ hostname = session["hostname"]
+ live_timer_freq = session["live-timer-freq"]
+ client_count = session["client-count"]
traces = []
- for trace in session['traces']:
- metadata_sections = trace.get('metadata-sections')
- beacons = trace.get('beacons')
- path = trace['path']
+ for trace in session["traces"]:
+ metadata_sections = trace.get("metadata-sections")
+ beacons = trace.get("beacons")
+ path = trace["path"]
if not os.path.isabs(path):
path = os.path.join(trace_path_prefix, path)
def _loglevel_parser(string):
- loglevels = {'info': logging.INFO, 'warning': logging.WARNING}
+ loglevels = {"info": logging.INFO, "warning": logging.WARNING}
if string not in loglevels:
msg = "{} is not a valid loglevel".format(string)
raise argparse.ArgumentTypeError(msg)
return loglevels[string]
-if __name__ == '__main__':
- logging.basicConfig(format='# %(asctime)-25s%(message)s')
+if __name__ == "__main__":
+ logging.basicConfig(format="# %(asctime)-25s%(message)s")
parser = argparse.ArgumentParser(
- description='LTTng-live protocol mocker', add_help=False
+ description="LTTng-live protocol mocker", add_help=False
)
parser.add_argument(
- '--log-level',
- default='warning',
- choices=['info', 'warning'],
- help='The loglevel to be used.',
+ "--log-level",
+ default="warning",
+ choices=["info", "warning"],
+ help="The loglevel to be used.",
)
loglevel_namespace, remaining_args = parser.parse_known_args()
logging.getLogger().setLevel(_loglevel_parser(loglevel_namespace.log_level))
parser.add_argument(
- '--port-filename',
- help='The final port file. This file is present when the server is ready to receive connection.',
+ "--port-filename",
+ help="The final port file. This file is present when the server is ready to receive connection.",
required=True,
)
parser.add_argument(
- '--max-query-data-response-size',
+ "--max-query-data-response-size",
type=int,
- help='The maximum size of control data response in bytes',
+ help="The maximum size of control data response in bytes",
)
parser.add_argument(
- '--trace-path-prefix',
+ "--trace-path-prefix",
type=str,
- help='Prefix to prepend to the trace paths of session configurations',
+ help="Prefix to prepend to the trace paths of session configurations",
)
parser.add_argument(
- '--sessions-filename',
+ "--sessions-filename",
type=str,
- help='Path to a session configuration file',
+ help="Path to a session configuration file",
)
parser.add_argument(
- '-h',
- '--help',
- action='help',
+ "-h",
+ "--help",
+ action="help",
default=argparse.SUPPRESS,
- help='Show this help message and exit.',
+ help="Show this help message and exit.",
)
args = parser.parse_args(args=remaining_args)