Tests: src.ctf.lttng-live: split metadata sections
authorFrancis Deslauriers <francis.deslauriers@efficios.com>
Tue, 14 Jul 2020 23:01:22 +0000 (19:01 -0400)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Thu, 24 Feb 2022 23:23:50 +0000 (18:23 -0500)
Background
==========
This commit adds a test case to emulate when a live session is cleared
by the user and the metadata is sent in two different batches.

Here is the scenario this new test case is testing:
  1. Live session is running and generating events,
  2. User clears the session,
  3. Relay empties its metadata stream (Relay is aware that metadata
    is available but has not received it from the consumer since the
    clear),
  4. Babeltrace requests for newly available metadata from the Relay,
  5. Relay sends a zero-sized metadata buffer and status
  `LTTNG_VIEWER_METADATA_OK`,
  6. Babeltrace interpret this as a retry,
  7. Relay receives new (and old) metadata from consumer,
  8. Babeltrace requests for newly available metadata again,
  9. Relay replies with the new metadata section.

This key steps here are 6 to 10 where the relay sends an empty metadata
buffer to signify to the viewer to try again hoping that next time it
will have received the metadata.

Approach
========
To emulate this scenario, this commit adds the concept of metadata
sections to the `lttng_live_server.py` script. A new optional
"metadata-sections" field in the JSON trace description. This list
contains sections described by a JSON object containing the line at
which the metadata section starts and a timestamp at which it must be
sent to the viewer. Empty section are represented by a "empty" string.
    e.g.
        "metadata-sections": [
          {
            "line": 1,
            "timestamp": 1294581
          },
          "empty",
          {
            "line": 111,
            "timestamp": 1295918
          },
        ]

The metadata stream object acts as a queue and is systematically (on
LTTNG_VIEWER_GET_NEXT_INDEX) checked against the current timestamp to
see if the next metadata section needs to be activated.

If the "metadata-sections" field is absent, the uses the metadata file
as a single section, as usual.

Notes
=====
You will notice that the all .expect files were modified to increment
the stream ID of all data streams. This is because we now create the
`_LttngLiveViewerSessionMetadataStreamState` object before (instead of
after) the `_LttngLiveViewerSessionDataStreamState` objects. This is
done because we need a reference from any data stream to its metadata
stream. The metadata stream id is pass to the
`_LttngLiveViewerSessionDataStreamState` __init__() method.

Signed-off-by: Francis Deslauriers <francis.deslauriers@efficios.com>
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
Change-Id: I45c7e12cc596033bce376e45393ff6970670f1da
Reviewed-on: https://review.lttng.org/c/babeltrace/+/3775
Tested-by: jenkins <jenkins@lttng.org>
tests/data/ctf-traces/live/split_metadata/channel0_0 [new file with mode: 0644]
tests/data/ctf-traces/live/split_metadata/index/channel0_0.idx [new file with mode: 0644]
tests/data/ctf-traces/live/split_metadata/metadata [new file with mode: 0644]
tests/data/plugins/src.ctf.lttng-live/cli-base.expect
tests/data/plugins/src.ctf.lttng-live/cli-multi-domains.expect
tests/data/plugins/src.ctf.lttng-live/inactivity_discarded_packet.expect
tests/data/plugins/src.ctf.lttng-live/lttng_live_server.py
tests/data/plugins/src.ctf.lttng-live/split_metadata.expect [new file with mode: 0644]
tests/data/plugins/src.ctf.lttng-live/split_metadata.json [new file with mode: 0644]
tests/plugins/src.ctf.lttng-live/test_live

