X-Git-Url: http://git.efficios.com/?p=babeltrace.git;a=blobdiff_plain;f=tests%2Fdata%2Fplugins%2Fsrc.ctf.lttng-live%2Flttng_live_server.py;fp=tests%2Fdata%2Fplugins%2Fsrc.ctf.lttng-live%2Flttng_live_server.py;h=0ffb909968a7f8a3a9f497c29be0fdb90880d599;hp=be42ac4f3c2fbc58b96cb736fde03d32af64d22a;hb=78169723129e666342d1dd5bc6194f80d70fe862;hpb=c5ce3927e9c05390d39ba099baa130be9927341f diff --git a/tests/data/plugins/src.ctf.lttng-live/lttng_live_server.py b/tests/data/plugins/src.ctf.lttng-live/lttng_live_server.py index be42ac4f..0ffb9099 100644 --- a/tests/data/plugins/src.ctf.lttng-live/lttng_live_server.py +++ b/tests/data/plugins/src.ctf.lttng-live/lttng_live_server.py @@ -14,6 +14,7 @@ import struct import sys import tempfile import json +from collections import namedtuple class UnexpectedInput(RuntimeError): @@ -648,6 +649,14 @@ class _LttngDataStreamBeaconEntry: return self._stream_class_id +def _get_entry_timestamp_begin(entry): + if type(entry) is _LttngDataStreamBeaconEntry: + return entry.timestamp + else: + assert type(entry) is _LttngDataStreamIndexEntry + return entry.timestamp_begin + + # The index of an LTTng data stream, a sequence of index entries. class _LttngDataStreamIndex(collections.abc.Sequence): def __init__(self, path, beacons): @@ -781,32 +790,162 @@ class _LttngDataStream: return self._file.read(len_bytes) +class _LttngMetadataStreamSection: + def __init__(self, timestamp, data): + self._timestamp = timestamp + if data is None: + self._data = bytes() + else: + self._data = data + logging.info( + 'Built metadata stream section: ts={}, data-len={}'.format( + self._timestamp, len(self._data) + ) + ) + + @property + def timestamp(self): + return self._timestamp + + @property + def data(self): + return self._data + + # An LTTng metadata stream. class _LttngMetadataStream: - def __init__(self, path): - self._path = path - logging.info('Built metadata stream: path="{}"'.format(path)) + def __init__(self, metadata_file_path, config_sections): + self._path = metadata_file_path + self._sections = config_sections + logging.info( + 'Built metadata stream: path={}, section-len={}'.format( + self._path, len(self._sections) + ) + ) @property def path(self): return self._path @property - def data(self): - assert os.path.isfile(self._path) + def sections(self): + return self._sections - with open(self._path, 'rb') as f: - return f.read() + +LttngMetadataConfigSection = namedtuple( + 'LttngMetadataConfigSection', ['line', 'timestamp', 'is_empty'] +) + + +def _parse_metadata_sections_config(config_sections): + assert config_sections is not None + config_metadata_sections = [] + append_empty_section = False + last_timestamp = 0 + last_line = 0 + + for config_section in config_sections: + if config_section == 'empty': + # Found a empty section marker. Actually append the section at the + # timestamp of the next concrete section. + append_empty_section = True + else: + assert type(config_section) is dict + line = config_section.get('line') + ts = config_section.get('timestamp') + + # Sections' timestamps and lines must both be increasing. + assert ts > last_timestamp + last_timestamp = ts + assert line > last_line + last_line = line + + if append_empty_section: + config_metadata_sections.append( + LttngMetadataConfigSection(line, ts, True) + ) + append_empty_section = False + + config_metadata_sections.append(LttngMetadataConfigSection(line, ts, False)) + + return config_metadata_sections + + +def _split_metadata_sections(metadata_file_path, raw_config_sections): + assert isinstance(raw_config_sections, collections.abc.Sequence) + + parsed_sections = _parse_metadata_sections_config(raw_config_sections) + + sections = [] + with open(metadata_file_path, 'r') as metadata_file: + metadata_lines = [line for line in metadata_file] + + config_metadata_sections_idx = 0 + curr_metadata_section = bytearray() + + for idx, line_content in enumerate(metadata_lines): + # Add one to the index to convert from the zero-indexing of the + # enumerate() function to the one-indexing used by humans when + # viewing a text file. + curr_line_number = idx + 1 + + # If there are no more sections, simply append the line. + if config_metadata_sections_idx + 1 >= len(parsed_sections): + curr_metadata_section += bytearray(line_content, 'utf8') + continue + + next_section_line_number = parsed_sections[ + config_metadata_sections_idx + 1 + ].line + + # If the next section begins at the current line, create a + # section with the metadata we gathered so far. + if curr_line_number >= next_section_line_number: + + # Flushing the metadata of the current section. + sections.append( + _LttngMetadataStreamSection( + parsed_sections[config_metadata_sections_idx].timestamp, + bytes(curr_metadata_section), + ) + ) + + # Move to the next section. + config_metadata_sections_idx += 1 + + # Clear old content and append current line for the next section. + curr_metadata_section.clear() + curr_metadata_section += bytearray(line_content, 'utf8') + + # Append any empty sections. + while parsed_sections[config_metadata_sections_idx].is_empty: + sections.append( + _LttngMetadataStreamSection( + parsed_sections[config_metadata_sections_idx].timestamp, None + ) + ) + config_metadata_sections_idx += 1 + else: + # Append line_content to the current metadata section. + curr_metadata_section += bytearray(line_content, 'utf8') + + # We iterated over all the lines of the metadata file. Close the current section. + sections.append( + _LttngMetadataStreamSection( + parsed_sections[config_metadata_sections_idx].timestamp, + bytes(curr_metadata_section), + ) + ) + + return sections # An LTTng trace, a sequence of LTTng data streams. class LttngTrace(collections.abc.Sequence): - def __init__(self, trace_dir, beacons): + def __init__(self, trace_dir, metadata_sections, beacons): assert os.path.isdir(trace_dir) self._path = trace_dir - self._metadata_stream = _LttngMetadataStream( - os.path.join(trace_dir, 'metadata') - ) + self._create_metadata_stream(trace_dir, metadata_sections) self._create_data_streams(trace_dir, beacons) logging.info('Built trace: path="{}"'.format(trace_dir)) @@ -841,6 +980,22 @@ class LttngTrace(collections.abc.Sequence): _LttngDataStream(data_stream_path, this_stream_beacons) ) + def _create_metadata_stream(self, trace_dir, config_metadata_sections): + metadata_path = os.path.join(trace_dir, 'metadata') + metadata_sections = [] + + if config_metadata_sections is None: + with open(metadata_path, 'rb') as metadata_file: + metadata_sections.append( + _LttngMetadataStreamSection(0, metadata_file.read()) + ) + else: + metadata_sections = _split_metadata_sections( + metadata_path, config_metadata_sections + ) + + self._metadata_stream = _LttngMetadataStream(metadata_path, metadata_sections) + @property def path(self): return self._path @@ -858,10 +1013,11 @@ class LttngTrace(collections.abc.Sequence): # The state of a single data stream. class _LttngLiveViewerSessionDataStreamState: - def __init__(self, ts_state, info, data_stream): + def __init__(self, ts_state, info, data_stream, metadata_stream_id): self._ts_state = ts_state self._info = info self._data_stream = data_stream + self._metadata_stream_id = metadata_stream_id self._cur_index_entry_index = 0 fmt = 'Built data stream state: id={}, ts-id={}, ts-name="{}", path="{}"' logging.info( @@ -902,6 +1058,14 @@ class _LttngLiveViewerSessionMetadataStreamState: self._ts_state = ts_state self._info = info self._metadata_stream = metadata_stream + self._cur_metadata_stream_section_index = 0 + if len(metadata_stream.sections) > 1: + self._next_metadata_stream_section_timestamp = metadata_stream.sections[ + 1 + ].timestamp + else: + self._next_metadata_stream_section_timestamp = None + self._is_sent = False fmt = 'Built metadata stream state: id={}, ts-id={}, ts-name="{}", path="{}"' logging.info( @@ -933,6 +1097,28 @@ class _LttngLiveViewerSessionMetadataStreamState: def is_sent(self, value): self._is_sent = value + @property + def cur_section(self): + fmt = "Get current metadata section: section-idx={}" + logging.info(fmt.format(self._cur_metadata_stream_section_index)) + if self._cur_metadata_stream_section_index == len( + self._metadata_stream.sections + ): + return + + return self._metadata_stream.sections[self._cur_metadata_stream_section_index] + + def goto_next_section(self): + self._cur_metadata_stream_section_index += 1 + if self.cur_section: + self._next_metadata_stream_section_timestamp = self.cur_section.timestamp + else: + self._next_metadata_stream_section_timestamp = None + + @property + def next_section_timestamp(self): + return self._next_metadata_stream_section_timestamp + # The state of a tracing session. class _LttngLiveViewerSessionTracingSessionState: @@ -946,6 +1132,17 @@ class _LttngLiveViewerSessionTracingSessionState: for trace in tc_descr.traces: trace_id = stream_id * 1000 + # Metadata stream -> stream info and metadata stream state + info = _LttngLiveViewerStreamInfo( + stream_id, trace_id, True, trace.metadata_stream.path, 'metadata' + ) + self._stream_infos.append(info) + self._ms_states[stream_id] = _LttngLiveViewerSessionMetadataStreamState( + self, info, trace.metadata_stream + ) + metadata_stream_id = stream_id + stream_id += 1 + # Data streams -> stream infos and data stream states for data_stream in trace: info = _LttngLiveViewerStreamInfo( @@ -957,20 +1154,10 @@ class _LttngLiveViewerSessionTracingSessionState: ) self._stream_infos.append(info) self._ds_states[stream_id] = _LttngLiveViewerSessionDataStreamState( - self, info, data_stream + self, info, data_stream, metadata_stream_id ) stream_id += 1 - # Metadata stream -> stream info and metadata stream state - info = _LttngLiveViewerStreamInfo( - stream_id, trace_id, True, trace.metadata_stream.path, 'metadata' - ) - self._stream_infos.append(info) - self._ms_states[stream_id] = _LttngLiveViewerSessionMetadataStreamState( - self, info, trace.metadata_stream - ) - stream_id += 1 - self._is_attached = False fmt = 'Built tracing session state: id={}, name="{}"' logging.info(fmt.format(tc_descr.info.tracing_session_id, tc_descr.info.name)) @@ -1004,6 +1191,16 @@ class _LttngLiveViewerSessionTracingSessionState: self._is_attached = value +def needs_new_metadata_section(metadata_stream_state, latest_timestamp): + if metadata_stream_state.next_section_timestamp is None: + return False + + if latest_timestamp >= metadata_stream_state.next_section_timestamp: + return True + else: + return False + + # An LTTng live viewer session manages a view on tracing sessions # and replies to commands accordingly. class _LttngLiveViewerSession: @@ -1115,6 +1312,7 @@ class _LttngLiveViewerSession: fmt = 'Handling "get next data stream index entry" command: stream-id={}' logging.info(fmt.format(cmd.stream_id)) stream_state = self._get_stream_state(cmd.stream_id) + metadata_stream_state = self._get_stream_state(stream_state._metadata_stream_id) if type(stream_state) is not _LttngLiveViewerSessionDataStreamState: raise UnexpectedInput( @@ -1133,6 +1331,12 @@ class _LttngLiveViewerSession: status, index_entry, False, False ) + timestamp_begin = _get_entry_timestamp_begin(stream_state.cur_index_entry) + + if needs_new_metadata_section(metadata_stream_state, timestamp_begin): + metadata_stream_state.is_sent = False + metadata_stream_state.goto_next_section() + # The viewer only checks the `has_new_metadata` flag if the # reply's status is `OK`, so we need to provide an index here has_new_metadata = stream_state.tracing_session_state.has_new_metadata @@ -1182,21 +1386,33 @@ class _LttngLiveViewerSession: def _handle_get_metadata_stream_data_command(self, cmd): fmt = 'Handling "get metadata stream data" command: stream-id={}' logging.info(fmt.format(cmd.stream_id)) - stream_state = self._get_stream_state(cmd.stream_id) + metadata_stream_state = self._get_stream_state(cmd.stream_id) - if type(stream_state) is not _LttngLiveViewerSessionMetadataStreamState: + if ( + type(metadata_stream_state) + is not _LttngLiveViewerSessionMetadataStreamState + ): raise UnexpectedInput( 'Stream with ID {} is not a metadata stream'.format(cmd.stream_id) ) - if stream_state.is_sent: + if metadata_stream_state.is_sent: status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.NO_NEW return _LttngLiveViewerGetMetadataStreamDataContentReply(status, bytes()) - stream_state.is_sent = True + metadata_stream_state.is_sent = True status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.OK + metadata_section = metadata_stream_state.cur_section + + # If we are sending an empty section, ready the next one right away. + if len(metadata_section.data) == 0: + metadata_stream_state.is_sent = False + metadata_stream_state.goto_next_section() + + fmt = 'Replying to "get metadata stream data" command: metadata-size={}' + logging.info(fmt.format(len(metadata_section.data))) return _LttngLiveViewerGetMetadataStreamDataContentReply( - status, stream_state.metadata_stream.data + status, metadata_section.data ) def _handle_get_new_stream_infos_command(self, cmd): @@ -1453,7 +1669,13 @@ def _session_descriptors_from_path(sessions_filename, trace_path_prefix): # "path": "meow/mix", # "beacons": { # "my_stream": [ 5235787, 728375283 ] - # } + # }, + # "metadata-sections": [ + # { + # "line": 1, + # "timestamp": 0 + # } + # ] # } # ] # } @@ -1472,13 +1694,14 @@ def _session_descriptors_from_path(sessions_filename, trace_path_prefix): traces = [] for trace in session['traces']: + metadata_sections = trace.get('metadata-sections') beacons = trace.get('beacons') path = trace['path'] if not os.path.isabs(path): path = os.path.join(trace_path_prefix, path) - traces.append(LttngTrace(path, beacons)) + traces.append(LttngTrace(path, metadata_sections, beacons)) sessions.append( LttngTracingSessionDescriptor( @@ -1548,7 +1771,8 @@ if __name__ == '__main__': args = parser.parse_args(args=remaining_args) try: sessions = _session_descriptors_from_path( - args.sessions_filename, args.trace_path_prefix + args.sessions_filename, + args.trace_path_prefix, ) LttngLiveServer(args.port_filename, sessions, args.max_query_data_response_size) except UnexpectedInput as exc: