Fix: bt2: _trim_docstring(): docstring can have 0 or 1 line
[babeltrace.git] / tests / data / plugins / src.ctf.lttng-live / lttng_live_server.py
index 416f9bca1eda60c8d9362b91c7566057f230a7b2..9bff60863a39bfbb8c0f9008be060b27b48da014 100644 (file)
@@ -1,24 +1,7 @@
-# The MIT License (MIT)
+# SPDX-License-Identifier: MIT
 #
-# Copyright (c) 2019 Philippe Proulx <pproulx@efficios.com>
+# Copyright (C) 2019 Philippe Proulx <pproulx@efficios.com>
 #
-# Permission is hereby granted, free of charge, to any person obtaining a copy
-# of this software and associated documentation files (the "Software"), to deal
-# in the Software without restriction, including without limitation the rights
-# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-# copies of the Software, and to permit persons to whom the Software is
-# furnished to do so, subject to the following conditions:
-#
-# The above copyright notice and this permission notice shall be included in
-# all copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-# THE SOFTWARE.
 
 import argparse
 import collections.abc
@@ -30,6 +13,7 @@ import socket
 import struct
 import sys
 import tempfile
+import json
 
 
 class UnexpectedInput(RuntimeError):
@@ -457,7 +441,7 @@ class _LttngLiveViewerProtocolCodec:
                 version, tracing_session_id, offset, seek_type
             )
         elif cmd_type == 4:
-            stream_id, = self._unpack_payload('Q', data)
+            (stream_id,) = self._unpack_payload('Q', data)
             return _LttngLiveViewerGetNextDataStreamIndexEntryCommand(
                 version, stream_id
             )
@@ -467,15 +451,15 @@ class _LttngLiveViewerProtocolCodec:
                 version, stream_id, offset, req_length
             )
         elif cmd_type == 6:
-            stream_id, = self._unpack_payload('Q', data)
+            (stream_id,) = self._unpack_payload('Q', data)
             return _LttngLiveViewerGetMetadataStreamDataCommand(version, stream_id)
         elif cmd_type == 7:
-            tracing_session_id, = self._unpack_payload('Q', data)
+            (tracing_session_id,) = self._unpack_payload('Q', data)
             return _LttngLiveViewerGetNewStreamInfosCommand(version, tracing_session_id)
         elif cmd_type == 8:
             return _LttngLiveViewerCreateViewerSessionCommand(version)
         elif cmd_type == 9:
-            tracing_session_id, = self._unpack_payload('Q', data)
+            (tracing_session_id,) = self._unpack_payload('Q', data)
             return _LttngLiveViewerDetachFromTracingSessionCommand(
                 version, tracing_session_id
             )
@@ -531,23 +515,39 @@ class _LttngLiveViewerProtocolCodec:
             for info in reply.stream_infos:
                 data += self._encode_stream_info(info)
         elif type(reply) is _LttngLiveViewerGetNextDataStreamIndexEntryReply:
+            index_format = 'QQQQQQQII'
             entry = reply.index_entry
             flags = self._get_has_new_stuff_flags(
                 reply.has_new_metadata, reply.has_new_data_stream
             )
 