diff --git a/tests/data/ctf-traces/live/split_metadata/channel0_0 b/tests/data/ctf-traces/live/split_metadata/channel0_0
new file mode 100644 (file)
index 0000000..c8e9ddb
Binary files /dev/null and b/tests/data/ctf-traces/live/split_metadata/channel0_0 differ
diff --git a/tests/data/ctf-traces/live/split_metadata/index/channel0_0.idx b/tests/data/ctf-traces/live/split_metadata/index/channel0_0.idx
new file mode 100644 (file)
index 0000000..399ec45
Binary files /dev/null and b/tests/data/ctf-traces/live/split_metadata/index/channel0_0.idx differ
diff --git a/tests/data/ctf-traces/live/split_metadata/metadata b/tests/data/ctf-traces/live/split_metadata/metadata
new file mode 100644 (file)
index 0000000..b61a4e1
--- /dev/null
@@ -0,0 +1,119 @@
+/* CTF 1.8 */
+
+typealias integer { size = 8; align = 8; signed = false; } := uint8_t;
+typealias integer { size = 16; align = 8; signed = false; } := uint16_t;
+typealias integer { size = 32; align = 8; signed = false; } := uint32_t;
+typealias integer { size = 64; align = 8; signed = false; } := uint64_t;
+typealias integer { size = 64; align = 8; signed = false; } := unsigned long;
+typealias integer { size = 5; align = 1; signed = false; } := uint5_t;
+typealias integer { size = 27; align = 1; signed = false; } := uint27_t;
+
+trace {
+       major = 1;
+       minor = 8;
+       uuid = "0339cd08-892d-404c-9291-64c1a8a74c81";
+       byte_order = le;
+       packet.header := struct {
+               uint32_t magic;
+               uint8_t  uuid[16];
+               uint32_t stream_id;
+               uint64_t stream_instance_id;
+       };
+};
+
+env {
+       domain = "ust";
+       tracer_name = "lttng-ust";
+       tracer_major = 2;
+       tracer_minor = 12;
+       tracer_buffering_scheme = "uid";
+       tracer_buffering_id = 1000;
+       architecture_bit_width = 64;
+       trace_name = "barney_descontie";
+       trace_creation_datetime = "20200715T174253-0400";
+       hostname = "raton";
+};
+
+clock {
+       name = "monotonic";
+       uuid = "81a04b89-9028-4d3e-a28d-5fbd53a8eb9d";
+       description = "Monotonic Clock";
+       freq = 1000000000; /* Frequency, in Hz */
+       /* clock value offset from Epoch is: offset * (1/freq) */
+       offset = 1594406328768346378;
+};
+
+typealias integer {
+       size = 27; align = 1; signed = false;
+       map = clock.monotonic.value;
+} := uint27_clock_monotonic_t;
+
+typealias integer {
+       size = 32; align = 8; signed = false;
+       map = clock.monotonic.value;
+} := uint32_clock_monotonic_t;
+
+typealias integer {
+       size = 64; align = 8; signed = false;
+       map = clock.monotonic.value;
+} := uint64_clock_monotonic_t;
+
+struct packet_context {
+       uint64_clock_monotonic_t timestamp_begin;
+       uint64_clock_monotonic_t timestamp_end;
+       uint64_t content_size;
+       uint64_t packet_size;
+       uint64_t packet_seq_num;
+       unsigned long events_discarded;
+       uint32_t cpu_id;
+};
+
+struct event_header_compact {
+       enum : uint5_t { compact = 0 ... 30, extended = 31 } id;
+       variant <id> {
+               struct {
+                       uint27_clock_monotonic_t timestamp;
+               } compact;
+               struct {
+                       uint32_t id;
+                       uint64_clock_monotonic_t timestamp;
+               } extended;
+       } v;
+} align(8);
+
+struct event_header_large {
+       enum : uint16_t { compact = 0 ... 65534, extended = 65535 } id;
+       variant <id> {
+               struct {
+                       uint32_clock_monotonic_t timestamp;
+               } compact;
+               struct {
+                       uint32_t id;
+                       uint64_clock_monotonic_t timestamp;
+               } extended;
+       } v;
+} align(8);
+
+stream {
+       id = 0;
+       event.header := struct event_header_large;
+       packet.context := struct packet_context;
+};
+
+event {
+       name = "my_app:signe_de_pia$$e";
+       id = 0;
+       stream_id = 0;
+       loglevel = 13;
+       fields := struct {
+       };
+};
+
+event {
+       name = "my_app:signe_de_pia$$e_2";
+       id = 1;
+       stream_id = 0;
+       loglevel = 13;
+       fields := struct {
+       };
+};
index 58616dc62536ff8066713e01fc123f4886801861..4a06297694a7dcc49ac54eb408a30958b0a4f45d 100644 (file)
@@ -26,7 +26,7 @@ Trace class:
 [Unknown]
 {Trace 0, Stream class ID 0, Stream ID 0}
 Stream beginning:
-  Name: stream-0
+  Name: stream-1
   Trace:
     UUID: e375faaf-5a32-401c-8680-7957c204f064
     Environment (5 entries):
@@ -43,7 +43,7 @@ Stream beginning:
 [Unknown]
 {Trace 0, Stream class ID 0, Stream ID 1}
 Stream beginning:
