-# The MIT License (MIT)
+# SPDX-License-Identifier: MIT
#
-# Copyright (c) 2019 Philippe Proulx <pproulx@efficios.com>
+# Copyright (C) 2019 Philippe Proulx <pproulx@efficios.com>
#
-# Permission is hereby granted, free of charge, to any person obtaining a copy
-# of this software and associated documentation files (the "Software"), to deal
-# in the Software without restriction, including without limitation the rights
-# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-# copies of the Software, and to permit persons to whom the Software is
-# furnished to do so, subject to the following conditions:
-#
-# The above copyright notice and this permission notice shall be included in
-# all copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-# THE SOFTWARE.
import argparse
import collections.abc
import struct
import sys
import tempfile
+import json
class UnexpectedInput(RuntimeError):
version, tracing_session_id, offset, seek_type
)
elif cmd_type == 4:
- stream_id, = self._unpack_payload('Q', data)
+ (stream_id,) = self._unpack_payload('Q', data)
return _LttngLiveViewerGetNextDataStreamIndexEntryCommand(
version, stream_id
)
version, stream_id, offset, req_length
)
elif cmd_type == 6:
- stream_id, = self._unpack_payload('Q', data)
+ (stream_id,) = self._unpack_payload('Q', data)
return _LttngLiveViewerGetMetadataStreamDataCommand(version, stream_id)
elif cmd_type == 7:
- tracing_session_id, = self._unpack_payload('Q', data)
+ (tracing_session_id,) = self._unpack_payload('Q', data)
return _LttngLiveViewerGetNewStreamInfosCommand(version, tracing_session_id)
elif cmd_type == 8:
return _LttngLiveViewerCreateViewerSessionCommand(version)
elif cmd_type == 9:
- tracing_session_id, = self._unpack_payload('Q', data)
+ (tracing_session_id,) = self._unpack_payload('Q', data)
return _LttngLiveViewerDetachFromTracingSessionCommand(
version, tracing_session_id
)
for info in reply.stream_infos:
data += self._encode_stream_info(info)
elif type(reply) is _LttngLiveViewerGetNextDataStreamIndexEntryReply:
+ index_format = 'QQQQQQQII'
entry = reply.index_entry
flags = self._get_has_new_stuff_flags(
reply.has_new_metadata, reply.has_new_data_stream
)
- data = self._pack(
- 'QQQQQQQII',
- entry.offset_bytes,
- entry.total_size_bits,
- entry.content_size_bits,
- entry.timestamp_begin,
- entry.timestamp_end,
- entry.events_discarded,
- entry.stream_class_id,
- reply.status,
- flags,
- )
+ if type(entry) is _LttngDataStreamIndexEntry:
+ data = self._pack(
+ index_format,
+ entry.offset_bytes,
+ entry.total_size_bits,
+ entry.content_size_bits,
+ entry.timestamp_begin,
+ entry.timestamp_end,
+ entry.events_discarded,
+ entry.stream_class_id,
+ reply.status,
+ flags,
+ )
+ else:
+ assert type(entry) is _LttngDataStreamBeaconEntry
+ data = self._pack(
+ index_format,
+ 0,
+ 0,
+ 0,
+ 0,
+ entry.timestamp,
+ 0,
+ entry.stream_class_id,
+ reply.status,
+ flags,
+ )
elif type(reply) is _LttngLiveViewerGetDataStreamPacketDataReply:
flags = self._get_has_new_stuff_flags(
reply.has_new_metadata, reply.has_new_data_stream
return self._stream_class_id
+# An entry within the index of an LTTng data stream. While a stream beacon entry
+# is conceptually unrelated to an index, it is sent as a reply to a
+# LttngLiveViewerGetNextDataStreamIndexEntryCommand
+class _LttngDataStreamBeaconEntry:
+ def __init__(self, stream_class_id, timestamp):
+ self._stream_class_id = stream_class_id
+ self._timestamp = timestamp
+
+ @property
+ def timestamp(self):
+ return self._timestamp
+
+ @property
+ def stream_class_id(self):
+ return self._stream_class_id
+
+
# The index of an LTTng data stream, a sequence of index entries.
class _LttngDataStreamIndex(collections.abc.Sequence):
- def __init__(self, path):
+ def __init__(self, path, beacons):
self._path = path
self._build()
+
+ if beacons:
+ stream_class_id = self._entries[0].stream_class_id
+ beacons = [
+ _LttngDataStreamBeaconEntry(stream_class_id, ts) for ts in beacons
+ ]
+ self._add_beacons(beacons)
+
logging.info(
'Built data stream index entries: path="{}", count={}'.format(
path, len(self._entries)
break
assert len(data) == size
- offset_bytes, total_size_bits, content_size_bits, timestamp_begin, timestamp_end, events_discarded, stream_class_id = struct.unpack(
- fmt, data
- )
+ (
+ offset_bytes,
+ total_size_bits,
+ content_size_bits,
+ timestamp_begin,
+ timestamp_end,
+ events_discarded,
+ stream_class_id,
+ ) = struct.unpack(fmt, data)
self._entries.append(
_LttngDataStreamIndexEntry(
# Skip anything else before the next entry
f.seek(index_entry_length - size, os.SEEK_CUR)
+ def _add_beacons(self, beacons):
+ # Assumes entries[n + 1].timestamp_end >= entries[n].timestamp_begin
+ def sort_key(entry):
+ if type(entry) is _LttngDataStreamBeaconEntry:
+ return entry.timestamp
+ else:
+ return entry.timestamp_end
+
+ self._entries += beacons
+ self._entries.sort(key=sort_key)
+
def __getitem__(self, index):
return self._entries[index]
# An LTTng data stream.
class _LttngDataStream:
- def __init__(self, path):
+ def __init__(self, path, beacons):
self._path = path
filename = os.path.basename(path)
match = re.match(r'(.*)_\d+', filename)
self._channel_name = match.group(1)
trace_dir = os.path.dirname(path)
index_path = os.path.join(trace_dir, 'index', filename + '.idx')
- self._index = _LttngDataStreamIndex(index_path)
+ self._index = _LttngDataStreamIndex(index_path, beacons)
assert os.path.isfile(path)
self._file = open(path, 'rb')
logging.info(
# An LTTng trace, a sequence of LTTng data streams.
class LttngTrace(collections.abc.Sequence):
- def __init__(self, trace_dir):
+ def __init__(self, trace_dir, beacons):
assert os.path.isdir(trace_dir)
self._path = trace_dir
self._metadata_stream = _LttngMetadataStream(
os.path.join(trace_dir, 'metadata')
)
- self._create_data_streams(trace_dir)
+ self._create_data_streams(trace_dir, beacons)
logging.info('Built trace: path="{}"'.format(trace_dir))
- def _create_data_streams(self, trace_dir):
+ def _create_data_streams(self, trace_dir, beacons):
data_stream_paths = []
for filename in os.listdir(trace_dir):
self._data_streams = []
for data_stream_path in data_stream_paths:
- self._data_streams.append(_LttngDataStream(data_stream_path))
+ stream_name = os.path.basename(data_stream_path)
+ this_stream_beacons = None
+
+ if beacons is not None and stream_name in beacons:
+ this_stream_beacons = beacons[stream_name]
+
+ self._data_streams.append(
+ _LttngDataStream(data_stream_path, this_stream_beacons)
+ )
@property
def path(self):
# The viewer only checks the `has_new_metadata` flag if the
# reply's status is `OK`, so we need to provide an index here
has_new_metadata = stream_state.tracing_session_state.has_new_metadata
- status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.OK
+ if type(stream_state.cur_index_entry) is _LttngDataStreamIndexEntry:
+ status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.OK
+ else:
+ assert type(stream_state.cur_index_entry) is _LttngDataStreamBeaconEntry
+ status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.INACTIVE
+
reply = _LttngLiveViewerGetNextDataStreamIndexEntryReply(
status, stream_state.cur_index_entry, has_new_metadata, False
)
return self._info
-def _tracing_session_descriptors_from_arg(string):
- # Format is:
- # NAME,ID,HOSTNAME,FREQ,CLIENTS,TRACEPATH[,TRACEPATH]...
- parts = string.split(',')
- name = parts[0]
- tracing_session_id = int(parts[1])
- hostname = parts[2]
- live_timer_freq = int(parts[3])
- client_count = int(parts[4])
- traces = [LttngTrace(path) for path in parts[5:]]
- return LttngTracingSessionDescriptor(
- name, tracing_session_id, hostname, live_timer_freq, client_count, traces
- )
+def _session_descriptors_from_path(sessions_filename, trace_path_prefix):
+ # File format is:
+ #
+ # [
+ # {
+ # "name": "my-session",
+ # "id": 17,
+ # "hostname": "myhost",
+ # "live-timer-freq": 1000000,
+ # "client-count": 23,
+ # "traces": [
+ # {
+ # "path": "lol"
+ # },
+ # {
+ # "path": "meow/mix",
+ # "beacons": {
+ # "my_stream": [ 5235787, 728375283 ]
+ # }
+ # }
+ # ]
+ # }
+ # ]
+ with open(sessions_filename, 'r') as sessions_file:
+ params = json.load(sessions_file)
+
+ sessions = []
+
+ for session in params:
+ name = session['name']
+ tracing_session_id = session['id']
+ hostname = session['hostname']
+ live_timer_freq = session['live-timer-freq']
+ client_count = session['client-count']
+ traces = []
+
+ for trace in session['traces']:
+ beacons = trace.get('beacons')
+ path = trace['path']
+
+ if not os.path.isabs(path):
+ path = os.path.join(trace_path_prefix, path)
+
+ traces.append(LttngTrace(path, beacons))
+
+ sessions.append(
+ LttngTracingSessionDescriptor(
+ name,
+ tracing_session_id,
+ hostname,
+ live_timer_freq,
+ client_count,
+ traces,
+ )
+ )
+
+ return sessions
def _loglevel_parser(string):
help='The maximum size of control data response in bytes',
)
parser.add_argument(
- 'sessions',
- nargs="+",
- metavar="SESSION",
- type=_tracing_session_descriptors_from_arg,
- help='A session configuration. There is no space after comma. Format is: NAME,ID,HOSTNAME,FREQ,CLIENTS,TRACEPATH[,TRACEPATH]....',
+ '--trace-path-prefix',
+ type=str,
+ help='Prefix to prepend to the trace paths of session configurations',
+ )
+ parser.add_argument(
+ '--sessions-filename',
+ type=str,
+ help='Path to a session configuration file',
)
parser.add_argument(
'-h',
args = parser.parse_args(args=remaining_args)
try:
- LttngLiveServer(
- args.port_filename, args.sessions, args.max_query_data_response_size
+ sessions = _session_descriptors_from_path(
+ args.sessions_filename, args.trace_path_prefix
)
+ LttngLiveServer(args.port_filename, sessions, args.max_query_data_response_size)
except UnexpectedInput as exc:
logging.error(str(exc))
print(exc, file=sys.stderr)