-            data = self._pack(
-                'QQQQQQQII',
-                entry.offset_bytes,
-                entry.total_size_bits,
-                entry.content_size_bits,
-                entry.timestamp_begin,
-                entry.timestamp_end,
-                entry.events_discarded,
-                entry.stream_class_id,
-                reply.status,
-                flags,
-            )
+            if type(entry) is _LttngDataStreamIndexEntry:
+                data = self._pack(
+                    index_format,
+                    entry.offset_bytes,
+                    entry.total_size_bits,
+                    entry.content_size_bits,
+                    entry.timestamp_begin,
+                    entry.timestamp_end,
+                    entry.events_discarded,
+                    entry.stream_class_id,
+                    reply.status,
+                    flags,
+                )
+            else:
+                assert type(entry) is _LttngDataStreamBeaconEntry
+                data = self._pack(
+                    index_format,
+                    0,
+                    0,
+                    0,
+                    0,
+                    entry.timestamp,
+                    0,
+                    entry.stream_class_id,
+                    reply.status,
+                    flags,
+                )
         elif type(reply) is _LttngLiveViewerGetDataStreamPacketDataReply:
             flags = self._get_has_new_stuff_flags(
                 reply.has_new_metadata, reply.has_new_data_stream
@@ -631,11 +631,36 @@ class _LttngDataStreamIndexEntry:
         return self._stream_class_id
 
 
+# An entry within the index of an LTTng data stream. While a stream beacon entry
+# is conceptually unrelated to an index, it is sent as a reply to a
+# LttngLiveViewerGetNextDataStreamIndexEntryCommand
+class _LttngDataStreamBeaconEntry:
+    def __init__(self, stream_class_id, timestamp):
+        self._stream_class_id = stream_class_id
+        self._timestamp = timestamp
+
+    @property
+    def timestamp(self):
+        return self._timestamp
+
+    @property
+    def stream_class_id(self):
+        return self._stream_class_id
+
+
 # The index of an LTTng data stream, a sequence of index entries.
 class _LttngDataStreamIndex(collections.abc.Sequence):
-    def __init__(self, path):
+    def __init__(self, path, beacons):
         self._path = path
         self._build()
+
+        if beacons:
+            stream_class_id = self._entries[0].stream_class_id
+            beacons = [
+                _LttngDataStreamBeaconEntry(stream_class_id, ts) for ts in beacons
+            ]
+            self._add_beacons(beacons)
+
         logging.info(
             'Built data stream index entries: path="{}", count={}'.format(
                 path, len(self._entries)
@@ -674,9 +699,15 @@ class _LttngDataStreamIndex(collections.abc.Sequence):
                     break
 
                 assert len(data) == size
-                offset_bytes, total_size_bits, content_size_bits, timestamp_begin, timestamp_end, events_discarded, stream_class_id = struct.unpack(
-                    fmt, data
-                )
+                (
+                    offset_bytes,
+                    total_size_bits,
+                    content_size_bits,
+                    timestamp_begin,
+                    timestamp_end,
+                    events_discarded,
+                    stream_class_id,
+                ) = struct.unpack(fmt, data)
 
                 self._entries.append(
                     _LttngDataStreamIndexEntry(
@@ -693,6 +724,17 @@ class _LttngDataStreamIndex(collections.abc.Sequence):
                 # Skip anything else before the next entry
                 f.seek(index_entry_length - size, os.SEEK_CUR)
 
+    def _add_beacons(self, beacons):
+        # Assumes entries[n + 1].timestamp_end >= entries[n].timestamp_begin
+        def sort_key(entry):
+            if type(entry) is _LttngDataStreamBeaconEntry:
+                return entry.timestamp
+            else:
+                return entry.timestamp_end
+
+        self._entries += beacons
+        self._entries.sort(key=sort_key)
+
     def __getitem__(self, index):
         return self._entries[index]
 
@@ -706,14 +748,14 @@ class _LttngDataStreamIndex(collections.abc.Sequence):
 
 # An LTTng data stream.
 class _LttngDataStream:
-    def __init__(self, path):
+    def __init__(self, path, beacons):
         self._path = path
         filename = os.path.basename(path)
         match = re.match(r'(.*)_\d+', filename)
         self._channel_name = match.group(1)
         trace_dir = os.path.dirname(path)
         index_path = os.path.join(trace_dir, 'index', filename + '.idx')
-        self._index = _LttngDataStreamIndex(index_path)
+        self._index = _LttngDataStreamIndex(index_path, beacons)
         assert os.path.isfile(path)
         self._file = open(path, 'rb')
         logging.info(
@@ -759,16 +801,16 @@ class _LttngMetadataStream:
 
 # An LTTng trace, a sequence of LTTng data streams.
 class LttngTrace(collections.abc.Sequence):
-    def __init__(self, trace_dir):
+    def __init__(self, trace_dir, beacons):
         assert os.path.isdir(trace_dir)
         self._path = trace_dir
         self._metadata_stream = _LttngMetadataStream(
             os.path.join(trace_dir, 'metadata')
         )
-        self._create_data_streams(trace_dir)
+        self._create_data_streams(trace_dir, beacons)
         logging.info('Built trace: path="{}"'.format(trace_dir))
 
-    def _create_data_streams(self, trace_dir):
+    def _create_data_streams(self, trace_dir, beacons):
         data_stream_paths = []
 
         for filename in os.listdir(trace_dir):
@@ -789,7 +831,15 @@ class LttngTrace(collections.abc.Sequence):
         self._data_streams = []
 
         for data_stream_path in data_stream_paths:
-            self._data_streams.append(_LttngDataStream(data_stream_path))
+            stream_name = os.path.basename(data_stream_path)
+            this_stream_beacons = None
+
+            if beacons is not None and stream_name in beacons:
+                this_stream_beacons = beacons[stream_name]
+
+            self._data_streams.append(
+                _LttngDataStream(data_stream_path, this_stream_beacons)
+            )
 
     @property
     def path(self):
@@ -1086,7 +1136,12 @@ class _LttngLiveViewerSession:
         # The viewer only checks the `has_new_metadata` flag if the
         # reply's status is `OK`, so we need to provide an index here
         has_new_metadata = stream_state.tracing_session_state.has_new_metadata
-        status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.OK
+        if type(stream_state.cur_index_entry) is _LttngDataStreamIndexEntry:
+            status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.OK
+        else:
+            assert type(stream_state.cur_index_entry) is _LttngDataStreamBeaconEntry
+            status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.INACTIVE
+
         reply = _LttngLiveViewerGetNextDataStreamIndexEntryReply(
             status, stream_state.cur_index_entry, has_new_metadata, False
         )
@@ -1339,7 +1394,7 @@ class LttngLiveServer:
             print(self._server_port, end='', file=tmp_port_file)
 
         # Rename temporary file to real file
-        os.rename(tmp_port_file.name, port_filename)
+        os.replace(tmp_port_file.name, port_filename)
         logging.info(
             'Renamed port file: src-path="{}", dst-path="{}"'.format(
                 tmp_port_file.name, port_filename
@@ -1380,19 +1435,63 @@ class LttngTracingSessionDescriptor:
         return self._info
 
 
-def _tracing_session_descriptors_from_arg(string):
-    # Format is:
-    #     NAME,ID,HOSTNAME,FREQ,CLIENTS,TRACEPATH[,TRACEPATH]...
-    parts = string.split(',')
-    name = parts[0]
-    tracing_session_id = int(parts[1])
-    hostname = parts[2]
-    live_timer_freq = int(parts[3])
-    client_count = int(parts[4])
-    traces = [LttngTrace(path) for path in parts[5:]]
-    return LttngTracingSessionDescriptor(
-        name, tracing_session_id, hostname, live_timer_freq, client_count, traces
-    )
+def _session_descriptors_from_path(sessions_filename, trace_path_prefix):
+    # File format is:
+    #
+    #     [
+    #         {
+    #             "name": "my-session",
+    #             "id": 17,
+    #             "hostname": "myhost",
+    #             "live-timer-freq": 1000000,
+    #             "client-count": 23,
+    #             "traces": [
+    #                 {
+    #                     "path": "lol"
+    #                 },
+    #                 {
+    #                     "path": "meow/mix",
+    #                     "beacons": {
+    #                         "my_stream": [ 5235787, 728375283 ]
+    #                     }
+    #                 }
+    #             ]
+    #         }
+    #     ]
+    with open(sessions_filename, 'r') as sessions_file:
+        params = json.load(sessions_file)
+
+    sessions = []
+
+    for session in params:
+        name = session['name']
+        tracing_session_id = session['id']
+        hostname = session['hostname']
+        live_timer_freq = session['live-timer-freq']
+        client_count = session['client-count']
+        traces = []
+
+        for trace in session['traces']:
+            beacons = trace.get('beacons')
+            path = trace['path']
+
+            if not os.path.isabs(path):
+                path = os.path.join(trace_path_prefix, path)
+
+            traces.append(LttngTrace(path, beacons))
+
+        sessions.append(
+            LttngTracingSessionDescriptor(
+                name,
+                tracing_session_id,
+                hostname,
+                live_timer_freq,
+                client_count,
+                traces,
+            )
+        )
+
+    return sessions
 
 
 def _loglevel_parser(string):
@@ -1429,11 +1528,12 @@ if __name__ == '__main__':
         help='The maximum size of control data response in bytes',
     )
     parser.add_argument(
-        'sessions',
-        nargs="+",
-        metavar="SESSION",
-        type=_tracing_session_descriptors_from_arg,
-        help='A session configuration. There is no space after comma. Format is: NAME,ID,HOSTNAME,FREQ,CLIENTS,TRACEPATH[,TRACEPATH]....',
+        '--trace-path-prefix',
+        type=str,
+        help='Prefix to prepend to the trace paths of session configurations',
+    )
+    parser.add_argument(
+        '--sessions-filename', type=str, help='Path to a session configuration file',
     )
     parser.add_argument(
         '-h',
@@ -1445,9 +1545,10 @@ if __name__ == '__main__':
 
     args = parser.parse_args(args=remaining_args)
     try:
-        LttngLiveServer(
-            args.port_filename, args.sessions, args.max_query_data_response_size
+        sessions = _session_descriptors_from_path(
+            args.sessions_filename, args.trace_path_prefix
         )
+        LttngLiveServer(args.port_filename, sessions, args.max_query_data_response_size)
     except UnexpectedInput as exc:
         logging.error(str(exc))
         print(exc, file=sys.stderr)
This page took 0.02852 seconds and 4 git commands to generate.