-  Name: stream-1
+  Name: stream-2
   Trace:
     UUID: e375faaf-5a32-401c-8680-7957c204f064
     Environment (5 entries):
@@ -60,7 +60,7 @@ Stream beginning:
 [Unknown]
 {Trace 0, Stream class ID 0, Stream ID 2}
 Stream beginning:
-  Name: stream-2
+  Name: stream-3
   Trace:
     UUID: e375faaf-5a32-401c-8680-7957c204f064
     Environment (5 entries):
@@ -77,7 +77,7 @@ Stream beginning:
 [Unknown]
 {Trace 0, Stream class ID 0, Stream ID 3}
 Stream beginning:
-  Name: stream-3
+  Name: stream-4
   Trace:
     UUID: e375faaf-5a32-401c-8680-7957c204f064
     Environment (5 entries):
index e54486c3020b23abd0c4f7eab2f30523a96d28fb..9c7db7cf55128335e99d264e4bf8d0736006c07c 100644 (file)
@@ -64,7 +64,7 @@ Trace class:
 [Unknown]
 {Trace 0, Stream class ID 2, Stream ID 0}
 Stream beginning:
-  Name: stream-5
+  Name: stream-6
   Trace:
     UUID: 0051d20e-3389-4157-9693-e7d5d8565aa0
     Environment (5 entries):
@@ -81,7 +81,7 @@ Stream beginning:
 [Unknown]
 {Trace 0, Stream class ID 2, Stream ID 1}
 Stream beginning:
-  Name: stream-6
+  Name: stream-7
   Trace:
     UUID: 0051d20e-3389-4157-9693-e7d5d8565aa0
     Environment (5 entries):
@@ -98,7 +98,7 @@ Stream beginning:
 [Unknown]
 {Trace 0, Stream class ID 2, Stream ID 2}
 Stream beginning:
-  Name: stream-7
+  Name: stream-8
   Trace:
     UUID: 0051d20e-3389-4157-9693-e7d5d8565aa0
     Environment (5 entries):
@@ -115,7 +115,7 @@ Stream beginning:
 [Unknown]
 {Trace 0, Stream class ID 2, Stream ID 3}
 Stream beginning:
-  Name: stream-8
+  Name: stream-9
   Trace:
     UUID: 0051d20e-3389-4157-9693-e7d5d8565aa0
     Environment (5 entries):
@@ -227,7 +227,7 @@ Trace class:
 [Unknown]
 {Trace 1, Stream class ID 0, Stream ID 0}
 Stream beginning:
-  Name: stream-0
+  Name: stream-1
   Trace:
     UUID: 830f2129-77d5-cb4e-90cf-b0754d8ce889
     Environment (12 entries):
@@ -251,7 +251,7 @@ Stream beginning:
 [Unknown]
 {Trace 1, Stream class ID 0, Stream ID 1}
 Stream beginning:
-  Name: stream-1
+  Name: stream-2
   Trace:
     UUID: 830f2129-77d5-cb4e-90cf-b0754d8ce889
     Environment (12 entries):
@@ -275,7 +275,7 @@ Stream beginning:
 [Unknown]
 {Trace 1, Stream class ID 0, Stream ID 2}
 Stream beginning:
-  Name: stream-2
+  Name: stream-3
   Trace:
     UUID: 830f2129-77d5-cb4e-90cf-b0754d8ce889
     Environment (12 entries):
@@ -299,7 +299,7 @@ Stream beginning:
 [Unknown]
 {Trace 1, Stream class ID 0, Stream ID 3}
 Stream beginning:
-  Name: stream-3
+  Name: stream-4
   Trace:
     UUID: 830f2129-77d5-cb4e-90cf-b0754d8ce889
     Environment (12 entries):
index ecbf57bbca26e0b3f9c7e3b5f52611951481cfc3..278818d7ae3f41e8b7b9718e232b7d026e1f8772 100644 (file)
@@ -18,7 +18,7 @@ Trace class:
 [Unknown]
 {Trace 0, Stream class ID 0, Stream ID 0}
 Stream beginning:
-  Name: stream-0
+  Name: stream-1
   Trace:
     UUID: 1c810767-575e-4c4e-afa1-5d3e15081cb9
     Stream (ID 0, Class ID 0)
index be42ac4f3c2fbc58b96cb736fde03d32af64d22a..0ffb909968a7f8a3a9f497c29be0fdb90880d599 100644 (file)
@@ -14,6 +14,7 @@ import struct
 import sys
 import tempfile
 import json
