X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=tests%2Fdata%2Fplugins%2Fsrc.ctf.lttng-live%2Flttng_live_server.py;h=0ffb909968a7f8a3a9f497c29be0fdb90880d599;hb=78169723129e666342d1dd5bc6194f80d70fe862;hp=77b005ca3083cb264352a02aeab32a0e47748cb4;hpb=0235b0db7de5bcacdb3650c92461f2ce5eb2143d;p=babeltrace.git diff --git a/tests/data/plugins/src.ctf.lttng-live/lttng_live_server.py b/tests/data/plugins/src.ctf.lttng-live/lttng_live_server.py index 77b005ca..0ffb9099 100644 --- a/tests/data/plugins/src.ctf.lttng-live/lttng_live_server.py +++ b/tests/data/plugins/src.ctf.lttng-live/lttng_live_server.py @@ -13,6 +13,8 @@ import socket import struct import sys import tempfile +import json +from collections import namedtuple class UnexpectedInput(RuntimeError): @@ -514,23 +516,39 @@ class _LttngLiveViewerProtocolCodec: for info in reply.stream_infos: data += self._encode_stream_info(info) elif type(reply) is _LttngLiveViewerGetNextDataStreamIndexEntryReply: + index_format = 'QQQQQQQII' entry = reply.index_entry flags = self._get_has_new_stuff_flags( reply.has_new_metadata, reply.has_new_data_stream ) - data = self._pack( - 'QQQQQQQII', - entry.offset_bytes, - entry.total_size_bits, - entry.content_size_bits, - entry.timestamp_begin, - entry.timestamp_end, - entry.events_discarded, - entry.stream_class_id, - reply.status, - flags, - ) + if type(entry) is _LttngDataStreamIndexEntry: + data = self._pack( + index_format, + entry.offset_bytes, + entry.total_size_bits, + entry.content_size_bits, + entry.timestamp_begin, + entry.timestamp_end, + entry.events_discarded, + entry.stream_class_id, + reply.status, + flags, + ) + else: + assert type(entry) is _LttngDataStreamBeaconEntry + data = self._pack( + index_format, + 0, + 0, + 0, + 0, + entry.timestamp, + 0, + entry.stream_class_id, + reply.status, + flags, + ) elif type(reply) is _LttngLiveViewerGetDataStreamPacketDataReply: flags = self._get_has_new_stuff_flags( reply.has_new_metadata, reply.has_new_data_stream @@ -614,11 +632,44 @@ class _LttngDataStreamIndexEntry: return self._stream_class_id +# An entry within the index of an LTTng data stream. While a stream beacon entry +# is conceptually unrelated to an index, it is sent as a reply to a +# LttngLiveViewerGetNextDataStreamIndexEntryCommand +class _LttngDataStreamBeaconEntry: + def __init__(self, stream_class_id, timestamp): + self._stream_class_id = stream_class_id + self._timestamp = timestamp + + @property + def timestamp(self): + return self._timestamp + + @property + def stream_class_id(self): + return self._stream_class_id + + +def _get_entry_timestamp_begin(entry): + if type(entry) is _LttngDataStreamBeaconEntry: + return entry.timestamp + else: + assert type(entry) is _LttngDataStreamIndexEntry + return entry.timestamp_begin + + # The index of an LTTng data stream, a sequence of index entries. class _LttngDataStreamIndex(collections.abc.Sequence): - def __init__(self, path): + def __init__(self, path, beacons): self._path = path self._build() + + if beacons: + stream_class_id = self._entries[0].stream_class_id + beacons = [ + _LttngDataStreamBeaconEntry(stream_class_id, ts) for ts in beacons + ] + self._add_beacons(beacons) + logging.info( 'Built data stream index entries: path="{}", count={}'.format( path, len(self._entries) @@ -682,6 +733,17 @@ class _LttngDataStreamIndex(collections.abc.Sequence): # Skip anything else before the next entry f.seek(index_entry_length - size, os.SEEK_CUR) + def _add_beacons(self, beacons): + # Assumes entries[n + 1].timestamp_end >= entries[n].timestamp_begin + def sort_key(entry): + if type(entry) is _LttngDataStreamBeaconEntry: + return entry.timestamp + else: + return entry.timestamp_end + + self._entries += beacons + self._entries.sort(key=sort_key) + def __getitem__(self, index): return self._entries[index] @@ -695,14 +757,14 @@ class _LttngDataStreamIndex(collections.abc.Sequence): # An LTTng data stream. class _LttngDataStream: - def __init__(self, path): + def __init__(self, path, beacons): self._path = path filename = os.path.basename(path) match = re.match(r'(.*)_\d+', filename) self._channel_name = match.group(1) trace_dir = os.path.dirname(path) index_path = os.path.join(trace_dir, 'index', filename + '.idx') - self._index = _LttngDataStreamIndex(index_path) + self._index = _LttngDataStreamIndex(index_path, beacons) assert os.path.isfile(path) self._file = open(path, 'rb') logging.info( @@ -728,36 +790,166 @@ class _LttngDataStream: return self._file.read(len_bytes) +class _LttngMetadataStreamSection: + def __init__(self, timestamp, data): + self._timestamp = timestamp + if data is None: + self._data = bytes() + else: + self._data = data + logging.info( + 'Built metadata stream section: ts={}, data-len={}'.format( + self._timestamp, len(self._data) + ) + ) + + @property + def timestamp(self): + return self._timestamp + + @property + def data(self): + return self._data + + # An LTTng metadata stream. class _LttngMetadataStream: - def __init__(self, path): - self._path = path - logging.info('Built metadata stream: path="{}"'.format(path)) + def __init__(self, metadata_file_path, config_sections): + self._path = metadata_file_path + self._sections = config_sections + logging.info( + 'Built metadata stream: path={}, section-len={}'.format( + self._path, len(self._sections) + ) + ) @property def path(self): return self._path @property - def data(self): - assert os.path.isfile(self._path) + def sections(self): + return self._sections + + +LttngMetadataConfigSection = namedtuple( + 'LttngMetadataConfigSection', ['line', 'timestamp', 'is_empty'] +) + + +def _parse_metadata_sections_config(config_sections): + assert config_sections is not None + config_metadata_sections = [] + append_empty_section = False + last_timestamp = 0 + last_line = 0 + + for config_section in config_sections: + if config_section == 'empty': + # Found a empty section marker. Actually append the section at the + # timestamp of the next concrete section. + append_empty_section = True + else: + assert type(config_section) is dict + line = config_section.get('line') + ts = config_section.get('timestamp') + + # Sections' timestamps and lines must both be increasing. + assert ts > last_timestamp + last_timestamp = ts + assert line > last_line + last_line = line + + if append_empty_section: + config_metadata_sections.append( + LttngMetadataConfigSection(line, ts, True) + ) + append_empty_section = False + + config_metadata_sections.append(LttngMetadataConfigSection(line, ts, False)) + + return config_metadata_sections - with open(self._path, 'rb') as f: - return f.read() + +def _split_metadata_sections(metadata_file_path, raw_config_sections): + assert isinstance(raw_config_sections, collections.abc.Sequence) + + parsed_sections = _parse_metadata_sections_config(raw_config_sections) + + sections = [] + with open(metadata_file_path, 'r') as metadata_file: + metadata_lines = [line for line in metadata_file] + + config_metadata_sections_idx = 0 + curr_metadata_section = bytearray() + + for idx, line_content in enumerate(metadata_lines): + # Add one to the index to convert from the zero-indexing of the + # enumerate() function to the one-indexing used by humans when + # viewing a text file. + curr_line_number = idx + 1 + + # If there are no more sections, simply append the line. + if config_metadata_sections_idx + 1 >= len(parsed_sections): + curr_metadata_section += bytearray(line_content, 'utf8') + continue + + next_section_line_number = parsed_sections[ + config_metadata_sections_idx + 1 + ].line + + # If the next section begins at the current line, create a + # section with the metadata we gathered so far. + if curr_line_number >= next_section_line_number: + + # Flushing the metadata of the current section. + sections.append( + _LttngMetadataStreamSection( + parsed_sections[config_metadata_sections_idx].timestamp, + bytes(curr_metadata_section), + ) + ) + + # Move to the next section. + config_metadata_sections_idx += 1 + + # Clear old content and append current line for the next section. + curr_metadata_section.clear() + curr_metadata_section += bytearray(line_content, 'utf8') + + # Append any empty sections. + while parsed_sections[config_metadata_sections_idx].is_empty: + sections.append( + _LttngMetadataStreamSection( + parsed_sections[config_metadata_sections_idx].timestamp, None + ) + ) + config_metadata_sections_idx += 1 + else: + # Append line_content to the current metadata section. + curr_metadata_section += bytearray(line_content, 'utf8') + + # We iterated over all the lines of the metadata file. Close the current section. + sections.append( + _LttngMetadataStreamSection( + parsed_sections[config_metadata_sections_idx].timestamp, + bytes(curr_metadata_section), + ) + ) + + return sections # An LTTng trace, a sequence of LTTng data streams. class LttngTrace(collections.abc.Sequence): - def __init__(self, trace_dir): + def __init__(self, trace_dir, metadata_sections, beacons): assert os.path.isdir(trace_dir) self._path = trace_dir - self._metadata_stream = _LttngMetadataStream( - os.path.join(trace_dir, 'metadata') - ) - self._create_data_streams(trace_dir) + self._create_metadata_stream(trace_dir, metadata_sections) + self._create_data_streams(trace_dir, beacons) logging.info('Built trace: path="{}"'.format(trace_dir)) - def _create_data_streams(self, trace_dir): + def _create_data_streams(self, trace_dir, beacons): data_stream_paths = [] for filename in os.listdir(trace_dir): @@ -778,7 +970,31 @@ class LttngTrace(collections.abc.Sequence): self._data_streams = [] for data_stream_path in data_stream_paths: - self._data_streams.append(_LttngDataStream(data_stream_path)) + stream_name = os.path.basename(data_stream_path) + this_stream_beacons = None + + if beacons is not None and stream_name in beacons: + this_stream_beacons = beacons[stream_name] + + self._data_streams.append( + _LttngDataStream(data_stream_path, this_stream_beacons) + ) + + def _create_metadata_stream(self, trace_dir, config_metadata_sections): + metadata_path = os.path.join(trace_dir, 'metadata') + metadata_sections = [] + + if config_metadata_sections is None: + with open(metadata_path, 'rb') as metadata_file: + metadata_sections.append( + _LttngMetadataStreamSection(0, metadata_file.read()) + ) + else: + metadata_sections = _split_metadata_sections( + metadata_path, config_metadata_sections + ) + + self._metadata_stream = _LttngMetadataStream(metadata_path, metadata_sections) @property def path(self): @@ -797,10 +1013,11 @@ class LttngTrace(collections.abc.Sequence): # The state of a single data stream. class _LttngLiveViewerSessionDataStreamState: - def __init__(self, ts_state, info, data_stream): + def __init__(self, ts_state, info, data_stream, metadata_stream_id): self._ts_state = ts_state self._info = info self._data_stream = data_stream + self._metadata_stream_id = metadata_stream_id self._cur_index_entry_index = 0 fmt = 'Built data stream state: id={}, ts-id={}, ts-name="{}", path="{}"' logging.info( @@ -841,6 +1058,14 @@ class _LttngLiveViewerSessionMetadataStreamState: self._ts_state = ts_state self._info = info self._metadata_stream = metadata_stream + self._cur_metadata_stream_section_index = 0 + if len(metadata_stream.sections) > 1: + self._next_metadata_stream_section_timestamp = metadata_stream.sections[ + 1 + ].timestamp + else: + self._next_metadata_stream_section_timestamp = None + self._is_sent = False fmt = 'Built metadata stream state: id={}, ts-id={}, ts-name="{}", path="{}"' logging.info( @@ -872,6 +1097,28 @@ class _LttngLiveViewerSessionMetadataStreamState: def is_sent(self, value): self._is_sent = value + @property + def cur_section(self): + fmt = "Get current metadata section: section-idx={}" + logging.info(fmt.format(self._cur_metadata_stream_section_index)) + if self._cur_metadata_stream_section_index == len( + self._metadata_stream.sections + ): + return + + return self._metadata_stream.sections[self._cur_metadata_stream_section_index] + + def goto_next_section(self): + self._cur_metadata_stream_section_index += 1 + if self.cur_section: + self._next_metadata_stream_section_timestamp = self.cur_section.timestamp + else: + self._next_metadata_stream_section_timestamp = None + + @property + def next_section_timestamp(self): + return self._next_metadata_stream_section_timestamp + # The state of a tracing session. class _LttngLiveViewerSessionTracingSessionState: @@ -885,6 +1132,17 @@ class _LttngLiveViewerSessionTracingSessionState: for trace in tc_descr.traces: trace_id = stream_id * 1000 + # Metadata stream -> stream info and metadata stream state + info = _LttngLiveViewerStreamInfo( + stream_id, trace_id, True, trace.metadata_stream.path, 'metadata' + ) + self._stream_infos.append(info) + self._ms_states[stream_id] = _LttngLiveViewerSessionMetadataStreamState( + self, info, trace.metadata_stream + ) + metadata_stream_id = stream_id + stream_id += 1 + # Data streams -> stream infos and data stream states for data_stream in trace: info = _LttngLiveViewerStreamInfo( @@ -896,20 +1154,10 @@ class _LttngLiveViewerSessionTracingSessionState: ) self._stream_infos.append(info) self._ds_states[stream_id] = _LttngLiveViewerSessionDataStreamState( - self, info, data_stream + self, info, data_stream, metadata_stream_id ) stream_id += 1 - # Metadata stream -> stream info and metadata stream state - info = _LttngLiveViewerStreamInfo( - stream_id, trace_id, True, trace.metadata_stream.path, 'metadata' - ) - self._stream_infos.append(info) - self._ms_states[stream_id] = _LttngLiveViewerSessionMetadataStreamState( - self, info, trace.metadata_stream - ) - stream_id += 1 - self._is_attached = False fmt = 'Built tracing session state: id={}, name="{}"' logging.info(fmt.format(tc_descr.info.tracing_session_id, tc_descr.info.name)) @@ -943,6 +1191,16 @@ class _LttngLiveViewerSessionTracingSessionState: self._is_attached = value +def needs_new_metadata_section(metadata_stream_state, latest_timestamp): + if metadata_stream_state.next_section_timestamp is None: + return False + + if latest_timestamp >= metadata_stream_state.next_section_timestamp: + return True + else: + return False + + # An LTTng live viewer session manages a view on tracing sessions # and replies to commands accordingly. class _LttngLiveViewerSession: @@ -1054,6 +1312,7 @@ class _LttngLiveViewerSession: fmt = 'Handling "get next data stream index entry" command: stream-id={}' logging.info(fmt.format(cmd.stream_id)) stream_state = self._get_stream_state(cmd.stream_id) + metadata_stream_state = self._get_stream_state(stream_state._metadata_stream_id) if type(stream_state) is not _LttngLiveViewerSessionDataStreamState: raise UnexpectedInput( @@ -1072,10 +1331,21 @@ class _LttngLiveViewerSession: status, index_entry, False, False ) + timestamp_begin = _get_entry_timestamp_begin(stream_state.cur_index_entry) + + if needs_new_metadata_section(metadata_stream_state, timestamp_begin): + metadata_stream_state.is_sent = False + metadata_stream_state.goto_next_section() + # The viewer only checks the `has_new_metadata` flag if the # reply's status is `OK`, so we need to provide an index here has_new_metadata = stream_state.tracing_session_state.has_new_metadata - status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.OK + if type(stream_state.cur_index_entry) is _LttngDataStreamIndexEntry: + status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.OK + else: + assert type(stream_state.cur_index_entry) is _LttngDataStreamBeaconEntry + status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.INACTIVE + reply = _LttngLiveViewerGetNextDataStreamIndexEntryReply( status, stream_state.cur_index_entry, has_new_metadata, False ) @@ -1116,21 +1386,33 @@ class _LttngLiveViewerSession: def _handle_get_metadata_stream_data_command(self, cmd): fmt = 'Handling "get metadata stream data" command: stream-id={}' logging.info(fmt.format(cmd.stream_id)) - stream_state = self._get_stream_state(cmd.stream_id) + metadata_stream_state = self._get_stream_state(cmd.stream_id) - if type(stream_state) is not _LttngLiveViewerSessionMetadataStreamState: + if ( + type(metadata_stream_state) + is not _LttngLiveViewerSessionMetadataStreamState + ): raise UnexpectedInput( 'Stream with ID {} is not a metadata stream'.format(cmd.stream_id) ) - if stream_state.is_sent: + if metadata_stream_state.is_sent: status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.NO_NEW return _LttngLiveViewerGetMetadataStreamDataContentReply(status, bytes()) - stream_state.is_sent = True + metadata_stream_state.is_sent = True status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.OK + metadata_section = metadata_stream_state.cur_section + + # If we are sending an empty section, ready the next one right away. + if len(metadata_section.data) == 0: + metadata_stream_state.is_sent = False + metadata_stream_state.goto_next_section() + + fmt = 'Replying to "get metadata stream data" command: metadata-size={}' + logging.info(fmt.format(len(metadata_section.data))) return _LttngLiveViewerGetMetadataStreamDataContentReply( - status, stream_state.metadata_stream.data + status, metadata_section.data ) def _handle_get_new_stream_infos_command(self, cmd): @@ -1369,19 +1651,70 @@ class LttngTracingSessionDescriptor: return self._info -def _tracing_session_descriptors_from_arg(string): - # Format is: - # NAME,ID,HOSTNAME,FREQ,CLIENTS,TRACEPATH[,TRACEPATH]... - parts = string.split(',') - name = parts[0] - tracing_session_id = int(parts[1]) - hostname = parts[2] - live_timer_freq = int(parts[3]) - client_count = int(parts[4]) - traces = [LttngTrace(path) for path in parts[5:]] - return LttngTracingSessionDescriptor( - name, tracing_session_id, hostname, live_timer_freq, client_count, traces - ) +def _session_descriptors_from_path(sessions_filename, trace_path_prefix): + # File format is: + # + # [ + # { + # "name": "my-session", + # "id": 17, + # "hostname": "myhost", + # "live-timer-freq": 1000000, + # "client-count": 23, + # "traces": [ + # { + # "path": "lol" + # }, + # { + # "path": "meow/mix", + # "beacons": { + # "my_stream": [ 5235787, 728375283 ] + # }, + # "metadata-sections": [ + # { + # "line": 1, + # "timestamp": 0 + # } + # ] + # } + # ] + # } + # ] + with open(sessions_filename, 'r') as sessions_file: + params = json.load(sessions_file) + + sessions = [] + + for session in params: + name = session['name'] + tracing_session_id = session['id'] + hostname = session['hostname'] + live_timer_freq = session['live-timer-freq'] + client_count = session['client-count'] + traces = [] + + for trace in session['traces']: + metadata_sections = trace.get('metadata-sections') + beacons = trace.get('beacons') + path = trace['path'] + + if not os.path.isabs(path): + path = os.path.join(trace_path_prefix, path) + + traces.append(LttngTrace(path, metadata_sections, beacons)) + + sessions.append( + LttngTracingSessionDescriptor( + name, + tracing_session_id, + hostname, + live_timer_freq, + client_count, + traces, + ) + ) + + return sessions def _loglevel_parser(string): @@ -1418,11 +1751,14 @@ if __name__ == '__main__': help='The maximum size of control data response in bytes', ) parser.add_argument( - 'sessions', - nargs="+", - metavar="SESSION", - type=_tracing_session_descriptors_from_arg, - help='A session configuration. There is no space after comma. Format is: NAME,ID,HOSTNAME,FREQ,CLIENTS,TRACEPATH[,TRACEPATH]....', + '--trace-path-prefix', + type=str, + help='Prefix to prepend to the trace paths of session configurations', + ) + parser.add_argument( + '--sessions-filename', + type=str, + help='Path to a session configuration file', ) parser.add_argument( '-h', @@ -1434,9 +1770,11 @@ if __name__ == '__main__': args = parser.parse_args(args=remaining_args) try: - LttngLiveServer( - args.port_filename, args.sessions, args.max_query_data_response_size + sessions = _session_descriptors_from_path( + args.sessions_filename, + args.trace_path_prefix, ) + LttngLiveServer(args.port_filename, sessions, args.max_query_data_response_size) except UnexpectedInput as exc: logging.error(str(exc)) print(exc, file=sys.stderr)