Tests: src.ctf.lttng-live: add support for beacon injections
authorJérémie Galarneau <jeremie.galarneau@efficios.com>
Fri, 5 Jun 2020 19:55:54 +0000 (15:55 -0400)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Sat, 6 Jun 2020 02:25:20 +0000 (22:25 -0400)
Add a 'beacons' field to the sessions configuration accepted
by lttng_live_server.py. 'beacons' is expected to be a list
of clock snapshots (in cycles) at which live inactivity beacons
should be sent to a live client.

From a protocol standpoint, an inactivity beacon is a "special" type
of index entry. They are inserted in the index list (_entries)
so that they can simply be sent in response to a
LttngLiveViewerGetNextDataStreamIndexEntryCommand in the same
way typical index entries are processed.

Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
Change-Id: I1f86cf369a5115af6787e9a9d52469d16f1ab3aa
Reviewed-on: https://review.lttng.org/c/babeltrace/+/3618
Tested-by: jenkins <jenkins@lttng.org>
Reviewed-by: Philippe Proulx <eeppeliteloop@gmail.com>
tests/data/plugins/src.ctf.lttng-live/lttng_live_server.py

index 6b674666fd17a743abcaf40741c09820c83bccf8..9bff60863a39bfbb8c0f9008be060b27b48da014 100644 (file)
@@ -515,23 +515,39 @@ class _LttngLiveViewerProtocolCodec:
             for info in reply.stream_infos:
                 data += self._encode_stream_info(info)
         elif type(reply) is _LttngLiveViewerGetNextDataStreamIndexEntryReply:
             for info in reply.stream_infos:
                 data += self._encode_stream_info(info)
         elif type(reply) is _LttngLiveViewerGetNextDataStreamIndexEntryReply:
+            index_format = 'QQQQQQQII'
             entry = reply.index_entry
             flags = self._get_has_new_stuff_flags(
                 reply.has_new_metadata, reply.has_new_data_stream
             )
 
             entry = reply.index_entry
             flags = self._get_has_new_stuff_flags(
                 reply.has_new_metadata, reply.has_new_data_stream
             )
 