+from collections import namedtuple
 
 
 class UnexpectedInput(RuntimeError):
@@ -648,6 +649,14 @@ class _LttngDataStreamBeaconEntry:
         return self._stream_class_id
 
 
+def _get_entry_timestamp_begin(entry):
+    if type(entry) is _LttngDataStreamBeaconEntry:
+        return entry.timestamp
+    else:
+        assert type(entry) is _LttngDataStreamIndexEntry
+        return entry.timestamp_begin
+
+
 # The index of an LTTng data stream, a sequence of index entries.
 class _LttngDataStreamIndex(collections.abc.Sequence):
     def __init__(self, path, beacons):
@@ -781,32 +790,162 @@ class _LttngDataStream:
         return self._file.read(len_bytes)
 
 
+class _LttngMetadataStreamSection:
+    def __init__(self, timestamp, data):
+        self._timestamp = timestamp
+        if data is None:
+            self._data = bytes()
+        else:
+            self._data = data
+        logging.info(
+            'Built metadata stream section: ts={}, data-len={}'.format(
+                self._timestamp, len(self._data)
+            )
+        )
+
+    @property
+    def timestamp(self):
+        return self._timestamp
+
+    @property
+    def data(self):
+        return self._data
+
+
 # An LTTng metadata stream.
 class _LttngMetadataStream:
-    def __init__(self, path):
-        self._path = path
-        logging.info('Built metadata stream: path="{}"'.format(path))
+    def __init__(self, metadata_file_path, config_sections):
+        self._path = metadata_file_path
+        self._sections = config_sections
+        logging.info(
+            'Built metadata stream: path={}, section-len={}'.format(
+                self._path, len(self._sections)
+            )
+        )
 
     @property
     def path(self):
         return self._path
 
     @property
-    def data(self):
-        assert os.path.isfile(self._path)
+    def sections(self):
+        return self._sections
 
-        with open(self._path, 'rb') as f:
-            return f.read()
+
+LttngMetadataConfigSection = namedtuple(
+    'LttngMetadataConfigSection', ['line', 'timestamp', 'is_empty']
+)
+
+
+def _parse_metadata_sections_config(config_sections):
+    assert config_sections is not None
+    config_metadata_sections = []
+    append_empty_section = False
+    last_timestamp = 0
+    last_line = 0
+
+    for config_section in config_sections:
+        if config_section == 'empty':
+            # Found a empty section marker. Actually append the section at the
+            # timestamp of the next concrete section.
+            append_empty_section = True
+        else:
+            assert type(config_section) is dict
+            line = config_section.get('line')
+            ts = config_section.get('timestamp')
+
+            # Sections' timestamps and lines must both be increasing.
+            assert ts > last_timestamp
+            last_timestamp = ts
+            assert line > last_line
+            last_line = line
+
+            if append_empty_section:
+                config_metadata_sections.append(
+                    LttngMetadataConfigSection(line, ts, True)
+                )
+                append_empty_section = False
+
+            config_metadata_sections.append(LttngMetadataConfigSection(line, ts, False))
+
+    return config_metadata_sections
+
+
+def _split_metadata_sections(metadata_file_path, raw_config_sections):
+    assert isinstance(raw_config_sections, collections.abc.Sequence)
+
+    parsed_sections = _parse_metadata_sections_config(raw_config_sections)
+
+    sections = []
+    with open(metadata_file_path, 'r') as metadata_file:
+        metadata_lines = [line for line in metadata_file]
+
+    config_metadata_sections_idx = 0
+    curr_metadata_section = bytearray()
+
+    for idx, line_content in enumerate(metadata_lines):
+        # Add one to the index to convert from the zero-indexing of the
+        # enumerate() function to the one-indexing used by humans when
+        # viewing a text file.
+        curr_line_number = idx + 1
+
+        # If there are no more sections, simply append the line.
+        if config_metadata_sections_idx + 1 >= len(parsed_sections):
+            curr_metadata_section += bytearray(line_content, 'utf8')
+            continue
+
+        next_section_line_number = parsed_sections[
+            config_metadata_sections_idx + 1
+        ].line
+
+        # If the next section begins at the current line, create a
+        # section with the metadata we gathered so far.
+        if curr_line_number >= next_section_line_number:
+
+            # Flushing the metadata of the current section.
+            sections.append(
+                _LttngMetadataStreamSection(
+                    parsed_sections[config_metadata_sections_idx].timestamp,
+                    bytes(curr_metadata_section),
+                )
+            )
+
+            # Move to the next section.
+            config_metadata_sections_idx += 1
+
+            # Clear old content and append current line for the next section.
+            curr_metadata_section.clear()
+            curr_metadata_section += bytearray(line_content, 'utf8')
+
+            # Append any empty sections.
+            while parsed_sections[config_metadata_sections_idx].is_empty:
+                sections.append(
+                    _LttngMetadataStreamSection(
+                        parsed_sections[config_metadata_sections_idx].timestamp, None
+                    )
+                )
+                config_metadata_sections_idx += 1
+        else:
+            # Append line_content to the current metadata section.
+            curr_metadata_section += bytearray(line_content, 'utf8')
+
+    # We iterated over all the lines of the metadata file. Close the current section.
+    sections.append(
+        _LttngMetadataStreamSection(
+            parsed_sections[config_metadata_sections_idx].timestamp,
+            bytes(curr_metadata_section),
+        )
+    )
+
+    return sections
 
 
 # An LTTng trace, a sequence of LTTng data streams.
 class LttngTrace(collections.abc.Sequence):
