tests: change lttng_live_server.py's --sessions-filename to be a positional argument
[babeltrace.git] / tests / data / plugins / src.ctf.lttng-live / lttng_live_server.py
CommitLineData
0235b0db 1# SPDX-License-Identifier: MIT
584af91e 2#
0235b0db 3# Copyright (C) 2019 Philippe Proulx <pproulx@efficios.com>
584af91e 4#
584af91e 5
aca7de76
SM
6# pyright: strict, reportTypeCommentUsage=false, reportMissingTypeStubs=false
7
584af91e 8import os
584af91e
PP
9import re
10import socket
11import struct
5995b304
SM
12import logging
13import os.path
14import argparse
584af91e 15import tempfile
aca7de76
SM
16from typing import Dict, Union, Iterable, Optional, Sequence, overload
17
18import tjson
19
20# isort: off
21from typing import Any, Callable # noqa: F401
22
23# isort: on
584af91e
PP
24
25
ee1171e5
SM
26# An entry within the index of an LTTng data stream.
27class _LttngDataStreamIndexEntry:
28 def __init__(
29 self,
aca7de76
SM
30 offset_bytes: int,
31 total_size_bits: int,
32 content_size_bits: int,
33 timestamp_begin: int,
34 timestamp_end: int,
35 events_discarded: int,
36 stream_class_id: int,
ee1171e5
SM
37 ):
38 self._offset_bytes = offset_bytes
39 self._total_size_bits = total_size_bits
40 self._content_size_bits = content_size_bits
41 self._timestamp_begin = timestamp_begin
42 self._timestamp_end = timestamp_end
43 self._events_discarded = events_discarded
44 self._stream_class_id = stream_class_id
45
46 @property
47 def offset_bytes(self):
48 return self._offset_bytes
49
50 @property
51 def total_size_bits(self):
52 return self._total_size_bits
53
54 @property
55 def total_size_bytes(self):
56 return self._total_size_bits // 8
57
58 @property
59 def content_size_bits(self):
60 return self._content_size_bits
61
62 @property
63 def content_size_bytes(self):
64 return self._content_size_bits // 8
65
66 @property
67 def timestamp_begin(self):
68 return self._timestamp_begin
69
70 @property
71 def timestamp_end(self):
72 return self._timestamp_end
73
74 @property
75 def events_discarded(self):
76 return self._events_discarded
77
78 @property
79 def stream_class_id(self):
80 return self._stream_class_id
81
82
83# An entry within the index of an LTTng data stream. While a stream beacon entry
84# is conceptually unrelated to an index, it is sent as a reply to a
85# LttngLiveViewerGetNextDataStreamIndexEntryCommand
aca7de76
SM
86class _LttngDataStreamBeaconIndexEntry:
87 def __init__(self, stream_class_id: int, timestamp: int):
ee1171e5
SM
88 self._stream_class_id = stream_class_id
89 self._timestamp = timestamp
90
91 @property
92 def timestamp(self):
93 return self._timestamp
94
95 @property
96 def stream_class_id(self):
97 return self._stream_class_id
98
99
aca7de76
SM
100_LttngIndexEntryT = Union[_LttngDataStreamIndexEntry, _LttngDataStreamBeaconIndexEntry]
101
102
584af91e 103class _LttngLiveViewerCommand:
aca7de76 104 def __init__(self, version: int):
584af91e
PP
105 self._version = version
106
107 @property
108 def version(self):
109 return self._version
110
111
112class _LttngLiveViewerConnectCommand(_LttngLiveViewerCommand):
aca7de76 113 def __init__(self, version: int, viewer_session_id: int, major: int, minor: int):
584af91e
PP
114 super().__init__(version)
115 self._viewer_session_id = viewer_session_id
116 self._major = major
117 self._minor = minor
118
119 @property
120 def viewer_session_id(self):
121 return self._viewer_session_id
122
123 @property
124 def major(self):
125 return self._major
126
127 @property
128 def minor(self):
129 return self._minor
130
131
aca7de76
SM
132class _LttngLiveViewerReply:
133 pass
134
135
136class _LttngLiveViewerConnectReply(_LttngLiveViewerReply):
137 def __init__(self, viewer_session_id: int, major: int, minor: int):
584af91e
PP
138 self._viewer_session_id = viewer_session_id
139 self._major = major
140 self._minor = minor
141
142 @property
143 def viewer_session_id(self):
144 return self._viewer_session_id
145
146 @property
147 def major(self):
148 return self._major
149
150 @property
151 def minor(self):
152 return self._minor
153
154
155class _LttngLiveViewerGetTracingSessionInfosCommand(_LttngLiveViewerCommand):
156 pass
157
158
159class _LttngLiveViewerTracingSessionInfo:
160 def __init__(
161 self,
aca7de76
SM
162 tracing_session_id: int,
163 live_timer_freq: int,
164 client_count: int,
165 stream_count: int,
166 hostname: str,
167 name: str,
584af91e
PP
168 ):
169 self._tracing_session_id = tracing_session_id
170 self._live_timer_freq = live_timer_freq
171 self._client_count = client_count
172 self._stream_count = stream_count
173 self._hostname = hostname
174 self._name = name
175
176 @property
177 def tracing_session_id(self):
178 return self._tracing_session_id
179
180 @property
181 def live_timer_freq(self):
182 return self._live_timer_freq
183
184 @property
185 def client_count(self):
186 return self._client_count
187
188 @property
189 def stream_count(self):
190 return self._stream_count
191
192 @property
193 def hostname(self):
194 return self._hostname
195
196 @property
197 def name(self):
198 return self._name
199
200
aca7de76
SM
201class _LttngLiveViewerGetTracingSessionInfosReply(_LttngLiveViewerReply):
202 def __init__(
203 self, tracing_session_infos: Sequence[_LttngLiveViewerTracingSessionInfo]
204 ):
584af91e
PP
205 self._tracing_session_infos = tracing_session_infos
206
207 @property
208 def tracing_session_infos(self):
209 return self._tracing_session_infos
210
211
212class _LttngLiveViewerAttachToTracingSessionCommand(_LttngLiveViewerCommand):
213 class SeekType:
214 BEGINNING = 1
215 LAST = 2
216
aca7de76
SM
217 def __init__(
218 self, version: int, tracing_session_id: int, offset: int, seek_type: int
219 ):
584af91e
PP
220 super().__init__(version)
221 self._tracing_session_id = tracing_session_id
222 self._offset = offset
223 self._seek_type = seek_type
224
225 @property
226 def tracing_session_id(self):
227 return self._tracing_session_id
228
229 @property
230 def offset(self):
231 return self._offset
232
233 @property
234 def seek_type(self):
235 return self._seek_type
236
237
238class _LttngLiveViewerStreamInfo:
aca7de76
SM
239 def __init__(
240 self, id: int, trace_id: int, is_metadata: bool, path: str, channel_name: str
241 ):
584af91e
PP
242 self._id = id
243 self._trace_id = trace_id
244 self._is_metadata = is_metadata
245 self._path = path
246 self._channel_name = channel_name
247
248 @property
249 def id(self):
250 return self._id
251
252 @property
253 def trace_id(self):
254 return self._trace_id
255
256 @property
257 def is_metadata(self):
258 return self._is_metadata
259
260 @property
261 def path(self):
262 return self._path
263
264 @property
265 def channel_name(self):
266 return self._channel_name
267
268
aca7de76 269class _LttngLiveViewerAttachToTracingSessionReply(_LttngLiveViewerReply):
584af91e
PP
270 class Status:
271 OK = 1
272 ALREADY = 2
273 UNKNOWN = 3
274 NOT_LIVE = 4
275 SEEK_ERROR = 5
276 NO_SESSION = 6
277
aca7de76 278 def __init__(self, status: int, stream_infos: Sequence[_LttngLiveViewerStreamInfo]):
584af91e
PP
279 self._status = status
280 self._stream_infos = stream_infos
281
282 @property
283 def status(self):
284 return self._status
285
286 @property
287 def stream_infos(self):
288 return self._stream_infos
289
290
291class _LttngLiveViewerGetNextDataStreamIndexEntryCommand(_LttngLiveViewerCommand):
aca7de76 292 def __init__(self, version: int, stream_id: int):
584af91e
PP
293 super().__init__(version)
294 self._stream_id = stream_id
295
296 @property
297 def stream_id(self):
298 return self._stream_id
299
300
aca7de76 301class _LttngLiveViewerGetNextDataStreamIndexEntryReply(_LttngLiveViewerReply):
584af91e
PP
302 class Status:
303 OK = 1
304 RETRY = 2
305 HUP = 3
306 ERROR = 4
307 INACTIVE = 5
308 EOF = 6
309
aca7de76
SM
310 def __init__(
311 self,
312 status: int,
313 index_entry: _LttngIndexEntryT,
314 has_new_metadata: bool,
315 has_new_data_stream: bool,
316 ):
584af91e
PP
317 self._status = status
318 self._index_entry = index_entry
319 self._has_new_metadata = has_new_metadata
320 self._has_new_data_stream = has_new_data_stream
321
322 @property
323 def status(self):
324 return self._status
325
326 @property
327 def index_entry(self):
328 return self._index_entry
329
330 @property
331 def has_new_metadata(self):
332 return self._has_new_metadata
333
334 @property
335 def has_new_data_stream(self):
336 return self._has_new_data_stream
337
338
339class _LttngLiveViewerGetDataStreamPacketDataCommand(_LttngLiveViewerCommand):
aca7de76 340 def __init__(self, version: int, stream_id: int, offset: int, req_length: int):
584af91e
PP
341 super().__init__(version)
342 self._stream_id = stream_id
343 self._offset = offset
344 self._req_length = req_length
345
346 @property
347 def stream_id(self):
348 return self._stream_id
349
350 @property
351 def offset(self):
352 return self._offset
353
354 @property
355 def req_length(self):
356 return self._req_length
357
358
aca7de76 359class _LttngLiveViewerGetDataStreamPacketDataReply(_LttngLiveViewerReply):
584af91e
PP
360 class Status:
361 OK = 1
362 RETRY = 2
363 ERROR = 3
364 EOF = 4
365
aca7de76
SM
366 def __init__(
367 self,
368 status: int,
369 data: bytes,
370 has_new_metadata: bool,
371 has_new_data_stream: bool,
372 ):
584af91e
PP
373 self._status = status
374 self._data = data
375 self._has_new_metadata = has_new_metadata
376 self._has_new_data_stream = has_new_data_stream
377
378 @property
379 def status(self):
380 return self._status
381
382 @property
383 def data(self):
384 return self._data
385
386 @property
387 def has_new_metadata(self):
388 return self._has_new_metadata
389
390 @property
391 def has_new_data_stream(self):
392 return self._has_new_data_stream
393
394
395class _LttngLiveViewerGetMetadataStreamDataCommand(_LttngLiveViewerCommand):
aca7de76 396 def __init__(self, version: int, stream_id: int):
584af91e
PP
397 super().__init__(version)
398 self._stream_id = stream_id
399
400 @property
401 def stream_id(self):
402 return self._stream_id
403
404
aca7de76 405class _LttngLiveViewerGetMetadataStreamDataContentReply(_LttngLiveViewerReply):
584af91e
PP
406 class Status:
407 OK = 1
408 NO_NEW = 2
409 ERROR = 3
410
aca7de76 411 def __init__(self, status: int, data: bytes):
584af91e
PP
412 self._status = status
413 self._data = data
414
415 @property
416 def status(self):
417 return self._status
418
419 @property
420 def data(self):
421 return self._data
422
423
424class _LttngLiveViewerGetNewStreamInfosCommand(_LttngLiveViewerCommand):
aca7de76 425 def __init__(self, version: int, tracing_session_id: int):
584af91e
PP
426 super().__init__(version)
427 self._tracing_session_id = tracing_session_id
428
429 @property
430 def tracing_session_id(self):
431 return self._tracing_session_id
432
433
aca7de76 434class _LttngLiveViewerGetNewStreamInfosReply(_LttngLiveViewerReply):
584af91e
PP
435 class Status:
436 OK = 1
437 NO_NEW = 2
438 ERROR = 3
439 HUP = 4
440
aca7de76 441 def __init__(self, status: int, stream_infos: Sequence[_LttngLiveViewerStreamInfo]):
584af91e
PP
442 self._status = status
443 self._stream_infos = stream_infos
444
445 @property
446 def status(self):
447 return self._status
448
449 @property
450 def stream_infos(self):
451 return self._stream_infos
452
453
454class _LttngLiveViewerCreateViewerSessionCommand(_LttngLiveViewerCommand):
455 pass
456
457
aca7de76 458class _LttngLiveViewerCreateViewerSessionReply(_LttngLiveViewerReply):
584af91e
PP
459 class Status:
460 OK = 1
461 ERROR = 2
462
aca7de76 463 def __init__(self, status: int):
584af91e
PP
464 self._status = status
465
466 @property
467 def status(self):
468 return self._status
469
470
471class _LttngLiveViewerDetachFromTracingSessionCommand(_LttngLiveViewerCommand):
aca7de76 472 def __init__(self, version: int, tracing_session_id: int):
584af91e
PP
473 super().__init__(version)
474 self._tracing_session_id = tracing_session_id
475
476 @property
477 def tracing_session_id(self):
478 return self._tracing_session_id
479
480
aca7de76 481class _LttngLiveViewerDetachFromTracingSessionReply(_LttngLiveViewerReply):
584af91e
PP
482 class Status:
483 OK = 1
484 UNKNOWN = 2
485 ERROR = 3
486
aca7de76 487 def __init__(self, status: int):
584af91e
PP
488 self._status = status
489
490 @property
491 def status(self):
492 return self._status
493
494
495# An LTTng live protocol codec can convert bytes to command objects and
496# reply objects to bytes.
497class _LttngLiveViewerProtocolCodec:
f5567ea8 498 _COMMAND_HEADER_STRUCT_FMT = "QII"
584af91e
PP
499 _COMMAND_HEADER_SIZE_BYTES = struct.calcsize(_COMMAND_HEADER_STRUCT_FMT)
500
501 def __init__(self):
502 pass
503
aca7de76 504 def _unpack(self, fmt: str, data: bytes, offset: int = 0):
f5567ea8 505 fmt = "!" + fmt
584af91e
PP
506 return struct.unpack_from(fmt, data, offset)
507
aca7de76 508 def _unpack_payload(self, fmt: str, data: bytes):
584af91e
PP
509 return self._unpack(
510 fmt, data, _LttngLiveViewerProtocolCodec._COMMAND_HEADER_SIZE_BYTES
511 )
512
aca7de76 513 def decode(self, data: bytes):
584af91e
PP
514 if len(data) < self._COMMAND_HEADER_SIZE_BYTES:
515 # Not enough data to read the command header
516 return
517
518 payload_size, cmd_type, version = self._unpack(
519 self._COMMAND_HEADER_STRUCT_FMT, data
520 )
521 logging.info(
f5567ea8 522 "Decoded command header: payload-size={}, cmd-type={}, version={}".format(
584af91e
PP
523 payload_size, cmd_type, version
524 )
525 )
526
527 if len(data) < self._COMMAND_HEADER_SIZE_BYTES + payload_size:
528 # Not enough data to read the whole command
529 return
530
531 if cmd_type == 1:
aca7de76 532 viewer_session_id, major, minor, _ = self._unpack_payload("QIII", data)
584af91e
PP
533 return _LttngLiveViewerConnectCommand(
534 version, viewer_session_id, major, minor
535 )
536 elif cmd_type == 2:
537 return _LttngLiveViewerGetTracingSessionInfosCommand(version)
538 elif cmd_type == 3:
f5567ea8 539 tracing_session_id, offset, seek_type = self._unpack_payload("QQI", data)
584af91e
PP
540 return _LttngLiveViewerAttachToTracingSessionCommand(
541 version, tracing_session_id, offset, seek_type
542 )
543 elif cmd_type == 4:
f5567ea8 544 (stream_id,) = self._unpack_payload("Q", data)
584af91e
PP
545 return _LttngLiveViewerGetNextDataStreamIndexEntryCommand(
546 version, stream_id
547 )
548 elif cmd_type == 5:
f5567ea8 549 stream_id, offset, req_length = self._unpack_payload("QQI", data)
584af91e
PP
550 return _LttngLiveViewerGetDataStreamPacketDataCommand(
551 version, stream_id, offset, req_length
552 )
553 elif cmd_type == 6:
f5567ea8 554 (stream_id,) = self._unpack_payload("Q", data)
584af91e
PP
555 return _LttngLiveViewerGetMetadataStreamDataCommand(version, stream_id)
556 elif cmd_type == 7:
f5567ea8 557 (tracing_session_id,) = self._unpack_payload("Q", data)
584af91e
PP
558 return _LttngLiveViewerGetNewStreamInfosCommand(version, tracing_session_id)
559 elif cmd_type == 8:
560 return _LttngLiveViewerCreateViewerSessionCommand(version)
561 elif cmd_type == 9:
f5567ea8 562 (tracing_session_id,) = self._unpack_payload("Q", data)
584af91e
PP
563 return _LttngLiveViewerDetachFromTracingSessionCommand(
564 version, tracing_session_id
565 )
566 else:
f2958352 567 raise RuntimeError("Unknown command type {}".format(cmd_type))
584af91e 568
aca7de76 569 def _pack(self, fmt: str, *args: Any):
584af91e 570 # Force network byte order
f5567ea8 571 return struct.pack("!" + fmt, *args)
584af91e 572
aca7de76 573 def _encode_zero_padded_str(self, string: str, length: int):
584af91e 574 data = string.encode()
f5567ea8 575 return data.ljust(length, b"\x00")
584af91e 576
aca7de76 577 def _encode_stream_info(self, info: _LttngLiveViewerStreamInfo):
f5567ea8 578 data = self._pack("QQI", info.id, info.trace_id, int(info.is_metadata))
584af91e
PP
579 data += self._encode_zero_padded_str(info.path, 4096)
580 data += self._encode_zero_padded_str(info.channel_name, 255)
581 return data
582
aca7de76
SM
583 def _get_has_new_stuff_flags(
584 self, has_new_metadata: bool, has_new_data_streams: bool
585 ):
584af91e
PP
586 flags = 0
587
588 if has_new_metadata:
589 flags |= 1
590
591 if has_new_data_streams:
592 flags |= 2
593
594 return flags
595
aca7de76
SM
596 def encode(
597 self,
598 reply: _LttngLiveViewerReply,
599 ) -> bytes:
584af91e
PP
600 if type(reply) is _LttngLiveViewerConnectReply:
601 data = self._pack(
f5567ea8 602 "QIII", reply.viewer_session_id, reply.major, reply.minor, 2
584af91e
PP
603 )
604 elif type(reply) is _LttngLiveViewerGetTracingSessionInfosReply:
f5567ea8 605 data = self._pack("I", len(reply.tracing_session_infos))
584af91e
PP
606
607 for info in reply.tracing_session_infos:
608 data += self._pack(
f5567ea8 609 "QIII",
584af91e
PP
610 info.tracing_session_id,
611 info.live_timer_freq,
612 info.client_count,
613 info.stream_count,
614 )
615 data += self._encode_zero_padded_str(info.hostname, 64)
616 data += self._encode_zero_padded_str(info.name, 255)
617 elif type(reply) is _LttngLiveViewerAttachToTracingSessionReply:
f5567ea8 618 data = self._pack("II", reply.status, len(reply.stream_infos))
584af91e
PP
619
620 for info in reply.stream_infos:
621 data += self._encode_stream_info(info)
622 elif type(reply) is _LttngLiveViewerGetNextDataStreamIndexEntryReply:
f5567ea8 623 index_format = "QQQQQQQII"
584af91e
PP
624 entry = reply.index_entry
625 flags = self._get_has_new_stuff_flags(
626 reply.has_new_metadata, reply.has_new_data_stream
627 )
628
aca7de76 629 if isinstance(entry, _LttngDataStreamIndexEntry):
71f56e5f
JG
630 data = self._pack(
631 index_format,
632 entry.offset_bytes,
633 entry.total_size_bits,
634 entry.content_size_bits,
635 entry.timestamp_begin,
636 entry.timestamp_end,
637 entry.events_discarded,
638 entry.stream_class_id,
639 reply.status,
640 flags,
641 )
642 else:
71f56e5f
JG
643 data = self._pack(
644 index_format,
645 0,
646 0,
647 0,
648 0,
649 entry.timestamp,
650 0,
651 entry.stream_class_id,
652 reply.status,
653 flags,
654 )
584af91e
PP
655 elif type(reply) is _LttngLiveViewerGetDataStreamPacketDataReply:
656 flags = self._get_has_new_stuff_flags(
657 reply.has_new_metadata, reply.has_new_data_stream
658 )
f5567ea8 659 data = self._pack("III", reply.status, len(reply.data), flags)
584af91e
PP
660 data += reply.data
661 elif type(reply) is _LttngLiveViewerGetMetadataStreamDataContentReply:
f5567ea8 662 data = self._pack("QI", len(reply.data), reply.status)
584af91e
PP
663 data += reply.data
664 elif type(reply) is _LttngLiveViewerGetNewStreamInfosReply:
f5567ea8 665 data = self._pack("II", reply.status, len(reply.stream_infos))
584af91e
PP
666
667 for info in reply.stream_infos:
668 data += self._encode_stream_info(info)
669 elif type(reply) is _LttngLiveViewerCreateViewerSessionReply:
f5567ea8 670 data = self._pack("I", reply.status)
584af91e 671 elif type(reply) is _LttngLiveViewerDetachFromTracingSessionReply:
f5567ea8 672 data = self._pack("I", reply.status)
584af91e
PP
673 else:
674 raise ValueError(
f5567ea8 675 "Unknown reply object with class `{}`".format(reply.__class__.__name__)
584af91e
PP
676 )
677
678 return data
679
680
aca7de76
SM
681def _get_entry_timestamp_begin(
682 entry: _LttngIndexEntryT,
683):
684 if isinstance(entry, _LttngDataStreamBeaconIndexEntry):
78169723
FD
685 return entry.timestamp
686 else:
78169723
FD
687 return entry.timestamp_begin
688
689
584af91e 690# The index of an LTTng data stream, a sequence of index entries.
aca7de76
SM
691class _LttngDataStreamIndex(Sequence[_LttngIndexEntryT]):
692 def __init__(self, path: str, beacons: Optional[tjson.ArrayVal]):
584af91e
PP
693 self._path = path
694 self._build()
71f56e5f
JG
695
696 if beacons:
697 stream_class_id = self._entries[0].stream_class_id
aca7de76
SM
698
699 beacons_list = [] # type: list[_LttngDataStreamBeaconIndexEntry]
700 for ts in beacons.iter(tjson.IntVal):
701 beacons_list.append(
702 _LttngDataStreamBeaconIndexEntry(stream_class_id, ts.val)
703 )
704
705 self._add_beacons(beacons_list)
71f56e5f 706
584af91e
PP
707 logging.info(
708 'Built data stream index entries: path="{}", count={}'.format(
709 path, len(self._entries)
710 )
711 )
712
713 def _build(self):
aca7de76 714 self._entries = [] # type: list[_LttngIndexEntryT]
584af91e
PP
715 assert os.path.isfile(self._path)
716
f5567ea8 717 with open(self._path, "rb") as f:
584af91e 718 # Read header first
f5567ea8 719 fmt = ">IIII"
584af91e
PP
720 size = struct.calcsize(fmt)
721 data = f.read(size)
722 assert len(data) == size
aca7de76 723 magic, _, _, index_entry_length = struct.unpack(fmt, data)
584af91e
PP
724 assert magic == 0xC1F1DCC1
725
726 # Read index entries
f5567ea8 727 fmt = ">QQQQQQQ"
584af91e
PP
728 size = struct.calcsize(fmt)
729
730 while True:
731 logging.debug(
732 'Decoding data stream index entry: path="{}", offset={}'.format(
733 self._path, f.tell()
734 )
735 )
736 data = f.read(size)
737
738 if not data:
739 # Done
740 break
741
742 assert len(data) == size
75882e97
FD
743 (
744 offset_bytes,
745 total_size_bits,
746 content_size_bits,
747 timestamp_begin,
748 timestamp_end,
749 events_discarded,
750 stream_class_id,
751 ) = struct.unpack(fmt, data)
584af91e
PP
752
753 self._entries.append(
754 _LttngDataStreamIndexEntry(
755 offset_bytes,
756 total_size_bits,
757 content_size_bits,
758 timestamp_begin,
759 timestamp_end,
760 events_discarded,
761 stream_class_id,
762 )
763 )
764
765 # Skip anything else before the next entry
766 f.seek(index_entry_length - size, os.SEEK_CUR)
767
aca7de76 768 def _add_beacons(self, beacons: Iterable[_LttngDataStreamBeaconIndexEntry]):
71f56e5f 769 # Assumes entries[n + 1].timestamp_end >= entries[n].timestamp_begin
aca7de76
SM
770 def sort_key(
771 entry: Union[_LttngDataStreamIndexEntry, _LttngDataStreamBeaconIndexEntry],
772 ) -> int:
773 if isinstance(entry, _LttngDataStreamBeaconIndexEntry):
71f56e5f
JG
774 return entry.timestamp
775 else:
776 return entry.timestamp_end
777
778 self._entries += beacons
779 self._entries.sort(key=sort_key)
780
aca7de76
SM
781 @overload
782 def __getitem__(self, index: int) -> _LttngIndexEntryT:
783 ...
784
785 @overload
786 def __getitem__(self, index: slice) -> Sequence[_LttngIndexEntryT]: # noqa: F811
787 ...
788
789 def __getitem__( # noqa: F811
790 self, index: Union[int, slice]
791 ) -> Union[_LttngIndexEntryT, Sequence[_LttngIndexEntryT],]:
584af91e
PP
792 return self._entries[index]
793
794 def __len__(self):
795 return len(self._entries)
796
797 @property
798 def path(self):
799 return self._path
800
801
802# An LTTng data stream.
803class _LttngDataStream:
aca7de76 804 def __init__(self, path: str, beacons_json: Optional[tjson.ArrayVal]):
584af91e
PP
805 self._path = path
806 filename = os.path.basename(path)
f5567ea8 807 match = re.match(r"(.*)_\d+", filename)
b8b97725
SM
808 if not match:
809 raise RuntimeError(
810 "Unexpected data stream file name pattern: {}".format(filename)
811 )
812
584af91e
PP
813 self._channel_name = match.group(1)
814 trace_dir = os.path.dirname(path)
f5567ea8 815 index_path = os.path.join(trace_dir, "index", filename + ".idx")
aca7de76 816 self._index = _LttngDataStreamIndex(index_path, beacons_json)
584af91e 817 assert os.path.isfile(path)
f5567ea8 818 self._file = open(path, "rb")
584af91e
PP
819 logging.info(
820 'Built data stream: path="{}", channel-name="{}"'.format(
821 path, self._channel_name
822 )
823 )
824
825 @property
826 def path(self):
827 return self._path
828
829 @property
830 def channel_name(self):
831 return self._channel_name
832
833 @property
834 def index(self):
835 return self._index
836
aca7de76 837 def get_data(self, offset_bytes: int, len_bytes: int):
584af91e
PP
838 self._file.seek(offset_bytes)
839 return self._file.read(len_bytes)
840
841
78169723 842class _LttngMetadataStreamSection:
aca7de76 843 def __init__(self, timestamp: int, data: Optional[bytes]):
78169723
FD
844 self._timestamp = timestamp
845 if data is None:
846 self._data = bytes()
847 else:
848 self._data = data
849 logging.info(
f5567ea8 850 "Built metadata stream section: ts={}, data-len={}".format(
78169723
FD
851 self._timestamp, len(self._data)
852 )
853 )
854
855 @property
856 def timestamp(self):
857 return self._timestamp
858
859 @property
860 def data(self):
861 return self._data
862
863
584af91e
PP
864# An LTTng metadata stream.
865class _LttngMetadataStream:
aca7de76
SM
866 def __init__(
867 self,
868 metadata_file_path: str,
869 config_sections: Sequence[_LttngMetadataStreamSection],
870 ):
78169723
FD
871 self._path = metadata_file_path
872 self._sections = config_sections
873 logging.info(
f5567ea8 874 "Built metadata stream: path={}, section-len={}".format(
78169723
FD
875 self._path, len(self._sections)
876 )
877 )
584af91e
PP
878
879 @property
880 def path(self):
881 return self._path
882
883 @property
78169723
FD
884 def sections(self):
885 return self._sections
584af91e 886
78169723 887
aca7de76
SM
888class LttngMetadataConfigSection:
889 def __init__(self, line: int, timestamp: int, is_empty: bool):
890 self._line = line
891 self._timestamp = timestamp
892 self._is_empty = is_empty
893
894 @property
895 def line(self):
896 return self._line
78169723 897
aca7de76
SM
898 @property
899 def timestamp(self):
900 return self._timestamp
78169723 901
aca7de76
SM
902 @property
903 def is_empty(self):
904 return self._is_empty
905
906
907def _parse_metadata_sections_config(metadata_sections_json: tjson.ArrayVal):
908 metadata_sections = [] # type: list[LttngMetadataConfigSection]
78169723
FD
909 append_empty_section = False
910 last_timestamp = 0
911 last_line = 0
912
aca7de76
SM
913 for section in metadata_sections_json:
914 if isinstance(section, tjson.StrVal):
915 if section.val == "empty":
916 # Found a empty section marker. Actually append the section at the
917 # timestamp of the next concrete section.
918 append_empty_section = True
919 else:
920 raise ValueError("Invalid string value at {}.".format(section.path))
921 elif isinstance(section, tjson.ObjVal):
922 line = section.at("line", tjson.IntVal).val
923 ts = section.at("timestamp", tjson.IntVal).val
b8b97725 924
78169723
FD
925 # Sections' timestamps and lines must both be increasing.
926 assert ts > last_timestamp
927 last_timestamp = ts
aca7de76 928
78169723
FD
929 assert line > last_line
930 last_line = line
931
932 if append_empty_section:
aca7de76 933 metadata_sections.append(LttngMetadataConfigSection(line, ts, True))
78169723
FD
934 append_empty_section = False
935
aca7de76
SM
936 metadata_sections.append(LttngMetadataConfigSection(line, ts, False))
937 else:
938 raise TypeError(
939 "`{}`: expecting a string or object value".format(section.path)
940 )
78169723 941
aca7de76 942 return metadata_sections
78169723 943
78169723 944
aca7de76
SM
945def _split_metadata_sections(
946 metadata_file_path: str, metadata_sections_json: tjson.ArrayVal
947):
948 metadata_sections = _parse_metadata_sections_config(metadata_sections_json)
78169723 949
aca7de76 950 sections = [] # type: list[_LttngMetadataStreamSection]
f5567ea8 951 with open(metadata_file_path, "r") as metadata_file:
78169723
FD
952 metadata_lines = [line for line in metadata_file]
953
aca7de76 954 metadata_section_idx = 0
78169723
FD
955 curr_metadata_section = bytearray()
956
957 for idx, line_content in enumerate(metadata_lines):
958 # Add one to the index to convert from the zero-indexing of the
959 # enumerate() function to the one-indexing used by humans when
960 # viewing a text file.
961 curr_line_number = idx + 1
962
963 # If there are no more sections, simply append the line.
aca7de76 964 if metadata_section_idx + 1 >= len(metadata_sections):
f5567ea8 965 curr_metadata_section += bytearray(line_content, "utf8")
78169723
FD
966 continue
967
aca7de76 968 next_section_line_number = metadata_sections[metadata_section_idx + 1].line
78169723
FD
969
970 # If the next section begins at the current line, create a
971 # section with the metadata we gathered so far.
972 if curr_line_number >= next_section_line_number:
78169723
FD
973 # Flushing the metadata of the current section.
974 sections.append(
975 _LttngMetadataStreamSection(
aca7de76 976 metadata_sections[metadata_section_idx].timestamp,
78169723
FD
977 bytes(curr_metadata_section),
978 )
979 )
980
981 # Move to the next section.
aca7de76 982 metadata_section_idx += 1
78169723
FD
983
984 # Clear old content and append current line for the next section.
985 curr_metadata_section.clear()
f5567ea8 986 curr_metadata_section += bytearray(line_content, "utf8")
78169723
FD
987
988 # Append any empty sections.
aca7de76 989 while metadata_sections[metadata_section_idx].is_empty:
78169723
FD
990 sections.append(
991 _LttngMetadataStreamSection(
aca7de76 992 metadata_sections[metadata_section_idx].timestamp, None
78169723
FD
993 )
994 )
aca7de76 995 metadata_section_idx += 1
78169723
FD
996 else:
997 # Append line_content to the current metadata section.
f5567ea8 998 curr_metadata_section += bytearray(line_content, "utf8")
78169723
FD
999
1000 # We iterated over all the lines of the metadata file. Close the current section.
1001 sections.append(
1002 _LttngMetadataStreamSection(
aca7de76 1003 metadata_sections[metadata_section_idx].timestamp,
78169723
FD
1004 bytes(curr_metadata_section),
1005 )
1006 )
1007
1008 return sections
584af91e
PP
1009
1010
aca7de76
SM
1011_StreamBeaconsT = Dict[str, Iterable[int]]
1012
1013
584af91e 1014# An LTTng trace, a sequence of LTTng data streams.
aca7de76
SM
1015class LttngTrace(Sequence[_LttngDataStream]):
1016 def __init__(
1017 self,
1018 trace_dir: str,
1019 metadata_sections_json: Optional[tjson.ArrayVal],
1020 beacons_json: Optional[tjson.ObjVal],
1021 ):
584af91e
PP
1022 assert os.path.isdir(trace_dir)
1023 self._path = trace_dir
aca7de76
SM
1024 self._create_metadata_stream(trace_dir, metadata_sections_json)
1025 self._create_data_streams(trace_dir, beacons_json)
584af91e
PP
1026 logging.info('Built trace: path="{}"'.format(trace_dir))
1027
aca7de76
SM
1028 def _create_data_streams(
1029 self, trace_dir: str, beacons_json: Optional[tjson.ObjVal]
1030 ):
1031 data_stream_paths = [] # type: list[str]
584af91e
PP
1032
1033 for filename in os.listdir(trace_dir):
1034 path = os.path.join(trace_dir, filename)
1035
1036 if not os.path.isfile(path):
1037 continue
1038
f5567ea8 1039 if filename.startswith("."):
584af91e
PP
1040 continue
1041
f5567ea8 1042 if filename == "metadata":
584af91e
PP
1043 continue
1044
1045 data_stream_paths.append(path)
1046
1047 data_stream_paths.sort()
aca7de76 1048 self._data_streams = [] # type: list[_LttngDataStream]
584af91e
PP
1049
1050 for data_stream_path in data_stream_paths:
71f56e5f 1051 stream_name = os.path.basename(data_stream_path)
aca7de76
SM
1052 this_beacons_json = None
1053 if beacons_json is not None and stream_name in beacons_json:
1054 this_beacons_json = beacons_json.at(stream_name, tjson.ArrayVal)
71f56e5f
JG
1055
1056 self._data_streams.append(
aca7de76 1057 _LttngDataStream(data_stream_path, this_beacons_json)
71f56e5f 1058 )
584af91e 1059
aca7de76
SM
1060 def _create_metadata_stream(
1061 self, trace_dir: str, metadata_sections_json: Optional[tjson.ArrayVal]
1062 ):
f5567ea8 1063 metadata_path = os.path.join(trace_dir, "metadata")
aca7de76 1064 metadata_sections = [] # type: list[_LttngMetadataStreamSection]
78169723 1065
aca7de76 1066 if metadata_sections_json is None:
f5567ea8 1067 with open(metadata_path, "rb") as metadata_file:
78169723
FD
1068 metadata_sections.append(
1069 _LttngMetadataStreamSection(0, metadata_file.read())
1070 )
1071 else:
1072 metadata_sections = _split_metadata_sections(
aca7de76 1073 metadata_path, metadata_sections_json
78169723
FD
1074 )
1075
1076 self._metadata_stream = _LttngMetadataStream(metadata_path, metadata_sections)
1077
584af91e
PP
1078 @property
1079 def path(self):
1080 return self._path
1081
1082 @property
1083 def metadata_stream(self):
1084 return self._metadata_stream
1085
aca7de76
SM
1086 @overload
1087 def __getitem__(self, index: int) -> _LttngDataStream:
1088 ...
1089
1090 @overload
1091 def __getitem__(self, index: slice) -> Sequence[_LttngDataStream]: # noqa: F811
1092 ...
1093
1094 def __getitem__( # noqa: F811
1095 self, index: Union[int, slice]
1096 ) -> Union[_LttngDataStream, Sequence[_LttngDataStream]]:
584af91e
PP
1097 return self._data_streams[index]
1098
1099 def __len__(self):
1100 return len(self._data_streams)
1101
1102
1103# The state of a single data stream.
1104class _LttngLiveViewerSessionDataStreamState:
aca7de76
SM
1105 def __init__(
1106 self,
1107 ts_state: "_LttngLiveViewerSessionTracingSessionState",
1108 info: _LttngLiveViewerStreamInfo,
1109 data_stream: _LttngDataStream,
1110 metadata_stream_id: int,
1111 ):
584af91e
PP
1112 self._ts_state = ts_state
1113 self._info = info
1114 self._data_stream = data_stream
78169723 1115 self._metadata_stream_id = metadata_stream_id
584af91e
PP
1116 self._cur_index_entry_index = 0
1117 fmt = 'Built data stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
1118 logging.info(
1119 fmt.format(
1120 info.id,
1121 ts_state.tracing_session_descriptor.info.tracing_session_id,
1122 ts_state.tracing_session_descriptor.info.name,
1123 data_stream.path,
1124 )
1125 )
1126
1127 @property
1128 def tracing_session_state(self):
1129 return self._ts_state
1130
1131 @property
1132 def info(self):
1133 return self._info
1134
1135 @property
1136 def data_stream(self):
1137 return self._data_stream
1138
1139 @property
1140 def cur_index_entry(self):
1141 if self._cur_index_entry_index == len(self._data_stream.index):
1142 return
1143
1144 return self._data_stream.index[self._cur_index_entry_index]
1145
aca7de76
SM
1146 @property
1147 def metadata_stream_id(self):
1148 return self._metadata_stream_id
1149
584af91e
PP
1150 def goto_next_index_entry(self):
1151 self._cur_index_entry_index += 1
1152
1153
1154# The state of a single metadata stream.
1155class _LttngLiveViewerSessionMetadataStreamState:
aca7de76
SM
1156 def __init__(
1157 self,
1158 ts_state: "_LttngLiveViewerSessionTracingSessionState",
1159 info: _LttngLiveViewerStreamInfo,
1160 metadata_stream: _LttngMetadataStream,
1161 ):
584af91e
PP
1162 self._ts_state = ts_state
1163 self._info = info
1164 self._metadata_stream = metadata_stream
78169723
FD
1165 self._cur_metadata_stream_section_index = 0
1166 if len(metadata_stream.sections) > 1:
1167 self._next_metadata_stream_section_timestamp = metadata_stream.sections[
1168 1
1169 ].timestamp
1170 else:
1171 self._next_metadata_stream_section_timestamp = None
1172
584af91e
PP
1173 self._is_sent = False
1174 fmt = 'Built metadata stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
1175 logging.info(
1176 fmt.format(
1177 info.id,
1178 ts_state.tracing_session_descriptor.info.tracing_session_id,
1179 ts_state.tracing_session_descriptor.info.name,
1180 metadata_stream.path,
1181 )
1182 )
1183
584af91e
PP
1184 @property
1185 def info(self):
1186 return self._info
1187
1188 @property
1189 def metadata_stream(self):
1190 return self._metadata_stream
1191
1192 @property
1193 def is_sent(self):
1194 return self._is_sent
1195
1196 @is_sent.setter
aca7de76 1197 def is_sent(self, value: bool):
584af91e
PP
1198 self._is_sent = value
1199
78169723
FD
1200 @property
1201 def cur_section(self):
1202 fmt = "Get current metadata section: section-idx={}"
1203 logging.info(fmt.format(self._cur_metadata_stream_section_index))
1204 if self._cur_metadata_stream_section_index == len(
1205 self._metadata_stream.sections
1206 ):
1207 return
1208
1209 return self._metadata_stream.sections[self._cur_metadata_stream_section_index]
1210
1211 def goto_next_section(self):
1212 self._cur_metadata_stream_section_index += 1
1213 if self.cur_section:
1214 self._next_metadata_stream_section_timestamp = self.cur_section.timestamp
1215 else:
1216 self._next_metadata_stream_section_timestamp = None
1217
1218 @property
1219 def next_section_timestamp(self):
1220 return self._next_metadata_stream_section_timestamp
1221
584af91e 1222
ee1171e5
SM
1223# A tracing session descriptor.
1224#
1225# In the constructor, `traces` is a list of LTTng traces (`LttngTrace`
1226# objects).
1227class LttngTracingSessionDescriptor:
1228 def __init__(
aca7de76
SM
1229 self,
1230 name: str,
1231 tracing_session_id: int,
1232 hostname: str,
1233 live_timer_freq: int,
1234 client_count: int,
1235 traces: Iterable[LttngTrace],
ee1171e5
SM
1236 ):
1237 for trace in traces:
1238 if name not in trace.path:
1239 fmt = "Tracing session name must be part of every trace path (`{}` not found in `{}`)"
1240 raise ValueError(fmt.format(name, trace.path))
1241
1242 self._traces = traces
1243 stream_count = sum([len(t) + 1 for t in traces])
1244 self._info = _LttngLiveViewerTracingSessionInfo(
1245 tracing_session_id,
1246 live_timer_freq,
1247 client_count,
1248 stream_count,
1249 hostname,
1250 name,
1251 )
1252
1253 @property
1254 def traces(self):
1255 return self._traces
1256
1257 @property
1258 def info(self):
1259 return self._info
1260
1261
584af91e
PP
1262# The state of a tracing session.
1263class _LttngLiveViewerSessionTracingSessionState:
aca7de76 1264 def __init__(self, tc_descr: LttngTracingSessionDescriptor, base_stream_id: int):
584af91e 1265 self._tc_descr = tc_descr
aca7de76
SM
1266 self._stream_infos = [] # type: list[_LttngLiveViewerStreamInfo]
1267 self._ds_states = {} # type: dict[int, _LttngLiveViewerSessionDataStreamState]
1268 self._ms_states = (
1269 {}
1270 ) # type: dict[int, _LttngLiveViewerSessionMetadataStreamState]
584af91e
PP
1271 stream_id = base_stream_id
1272
1273 for trace in tc_descr.traces:
1274 trace_id = stream_id * 1000
1275
78169723
FD
1276 # Metadata stream -> stream info and metadata stream state
1277 info = _LttngLiveViewerStreamInfo(
f5567ea8 1278 stream_id, trace_id, True, trace.metadata_stream.path, "metadata"
78169723
FD
1279 )
1280 self._stream_infos.append(info)
1281 self._ms_states[stream_id] = _LttngLiveViewerSessionMetadataStreamState(
1282 self, info, trace.metadata_stream
1283 )
1284 metadata_stream_id = stream_id
1285 stream_id += 1
1286
584af91e
PP
1287 # Data streams -> stream infos and data stream states
1288 for data_stream in trace:
1289 info = _LttngLiveViewerStreamInfo(
1290 stream_id,
1291 trace_id,
1292 False,
1293 data_stream.path,
1294 data_stream.channel_name,
1295 )
1296 self._stream_infos.append(info)
1297 self._ds_states[stream_id] = _LttngLiveViewerSessionDataStreamState(
78169723 1298 self, info, data_stream, metadata_stream_id
584af91e
PP
1299 )
1300 stream_id += 1
1301
584af91e
PP
1302 self._is_attached = False
1303 fmt = 'Built tracing session state: id={}, name="{}"'
1304 logging.info(fmt.format(tc_descr.info.tracing_session_id, tc_descr.info.name))
1305
1306 @property
1307 def tracing_session_descriptor(self):
1308 return self._tc_descr
1309
1310 @property
1311 def data_stream_states(self):
1312 return self._ds_states
1313
1314 @property
1315 def metadata_stream_states(self):
1316 return self._ms_states
1317
1318 @property
1319 def stream_infos(self):
1320 return self._stream_infos
1321
1322 @property
1323 def has_new_metadata(self):
1324 return any([not ms.is_sent for ms in self._ms_states.values()])
1325
1326 @property
1327 def is_attached(self):
1328 return self._is_attached
1329
1330 @is_attached.setter
aca7de76 1331 def is_attached(self, value: bool):
584af91e
PP
1332 self._is_attached = value
1333
1334
aca7de76
SM
1335def needs_new_metadata_section(
1336 metadata_stream_state: _LttngLiveViewerSessionMetadataStreamState,
1337 latest_timestamp: int,
1338):
78169723
FD
1339 if metadata_stream_state.next_section_timestamp is None:
1340 return False
1341
1342 if latest_timestamp >= metadata_stream_state.next_section_timestamp:
1343 return True
1344 else:
1345 return False
1346
1347
584af91e
PP
1348# An LTTng live viewer session manages a view on tracing sessions
1349# and replies to commands accordingly.
1350class _LttngLiveViewerSession:
1351 def __init__(
1352 self,
aca7de76
SM
1353 viewer_session_id: int,
1354 tracing_session_descriptors: Iterable[LttngTracingSessionDescriptor],
1355 max_query_data_response_size: Optional[int],
584af91e
PP
1356 ):
1357 self._viewer_session_id = viewer_session_id
aca7de76
SM
1358 self._ts_states = (
1359 {}
1360 ) # type: dict[int, _LttngLiveViewerSessionTracingSessionState]
1361 self._stream_states = (
1362 {}
1363 ) # type: dict[int, _LttngLiveViewerSessionDataStreamState | _LttngLiveViewerSessionMetadataStreamState]
584af91e
PP
1364 self._max_query_data_response_size = max_query_data_response_size
1365 total_stream_infos = 0
1366
1367 for ts_descr in tracing_session_descriptors:
1368 ts_state = _LttngLiveViewerSessionTracingSessionState(
1369 ts_descr, total_stream_infos
1370 )
1371 ts_id = ts_state.tracing_session_descriptor.info.tracing_session_id
1372 self._ts_states[ts_id] = ts_state
1373 total_stream_infos += len(ts_state.stream_infos)
1374
1375 # Update session's stream states to have the new states
1376 self._stream_states.update(ts_state.data_stream_states)
1377 self._stream_states.update(ts_state.metadata_stream_states)
1378
1379 self._command_handlers = {
1380 _LttngLiveViewerAttachToTracingSessionCommand: self._handle_attach_to_tracing_session_command,
1381 _LttngLiveViewerCreateViewerSessionCommand: self._handle_create_viewer_session_command,
1382 _LttngLiveViewerDetachFromTracingSessionCommand: self._handle_detach_from_tracing_session_command,
1383 _LttngLiveViewerGetDataStreamPacketDataCommand: self._handle_get_data_stream_packet_data_command,
1384 _LttngLiveViewerGetMetadataStreamDataCommand: self._handle_get_metadata_stream_data_command,
1385 _LttngLiveViewerGetNewStreamInfosCommand: self._handle_get_new_stream_infos_command,
1386 _LttngLiveViewerGetNextDataStreamIndexEntryCommand: self._handle_get_next_data_stream_index_entry_command,
1387 _LttngLiveViewerGetTracingSessionInfosCommand: self._handle_get_tracing_session_infos_command,
aca7de76 1388 } # type: dict[type[_LttngLiveViewerCommand], Callable[[Any], _LttngLiveViewerReply]]
584af91e
PP
1389
1390 @property
1391 def viewer_session_id(self):
1392 return self._viewer_session_id
1393
aca7de76 1394 def _get_tracing_session_state(self, tracing_session_id: int):
584af91e 1395 if tracing_session_id not in self._ts_states:
f2958352 1396 raise RuntimeError(
f5567ea8 1397 "Unknown tracing session ID {}".format(tracing_session_id)
584af91e
PP
1398 )
1399
1400 return self._ts_states[tracing_session_id]
1401
aca7de76 1402 def _get_data_stream_state(self, stream_id: int):
584af91e 1403 if stream_id not in self._stream_states:
aca7de76 1404 RuntimeError("Unknown stream ID {}".format(stream_id))
584af91e 1405
aca7de76
SM
1406 stream = self._stream_states[stream_id]
1407 if type(stream) is not _LttngLiveViewerSessionDataStreamState:
1408 raise RuntimeError("Stream is not a data stream")
584af91e 1409
aca7de76
SM
1410 return stream
1411
1412 def _get_metadata_stream_state(self, stream_id: int):
1413 if stream_id not in self._stream_states:
1414 RuntimeError("Unknown stream ID {}".format(stream_id))
1415
1416 stream = self._stream_states[stream_id]
1417 if type(stream) is not _LttngLiveViewerSessionMetadataStreamState:
1418 raise RuntimeError("Stream is not a metadata stream")
1419
1420 return stream
1421
1422 def handle_command(self, cmd: _LttngLiveViewerCommand):
584af91e 1423 logging.info(
f5567ea8 1424 "Handling command in viewer session: cmd-cls-name={}".format(
584af91e
PP
1425 cmd.__class__.__name__
1426 )
1427 )
1428 cmd_type = type(cmd)
1429
1430 if cmd_type not in self._command_handlers:
f2958352 1431 raise RuntimeError(
f5567ea8 1432 "Unexpected command: cmd-cls-name={}".format(cmd.__class__.__name__)
584af91e
PP
1433 )
1434
1435 return self._command_handlers[cmd_type](cmd)
1436
aca7de76
SM
1437 def _handle_attach_to_tracing_session_command(
1438 self, cmd: _LttngLiveViewerAttachToTracingSessionCommand
1439 ):
584af91e
PP
1440 fmt = 'Handling "attach to tracing session" command: ts-id={}, offset={}, seek-type={}'
1441 logging.info(fmt.format(cmd.tracing_session_id, cmd.offset, cmd.seek_type))
1442 ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
1443 info = ts_state.tracing_session_descriptor.info
1444
1445 if ts_state.is_attached:
f2958352 1446 raise RuntimeError(
f5567ea8 1447 "Cannot attach to tracing session `{}`: viewer is already attached".format(
584af91e
PP
1448 info.name
1449 )
1450 )
1451
1452 ts_state.is_attached = True
1453 status = _LttngLiveViewerAttachToTracingSessionReply.Status.OK
1454 return _LttngLiveViewerAttachToTracingSessionReply(
1455 status, ts_state.stream_infos
1456 )
1457
aca7de76
SM
1458 def _handle_detach_from_tracing_session_command(
1459 self, cmd: _LttngLiveViewerDetachFromTracingSessionCommand
1460 ):
584af91e
PP
1461 fmt = 'Handling "detach from tracing session" command: ts-id={}'
1462 logging.info(fmt.format(cmd.tracing_session_id))
1463 ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
1464 info = ts_state.tracing_session_descriptor.info
1465
1466 if not ts_state.is_attached:
f2958352 1467 raise RuntimeError(
f5567ea8 1468 "Cannot detach to tracing session `{}`: viewer is not attached".format(
584af91e
PP
1469 info.name
1470 )
1471 )
1472
1473 ts_state.is_attached = False
1474 status = _LttngLiveViewerDetachFromTracingSessionReply.Status.OK
1475 return _LttngLiveViewerDetachFromTracingSessionReply(status)
1476
aca7de76
SM
1477 def _handle_get_next_data_stream_index_entry_command(
1478 self, cmd: _LttngLiveViewerGetNextDataStreamIndexEntryCommand
1479 ):
584af91e
PP
1480 fmt = 'Handling "get next data stream index entry" command: stream-id={}'
1481 logging.info(fmt.format(cmd.stream_id))
aca7de76
SM
1482 stream_state = self._get_data_stream_state(cmd.stream_id)
1483 metadata_stream_state = self._get_metadata_stream_state(
1484 stream_state.metadata_stream_id
1485 )
584af91e
PP
1486
1487 if stream_state.cur_index_entry is None:
1488 # The viewer is done reading this stream
1489 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.HUP
1490
1491 # Dummy data stream index entry to use with the `HUP` status
1492 # (the reply needs one, but the viewer ignores it)
1493 index_entry = _LttngDataStreamIndexEntry(0, 0, 0, 0, 0, 0, 0)
1494
1495 return _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1496 status, index_entry, False, False
1497 )
1498
78169723
FD
1499 timestamp_begin = _get_entry_timestamp_begin(stream_state.cur_index_entry)
1500
1501 if needs_new_metadata_section(metadata_stream_state, timestamp_begin):
1502 metadata_stream_state.is_sent = False
1503 metadata_stream_state.goto_next_section()
1504
584af91e
PP
1505 # The viewer only checks the `has_new_metadata` flag if the
1506 # reply's status is `OK`, so we need to provide an index here
1507 has_new_metadata = stream_state.tracing_session_state.has_new_metadata
aca7de76 1508 if isinstance(stream_state.cur_index_entry, _LttngDataStreamIndexEntry):
71f56e5f
JG
1509 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.OK
1510 else:
71f56e5f
JG
1511 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.INACTIVE
1512
584af91e
PP
1513 reply = _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1514 status, stream_state.cur_index_entry, has_new_metadata, False
1515 )
1516 stream_state.goto_next_index_entry()
1517 return reply
1518
aca7de76
SM
1519 def _handle_get_data_stream_packet_data_command(
1520 self, cmd: _LttngLiveViewerGetDataStreamPacketDataCommand
1521 ):
584af91e
PP
1522 fmt = 'Handling "get data stream packet data" command: stream-id={}, offset={}, req-length={}'
1523 logging.info(fmt.format(cmd.stream_id, cmd.offset, cmd.req_length))
aca7de76 1524 stream_state = self._get_data_stream_state(cmd.stream_id)
584af91e
PP
1525 data_response_length = cmd.req_length
1526
584af91e
PP
1527 if stream_state.tracing_session_state.has_new_metadata:
1528 status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.ERROR
1529 return _LttngLiveViewerGetDataStreamPacketDataReply(
1530 status, bytes(), True, False
1531 )
1532
1533 if self._max_query_data_response_size:
1534 # Enforce a server side limit on the query requested length.
1535 # To ensure that the transaction terminate take the minimum of both
1536 # value.
1537 data_response_length = min(
1538 cmd.req_length, self._max_query_data_response_size
1539 )
1540 fmt = 'Limiting "get data stream packet data" command: req-length={} actual response size={}'
1541 logging.info(fmt.format(cmd.req_length, data_response_length))
1542
1543 data = stream_state.data_stream.get_data(cmd.offset, data_response_length)
1544 status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.OK
1545 return _LttngLiveViewerGetDataStreamPacketDataReply(status, data, False, False)
1546
aca7de76
SM
1547 def _handle_get_metadata_stream_data_command(
1548 self, cmd: _LttngLiveViewerGetMetadataStreamDataCommand
1549 ):
584af91e
PP
1550 fmt = 'Handling "get metadata stream data" command: stream-id={}'
1551 logging.info(fmt.format(cmd.stream_id))
aca7de76 1552 metadata_stream_state = self._get_metadata_stream_state(cmd.stream_id)
584af91e 1553
78169723 1554 if metadata_stream_state.is_sent:
584af91e
PP
1555 status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.NO_NEW
1556 return _LttngLiveViewerGetMetadataStreamDataContentReply(status, bytes())
1557
78169723 1558 metadata_stream_state.is_sent = True
584af91e 1559 status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.OK
78169723 1560 metadata_section = metadata_stream_state.cur_section
b8b97725 1561 assert metadata_section is not None
78169723
FD
1562
1563 # If we are sending an empty section, ready the next one right away.
1564 if len(metadata_section.data) == 0:
1565 metadata_stream_state.is_sent = False
1566 metadata_stream_state.goto_next_section()
1567
1568 fmt = 'Replying to "get metadata stream data" command: metadata-size={}'
1569 logging.info(fmt.format(len(metadata_section.data)))
584af91e 1570 return _LttngLiveViewerGetMetadataStreamDataContentReply(
78169723 1571 status, metadata_section.data
584af91e
PP
1572 )
1573
aca7de76
SM
1574 def _handle_get_new_stream_infos_command(
1575 self, cmd: _LttngLiveViewerGetNewStreamInfosCommand
1576 ):
584af91e
PP
1577 fmt = 'Handling "get new stream infos" command: ts-id={}'
1578 logging.info(fmt.format(cmd.tracing_session_id))
1579
1580 # As of this version, all the tracing session's stream infos are
1581 # always given to the viewer when sending the "attach to tracing
1582 # session" reply, so there's nothing new here. Return the `HUP`
1583 # status as, if we're handling this command, the viewer consumed
1584 # all the existing data streams.
1585 status = _LttngLiveViewerGetNewStreamInfosReply.Status.HUP
1586 return _LttngLiveViewerGetNewStreamInfosReply(status, [])
1587
aca7de76
SM
1588 def _handle_get_tracing_session_infos_command(
1589 self, cmd: _LttngLiveViewerGetTracingSessionInfosCommand
1590 ):
584af91e
PP
1591 logging.info('Handling "get tracing session infos" command.')
1592 infos = [
1593 tss.tracing_session_descriptor.info for tss in self._ts_states.values()
1594 ]
1595 infos.sort(key=lambda info: info.name)
1596 return _LttngLiveViewerGetTracingSessionInfosReply(infos)
1597
aca7de76
SM
1598 def _handle_create_viewer_session_command(
1599 self, cmd: _LttngLiveViewerCreateViewerSessionCommand
1600 ):
584af91e
PP
1601 logging.info('Handling "create viewer session" command.')
1602 status = _LttngLiveViewerCreateViewerSessionReply.Status.OK
1603
1604 # This does nothing here. In the LTTng relay daemon, it
1605 # allocates the viewer session's state.
1606 return _LttngLiveViewerCreateViewerSessionReply(status)
1607
1608
1609# An LTTng live TCP server.
1610#
e51141d3
SM
1611# On creation, it binds to `localhost` on the TCP port `port` if not `None`, or
1612# on an OS-assigned TCP port otherwise. It writes the decimal TCP port number
1613# to a temporary port file. It renames the temporary port file to
1614# `port_filename`.
584af91e
PP
1615#
1616# `tracing_session_descriptors` is a list of tracing session descriptors
1617# (`LttngTracingSessionDescriptor`) to serve.
1618#
1619# This server accepts a single viewer (client).
1620#
1621# When the viewer closes the connection, the server's constructor
1622# returns.
1623class LttngLiveServer:
1624 def __init__(
e51141d3 1625 self,
aca7de76 1626 port: Optional[int],
d11f45ef 1627 port_filename: Optional[str],
aca7de76
SM
1628 tracing_session_descriptors: Iterable[LttngTracingSessionDescriptor],
1629 max_query_data_response_size: Optional[int],
584af91e 1630 ):
f5567ea8 1631 logging.info("Server configuration:")
584af91e 1632
f5567ea8 1633 logging.info(" Port file name: `{}`".format(port_filename))
584af91e
PP
1634
1635 if max_query_data_response_size is not None:
1636 logging.info(
f5567ea8 1637 " Maximum response data query size: `{}`".format(
584af91e
PP
1638 max_query_data_response_size
1639 )
1640 )
1641
1642 for ts_descr in tracing_session_descriptors:
1643 info = ts_descr.info
1644 fmt = ' TS descriptor: name="{}", id={}, hostname="{}", live-timer-freq={}, client-count={}, stream-count={}:'
1645 logging.info(
1646 fmt.format(
1647 info.name,
1648 info.tracing_session_id,
1649 info.hostname,
1650 info.live_timer_freq,
1651 info.client_count,
1652 info.stream_count,
1653 )
1654 )
1655
1656 for trace in ts_descr.traces:
1657 logging.info(' Trace: path="{}"'.format(trace.path))
1658
1659 self._ts_descriptors = tracing_session_descriptors
1660 self._max_query_data_response_size = max_query_data_response_size
1661 self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1662 self._codec = _LttngLiveViewerProtocolCodec()
1663
1664 # Port 0: OS assigns an unused port
e51141d3 1665 serv_addr = ("localhost", port if port is not None else 0)
584af91e 1666 self._sock.bind(serv_addr)
d11f45ef
SM
1667
1668 if port_filename is not None:
1669 self._write_port_to_file(port_filename)
1670
1671 print("Listening on port {}".format(self._server_port))
584af91e
PP
1672
1673 try:
1674 self._listen()
1675 finally:
1676 self._sock.close()
f5567ea8 1677 logging.info("Closed connection and socket.")
584af91e
PP
1678
1679 @property
1680 def _server_port(self):
1681 return self._sock.getsockname()[1]
1682
1683 def _recv_command(self):
1684 data = bytes()
1685
1686 while True:
f5567ea8 1687 logging.info("Waiting for viewer command.")
584af91e
PP
1688 buf = self._conn.recv(128)
1689
1690 if not buf:
f5567ea8 1691 logging.info("Client closed connection.")
584af91e
PP
1692
1693 if data:
f2958352 1694 raise RuntimeError(
f5567ea8 1695 "Client closed connection after having sent {} command bytes.".format(
584af91e
PP
1696 len(data)
1697 )
1698 )
1699
1700 return
1701
f5567ea8 1702 logging.info("Received data from viewer: length={}".format(len(buf)))
584af91e
PP
1703
1704 data += buf
1705
1706 try:
1707 cmd = self._codec.decode(data)
1708 except struct.error as exc:
f2958352 1709 raise RuntimeError("Malformed command: {}".format(exc)) from exc
584af91e
PP
1710
1711 if cmd is not None:
1712 logging.info(
f5567ea8 1713 "Received command from viewer: cmd-cls-name={}".format(
584af91e
PP
1714 cmd.__class__.__name__
1715 )
1716 )
1717 return cmd
1718
aca7de76 1719 def _send_reply(self, reply: _LttngLiveViewerReply):
584af91e
PP
1720 data = self._codec.encode(reply)
1721 logging.info(
f5567ea8 1722 "Sending reply to viewer: reply-cls-name={}, length={}".format(
584af91e
PP
1723 reply.__class__.__name__, len(data)
1724 )
1725 )
1726 self._conn.sendall(data)
1727
1728 def _handle_connection(self):
1729 # First command must be "connect"
1730 cmd = self._recv_command()
1731
1732 if type(cmd) is not _LttngLiveViewerConnectCommand:
f2958352 1733 raise RuntimeError(
584af91e
PP
1734 'First command is not "connect": cmd-cls-name={}'.format(
1735 cmd.__class__.__name__
1736 )
1737 )
1738
1739 # Create viewer session (arbitrary ID 23)
1740 logging.info(
f5567ea8 1741 "LTTng live viewer connected: version={}.{}".format(cmd.major, cmd.minor)
584af91e
PP
1742 )
1743 viewer_session = _LttngLiveViewerSession(
1744 23, self._ts_descriptors, self._max_query_data_response_size
1745 )
1746
1747 # Send "connect" reply
1748 self._send_reply(
1749 _LttngLiveViewerConnectReply(viewer_session.viewer_session_id, 2, 10)
1750 )
1751
1752 # Make the viewer session handle the remaining commands
1753 while True:
1754 cmd = self._recv_command()
1755
1756 if cmd is None:
1757 # Connection closed (at an expected location within the
1758 # conversation)
1759 return
1760
1761 self._send_reply(viewer_session.handle_command(cmd))
1762
1763 def _listen(self):
f5567ea8 1764 logging.info("Listening: port={}".format(self._server_port))
1726ac08
JR
1765 # Backlog must be present for Python version < 3.5.
1766 # 128 is an arbitrary number since we expect only 1 connection anyway.
1767 self._sock.listen(128)
584af91e
PP
1768 self._conn, viewer_addr = self._sock.accept()
1769 logging.info(
f5567ea8 1770 "Accepted viewer: addr={}:{}".format(viewer_addr[0], viewer_addr[1])
584af91e
PP
1771 )
1772
1773 try:
1774 self._handle_connection()
1775 finally:
1776 self._conn.close()
1777
aca7de76 1778 def _write_port_to_file(self, port_filename: str):
584af91e 1779 # Write the port number to a temporary file.
f5567ea8
FD
1780 with tempfile.NamedTemporaryFile(mode="w", delete=False) as tmp_port_file:
1781 print(self._server_port, end="", file=tmp_port_file)
584af91e
PP
1782
1783 # Rename temporary file to real file
9c878ece 1784 os.replace(tmp_port_file.name, port_filename)
584af91e
PP
1785 logging.info(
1786 'Renamed port file: src-path="{}", dst-path="{}"'.format(
1787 tmp_port_file.name, port_filename
1788 )
1789 )
1790
1791
aca7de76
SM
1792def _session_descriptors_from_path(
1793 sessions_filename: str, trace_path_prefix: Optional[str]
1794):
2b763e29
JG
1795 # File format is:
1796 #
1797 # [
1798 # {
1799 # "name": "my-session",
1800 # "id": 17,
1801 # "hostname": "myhost",
1802 # "live-timer-freq": 1000000,
1803 # "client-count": 23,
1804 # "traces": [
1805 # {
1806 # "path": "lol"
1807 # },
1808 # {
71f56e5f
JG
1809 # "path": "meow/mix",
1810 # "beacons": {
1811 # "my_stream": [ 5235787, 728375283 ]
78169723
FD
1812 # },
1813 # "metadata-sections": [
1814 # {
1815 # "line": 1,
1816 # "timestamp": 0
1817 # }
1818 # ]
2b763e29
JG
1819 # }
1820 # ]
1821 # }
1822 # ]
f5567ea8 1823 with open(sessions_filename, "r") as sessions_file:
aca7de76
SM
1824 sessions_json = tjson.load(sessions_file, tjson.ArrayVal)
1825
1826 sessions = [] # type: list[LttngTracingSessionDescriptor]
2b763e29 1827
aca7de76
SM
1828 for session_json in sessions_json.iter(tjson.ObjVal):
1829 name = session_json.at("name", tjson.StrVal).val
1830 tracing_session_id = session_json.at("id", tjson.IntVal).val
1831 hostname = session_json.at("hostname", tjson.StrVal).val
1832 live_timer_freq = session_json.at("live-timer-freq", tjson.IntVal).val
1833 client_count = session_json.at("client-count", tjson.IntVal).val
1834 traces_json = session_json.at("traces", tjson.ArrayVal)
2b763e29 1835
aca7de76 1836 traces = [] # type: list[LttngTrace]
2b763e29 1837
aca7de76
SM
1838 for trace_json in traces_json.iter(tjson.ObjVal):
1839 metadata_sections = (
1840 trace_json.at("metadata-sections", tjson.ArrayVal)
1841 if "metadata-sections" in trace_json
1842 else None
1843 )
1844 beacons = (
1845 trace_json.at("beacons", tjson.ObjVal)
1846 if "beacons" in trace_json
1847 else None
1848 )
1849 path = trace_json.at("path", tjson.StrVal).val
2b763e29 1850
aca7de76 1851 if not os.path.isabs(path) and trace_path_prefix:
2b763e29
JG
1852 path = os.path.join(trace_path_prefix, path)
1853
aca7de76
SM
1854 traces.append(
1855 LttngTrace(
1856 path,
1857 metadata_sections,
1858 beacons,
1859 )
1860 )
2b763e29
JG
1861
1862 sessions.append(
1863 LttngTracingSessionDescriptor(
1864 name,
1865 tracing_session_id,
1866 hostname,
1867 live_timer_freq,
1868 client_count,
1869 traces,
1870 )
1871 )
1872
1873 return sessions
584af91e
PP
1874
1875
aca7de76 1876def _loglevel_parser(string: str):
f5567ea8 1877 loglevels = {"info": logging.INFO, "warning": logging.WARNING}
584af91e
PP
1878 if string not in loglevels:
1879 msg = "{} is not a valid loglevel".format(string)
1880 raise argparse.ArgumentTypeError(msg)
1881 return loglevels[string]
1882
1883
f5567ea8
FD
1884if __name__ == "__main__":
1885 logging.basicConfig(format="# %(asctime)-25s%(message)s")
584af91e 1886 parser = argparse.ArgumentParser(
f5567ea8 1887 description="LTTng-live protocol mocker", add_help=False
584af91e
PP
1888 )
1889 parser.add_argument(
f5567ea8
FD
1890 "--log-level",
1891 default="warning",
1892 choices=["info", "warning"],
1893 help="The loglevel to be used.",
584af91e
PP
1894 )
1895
1896 loglevel_namespace, remaining_args = parser.parse_known_args()
1897 logging.getLogger().setLevel(_loglevel_parser(loglevel_namespace.log_level))
1898
e51141d3
SM
1899 parser.add_argument(
1900 "--port",
1901 help="The port to bind to. If missing, use an OS-assigned port..",
1902 type=int,
1903 )
584af91e 1904 parser.add_argument(
f5567ea8
FD
1905 "--port-filename",
1906 help="The final port file. This file is present when the server is ready to receive connection.",
584af91e
PP
1907 )
1908 parser.add_argument(
f5567ea8 1909 "--max-query-data-response-size",
584af91e 1910 type=int,
f5567ea8 1911 help="The maximum size of control data response in bytes",
584af91e
PP
1912 )
1913 parser.add_argument(
f5567ea8 1914 "--trace-path-prefix",
2b763e29 1915 type=str,
f5567ea8 1916 help="Prefix to prepend to the trace paths of session configurations",
2b763e29
JG
1917 )
1918 parser.add_argument(
e46b8462 1919 "sessions_filename",
776a2a25 1920 type=str,
f5567ea8 1921 help="Path to a session configuration file",
e46b8462 1922 metavar="sessions-filename",
584af91e
PP
1923 )
1924 parser.add_argument(
f5567ea8
FD
1925 "-h",
1926 "--help",
1927 action="help",
584af91e 1928 default=argparse.SUPPRESS,
f5567ea8 1929 help="Show this help message and exit.",
584af91e
PP
1930 )
1931
1932 args = parser.parse_args(args=remaining_args)
f2958352
SM
1933 sessions_filename = args.sessions_filename # type: str
1934 trace_path_prefix = args.trace_path_prefix # type: str | None
1935 sessions = _session_descriptors_from_path(
1936 sessions_filename,
1937 trace_path_prefix,
1938 )
aca7de76 1939
f2958352 1940 port = args.port # type: int | None
d11f45ef 1941 port_filename = args.port_filename # type: str | None
f2958352
SM
1942 max_query_data_response_size = args.max_query_data_response_size # type: int | None
1943 LttngLiveServer(port, port_filename, sessions, max_query_data_response_size)
This page took 0.132678 seconds and 4 git commands to generate.