Tests: src.ctf.lttng-live: split metadata sections
[babeltrace.git] / tests / data / plugins / src.ctf.lttng-live / lttng_live_server.py
index be42ac4f3c2fbc58b96cb736fde03d32af64d22a..0ffb909968a7f8a3a9f497c29be0fdb90880d599 100644 (file)
@@ -14,6 +14,7 @@ import struct
 import sys
 import tempfile
 import json
 import sys
 import tempfile
 import json
+from collections import namedtuple
 
 
 class UnexpectedInput(RuntimeError):
 
 
 class UnexpectedInput(RuntimeError):
@@ -648,6 +649,14 @@ class _LttngDataStreamBeaconEntry:
         return self._stream_class_id
 
 
         return self._stream_class_id
 
 
+def _get_entry_timestamp_begin(entry):
+    if type(entry) is _LttngDataStreamBeaconEntry:
+        return entry.timestamp
+    else:
+        assert type(entry) is _LttngDataStreamIndexEntry
+        return entry.timestamp_begin
+
+
 # The index of an LTTng data stream, a sequence of index entries.
 class _LttngDataStreamIndex(collections.abc.Sequence):
     def __init__(self, path, beacons):
 # The index of an LTTng data stream, a sequence of index entries.
 class _LttngDataStreamIndex(collections.abc.Sequence):
     def __init__(self, path, beacons):
@@ -781,32 +790,162 @@ class _LttngDataStream:
         return self._file.read(len_bytes)
 
 
         return self._file.read(len_bytes)
 
 
+class _LttngMetadataStreamSection:
+    def __init__(self, timestamp, data):
+        self._timestamp = timestamp
+        if data is None:
+            self._data = bytes()
+        else:
+            self._data = data
+        logging.info(
+            'Built metadata stream section: ts={}, data-len={}'.format(
+                self._timestamp, len(self._data)
+            )
+        )
+
+    @property
+    def timestamp(self):
+        return self._timestamp
+
+    @property
+    def data(self):
+        return self._data
+
+
 # An LTTng metadata stream.
 class _LttngMetadataStream:
 # An LTTng metadata stream.
 class _LttngMetadataStream:
-    def __init__(self, path):
-        self._path = path
-        logging.info('Built metadata stream: path="{}"'.format(path))
+    def __init__(self, metadata_file_path, config_sections):
+        self._path = metadata_file_path
+        self._sections = config_sections
+        logging.info(
+            'Built metadata stream: path={}, section-len={}'.format(
+                self._path, len(self._sections)
+            )
+        )
 
     @property
     def path(self):
         return self._path
 
     @property
 
     @property
     def path(self):
         return self._path
 
     @property
-    def data(self):
-        assert os.path.isfile(self._path)
+    def sections(self):
+        return self._sections
 
 
-        with open(self._path, 'rb') as f:
-            return f.read()
+
+LttngMetadataConfigSection = namedtuple(
+    'LttngMetadataConfigSection', ['line', 'timestamp', 'is_empty']
+)
+
+
+def _parse_metadata_sections_config(config_sections):
+    assert config_sections is not None
+    config_metadata_sections = []
+    append_empty_section = False
+    last_timestamp = 0
+    last_line = 0
+
+    for config_section in config_sections:
+        if config_section == 'empty':
+            # Found a empty section marker. Actually append the section at the
+            # timestamp of the next concrete section.
+            append_empty_section = True
+        else:
+            assert type(config_section) is dict
+            line = config_section.get('line')
+            ts = config_section.get('timestamp')
+
+            # Sections' timestamps and lines must both be increasing.
+            assert ts > last_timestamp
+            last_timestamp = ts
+            assert line > last_line
+            last_line = line
+
+            if append_empty_section:
+                config_metadata_sections.append(
+                    LttngMetadataConfigSection(line, ts, True)
+                )
+                append_empty_section = False
+
+            config_metadata_sections.append(LttngMetadataConfigSection(line, ts, False))
+
+    return config_metadata_sections
+
+
+def _split_metadata_sections(metadata_file_path, raw_config_sections):
+    assert isinstance(raw_config_sections, collections.abc.Sequence)
+
+    parsed_sections = _parse_metadata_sections_config(raw_config_sections)
+
+    sections = []
+    with open(metadata_file_path, 'r') as metadata_file:
+        metadata_lines = [line for line in metadata_file]
+
+    config_metadata_sections_idx = 0
+    curr_metadata_section = bytearray()
+
+    for idx, line_content in enumerate(metadata_lines):
+        # Add one to the index to convert from the zero-indexing of the
+        # enumerate() function to the one-indexing used by humans when
+        # viewing a text file.
+        curr_line_number = idx + 1
+
+        # If there are no more sections, simply append the line.
+        if config_metadata_sections_idx + 1 >= len(parsed_sections):
+            curr_metadata_section += bytearray(line_content, 'utf8')
+            continue
+
+        next_section_line_number = parsed_sections[
+            config_metadata_sections_idx + 1
+        ].line
+
+        # If the next section begins at the current line, create a
+        # section with the metadata we gathered so far.
+        if curr_line_number >= next_section_line_number:
+
+            # Flushing the metadata of the current section.
+            sections.append(
+                _LttngMetadataStreamSection(
+                    parsed_sections[config_metadata_sections_idx].timestamp,
+                    bytes(curr_metadata_section),
+                )
+            )
+
+            # Move to the next section.
+            config_metadata_sections_idx += 1
+
+            # Clear old content and append current line for the next section.
+            curr_metadata_section.clear()
+            curr_metadata_section += bytearray(line_content, 'utf8')
+
+            # Append any empty sections.
+            while parsed_sections[config_metadata_sections_idx].is_empty:
+                sections.append(
+                    _LttngMetadataStreamSection(
+                        parsed_sections[config_metadata_sections_idx].timestamp, None
+                    )
+                )
+                config_metadata_sections_idx += 1
+        else:
+            # Append line_content to the current metadata section.
+            curr_metadata_section += bytearray(line_content, 'utf8')
+
+    # We iterated over all the lines of the metadata file. Close the current section.
+    sections.append(
+        _LttngMetadataStreamSection(
+            parsed_sections[config_metadata_sections_idx].timestamp,
+            bytes(curr_metadata_section),
+        )
+    )
+
+    return sections
 
 
 # An LTTng trace, a sequence of LTTng data streams.
 class LttngTrace(collections.abc.Sequence):
 
 
 # An LTTng trace, a sequence of LTTng data streams.
 class LttngTrace(collections.abc.Sequence):
-    def __init__(self, trace_dir, beacons):
+    def __init__(self, trace_dir, metadata_sections, beacons):
         assert os.path.isdir(trace_dir)
         self._path = trace_dir
         assert os.path.isdir(trace_dir)
         self._path = trace_dir
-        self._metadata_stream = _LttngMetadataStream(
-            os.path.join(trace_dir, 'metadata')
-        )
+        self._create_metadata_stream(trace_dir, metadata_sections)
         self._create_data_streams(trace_dir, beacons)
         logging.info('Built trace: path="{}"'.format(trace_dir))
 
         self._create_data_streams(trace_dir, beacons)
         logging.info('Built trace: path="{}"'.format(trace_dir))
 
@@ -841,6 +980,22 @@ class LttngTrace(collections.abc.Sequence):
                 _LttngDataStream(data_stream_path, this_stream_beacons)
             )
 
                 _LttngDataStream(data_stream_path, this_stream_beacons)
             )
 
+    def _create_metadata_stream(self, trace_dir, config_metadata_sections):
+        metadata_path = os.path.join(trace_dir, 'metadata')
+        metadata_sections = []
+
+        if config_metadata_sections is None:
+            with open(metadata_path, 'rb') as metadata_file:
+                metadata_sections.append(
+                    _LttngMetadataStreamSection(0, metadata_file.read())
+                )
+        else:
+            metadata_sections = _split_metadata_sections(
+                metadata_path, config_metadata_sections
+            )
+
+        self._metadata_stream = _LttngMetadataStream(metadata_path, metadata_sections)
+
     @property
     def path(self):
         return self._path
     @property
     def path(self):
         return self._path
@@ -858,10 +1013,11 @@ class LttngTrace(collections.abc.Sequence):
 
 # The state of a single data stream.
 class _LttngLiveViewerSessionDataStreamState:
 
 # The state of a single data stream.
 class _LttngLiveViewerSessionDataStreamState:
-    def __init__(self, ts_state, info, data_stream):
+    def __init__(self, ts_state, info, data_stream, metadata_stream_id):
         self._ts_state = ts_state
         self._info = info
         self._data_stream = data_stream
         self._ts_state = ts_state
         self._info = info
         self._data_stream = data_stream
+        self._metadata_stream_id = metadata_stream_id
         self._cur_index_entry_index = 0
         fmt = 'Built data stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
         logging.info(
         self._cur_index_entry_index = 0
         fmt = 'Built data stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
         logging.info(
@@ -902,6 +1058,14 @@ class _LttngLiveViewerSessionMetadataStreamState:
         self._ts_state = ts_state
         self._info = info
         self._metadata_stream = metadata_stream
         self._ts_state = ts_state
         self._info = info
         self._metadata_stream = metadata_stream
+        self._cur_metadata_stream_section_index = 0
+        if len(metadata_stream.sections) > 1:
+            self._next_metadata_stream_section_timestamp = metadata_stream.sections[
+                1
+            ].timestamp
+        else:
+            self._next_metadata_stream_section_timestamp = None
+
         self._is_sent = False
         fmt = 'Built metadata stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
         logging.info(
         self._is_sent = False
         fmt = 'Built metadata stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
         logging.info(
@@ -933,6 +1097,28 @@ class _LttngLiveViewerSessionMetadataStreamState:
     def is_sent(self, value):
         self._is_sent = value
 
     def is_sent(self, value):
         self._is_sent = value
 
+    @property
+    def cur_section(self):
+        fmt = "Get current metadata section: section-idx={}"
+        logging.info(fmt.format(self._cur_metadata_stream_section_index))
+        if self._cur_metadata_stream_section_index == len(
+            self._metadata_stream.sections
+        ):
+            return
+
+        return self._metadata_stream.sections[self._cur_metadata_stream_section_index]
+
+    def goto_next_section(self):
+        self._cur_metadata_stream_section_index += 1
+        if self.cur_section:
+            self._next_metadata_stream_section_timestamp = self.cur_section.timestamp
+        else:
+            self._next_metadata_stream_section_timestamp = None
+
+    @property
+    def next_section_timestamp(self):
+        return self._next_metadata_stream_section_timestamp
+
 
 # The state of a tracing session.
 class _LttngLiveViewerSessionTracingSessionState:
 
 # The state of a tracing session.
 class _LttngLiveViewerSessionTracingSessionState:
@@ -946,6 +1132,17 @@ class _LttngLiveViewerSessionTracingSessionState:
         for trace in tc_descr.traces:
             trace_id = stream_id * 1000
 
         for trace in tc_descr.traces:
             trace_id = stream_id * 1000
 
+            # Metadata stream -> stream info and metadata stream state
+            info = _LttngLiveViewerStreamInfo(
+                stream_id, trace_id, True, trace.metadata_stream.path, 'metadata'
+            )
+            self._stream_infos.append(info)
+            self._ms_states[stream_id] = _LttngLiveViewerSessionMetadataStreamState(
+                self, info, trace.metadata_stream
+            )
+            metadata_stream_id = stream_id
+            stream_id += 1
+
             # Data streams -> stream infos and data stream states
             for data_stream in trace:
                 info = _LttngLiveViewerStreamInfo(
             # Data streams -> stream infos and data stream states
             for data_stream in trace:
                 info = _LttngLiveViewerStreamInfo(
@@ -957,20 +1154,10 @@ class _LttngLiveViewerSessionTracingSessionState:
                 )
                 self._stream_infos.append(info)
                 self._ds_states[stream_id] = _LttngLiveViewerSessionDataStreamState(
                 )
                 self._stream_infos.append(info)
                 self._ds_states[stream_id] = _LttngLiveViewerSessionDataStreamState(
-                    self, info, data_stream
+                    self, info, data_stream, metadata_stream_id
                 )
                 stream_id += 1
 
                 )
                 stream_id += 1
 
-            # Metadata stream -> stream info and metadata stream state
-            info = _LttngLiveViewerStreamInfo(
-                stream_id, trace_id, True, trace.metadata_stream.path, 'metadata'
-            )
-            self._stream_infos.append(info)
-            self._ms_states[stream_id] = _LttngLiveViewerSessionMetadataStreamState(
-                self, info, trace.metadata_stream
-            )
-            stream_id += 1
-
         self._is_attached = False
         fmt = 'Built tracing session state: id={}, name="{}"'
         logging.info(fmt.format(tc_descr.info.tracing_session_id, tc_descr.info.name))
         self._is_attached = False
         fmt = 'Built tracing session state: id={}, name="{}"'
         logging.info(fmt.format(tc_descr.info.tracing_session_id, tc_descr.info.name))
@@ -1004,6 +1191,16 @@ class _LttngLiveViewerSessionTracingSessionState:
         self._is_attached = value
 
 
         self._is_attached = value
 
 
+def needs_new_metadata_section(metadata_stream_state, latest_timestamp):
+    if metadata_stream_state.next_section_timestamp is None:
+        return False
+
+    if latest_timestamp >= metadata_stream_state.next_section_timestamp:
+        return True
+    else:
+        return False
+
+
 # An LTTng live viewer session manages a view on tracing sessions
 # and replies to commands accordingly.
 class _LttngLiveViewerSession:
 # An LTTng live viewer session manages a view on tracing sessions
 # and replies to commands accordingly.
 class _LttngLiveViewerSession:
@@ -1115,6 +1312,7 @@ class _LttngLiveViewerSession:
         fmt = 'Handling "get next data stream index entry" command: stream-id={}'
         logging.info(fmt.format(cmd.stream_id))
         stream_state = self._get_stream_state(cmd.stream_id)
         fmt = 'Handling "get next data stream index entry" command: stream-id={}'
         logging.info(fmt.format(cmd.stream_id))
         stream_state = self._get_stream_state(cmd.stream_id)
+        metadata_stream_state = self._get_stream_state(stream_state._metadata_stream_id)
 
         if type(stream_state) is not _LttngLiveViewerSessionDataStreamState:
             raise UnexpectedInput(
 
         if type(stream_state) is not _LttngLiveViewerSessionDataStreamState:
             raise UnexpectedInput(
@@ -1133,6 +1331,12 @@ class _LttngLiveViewerSession:
                 status, index_entry, False, False
             )
 
                 status, index_entry, False, False
             )
 
+        timestamp_begin = _get_entry_timestamp_begin(stream_state.cur_index_entry)
+
+        if needs_new_metadata_section(metadata_stream_state, timestamp_begin):
+            metadata_stream_state.is_sent = False
+            metadata_stream_state.goto_next_section()
+
         # The viewer only checks the `has_new_metadata` flag if the
         # reply's status is `OK`, so we need to provide an index here
         has_new_metadata = stream_state.tracing_session_state.has_new_metadata
         # The viewer only checks the `has_new_metadata` flag if the
         # reply's status is `OK`, so we need to provide an index here
         has_new_metadata = stream_state.tracing_session_state.has_new_metadata
@@ -1182,21 +1386,33 @@ class _LttngLiveViewerSession:
     def _handle_get_metadata_stream_data_command(self, cmd):
         fmt = 'Handling "get metadata stream data" command: stream-id={}'
         logging.info(fmt.format(cmd.stream_id))
     def _handle_get_metadata_stream_data_command(self, cmd):
         fmt = 'Handling "get metadata stream data" command: stream-id={}'
         logging.info(fmt.format(cmd.stream_id))
-        stream_state = self._get_stream_state(cmd.stream_id)
+        metadata_stream_state = self._get_stream_state(cmd.stream_id)
 
 
-        if type(stream_state) is not _LttngLiveViewerSessionMetadataStreamState:
+        if (
+            type(metadata_stream_state)
+            is not _LttngLiveViewerSessionMetadataStreamState
+        ):
             raise UnexpectedInput(
                 'Stream with ID {} is not a metadata stream'.format(cmd.stream_id)
             )
 
             raise UnexpectedInput(
                 'Stream with ID {} is not a metadata stream'.format(cmd.stream_id)
             )
 
-        if stream_state.is_sent:
+        if metadata_stream_state.is_sent:
             status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.NO_NEW
             return _LttngLiveViewerGetMetadataStreamDataContentReply(status, bytes())
 
             status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.NO_NEW
             return _LttngLiveViewerGetMetadataStreamDataContentReply(status, bytes())
 
-        stream_state.is_sent = True
+        metadata_stream_state.is_sent = True
         status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.OK
         status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.OK
+        metadata_section = metadata_stream_state.cur_section
+
+        # If we are sending an empty section, ready the next one right away.
+        if len(metadata_section.data) == 0:
+            metadata_stream_state.is_sent = False
+            metadata_stream_state.goto_next_section()
+
+        fmt = 'Replying to "get metadata stream data" command: metadata-size={}'
+        logging.info(fmt.format(len(metadata_section.data)))
         return _LttngLiveViewerGetMetadataStreamDataContentReply(
         return _LttngLiveViewerGetMetadataStreamDataContentReply(
-            status, stream_state.metadata_stream.data
+            status, metadata_section.data
         )
 
     def _handle_get_new_stream_infos_command(self, cmd):
         )
 
     def _handle_get_new_stream_infos_command(self, cmd):
@@ -1453,7 +1669,13 @@ def _session_descriptors_from_path(sessions_filename, trace_path_prefix):
     #                     "path": "meow/mix",
     #                     "beacons": {
     #                         "my_stream": [ 5235787, 728375283 ]
     #                     "path": "meow/mix",
     #                     "beacons": {
     #                         "my_stream": [ 5235787, 728375283 ]
-    #                     }
+    #                     },
+    #                     "metadata-sections": [
+    #                           {
+    #                                "line": 1,
+    #                                "timestamp": 0
+    #                           }
+    #                      ]
     #                 }
     #             ]
     #         }
     #                 }
     #             ]
     #         }
@@ -1472,13 +1694,14 @@ def _session_descriptors_from_path(sessions_filename, trace_path_prefix):
         traces = []
 
         for trace in session['traces']:
         traces = []
 
         for trace in session['traces']:
+            metadata_sections = trace.get('metadata-sections')
             beacons = trace.get('beacons')
             path = trace['path']
 
             if not os.path.isabs(path):
                 path = os.path.join(trace_path_prefix, path)
 
             beacons = trace.get('beacons')
             path = trace['path']
 
             if not os.path.isabs(path):
                 path = os.path.join(trace_path_prefix, path)
 
-            traces.append(LttngTrace(path, beacons))
+            traces.append(LttngTrace(path, metadata_sections, beacons))
 
         sessions.append(
             LttngTracingSessionDescriptor(
 
         sessions.append(
             LttngTracingSessionDescriptor(
@@ -1548,7 +1771,8 @@ if __name__ == '__main__':
     args = parser.parse_args(args=remaining_args)
     try:
         sessions = _session_descriptors_from_path(
     args = parser.parse_args(args=remaining_args)
     try:
         sessions = _session_descriptors_from_path(
-            args.sessions_filename, args.trace_path_prefix
+            args.sessions_filename,
+            args.trace_path_prefix,
         )
         LttngLiveServer(args.port_filename, sessions, args.max_query_data_response_size)
     except UnexpectedInput as exc:
         )
         LttngLiveServer(args.port_filename, sessions, args.max_query_data_response_size)
     except UnexpectedInput as exc:
This page took 0.027326 seconds and 4 git commands to generate.