-    def __init__(self, trace_dir, beacons):
+    def __init__(self, trace_dir, metadata_sections, beacons):
         assert os.path.isdir(trace_dir)
         self._path = trace_dir
-        self._metadata_stream = _LttngMetadataStream(
-            os.path.join(trace_dir, 'metadata')
-        )
+        self._create_metadata_stream(trace_dir, metadata_sections)
         self._create_data_streams(trace_dir, beacons)
         logging.info('Built trace: path="{}"'.format(trace_dir))
 
@@ -841,6 +980,22 @@ class LttngTrace(collections.abc.Sequence):
                 _LttngDataStream(data_stream_path, this_stream_beacons)
             )
 
+    def _create_metadata_stream(self, trace_dir, config_metadata_sections):
+        metadata_path = os.path.join(trace_dir, 'metadata')
+        metadata_sections = []
+
+        if config_metadata_sections is None:
+            with open(metadata_path, 'rb') as metadata_file:
+                metadata_sections.append(
+                    _LttngMetadataStreamSection(0, metadata_file.read())
+                )
+        else:
+            metadata_sections = _split_metadata_sections(
+                metadata_path, config_metadata_sections
+            )
+
+        self._metadata_stream = _LttngMetadataStream(metadata_path, metadata_sections)
+
     @property
     def path(self):
         return self._path
@@ -858,10 +1013,11 @@ class LttngTrace(collections.abc.Sequence):
 
 # The state of a single data stream.
 class _LttngLiveViewerSessionDataStreamState:
-    def __init__(self, ts_state, info, data_stream):
+    def __init__(self, ts_state, info, data_stream, metadata_stream_id):
         self._ts_state = ts_state
         self._info = info
         self._data_stream = data_stream