-            data = self._pack(
-                'QQQQQQQII',
-                entry.offset_bytes,
-                entry.total_size_bits,
-                entry.content_size_bits,
-                entry.timestamp_begin,
-                entry.timestamp_end,
-                entry.events_discarded,
-                entry.stream_class_id,
-                reply.status,
-                flags,
-            )
+            if type(entry) is _LttngDataStreamIndexEntry:
+                data = self._pack(
+                    index_format,
+                    entry.offset_bytes,
+                    entry.total_size_bits,
+                    entry.content_size_bits,
+                    entry.timestamp_begin,
+                    entry.timestamp_end,
+                    entry.events_discarded,
+                    entry.stream_class_id,
+                    reply.status,
+                    flags,
+                )
+            else:
+                assert type(entry) is _LttngDataStreamBeaconEntry
+                data = self._pack(
+                    index_format,
+                    0,
+                    0,
+                    0,
+                    0,
+                    entry.timestamp,
+                    0,
+                    entry.stream_class_id,
+                    reply.status,
+                    flags,
+                )
         elif type(reply) is _LttngLiveViewerGetDataStreamPacketDataReply:
             flags = self._get_has_new_stuff_flags(
                 reply.has_new_metadata, reply.has_new_data_stream
         elif type(reply) is _LttngLiveViewerGetDataStreamPacketDataReply:
             flags = self._get_has_new_stuff_flags(
                 reply.has_new_metadata, reply.has_new_data_stream
@@ -615,11 +631,36 @@ class _LttngDataStreamIndexEntry:
         return self._stream_class_id
 
 
         return self._stream_class_id
 
 
+# An entry within the index of an LTTng data stream. While a stream beacon entry
+# is conceptually unrelated to an index, it is sent as a reply to a
+# LttngLiveViewerGetNextDataStreamIndexEntryCommand
+class _LttngDataStreamBeaconEntry:
+    def __init__(self, stream_class_id, timestamp):
+        self._stream_class_id = stream_class_id
+        self._timestamp = timestamp
+
+    @property
+    def timestamp(self):
+        return self._timestamp
+
+    @property
+    def stream_class_id(self):
+        return self._stream_class_id
+
+
 # The index of an LTTng data stream, a sequence of index entries.
 class _LttngDataStreamIndex(collections.abc.Sequence):
 # The index of an LTTng data stream, a sequence of index entries.
 class _LttngDataStreamIndex(collections.abc.Sequence):
-    def __init__(self, path):
+    def __init__(self, path, beacons):
         self._path = path
         self._build()
         self._path = path
         self._build()
+
+        if beacons:
+            stream_class_id = self._entries[0].stream_class_id
+            beacons = [
+                _LttngDataStreamBeaconEntry(stream_class_id, ts) for ts in beacons
+            ]
+            self._add_beacons(beacons)
+
         logging.info(
             'Built data stream index entries: path="{}", count={}'.format(
                 path, len(self._entries)
         logging.info(
             'Built data stream index entries: path="{}", count={}'.format(
                 path, len(self._entries)
@@ -683,6 +724,17 @@ class _LttngDataStreamIndex(collections.abc.Sequence):
                 # Skip anything else before the next entry
                 f.seek(index_entry_length - size, os.SEEK_CUR)
 
                 # Skip anything else before the next entry
                 f.seek(index_entry_length - size, os.SEEK_CUR)
 
+    def _add_beacons(self, beacons):
+        # Assumes entries[n + 1].timestamp_end >= entries[n].timestamp_begin
+        def sort_key(entry):
+            if type(entry) is _LttngDataStreamBeaconEntry:
+                return entry.timestamp
+            else:
+                return entry.timestamp_end
+
+        self._entries += beacons
+        self._entries.sort(key=sort_key)
+
     def __getitem__(self, index):
         return self._entries[index]
 
     def __getitem__(self, index):
         return self._entries[index]
 
@@ -696,14 +748,14 @@ class _LttngDataStreamIndex(collections.abc.Sequence):
 
 # An LTTng data stream.
 class _LttngDataStream:
 
 # An LTTng data stream.
 class _LttngDataStream:
-    def __init__(self, path):
+    def __init__(self, path, beacons):
         self._path = path
         filename = os.path.basename(path)
         match = re.match(r'(.*)_\d+', filename)
         self._channel_name = match.group(1)
         trace_dir = os.path.dirname(path)
         index_path = os.path.join(trace_dir, 'index', filename + '.idx')
         self._path = path
         filename = os.path.basename(path)
         match = re.match(r'(.*)_\d+', filename)
         self._channel_name = match.group(1)
         trace_dir = os.path.dirname(path)
         index_path = os.path.join(trace_dir, 'index', filename + '.idx')
-        self._index = _LttngDataStreamIndex(index_path)
+        self._index = _LttngDataStreamIndex(index_path, beacons)
         assert os.path.isfile(path)
         self._file = open(path, 'rb')
         logging.info(
         assert os.path.isfile(path)
         self._file = open(path, 'rb')
         logging.info(
@@ -749,16 +801,16 @@ class _LttngMetadataStream:
 
 # An LTTng trace, a sequence of LTTng data streams.
 class LttngTrace(collections.abc.Sequence):
 
 # An LTTng trace, a sequence of LTTng data streams.
 class LttngTrace(collections.abc.Sequence):
-    def __init__(self, trace_dir):
+    def __init__(self, trace_dir, beacons):
         assert os.path.isdir(trace_dir)
         self._path = trace_dir
         self._metadata_stream = _LttngMetadataStream(
             os.path.join(trace_dir, 'metadata')
         )
         assert os.path.isdir(trace_dir)
         self._path = trace_dir
         self._metadata_stream = _LttngMetadataStream(
             os.path.join(trace_dir, 'metadata')
         )
-        self._create_data_streams(trace_dir)
+        self._create_data_streams(trace_dir, beacons)
         logging.info('Built trace: path="{}"'.format(trace_dir))
 
         logging.info('Built trace: path="{}"'.format(trace_dir))
 
-    def _create_data_streams(self, trace_dir):
+    def _create_data_streams(self, trace_dir, beacons):
         data_stream_paths = []
 
         for filename in os.listdir(trace_dir):
         data_stream_paths = []
 
         for filename in os.listdir(trace_dir):
@@ -779,7 +831,15 @@ class LttngTrace(collections.abc.Sequence):
         self._data_streams = []
 
         for data_stream_path in data_stream_paths:
         self._data_streams = []
 
         for data_stream_path in data_stream_paths:
-            self._data_streams.append(_LttngDataStream(data_stream_path))
+            stream_name = os.path.basename(data_stream_path)
+            this_stream_beacons = None
+
+            if beacons is not None and stream_name in beacons:
+                this_stream_beacons = beacons[stream_name]
+
+            self._data_streams.append(
+                _LttngDataStream(data_stream_path, this_stream_beacons)
+            )
 
     @property
     def path(self):
 
     @property
     def path(self):
@@ -1076,7 +1136,12 @@ class _LttngLiveViewerSession:
         # The viewer only checks the `has_new_metadata` flag if the
         # reply's status is `OK`, so we need to provide an index here
         has_new_metadata = stream_state.tracing_session_state.has_new_metadata
         # The viewer only checks the `has_new_metadata` flag if the
         # reply's status is `OK`, so we need to provide an index here
         has_new_metadata = stream_state.tracing_session_state.has_new_metadata
-        status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.OK
+        if type(stream_state.cur_index_entry) is _LttngDataStreamIndexEntry:
+            status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.OK
+        else:
+            assert type(stream_state.cur_index_entry) is _LttngDataStreamBeaconEntry
+            status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.INACTIVE
+
         reply = _LttngLiveViewerGetNextDataStreamIndexEntryReply(
             status, stream_state.cur_index_entry, has_new_metadata, False
         )
         reply = _LttngLiveViewerGetNextDataStreamIndexEntryReply(
             status, stream_state.cur_index_entry, has_new_metadata, False
         )
@@ -1385,7 +1450,10 @@ def _session_descriptors_from_path(sessions_filename, trace_path_prefix):
     #                     "path": "lol"
     #                 },
     #                 {
     #                     "path": "lol"
     #                 },
     #                 {
-    #                     "path": "meow/mix"
+    #                     "path": "meow/mix",
+    #                     "beacons": {
+    #                         "my_stream": [ 5235787, 728375283 ]
+    #                     }
     #                 }
     #             ]
     #         }
     #                 }
     #             ]
     #         }
@@ -1404,12 +1472,13 @@ def _session_descriptors_from_path(sessions_filename, trace_path_prefix):
         traces = []
 
         for trace in session['traces']:
         traces = []
 
         for trace in session['traces']:
+            beacons = trace.get('beacons')
             path = trace['path']
 
             if not os.path.isabs(path):
                 path = os.path.join(trace_path_prefix, path)
 
             path = trace['path']
 
             if not os.path.isabs(path):
                 path = os.path.join(trace_path_prefix, path)
 
-            traces.append(LttngTrace(path))
+            traces.append(LttngTrace(path, beacons))
 
         sessions.append(
             LttngTracingSessionDescriptor(
 
         sessions.append(
             LttngTracingSessionDescriptor(
This page took 0.028623 seconds and 4 git commands to generate.