+        self._metadata_stream_id = metadata_stream_id
         self._cur_index_entry_index = 0
         fmt = 'Built data stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
         logging.info(
@@ -902,6 +1058,14 @@ class _LttngLiveViewerSessionMetadataStreamState:
         self._ts_state = ts_state
         self._info = info
         self._metadata_stream = metadata_stream
+        self._cur_metadata_stream_section_index = 0
+        if len(metadata_stream.sections) > 1:
+            self._next_metadata_stream_section_timestamp = metadata_stream.sections[
+                1
+            ].timestamp
+        else:
+            self._next_metadata_stream_section_timestamp = None
+
         self._is_sent = False
         fmt = 'Built metadata stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
         logging.info(
@@ -933,6 +1097,28 @@ class _LttngLiveViewerSessionMetadataStreamState:
     def is_sent(self, value):
         self._is_sent = value
 
+    @property
+    def cur_section(self):
+        fmt = "Get current metadata section: section-idx={}"
+        logging.info(fmt.format(self._cur_metadata_stream_section_index))
+        if self._cur_metadata_stream_section_index == len(
+            self._metadata_stream.sections
+        ):
+            return
+
+        return self._metadata_stream.sections[self._cur_metadata_stream_section_index]
+
+    def goto_next_section(self):
+        self._cur_metadata_stream_section_index += 1
+        if self.cur_section:
+            self._next_metadata_stream_section_timestamp = self.cur_section.timestamp
+        else:
+            self._next_metadata_stream_section_timestamp = None
+
+    @property
+    def next_section_timestamp(self):
+        return self._next_metadata_stream_section_timestamp
+
 
 # The state of a tracing session.
 class _LttngLiveViewerSessionTracingSessionState:
@@ -946,6 +1132,17 @@ class _LttngLiveViewerSessionTracingSessionState:
         for trace in tc_descr.traces:
             trace_id = stream_id * 1000
 
+            # Metadata stream -> stream info and metadata stream state
+            info = _LttngLiveViewerStreamInfo(
+                stream_id, trace_id, True, trace.metadata_stream.path, 'metadata'
+            )
+            self._stream_infos.append(info)
+            self._ms_states[stream_id] = _LttngLiveViewerSessionMetadataStreamState(
+                self, info, trace.metadata_stream
+            )
+            metadata_stream_id = stream_id
+            stream_id += 1
+
             # Data streams -> stream infos and data stream states
             for data_stream in trace:
                 info = _LttngLiveViewerStreamInfo(
@@ -957,20 +1154,10 @@ class _LttngLiveViewerSessionTracingSessionState:
                 )
                 self._stream_infos.append(info)
                 self._ds_states[stream_id] = _LttngLiveViewerSessionDataStreamState(
-                    self, info, data_stream
+                    self, info, data_stream, metadata_stream_id
                 )
                 stream_id += 1
 
-            # Metadata stream -> stream info and metadata stream state
-            info = _LttngLiveViewerStreamInfo(
-                stream_id, trace_id, True, trace.metadata_stream.path, 'metadata'
-            )
-            self._stream_infos.append(info)
-            self._ms_states[stream_id] = _LttngLiveViewerSessionMetadataStreamState(
-                self, info, trace.metadata_stream
-            )
-            stream_id += 1
-
         self._is_attached = False
         fmt = 'Built tracing session state: id={}, name="{}"'
         logging.info(fmt.format(tc_descr.info.tracing_session_id, tc_descr.info.name))
@@ -1004,6 +1191,16 @@ class _LttngLiveViewerSessionTracingSessionState:
         self._is_attached = value
 
 
+def needs_new_metadata_section(metadata_stream_state, latest_timestamp):
+    if metadata_stream_state.next_section_timestamp is None:
+        return False
+
+    if latest_timestamp >= metadata_stream_state.next_section_timestamp:
+        return True
+    else:
+        return False
+
+
 # An LTTng live viewer session manages a view on tracing sessions
 # and replies to commands accordingly.
 class _LttngLiveViewerSession:
@@ -1115,6 +1312,7 @@ class _LttngLiveViewerSession:
         fmt = 'Handling "get next data stream index entry" command: stream-id={}'
         logging.info(fmt.format(cmd.stream_id))
         stream_state = self._get_stream_state(cmd.stream_id)
+        metadata_stream_state = self._get_stream_state(stream_state._metadata_stream_id)
 
         if type(stream_state) is not _LttngLiveViewerSessionDataStreamState:
             raise UnexpectedInput(
@@ -1133,6 +1331,12 @@ class _LttngLiveViewerSession:
                 status, index_entry, False, False
             )
 
+        timestamp_begin = _get_entry_timestamp_begin(stream_state.cur_index_entry)
+
+        if needs_new_metadata_section(metadata_stream_state, timestamp_begin):
+            metadata_stream_state.is_sent = False
+            metadata_stream_state.goto_next_section()
+
         # The viewer only checks the `has_new_metadata` flag if the
         # reply's status is `OK`, so we need to provide an index here
         has_new_metadata = stream_state.tracing_session_state.has_new_metadata
@@ -1182,21 +1386,33 @@ class _LttngLiveViewerSession:
     def _handle_get_metadata_stream_data_command(self, cmd):
         fmt = 'Handling "get metadata stream data" command: stream-id={}'
         logging.info(fmt.format(cmd.stream_id))
-        stream_state = self._get_stream_state(cmd.stream_id)
+        metadata_stream_state = self._get_stream_state(cmd.stream_id)
 
-        if type(stream_state) is not _LttngLiveViewerSessionMetadataStreamState:
+        if (
+            type(metadata_stream_state)
+            is not _LttngLiveViewerSessionMetadataStreamState
+        ):
             raise UnexpectedInput(
                 'Stream with ID {} is not a metadata stream'.format(cmd.stream_id)
             )
 
-        if stream_state.is_sent:
+        if metadata_stream_state.is_sent:
             status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.NO_NEW
             return _LttngLiveViewerGetMetadataStreamDataContentReply(status, bytes())
 
-        stream_state.is_sent = True
+        metadata_stream_state.is_sent = True
         status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.OK
+        metadata_section = metadata_stream_state.cur_section
+
+        # If we are sending an empty section, ready the next one right away.
+        if len(metadata_section.data) == 0:
+            metadata_stream_state.is_sent = False
+            metadata_stream_state.goto_next_section()
+
+        fmt = 'Replying to "get metadata stream data" command: metadata-size={}'
+        logging.info(fmt.format(len(metadata_section.data)))
         return _LttngLiveViewerGetMetadataStreamDataContentReply(
-            status, stream_state.metadata_stream.data
+            status, metadata_section.data
         )
 
     def _handle_get_new_stream_infos_command(self, cmd):
@@ -1453,7 +1669,13 @@ def _session_descriptors_from_path(sessions_filename, trace_path_prefix):
     #                     "path": "meow/mix",
     #                     "beacons": {
     #                         "my_stream": [ 5235787, 728375283 ]
-    #                     }
+    #                     },
+    #                     "metadata-sections": [
+    #                           {
+    #                                "line": 1,
+    #                                "timestamp": 0
+    #                           }
+    #                      ]
     #                 }
     #             ]
     #         }
@@ -1472,13 +1694,14 @@ def _session_descriptors_from_path(sessions_filename, trace_path_prefix):
         traces = []
 
         for trace in session['traces']:
+            metadata_sections = trace.get('metadata-sections')
             beacons = trace.get('beacons')
             path = trace['path']
 
             if not os.path.isabs(path):
                 path = os.path.join(trace_path_prefix, path)
 
-            traces.append(LttngTrace(path, beacons))
+            traces.append(LttngTrace(path, metadata_sections, beacons))
 
         sessions.append(
             LttngTracingSessionDescriptor(
@@ -1548,7 +1771,8 @@ if __name__ == '__main__':
     args = parser.parse_args(args=remaining_args)
     try:
         sessions = _session_descriptors_from_path(
-            args.sessions_filename, args.trace_path_prefix
+            args.sessions_filename,
+            args.trace_path_prefix,
         )
         LttngLiveServer(args.port_filename, sessions, args.max_query_data_response_size)
     except UnexpectedInput as exc:
diff --git a/tests/data/plugins/src.ctf.lttng-live/split_metadata.expect b/tests/data/plugins/src.ctf.lttng-live/split_metadata.expect
new file mode 100644 (file)
index 0000000..17b6ccb
--- /dev/null
@@ -0,0 +1,84 @@
+Trace class:
+  Stream class (ID 0):
+    Supports packets: Yes
+    Packets have beginning default clock snapshot: Yes
+    Packets have end default clock snapshot: Yes
+    Supports discarded events: Yes
+    Discarded events have default clock snapshots: Yes
+    Supports discarded packets: Yes
+    Discarded packets have default clock snapshots: Yes
+    Default clock class:
+      Name: monotonic
+      Description: Monotonic Clock
+      Frequency (Hz): 1,000,000,000
+      Precision (cycles): 0
+      Offset (s): 1,594,406,328
+      Offset (cycles): 768,346,378
+      Origin is Unix epoch: Yes
+      UUID: 81a04b89-9028-4d3e-a28d-5fbd53a8eb9d
+    Packet context field class: Structure (1 member):
+      cpu_id: Unsigned integer (32-bit, Base 10)
+    Event class `my_app:signe_de_pia$$e` (ID 0):
+      Log level: Debug (line)
+      Payload field class: Structure (0 members)
+    Event class `my_app:signe_de_pia$$e_2` (ID 1):
+      Log level: Debug (line)
+      Payload field class: Structure (0 members)
+
+[Unknown]
+{Trace 0, Stream class ID 0, Stream ID 0}
+Stream beginning:
+  Name: stream-1
+  Trace:
+    UUID: 0339cd08-892d-404c-9291-64c1a8a74c81
+    Environment (10 entries):
+      architecture_bit_width: 64
+      domain: ust
+      hostname: raton
+      trace_creation_datetime: 20200715T174253-0400
+      trace_name: barney_descontie
+      tracer_buffering_id: 1000
+      tracer_buffering_scheme: uid
+      tracer_major: 2
+      tracer_minor: 12
+      tracer_name: lttng-ust
+    Stream (ID 0, Class ID 0)
+
+[443,073,474,574,097 cycles, 1,594,849,402,242,920,475 ns from origin]
+{Trace 0, Stream class ID 0, Stream ID 0}
+Packet beginning:
+  Context:
+    cpu_id: 0
+
+[443,073,484,867,537 cycles, 1,594,849,402,253,213,915 ns from origin]
+{Trace 0, Stream class ID 0, Stream ID 0}
+Event `my_app:signe_de_pia$$e` (Class ID 0):
+  Payload: Empty
+
+[443,076,225,270,435 cycles, 1,594,849,404,993,616,813 ns from origin]
+{Trace 0, Stream class ID 0, Stream ID 0}
+Packet end
+
+[443,087,407,631,276 cycles, 1,594,849,416,175,977,654 ns from origin]
+{Trace 0, Stream class ID 0, Stream ID 0}
+Packet beginning:
+  Context:
+    cpu_id: 0
+
+[443,087,407,631,276 cycles, 1,594,849,416,175,977,654 ns from origin]
+{Trace 0, Stream class ID 0, Stream ID 0}
+Event `my_app:signe_de_pia$$e` (Class ID 0):
+  Payload: Empty
+
+[443,087,407,643,172 cycles, 1,594,849,416,175,989,550 ns from origin]
+{Trace 0, Stream class ID 0, Stream ID 0}
+Event `my_app:signe_de_pia$$e_2` (Class ID 1):
+  Payload: Empty
+
+[443,089,152,508,997 cycles, 1,594,849,417,920,855,375 ns from origin]
+{Trace 0, Stream class ID 0, Stream ID 0}
+Packet end
+
+[Unknown]
+{Trace 0, Stream class ID 0, Stream ID 0}
+Stream end
diff --git a/tests/data/plugins/src.ctf.lttng-live/split_metadata.json b/tests/data/plugins/src.ctf.lttng-live/split_metadata.json
new file mode 100644 (file)
index 0000000..9b5edf2
--- /dev/null
@@ -0,0 +1,27 @@
+[
+    {
+       "name": "split_metadata",
+        "id": 0,
+        "hostname": "hostname",
+        "live-timer-freq": 1,
+        "client-count": 0,
+        "traces": [
+            {
+                "path": "live/split_metadata/",
+                "beacons": {
+                },
+                "metadata-sections": [
+                  {
+                    "line": 1,
+                    "timestamp": 443073474574097
+                  },
+                  "empty",
+                  {
+                    "line": 112,
+                    "timestamp": 443087407631276
+                  }
+                ]
+            }
+        ]
+    }
+]
index deee081a2f02fdfb5919f6feb476f52792bb13a7..50cf3dfa7d9789eea620c1cb0e9311d8b5dfc712 100755 (executable)
@@ -367,7 +367,33 @@ test_inactivity_discarded_packet() {
        rm -f "$expected_stderr"
 }
 
-plan_tests 14
+test_split_metadata() {
+       # Consume a metadata stream sent in two parts. This testcase tests the
+       # behaviour of Babeltrace when the tracing session was cleared (lttng
+       # clear) but the metadata is not yet available to the relay. In such
+       # cases, when asked for metadata, the relay will return the
+       # `LTTNG_VIEWER_METADATA_OK` status and a data length of 0. The viewer
+       # need to consider such case as a request to retry fetching metadata.
+       #
+       # This testcase emulates such behaviour by adding empty metadata
+       # packets.
+
+       local test_text="CLI attach and fetch from single-domain session - Receive metadata in two sections separated by a empty section"
+       local cli_args_template="-i lttng-live net://localhost:@PORT@/host/hostname/split_metadata -c sink.text.details"
+       local sessions_file="$test_data_dir/split_metadata.json"
+       local server_args="--sessions-filename '$sessions_file'"
+       local expected_stdout="${test_data_dir}/split_metadata.expect"
+       local expected_stderr
+
+       # Empty file for stderr expected
+       expected_stderr="$(mktemp -t test_live_split_metadata_stderr_expected.XXXXXX)"
+
+       run_test "$test_text" "$cli_args_template" "$server_args" "$expected_stdout" "$expected_stderr"
+
+       rm -f "$expected_stderr"
+}
+
+plan_tests 16
 
 test_list_sessions
 test_base
@@ -375,3 +401,4 @@ test_multi_domains
 test_rate_limited
 test_compare_to_ctf_fs
 test_inactivity_discarded_packet
+test_split_metadata
This page took 0.052467 seconds and 4 git commands to generate.