tests: add typing annotations to lttng_live_server.py
[babeltrace.git] / tests / data / plugins / src.ctf.lttng-live / lttng_live_server.py
1 # SPDX-License-Identifier: MIT
2 #
3 # Copyright (C) 2019 Philippe Proulx <pproulx@efficios.com>
4 #
5
6 # pyright: strict, reportTypeCommentUsage=false, reportMissingTypeStubs=false
7
8 import os
9 import re
10 import sys
11 import socket
12 import struct
13 import logging
14 import os.path
15 import argparse
16 import tempfile
17 from typing import Dict, Union, Iterable, Optional, Sequence, overload
18
19 import tjson
20
21 # isort: off
22 from typing import Any, Callable # noqa: F401
23
24 # isort: on
25
26
27 class UnexpectedInput(RuntimeError):
28 pass
29
30
31 # An entry within the index of an LTTng data stream.
32 class _LttngDataStreamIndexEntry:
33 def __init__(
34 self,
35 offset_bytes: int,
36 total_size_bits: int,
37 content_size_bits: int,
38 timestamp_begin: int,
39 timestamp_end: int,
40 events_discarded: int,
41 stream_class_id: int,
42 ):
43 self._offset_bytes = offset_bytes
44 self._total_size_bits = total_size_bits
45 self._content_size_bits = content_size_bits
46 self._timestamp_begin = timestamp_begin
47 self._timestamp_end = timestamp_end
48 self._events_discarded = events_discarded
49 self._stream_class_id = stream_class_id
50
51 @property
52 def offset_bytes(self):
53 return self._offset_bytes
54
55 @property
56 def total_size_bits(self):
57 return self._total_size_bits
58
59 @property
60 def total_size_bytes(self):
61 return self._total_size_bits // 8
62
63 @property
64 def content_size_bits(self):
65 return self._content_size_bits
66
67 @property
68 def content_size_bytes(self):
69 return self._content_size_bits // 8
70
71 @property
72 def timestamp_begin(self):
73 return self._timestamp_begin
74
75 @property
76 def timestamp_end(self):
77 return self._timestamp_end
78
79 @property
80 def events_discarded(self):
81 return self._events_discarded
82
83 @property
84 def stream_class_id(self):
85 return self._stream_class_id
86
87
88 # An entry within the index of an LTTng data stream. While a stream beacon entry
89 # is conceptually unrelated to an index, it is sent as a reply to a
90 # LttngLiveViewerGetNextDataStreamIndexEntryCommand
91 class _LttngDataStreamBeaconIndexEntry:
92 def __init__(self, stream_class_id: int, timestamp: int):
93 self._stream_class_id = stream_class_id
94 self._timestamp = timestamp
95
96 @property
97 def timestamp(self):
98 return self._timestamp
99
100 @property
101 def stream_class_id(self):
102 return self._stream_class_id
103
104
105 _LttngIndexEntryT = Union[_LttngDataStreamIndexEntry, _LttngDataStreamBeaconIndexEntry]
106
107
108 class _LttngLiveViewerCommand:
109 def __init__(self, version: int):
110 self._version = version
111
112 @property
113 def version(self):
114 return self._version
115
116
117 class _LttngLiveViewerConnectCommand(_LttngLiveViewerCommand):
118 def __init__(self, version: int, viewer_session_id: int, major: int, minor: int):
119 super().__init__(version)
120 self._viewer_session_id = viewer_session_id
121 self._major = major
122 self._minor = minor
123
124 @property
125 def viewer_session_id(self):
126 return self._viewer_session_id
127
128 @property
129 def major(self):
130 return self._major
131
132 @property
133 def minor(self):
134 return self._minor
135
136
137 class _LttngLiveViewerReply:
138 pass
139
140
141 class _LttngLiveViewerConnectReply(_LttngLiveViewerReply):
142 def __init__(self, viewer_session_id: int, major: int, minor: int):
143 self._viewer_session_id = viewer_session_id
144 self._major = major
145 self._minor = minor
146
147 @property
148 def viewer_session_id(self):
149 return self._viewer_session_id
150
151 @property
152 def major(self):
153 return self._major
154
155 @property
156 def minor(self):
157 return self._minor
158
159
160 class _LttngLiveViewerGetTracingSessionInfosCommand(_LttngLiveViewerCommand):
161 pass
162
163
164 class _LttngLiveViewerTracingSessionInfo:
165 def __init__(
166 self,
167 tracing_session_id: int,
168 live_timer_freq: int,
169 client_count: int,
170 stream_count: int,
171 hostname: str,
172 name: str,
173 ):
174 self._tracing_session_id = tracing_session_id
175 self._live_timer_freq = live_timer_freq
176 self._client_count = client_count
177 self._stream_count = stream_count
178 self._hostname = hostname
179 self._name = name
180
181 @property
182 def tracing_session_id(self):
183 return self._tracing_session_id
184
185 @property
186 def live_timer_freq(self):
187 return self._live_timer_freq
188
189 @property
190 def client_count(self):
191 return self._client_count
192
193 @property
194 def stream_count(self):
195 return self._stream_count
196
197 @property
198 def hostname(self):
199 return self._hostname
200
201 @property
202 def name(self):
203 return self._name
204
205
206 class _LttngLiveViewerGetTracingSessionInfosReply(_LttngLiveViewerReply):
207 def __init__(
208 self, tracing_session_infos: Sequence[_LttngLiveViewerTracingSessionInfo]
209 ):
210 self._tracing_session_infos = tracing_session_infos
211
212 @property
213 def tracing_session_infos(self):
214 return self._tracing_session_infos
215
216
217 class _LttngLiveViewerAttachToTracingSessionCommand(_LttngLiveViewerCommand):
218 class SeekType:
219 BEGINNING = 1
220 LAST = 2
221
222 def __init__(
223 self, version: int, tracing_session_id: int, offset: int, seek_type: int
224 ):
225 super().__init__(version)
226 self._tracing_session_id = tracing_session_id
227 self._offset = offset
228 self._seek_type = seek_type
229
230 @property
231 def tracing_session_id(self):
232 return self._tracing_session_id
233
234 @property
235 def offset(self):
236 return self._offset
237
238 @property
239 def seek_type(self):
240 return self._seek_type
241
242
243 class _LttngLiveViewerStreamInfo:
244 def __init__(
245 self, id: int, trace_id: int, is_metadata: bool, path: str, channel_name: str
246 ):
247 self._id = id
248 self._trace_id = trace_id
249 self._is_metadata = is_metadata
250 self._path = path
251 self._channel_name = channel_name
252
253 @property
254 def id(self):
255 return self._id
256
257 @property
258 def trace_id(self):
259 return self._trace_id
260
261 @property
262 def is_metadata(self):
263 return self._is_metadata
264
265 @property
266 def path(self):
267 return self._path
268
269 @property
270 def channel_name(self):
271 return self._channel_name
272
273
274 class _LttngLiveViewerAttachToTracingSessionReply(_LttngLiveViewerReply):
275 class Status:
276 OK = 1
277 ALREADY = 2
278 UNKNOWN = 3
279 NOT_LIVE = 4
280 SEEK_ERROR = 5
281 NO_SESSION = 6
282
283 def __init__(self, status: int, stream_infos: Sequence[_LttngLiveViewerStreamInfo]):
284 self._status = status
285 self._stream_infos = stream_infos
286
287 @property
288 def status(self):
289 return self._status
290
291 @property
292 def stream_infos(self):
293 return self._stream_infos
294
295
296 class _LttngLiveViewerGetNextDataStreamIndexEntryCommand(_LttngLiveViewerCommand):
297 def __init__(self, version: int, stream_id: int):
298 super().__init__(version)
299 self._stream_id = stream_id
300
301 @property
302 def stream_id(self):
303 return self._stream_id
304
305
306 class _LttngLiveViewerGetNextDataStreamIndexEntryReply(_LttngLiveViewerReply):
307 class Status:
308 OK = 1
309 RETRY = 2
310 HUP = 3
311 ERROR = 4
312 INACTIVE = 5
313 EOF = 6
314
315 def __init__(
316 self,
317 status: int,
318 index_entry: _LttngIndexEntryT,
319 has_new_metadata: bool,
320 has_new_data_stream: bool,
321 ):
322 self._status = status
323 self._index_entry = index_entry
324 self._has_new_metadata = has_new_metadata
325 self._has_new_data_stream = has_new_data_stream
326
327 @property
328 def status(self):
329 return self._status
330
331 @property
332 def index_entry(self):
333 return self._index_entry
334
335 @property
336 def has_new_metadata(self):
337 return self._has_new_metadata
338
339 @property
340 def has_new_data_stream(self):
341 return self._has_new_data_stream
342
343
344 class _LttngLiveViewerGetDataStreamPacketDataCommand(_LttngLiveViewerCommand):
345 def __init__(self, version: int, stream_id: int, offset: int, req_length: int):
346 super().__init__(version)
347 self._stream_id = stream_id
348 self._offset = offset
349 self._req_length = req_length
350
351 @property
352 def stream_id(self):
353 return self._stream_id
354
355 @property
356 def offset(self):
357 return self._offset
358
359 @property
360 def req_length(self):
361 return self._req_length
362
363
364 class _LttngLiveViewerGetDataStreamPacketDataReply(_LttngLiveViewerReply):
365 class Status:
366 OK = 1
367 RETRY = 2
368 ERROR = 3
369 EOF = 4
370
371 def __init__(
372 self,
373 status: int,
374 data: bytes,
375 has_new_metadata: bool,
376 has_new_data_stream: bool,
377 ):
378 self._status = status
379 self._data = data
380 self._has_new_metadata = has_new_metadata
381 self._has_new_data_stream = has_new_data_stream
382
383 @property
384 def status(self):
385 return self._status
386
387 @property
388 def data(self):
389 return self._data
390
391 @property
392 def has_new_metadata(self):
393 return self._has_new_metadata
394
395 @property
396 def has_new_data_stream(self):
397 return self._has_new_data_stream
398
399
400 class _LttngLiveViewerGetMetadataStreamDataCommand(_LttngLiveViewerCommand):
401 def __init__(self, version: int, stream_id: int):
402 super().__init__(version)
403 self._stream_id = stream_id
404
405 @property
406 def stream_id(self):
407 return self._stream_id
408
409
410 class _LttngLiveViewerGetMetadataStreamDataContentReply(_LttngLiveViewerReply):
411 class Status:
412 OK = 1
413 NO_NEW = 2
414 ERROR = 3
415
416 def __init__(self, status: int, data: bytes):
417 self._status = status
418 self._data = data
419
420 @property
421 def status(self):
422 return self._status
423
424 @property
425 def data(self):
426 return self._data
427
428
429 class _LttngLiveViewerGetNewStreamInfosCommand(_LttngLiveViewerCommand):
430 def __init__(self, version: int, tracing_session_id: int):
431 super().__init__(version)
432 self._tracing_session_id = tracing_session_id
433
434 @property
435 def tracing_session_id(self):
436 return self._tracing_session_id
437
438
439 class _LttngLiveViewerGetNewStreamInfosReply(_LttngLiveViewerReply):
440 class Status:
441 OK = 1
442 NO_NEW = 2
443 ERROR = 3
444 HUP = 4
445
446 def __init__(self, status: int, stream_infos: Sequence[_LttngLiveViewerStreamInfo]):
447 self._status = status
448 self._stream_infos = stream_infos
449
450 @property
451 def status(self):
452 return self._status
453
454 @property
455 def stream_infos(self):
456 return self._stream_infos
457
458
459 class _LttngLiveViewerCreateViewerSessionCommand(_LttngLiveViewerCommand):
460 pass
461
462
463 class _LttngLiveViewerCreateViewerSessionReply(_LttngLiveViewerReply):
464 class Status:
465 OK = 1
466 ERROR = 2
467
468 def __init__(self, status: int):
469 self._status = status
470
471 @property
472 def status(self):
473 return self._status
474
475
476 class _LttngLiveViewerDetachFromTracingSessionCommand(_LttngLiveViewerCommand):
477 def __init__(self, version: int, tracing_session_id: int):
478 super().__init__(version)
479 self._tracing_session_id = tracing_session_id
480
481 @property
482 def tracing_session_id(self):
483 return self._tracing_session_id
484
485
486 class _LttngLiveViewerDetachFromTracingSessionReply(_LttngLiveViewerReply):
487 class Status:
488 OK = 1
489 UNKNOWN = 2
490 ERROR = 3
491
492 def __init__(self, status: int):
493 self._status = status
494
495 @property
496 def status(self):
497 return self._status
498
499
500 # An LTTng live protocol codec can convert bytes to command objects and
501 # reply objects to bytes.
502 class _LttngLiveViewerProtocolCodec:
503 _COMMAND_HEADER_STRUCT_FMT = "QII"
504 _COMMAND_HEADER_SIZE_BYTES = struct.calcsize(_COMMAND_HEADER_STRUCT_FMT)
505
506 def __init__(self):
507 pass
508
509 def _unpack(self, fmt: str, data: bytes, offset: int = 0):
510 fmt = "!" + fmt
511 return struct.unpack_from(fmt, data, offset)
512
513 def _unpack_payload(self, fmt: str, data: bytes):
514 return self._unpack(
515 fmt, data, _LttngLiveViewerProtocolCodec._COMMAND_HEADER_SIZE_BYTES
516 )
517
518 def decode(self, data: bytes):
519 if len(data) < self._COMMAND_HEADER_SIZE_BYTES:
520 # Not enough data to read the command header
521 return
522
523 payload_size, cmd_type, version = self._unpack(
524 self._COMMAND_HEADER_STRUCT_FMT, data
525 )
526 logging.info(
527 "Decoded command header: payload-size={}, cmd-type={}, version={}".format(
528 payload_size, cmd_type, version
529 )
530 )
531
532 if len(data) < self._COMMAND_HEADER_SIZE_BYTES + payload_size:
533 # Not enough data to read the whole command
534 return
535
536 if cmd_type == 1:
537 viewer_session_id, major, minor, _ = self._unpack_payload("QIII", data)
538 return _LttngLiveViewerConnectCommand(
539 version, viewer_session_id, major, minor
540 )
541 elif cmd_type == 2:
542 return _LttngLiveViewerGetTracingSessionInfosCommand(version)
543 elif cmd_type == 3:
544 tracing_session_id, offset, seek_type = self._unpack_payload("QQI", data)
545 return _LttngLiveViewerAttachToTracingSessionCommand(
546 version, tracing_session_id, offset, seek_type
547 )
548 elif cmd_type == 4:
549 (stream_id,) = self._unpack_payload("Q", data)
550 return _LttngLiveViewerGetNextDataStreamIndexEntryCommand(
551 version, stream_id
552 )
553 elif cmd_type == 5:
554 stream_id, offset, req_length = self._unpack_payload("QQI", data)
555 return _LttngLiveViewerGetDataStreamPacketDataCommand(
556 version, stream_id, offset, req_length
557 )
558 elif cmd_type == 6:
559 (stream_id,) = self._unpack_payload("Q", data)
560 return _LttngLiveViewerGetMetadataStreamDataCommand(version, stream_id)
561 elif cmd_type == 7:
562 (tracing_session_id,) = self._unpack_payload("Q", data)
563 return _LttngLiveViewerGetNewStreamInfosCommand(version, tracing_session_id)
564 elif cmd_type == 8:
565 return _LttngLiveViewerCreateViewerSessionCommand(version)
566 elif cmd_type == 9:
567 (tracing_session_id,) = self._unpack_payload("Q", data)
568 return _LttngLiveViewerDetachFromTracingSessionCommand(
569 version, tracing_session_id
570 )
571 else:
572 raise UnexpectedInput("Unknown command type {}".format(cmd_type))
573
574 def _pack(self, fmt: str, *args: Any):
575 # Force network byte order
576 return struct.pack("!" + fmt, *args)
577
578 def _encode_zero_padded_str(self, string: str, length: int):
579 data = string.encode()
580 return data.ljust(length, b"\x00")
581
582 def _encode_stream_info(self, info: _LttngLiveViewerStreamInfo):
583 data = self._pack("QQI", info.id, info.trace_id, int(info.is_metadata))
584 data += self._encode_zero_padded_str(info.path, 4096)
585 data += self._encode_zero_padded_str(info.channel_name, 255)
586 return data
587
588 def _get_has_new_stuff_flags(
589 self, has_new_metadata: bool, has_new_data_streams: bool
590 ):
591 flags = 0
592
593 if has_new_metadata:
594 flags |= 1
595
596 if has_new_data_streams:
597 flags |= 2
598
599 return flags
600
601 def encode(
602 self,
603 reply: _LttngLiveViewerReply,
604 ) -> bytes:
605 if type(reply) is _LttngLiveViewerConnectReply:
606 data = self._pack(
607 "QIII", reply.viewer_session_id, reply.major, reply.minor, 2
608 )
609 elif type(reply) is _LttngLiveViewerGetTracingSessionInfosReply:
610 data = self._pack("I", len(reply.tracing_session_infos))
611
612 for info in reply.tracing_session_infos:
613 data += self._pack(
614 "QIII",
615 info.tracing_session_id,
616 info.live_timer_freq,
617 info.client_count,
618 info.stream_count,
619 )
620 data += self._encode_zero_padded_str(info.hostname, 64)
621 data += self._encode_zero_padded_str(info.name, 255)
622 elif type(reply) is _LttngLiveViewerAttachToTracingSessionReply:
623 data = self._pack("II", reply.status, len(reply.stream_infos))
624
625 for info in reply.stream_infos:
626 data += self._encode_stream_info(info)
627 elif type(reply) is _LttngLiveViewerGetNextDataStreamIndexEntryReply:
628 index_format = "QQQQQQQII"
629 entry = reply.index_entry
630 flags = self._get_has_new_stuff_flags(
631 reply.has_new_metadata, reply.has_new_data_stream
632 )
633
634 if isinstance(entry, _LttngDataStreamIndexEntry):
635 data = self._pack(
636 index_format,
637 entry.offset_bytes,
638 entry.total_size_bits,
639 entry.content_size_bits,
640 entry.timestamp_begin,
641 entry.timestamp_end,
642 entry.events_discarded,
643 entry.stream_class_id,
644 reply.status,
645 flags,
646 )
647 else:
648 data = self._pack(
649 index_format,
650 0,
651 0,
652 0,
653 0,
654 entry.timestamp,
655 0,
656 entry.stream_class_id,
657 reply.status,
658 flags,
659 )
660 elif type(reply) is _LttngLiveViewerGetDataStreamPacketDataReply:
661 flags = self._get_has_new_stuff_flags(
662 reply.has_new_metadata, reply.has_new_data_stream
663 )
664 data = self._pack("III", reply.status, len(reply.data), flags)
665 data += reply.data
666 elif type(reply) is _LttngLiveViewerGetMetadataStreamDataContentReply:
667 data = self._pack("QI", len(reply.data), reply.status)
668 data += reply.data
669 elif type(reply) is _LttngLiveViewerGetNewStreamInfosReply:
670 data = self._pack("II", reply.status, len(reply.stream_infos))
671
672 for info in reply.stream_infos:
673 data += self._encode_stream_info(info)
674 elif type(reply) is _LttngLiveViewerCreateViewerSessionReply:
675 data = self._pack("I", reply.status)
676 elif type(reply) is _LttngLiveViewerDetachFromTracingSessionReply:
677 data = self._pack("I", reply.status)
678 else:
679 raise ValueError(
680 "Unknown reply object with class `{}`".format(reply.__class__.__name__)
681 )
682
683 return data
684
685
686 def _get_entry_timestamp_begin(
687 entry: _LttngIndexEntryT,
688 ):
689 if isinstance(entry, _LttngDataStreamBeaconIndexEntry):
690 return entry.timestamp
691 else:
692 return entry.timestamp_begin
693
694
695 # The index of an LTTng data stream, a sequence of index entries.
696 class _LttngDataStreamIndex(Sequence[_LttngIndexEntryT]):
697 def __init__(self, path: str, beacons: Optional[tjson.ArrayVal]):
698 self._path = path
699 self._build()
700
701 if beacons:
702 stream_class_id = self._entries[0].stream_class_id
703
704 beacons_list = [] # type: list[_LttngDataStreamBeaconIndexEntry]
705 for ts in beacons.iter(tjson.IntVal):
706 beacons_list.append(
707 _LttngDataStreamBeaconIndexEntry(stream_class_id, ts.val)
708 )
709
710 self._add_beacons(beacons_list)
711
712 logging.info(
713 'Built data stream index entries: path="{}", count={}'.format(
714 path, len(self._entries)
715 )
716 )
717
718 def _build(self):
719 self._entries = [] # type: list[_LttngIndexEntryT]
720 assert os.path.isfile(self._path)
721
722 with open(self._path, "rb") as f:
723 # Read header first
724 fmt = ">IIII"
725 size = struct.calcsize(fmt)
726 data = f.read(size)
727 assert len(data) == size
728 magic, _, _, index_entry_length = struct.unpack(fmt, data)
729 assert magic == 0xC1F1DCC1
730
731 # Read index entries
732 fmt = ">QQQQQQQ"
733 size = struct.calcsize(fmt)
734
735 while True:
736 logging.debug(
737 'Decoding data stream index entry: path="{}", offset={}'.format(
738 self._path, f.tell()
739 )
740 )
741 data = f.read(size)
742
743 if not data:
744 # Done
745 break
746
747 assert len(data) == size
748 (
749 offset_bytes,
750 total_size_bits,
751 content_size_bits,
752 timestamp_begin,
753 timestamp_end,
754 events_discarded,
755 stream_class_id,
756 ) = struct.unpack(fmt, data)
757
758 self._entries.append(
759 _LttngDataStreamIndexEntry(
760 offset_bytes,
761 total_size_bits,
762 content_size_bits,
763 timestamp_begin,
764 timestamp_end,
765 events_discarded,
766 stream_class_id,
767 )
768 )
769
770 # Skip anything else before the next entry
771 f.seek(index_entry_length - size, os.SEEK_CUR)
772
773 def _add_beacons(self, beacons: Iterable[_LttngDataStreamBeaconIndexEntry]):
774 # Assumes entries[n + 1].timestamp_end >= entries[n].timestamp_begin
775 def sort_key(
776 entry: Union[_LttngDataStreamIndexEntry, _LttngDataStreamBeaconIndexEntry],
777 ) -> int:
778 if isinstance(entry, _LttngDataStreamBeaconIndexEntry):
779 return entry.timestamp
780 else:
781 return entry.timestamp_end
782
783 self._entries += beacons
784 self._entries.sort(key=sort_key)
785
786 @overload
787 def __getitem__(self, index: int) -> _LttngIndexEntryT:
788 ...
789
790 @overload
791 def __getitem__(self, index: slice) -> Sequence[_LttngIndexEntryT]: # noqa: F811
792 ...
793
794 def __getitem__( # noqa: F811
795 self, index: Union[int, slice]
796 ) -> Union[_LttngIndexEntryT, Sequence[_LttngIndexEntryT],]:
797 return self._entries[index]
798
799 def __len__(self):
800 return len(self._entries)
801
802 @property
803 def path(self):
804 return self._path
805
806
807 # An LTTng data stream.
808 class _LttngDataStream:
809 def __init__(self, path: str, beacons_json: Optional[tjson.ArrayVal]):
810 self._path = path
811 filename = os.path.basename(path)
812 match = re.match(r"(.*)_\d+", filename)
813 if not match:
814 raise RuntimeError(
815 "Unexpected data stream file name pattern: {}".format(filename)
816 )
817
818 self._channel_name = match.group(1)
819 trace_dir = os.path.dirname(path)
820 index_path = os.path.join(trace_dir, "index", filename + ".idx")
821 self._index = _LttngDataStreamIndex(index_path, beacons_json)
822 assert os.path.isfile(path)
823 self._file = open(path, "rb")
824 logging.info(
825 'Built data stream: path="{}", channel-name="{}"'.format(
826 path, self._channel_name
827 )
828 )
829
830 @property
831 def path(self):
832 return self._path
833
834 @property
835 def channel_name(self):
836 return self._channel_name
837
838 @property
839 def index(self):
840 return self._index
841
842 def get_data(self, offset_bytes: int, len_bytes: int):
843 self._file.seek(offset_bytes)
844 return self._file.read(len_bytes)
845
846
847 class _LttngMetadataStreamSection:
848 def __init__(self, timestamp: int, data: Optional[bytes]):
849 self._timestamp = timestamp
850 if data is None:
851 self._data = bytes()
852 else:
853 self._data = data
854 logging.info(
855 "Built metadata stream section: ts={}, data-len={}".format(
856 self._timestamp, len(self._data)
857 )
858 )
859
860 @property
861 def timestamp(self):
862 return self._timestamp
863
864 @property
865 def data(self):
866 return self._data
867
868
869 # An LTTng metadata stream.
870 class _LttngMetadataStream:
871 def __init__(
872 self,
873 metadata_file_path: str,
874 config_sections: Sequence[_LttngMetadataStreamSection],
875 ):
876 self._path = metadata_file_path
877 self._sections = config_sections
878 logging.info(
879 "Built metadata stream: path={}, section-len={}".format(
880 self._path, len(self._sections)
881 )
882 )
883
884 @property
885 def path(self):
886 return self._path
887
888 @property
889 def sections(self):
890 return self._sections
891
892
893 class LttngMetadataConfigSection:
894 def __init__(self, line: int, timestamp: int, is_empty: bool):
895 self._line = line
896 self._timestamp = timestamp
897 self._is_empty = is_empty
898
899 @property
900 def line(self):
901 return self._line
902
903 @property
904 def timestamp(self):
905 return self._timestamp
906
907 @property
908 def is_empty(self):
909 return self._is_empty
910
911
912 def _parse_metadata_sections_config(metadata_sections_json: tjson.ArrayVal):
913 metadata_sections = [] # type: list[LttngMetadataConfigSection]
914 append_empty_section = False
915 last_timestamp = 0
916 last_line = 0
917
918 for section in metadata_sections_json:
919 if isinstance(section, tjson.StrVal):
920 if section.val == "empty":
921 # Found a empty section marker. Actually append the section at the
922 # timestamp of the next concrete section.
923 append_empty_section = True
924 else:
925 raise ValueError("Invalid string value at {}.".format(section.path))
926 elif isinstance(section, tjson.ObjVal):
927 line = section.at("line", tjson.IntVal).val
928 ts = section.at("timestamp", tjson.IntVal).val
929
930 # Sections' timestamps and lines must both be increasing.
931 assert ts > last_timestamp
932 last_timestamp = ts
933
934 assert line > last_line
935 last_line = line
936
937 if append_empty_section:
938 metadata_sections.append(LttngMetadataConfigSection(line, ts, True))
939 append_empty_section = False
940
941 metadata_sections.append(LttngMetadataConfigSection(line, ts, False))
942 else:
943 raise TypeError(
944 "`{}`: expecting a string or object value".format(section.path)
945 )
946
947 return metadata_sections
948
949
950 def _split_metadata_sections(
951 metadata_file_path: str, metadata_sections_json: tjson.ArrayVal
952 ):
953 metadata_sections = _parse_metadata_sections_config(metadata_sections_json)
954
955 sections = [] # type: list[_LttngMetadataStreamSection]
956 with open(metadata_file_path, "r") as metadata_file:
957 metadata_lines = [line for line in metadata_file]
958
959 metadata_section_idx = 0
960 curr_metadata_section = bytearray()
961
962 for idx, line_content in enumerate(metadata_lines):
963 # Add one to the index to convert from the zero-indexing of the
964 # enumerate() function to the one-indexing used by humans when
965 # viewing a text file.
966 curr_line_number = idx + 1
967
968 # If there are no more sections, simply append the line.
969 if metadata_section_idx + 1 >= len(metadata_sections):
970 curr_metadata_section += bytearray(line_content, "utf8")
971 continue
972
973 next_section_line_number = metadata_sections[metadata_section_idx + 1].line
974
975 # If the next section begins at the current line, create a
976 # section with the metadata we gathered so far.
977 if curr_line_number >= next_section_line_number:
978 # Flushing the metadata of the current section.
979 sections.append(
980 _LttngMetadataStreamSection(
981 metadata_sections[metadata_section_idx].timestamp,
982 bytes(curr_metadata_section),
983 )
984 )
985
986 # Move to the next section.
987 metadata_section_idx += 1
988
989 # Clear old content and append current line for the next section.
990 curr_metadata_section.clear()
991 curr_metadata_section += bytearray(line_content, "utf8")
992
993 # Append any empty sections.
994 while metadata_sections[metadata_section_idx].is_empty:
995 sections.append(
996 _LttngMetadataStreamSection(
997 metadata_sections[metadata_section_idx].timestamp, None
998 )
999 )
1000 metadata_section_idx += 1
1001 else:
1002 # Append line_content to the current metadata section.
1003 curr_metadata_section += bytearray(line_content, "utf8")
1004
1005 # We iterated over all the lines of the metadata file. Close the current section.
1006 sections.append(
1007 _LttngMetadataStreamSection(
1008 metadata_sections[metadata_section_idx].timestamp,
1009 bytes(curr_metadata_section),
1010 )
1011 )
1012
1013 return sections
1014
1015
1016 _StreamBeaconsT = Dict[str, Iterable[int]]
1017
1018
1019 # An LTTng trace, a sequence of LTTng data streams.
1020 class LttngTrace(Sequence[_LttngDataStream]):
1021 def __init__(
1022 self,
1023 trace_dir: str,
1024 metadata_sections_json: Optional[tjson.ArrayVal],
1025 beacons_json: Optional[tjson.ObjVal],
1026 ):
1027 assert os.path.isdir(trace_dir)
1028 self._path = trace_dir
1029 self._create_metadata_stream(trace_dir, metadata_sections_json)
1030 self._create_data_streams(trace_dir, beacons_json)
1031 logging.info('Built trace: path="{}"'.format(trace_dir))
1032
1033 def _create_data_streams(
1034 self, trace_dir: str, beacons_json: Optional[tjson.ObjVal]
1035 ):
1036 data_stream_paths = [] # type: list[str]
1037
1038 for filename in os.listdir(trace_dir):
1039 path = os.path.join(trace_dir, filename)
1040
1041 if not os.path.isfile(path):
1042 continue
1043
1044 if filename.startswith("."):
1045 continue
1046
1047 if filename == "metadata":
1048 continue
1049
1050 data_stream_paths.append(path)
1051
1052 data_stream_paths.sort()
1053 self._data_streams = [] # type: list[_LttngDataStream]
1054
1055 for data_stream_path in data_stream_paths:
1056 stream_name = os.path.basename(data_stream_path)
1057 this_beacons_json = None
1058 if beacons_json is not None and stream_name in beacons_json:
1059 this_beacons_json = beacons_json.at(stream_name, tjson.ArrayVal)
1060
1061 self._data_streams.append(
1062 _LttngDataStream(data_stream_path, this_beacons_json)
1063 )
1064
1065 def _create_metadata_stream(
1066 self, trace_dir: str, metadata_sections_json: Optional[tjson.ArrayVal]
1067 ):
1068 metadata_path = os.path.join(trace_dir, "metadata")
1069 metadata_sections = [] # type: list[_LttngMetadataStreamSection]
1070
1071 if metadata_sections_json is None:
1072 with open(metadata_path, "rb") as metadata_file:
1073 metadata_sections.append(
1074 _LttngMetadataStreamSection(0, metadata_file.read())
1075 )
1076 else:
1077 metadata_sections = _split_metadata_sections(
1078 metadata_path, metadata_sections_json
1079 )
1080
1081 self._metadata_stream = _LttngMetadataStream(metadata_path, metadata_sections)
1082
1083 @property
1084 def path(self):
1085 return self._path
1086
1087 @property
1088 def metadata_stream(self):
1089 return self._metadata_stream
1090
1091 @overload
1092 def __getitem__(self, index: int) -> _LttngDataStream:
1093 ...
1094
1095 @overload
1096 def __getitem__(self, index: slice) -> Sequence[_LttngDataStream]: # noqa: F811
1097 ...
1098
1099 def __getitem__( # noqa: F811
1100 self, index: Union[int, slice]
1101 ) -> Union[_LttngDataStream, Sequence[_LttngDataStream]]:
1102 return self._data_streams[index]
1103
1104 def __len__(self):
1105 return len(self._data_streams)
1106
1107
1108 # The state of a single data stream.
1109 class _LttngLiveViewerSessionDataStreamState:
1110 def __init__(
1111 self,
1112 ts_state: "_LttngLiveViewerSessionTracingSessionState",
1113 info: _LttngLiveViewerStreamInfo,
1114 data_stream: _LttngDataStream,
1115 metadata_stream_id: int,
1116 ):
1117 self._ts_state = ts_state
1118 self._info = info
1119 self._data_stream = data_stream
1120 self._metadata_stream_id = metadata_stream_id
1121 self._cur_index_entry_index = 0
1122 fmt = 'Built data stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
1123 logging.info(
1124 fmt.format(
1125 info.id,
1126 ts_state.tracing_session_descriptor.info.tracing_session_id,
1127 ts_state.tracing_session_descriptor.info.name,
1128 data_stream.path,
1129 )
1130 )
1131
1132 @property
1133 def tracing_session_state(self):
1134 return self._ts_state
1135
1136 @property
1137 def info(self):
1138 return self._info
1139
1140 @property
1141 def data_stream(self):
1142 return self._data_stream
1143
1144 @property
1145 def cur_index_entry(self):
1146 if self._cur_index_entry_index == len(self._data_stream.index):
1147 return
1148
1149 return self._data_stream.index[self._cur_index_entry_index]
1150
1151 @property
1152 def metadata_stream_id(self):
1153 return self._metadata_stream_id
1154
1155 def goto_next_index_entry(self):
1156 self._cur_index_entry_index += 1
1157
1158
1159 # The state of a single metadata stream.
1160 class _LttngLiveViewerSessionMetadataStreamState:
1161 def __init__(
1162 self,
1163 ts_state: "_LttngLiveViewerSessionTracingSessionState",
1164 info: _LttngLiveViewerStreamInfo,
1165 metadata_stream: _LttngMetadataStream,
1166 ):
1167 self._ts_state = ts_state
1168 self._info = info
1169 self._metadata_stream = metadata_stream
1170 self._cur_metadata_stream_section_index = 0
1171 if len(metadata_stream.sections) > 1:
1172 self._next_metadata_stream_section_timestamp = metadata_stream.sections[
1173 1
1174 ].timestamp
1175 else:
1176 self._next_metadata_stream_section_timestamp = None
1177
1178 self._is_sent = False
1179 fmt = 'Built metadata stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
1180 logging.info(
1181 fmt.format(
1182 info.id,
1183 ts_state.tracing_session_descriptor.info.tracing_session_id,
1184 ts_state.tracing_session_descriptor.info.name,
1185 metadata_stream.path,
1186 )
1187 )
1188
1189 @property
1190 def info(self):
1191 return self._info
1192
1193 @property
1194 def metadata_stream(self):
1195 return self._metadata_stream
1196
1197 @property
1198 def is_sent(self):
1199 return self._is_sent
1200
1201 @is_sent.setter
1202 def is_sent(self, value: bool):
1203 self._is_sent = value
1204
1205 @property
1206 def cur_section(self):
1207 fmt = "Get current metadata section: section-idx={}"
1208 logging.info(fmt.format(self._cur_metadata_stream_section_index))
1209 if self._cur_metadata_stream_section_index == len(
1210 self._metadata_stream.sections
1211 ):
1212 return
1213
1214 return self._metadata_stream.sections[self._cur_metadata_stream_section_index]
1215
1216 def goto_next_section(self):
1217 self._cur_metadata_stream_section_index += 1
1218 if self.cur_section:
1219 self._next_metadata_stream_section_timestamp = self.cur_section.timestamp
1220 else:
1221 self._next_metadata_stream_section_timestamp = None
1222
1223 @property
1224 def next_section_timestamp(self):
1225 return self._next_metadata_stream_section_timestamp
1226
1227
1228 # A tracing session descriptor.
1229 #
1230 # In the constructor, `traces` is a list of LTTng traces (`LttngTrace`
1231 # objects).
1232 class LttngTracingSessionDescriptor:
1233 def __init__(
1234 self,
1235 name: str,
1236 tracing_session_id: int,
1237 hostname: str,
1238 live_timer_freq: int,
1239 client_count: int,
1240 traces: Iterable[LttngTrace],
1241 ):
1242 for trace in traces:
1243 if name not in trace.path:
1244 fmt = "Tracing session name must be part of every trace path (`{}` not found in `{}`)"
1245 raise ValueError(fmt.format(name, trace.path))
1246
1247 self._traces = traces
1248 stream_count = sum([len(t) + 1 for t in traces])
1249 self._info = _LttngLiveViewerTracingSessionInfo(
1250 tracing_session_id,
1251 live_timer_freq,
1252 client_count,
1253 stream_count,
1254 hostname,
1255 name,
1256 )
1257
1258 @property
1259 def traces(self):
1260 return self._traces
1261
1262 @property
1263 def info(self):
1264 return self._info
1265
1266
1267 # The state of a tracing session.
1268 class _LttngLiveViewerSessionTracingSessionState:
1269 def __init__(self, tc_descr: LttngTracingSessionDescriptor, base_stream_id: int):
1270 self._tc_descr = tc_descr
1271 self._stream_infos = [] # type: list[_LttngLiveViewerStreamInfo]
1272 self._ds_states = {} # type: dict[int, _LttngLiveViewerSessionDataStreamState]
1273 self._ms_states = (
1274 {}
1275 ) # type: dict[int, _LttngLiveViewerSessionMetadataStreamState]
1276 stream_id = base_stream_id
1277
1278 for trace in tc_descr.traces:
1279 trace_id = stream_id * 1000
1280
1281 # Metadata stream -> stream info and metadata stream state
1282 info = _LttngLiveViewerStreamInfo(
1283 stream_id, trace_id, True, trace.metadata_stream.path, "metadata"
1284 )
1285 self._stream_infos.append(info)
1286 self._ms_states[stream_id] = _LttngLiveViewerSessionMetadataStreamState(
1287 self, info, trace.metadata_stream
1288 )
1289 metadata_stream_id = stream_id
1290 stream_id += 1
1291
1292 # Data streams -> stream infos and data stream states
1293 for data_stream in trace:
1294 info = _LttngLiveViewerStreamInfo(
1295 stream_id,
1296 trace_id,
1297 False,
1298 data_stream.path,
1299 data_stream.channel_name,
1300 )
1301 self._stream_infos.append(info)
1302 self._ds_states[stream_id] = _LttngLiveViewerSessionDataStreamState(
1303 self, info, data_stream, metadata_stream_id
1304 )
1305 stream_id += 1
1306
1307 self._is_attached = False
1308 fmt = 'Built tracing session state: id={}, name="{}"'
1309 logging.info(fmt.format(tc_descr.info.tracing_session_id, tc_descr.info.name))
1310
1311 @property
1312 def tracing_session_descriptor(self):
1313 return self._tc_descr
1314
1315 @property
1316 def data_stream_states(self):
1317 return self._ds_states
1318
1319 @property
1320 def metadata_stream_states(self):
1321 return self._ms_states
1322
1323 @property
1324 def stream_infos(self):
1325 return self._stream_infos
1326
1327 @property
1328 def has_new_metadata(self):
1329 return any([not ms.is_sent for ms in self._ms_states.values()])
1330
1331 @property
1332 def is_attached(self):
1333 return self._is_attached
1334
1335 @is_attached.setter
1336 def is_attached(self, value: bool):
1337 self._is_attached = value
1338
1339
1340 def needs_new_metadata_section(
1341 metadata_stream_state: _LttngLiveViewerSessionMetadataStreamState,
1342 latest_timestamp: int,
1343 ):
1344 if metadata_stream_state.next_section_timestamp is None:
1345 return False
1346
1347 if latest_timestamp >= metadata_stream_state.next_section_timestamp:
1348 return True
1349 else:
1350 return False
1351
1352
1353 # An LTTng live viewer session manages a view on tracing sessions
1354 # and replies to commands accordingly.
1355 class _LttngLiveViewerSession:
1356 def __init__(
1357 self,
1358 viewer_session_id: int,
1359 tracing_session_descriptors: Iterable[LttngTracingSessionDescriptor],
1360 max_query_data_response_size: Optional[int],
1361 ):
1362 self._viewer_session_id = viewer_session_id
1363 self._ts_states = (
1364 {}
1365 ) # type: dict[int, _LttngLiveViewerSessionTracingSessionState]
1366 self._stream_states = (
1367 {}
1368 ) # type: dict[int, _LttngLiveViewerSessionDataStreamState | _LttngLiveViewerSessionMetadataStreamState]
1369 self._max_query_data_response_size = max_query_data_response_size
1370 total_stream_infos = 0
1371
1372 for ts_descr in tracing_session_descriptors:
1373 ts_state = _LttngLiveViewerSessionTracingSessionState(
1374 ts_descr, total_stream_infos
1375 )
1376 ts_id = ts_state.tracing_session_descriptor.info.tracing_session_id
1377 self._ts_states[ts_id] = ts_state
1378 total_stream_infos += len(ts_state.stream_infos)
1379
1380 # Update session's stream states to have the new states
1381 self._stream_states.update(ts_state.data_stream_states)
1382 self._stream_states.update(ts_state.metadata_stream_states)
1383
1384 self._command_handlers = {
1385 _LttngLiveViewerAttachToTracingSessionCommand: self._handle_attach_to_tracing_session_command,
1386 _LttngLiveViewerCreateViewerSessionCommand: self._handle_create_viewer_session_command,
1387 _LttngLiveViewerDetachFromTracingSessionCommand: self._handle_detach_from_tracing_session_command,
1388 _LttngLiveViewerGetDataStreamPacketDataCommand: self._handle_get_data_stream_packet_data_command,
1389 _LttngLiveViewerGetMetadataStreamDataCommand: self._handle_get_metadata_stream_data_command,
1390 _LttngLiveViewerGetNewStreamInfosCommand: self._handle_get_new_stream_infos_command,
1391 _LttngLiveViewerGetNextDataStreamIndexEntryCommand: self._handle_get_next_data_stream_index_entry_command,
1392 _LttngLiveViewerGetTracingSessionInfosCommand: self._handle_get_tracing_session_infos_command,
1393 } # type: dict[type[_LttngLiveViewerCommand], Callable[[Any], _LttngLiveViewerReply]]
1394
1395 @property
1396 def viewer_session_id(self):
1397 return self._viewer_session_id
1398
1399 def _get_tracing_session_state(self, tracing_session_id: int):
1400 if tracing_session_id not in self._ts_states:
1401 raise UnexpectedInput(
1402 "Unknown tracing session ID {}".format(tracing_session_id)
1403 )
1404
1405 return self._ts_states[tracing_session_id]
1406
1407 def _get_data_stream_state(self, stream_id: int):
1408 if stream_id not in self._stream_states:
1409 RuntimeError("Unknown stream ID {}".format(stream_id))
1410
1411 stream = self._stream_states[stream_id]
1412 if type(stream) is not _LttngLiveViewerSessionDataStreamState:
1413 raise RuntimeError("Stream is not a data stream")
1414
1415 return stream
1416
1417 def _get_metadata_stream_state(self, stream_id: int):
1418 if stream_id not in self._stream_states:
1419 RuntimeError("Unknown stream ID {}".format(stream_id))
1420
1421 stream = self._stream_states[stream_id]
1422 if type(stream) is not _LttngLiveViewerSessionMetadataStreamState:
1423 raise RuntimeError("Stream is not a metadata stream")
1424
1425 return stream
1426
1427 def handle_command(self, cmd: _LttngLiveViewerCommand):
1428 logging.info(
1429 "Handling command in viewer session: cmd-cls-name={}".format(
1430 cmd.__class__.__name__
1431 )
1432 )
1433 cmd_type = type(cmd)
1434
1435 if cmd_type not in self._command_handlers:
1436 raise UnexpectedInput(
1437 "Unexpected command: cmd-cls-name={}".format(cmd.__class__.__name__)
1438 )
1439
1440 return self._command_handlers[cmd_type](cmd)
1441
1442 def _handle_attach_to_tracing_session_command(
1443 self, cmd: _LttngLiveViewerAttachToTracingSessionCommand
1444 ):
1445 fmt = 'Handling "attach to tracing session" command: ts-id={}, offset={}, seek-type={}'
1446 logging.info(fmt.format(cmd.tracing_session_id, cmd.offset, cmd.seek_type))
1447 ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
1448 info = ts_state.tracing_session_descriptor.info
1449
1450 if ts_state.is_attached:
1451 raise UnexpectedInput(
1452 "Cannot attach to tracing session `{}`: viewer is already attached".format(
1453 info.name
1454 )
1455 )
1456
1457 ts_state.is_attached = True
1458 status = _LttngLiveViewerAttachToTracingSessionReply.Status.OK
1459 return _LttngLiveViewerAttachToTracingSessionReply(
1460 status, ts_state.stream_infos
1461 )
1462
1463 def _handle_detach_from_tracing_session_command(
1464 self, cmd: _LttngLiveViewerDetachFromTracingSessionCommand
1465 ):
1466 fmt = 'Handling "detach from tracing session" command: ts-id={}'
1467 logging.info(fmt.format(cmd.tracing_session_id))
1468 ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
1469 info = ts_state.tracing_session_descriptor.info
1470
1471 if not ts_state.is_attached:
1472 raise UnexpectedInput(
1473 "Cannot detach to tracing session `{}`: viewer is not attached".format(
1474 info.name
1475 )
1476 )
1477
1478 ts_state.is_attached = False
1479 status = _LttngLiveViewerDetachFromTracingSessionReply.Status.OK
1480 return _LttngLiveViewerDetachFromTracingSessionReply(status)
1481
1482 def _handle_get_next_data_stream_index_entry_command(
1483 self, cmd: _LttngLiveViewerGetNextDataStreamIndexEntryCommand
1484 ):
1485 fmt = 'Handling "get next data stream index entry" command: stream-id={}'
1486 logging.info(fmt.format(cmd.stream_id))
1487 stream_state = self._get_data_stream_state(cmd.stream_id)
1488 metadata_stream_state = self._get_metadata_stream_state(
1489 stream_state.metadata_stream_id
1490 )
1491
1492 if stream_state.cur_index_entry is None:
1493 # The viewer is done reading this stream
1494 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.HUP
1495
1496 # Dummy data stream index entry to use with the `HUP` status
1497 # (the reply needs one, but the viewer ignores it)
1498 index_entry = _LttngDataStreamIndexEntry(0, 0, 0, 0, 0, 0, 0)
1499
1500 return _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1501 status, index_entry, False, False
1502 )
1503
1504 timestamp_begin = _get_entry_timestamp_begin(stream_state.cur_index_entry)
1505
1506 if needs_new_metadata_section(metadata_stream_state, timestamp_begin):
1507 metadata_stream_state.is_sent = False
1508 metadata_stream_state.goto_next_section()
1509
1510 # The viewer only checks the `has_new_metadata` flag if the
1511 # reply's status is `OK`, so we need to provide an index here
1512 has_new_metadata = stream_state.tracing_session_state.has_new_metadata
1513 if isinstance(stream_state.cur_index_entry, _LttngDataStreamIndexEntry):
1514 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.OK
1515 else:
1516 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.INACTIVE
1517
1518 reply = _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1519 status, stream_state.cur_index_entry, has_new_metadata, False
1520 )
1521 stream_state.goto_next_index_entry()
1522 return reply
1523
1524 def _handle_get_data_stream_packet_data_command(
1525 self, cmd: _LttngLiveViewerGetDataStreamPacketDataCommand
1526 ):
1527 fmt = 'Handling "get data stream packet data" command: stream-id={}, offset={}, req-length={}'
1528 logging.info(fmt.format(cmd.stream_id, cmd.offset, cmd.req_length))
1529 stream_state = self._get_data_stream_state(cmd.stream_id)
1530 data_response_length = cmd.req_length
1531
1532 if stream_state.tracing_session_state.has_new_metadata:
1533 status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.ERROR
1534 return _LttngLiveViewerGetDataStreamPacketDataReply(
1535 status, bytes(), True, False
1536 )
1537
1538 if self._max_query_data_response_size:
1539 # Enforce a server side limit on the query requested length.
1540 # To ensure that the transaction terminate take the minimum of both
1541 # value.
1542 data_response_length = min(
1543 cmd.req_length, self._max_query_data_response_size
1544 )
1545 fmt = 'Limiting "get data stream packet data" command: req-length={} actual response size={}'
1546 logging.info(fmt.format(cmd.req_length, data_response_length))
1547
1548 data = stream_state.data_stream.get_data(cmd.offset, data_response_length)
1549 status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.OK
1550 return _LttngLiveViewerGetDataStreamPacketDataReply(status, data, False, False)
1551
1552 def _handle_get_metadata_stream_data_command(
1553 self, cmd: _LttngLiveViewerGetMetadataStreamDataCommand
1554 ):
1555 fmt = 'Handling "get metadata stream data" command: stream-id={}'
1556 logging.info(fmt.format(cmd.stream_id))
1557 metadata_stream_state = self._get_metadata_stream_state(cmd.stream_id)
1558
1559 if metadata_stream_state.is_sent:
1560 status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.NO_NEW
1561 return _LttngLiveViewerGetMetadataStreamDataContentReply(status, bytes())
1562
1563 metadata_stream_state.is_sent = True
1564 status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.OK
1565 metadata_section = metadata_stream_state.cur_section
1566 assert metadata_section is not None
1567
1568 # If we are sending an empty section, ready the next one right away.
1569 if len(metadata_section.data) == 0:
1570 metadata_stream_state.is_sent = False
1571 metadata_stream_state.goto_next_section()
1572
1573 fmt = 'Replying to "get metadata stream data" command: metadata-size={}'
1574 logging.info(fmt.format(len(metadata_section.data)))
1575 return _LttngLiveViewerGetMetadataStreamDataContentReply(
1576 status, metadata_section.data
1577 )
1578
1579 def _handle_get_new_stream_infos_command(
1580 self, cmd: _LttngLiveViewerGetNewStreamInfosCommand
1581 ):
1582 fmt = 'Handling "get new stream infos" command: ts-id={}'
1583 logging.info(fmt.format(cmd.tracing_session_id))
1584
1585 # As of this version, all the tracing session's stream infos are
1586 # always given to the viewer when sending the "attach to tracing
1587 # session" reply, so there's nothing new here. Return the `HUP`
1588 # status as, if we're handling this command, the viewer consumed
1589 # all the existing data streams.
1590 status = _LttngLiveViewerGetNewStreamInfosReply.Status.HUP
1591 return _LttngLiveViewerGetNewStreamInfosReply(status, [])
1592
1593 def _handle_get_tracing_session_infos_command(
1594 self, cmd: _LttngLiveViewerGetTracingSessionInfosCommand
1595 ):
1596 logging.info('Handling "get tracing session infos" command.')
1597 infos = [
1598 tss.tracing_session_descriptor.info for tss in self._ts_states.values()
1599 ]
1600 infos.sort(key=lambda info: info.name)
1601 return _LttngLiveViewerGetTracingSessionInfosReply(infos)
1602
1603 def _handle_create_viewer_session_command(
1604 self, cmd: _LttngLiveViewerCreateViewerSessionCommand
1605 ):
1606 logging.info('Handling "create viewer session" command.')
1607 status = _LttngLiveViewerCreateViewerSessionReply.Status.OK
1608
1609 # This does nothing here. In the LTTng relay daemon, it
1610 # allocates the viewer session's state.
1611 return _LttngLiveViewerCreateViewerSessionReply(status)
1612
1613
1614 # An LTTng live TCP server.
1615 #
1616 # On creation, it binds to `localhost` on the TCP port `port` if not `None`, or
1617 # on an OS-assigned TCP port otherwise. It writes the decimal TCP port number
1618 # to a temporary port file. It renames the temporary port file to
1619 # `port_filename`.
1620 #
1621 # `tracing_session_descriptors` is a list of tracing session descriptors
1622 # (`LttngTracingSessionDescriptor`) to serve.
1623 #
1624 # This server accepts a single viewer (client).
1625 #
1626 # When the viewer closes the connection, the server's constructor
1627 # returns.
1628 class LttngLiveServer:
1629 def __init__(
1630 self,
1631 port: Optional[int],
1632 port_filename: str,
1633 tracing_session_descriptors: Iterable[LttngTracingSessionDescriptor],
1634 max_query_data_response_size: Optional[int],
1635 ):
1636 logging.info("Server configuration:")
1637
1638 logging.info(" Port file name: `{}`".format(port_filename))
1639
1640 if max_query_data_response_size is not None:
1641 logging.info(
1642 " Maximum response data query size: `{}`".format(
1643 max_query_data_response_size
1644 )
1645 )
1646
1647 for ts_descr in tracing_session_descriptors:
1648 info = ts_descr.info
1649 fmt = ' TS descriptor: name="{}", id={}, hostname="{}", live-timer-freq={}, client-count={}, stream-count={}:'
1650 logging.info(
1651 fmt.format(
1652 info.name,
1653 info.tracing_session_id,
1654 info.hostname,
1655 info.live_timer_freq,
1656 info.client_count,
1657 info.stream_count,
1658 )
1659 )
1660
1661 for trace in ts_descr.traces:
1662 logging.info(' Trace: path="{}"'.format(trace.path))
1663
1664 self._ts_descriptors = tracing_session_descriptors
1665 self._max_query_data_response_size = max_query_data_response_size
1666 self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1667 self._codec = _LttngLiveViewerProtocolCodec()
1668
1669 # Port 0: OS assigns an unused port
1670 serv_addr = ("localhost", port if port is not None else 0)
1671 self._sock.bind(serv_addr)
1672 self._write_port_to_file(port_filename)
1673
1674 try:
1675 self._listen()
1676 finally:
1677 self._sock.close()
1678 logging.info("Closed connection and socket.")
1679
1680 @property
1681 def _server_port(self):
1682 return self._sock.getsockname()[1]
1683
1684 def _recv_command(self):
1685 data = bytes()
1686
1687 while True:
1688 logging.info("Waiting for viewer command.")
1689 buf = self._conn.recv(128)
1690
1691 if not buf:
1692 logging.info("Client closed connection.")
1693
1694 if data:
1695 raise UnexpectedInput(
1696 "Client closed connection after having sent {} command bytes.".format(
1697 len(data)
1698 )
1699 )
1700
1701 return
1702
1703 logging.info("Received data from viewer: length={}".format(len(buf)))
1704
1705 data += buf
1706
1707 try:
1708 cmd = self._codec.decode(data)
1709 except struct.error as exc:
1710 raise UnexpectedInput("Malformed command: {}".format(exc)) from exc
1711
1712 if cmd is not None:
1713 logging.info(
1714 "Received command from viewer: cmd-cls-name={}".format(
1715 cmd.__class__.__name__
1716 )
1717 )
1718 return cmd
1719
1720 def _send_reply(self, reply: _LttngLiveViewerReply):
1721 data = self._codec.encode(reply)
1722 logging.info(
1723 "Sending reply to viewer: reply-cls-name={}, length={}".format(
1724 reply.__class__.__name__, len(data)
1725 )
1726 )
1727 self._conn.sendall(data)
1728
1729 def _handle_connection(self):
1730 # First command must be "connect"
1731 cmd = self._recv_command()
1732
1733 if type(cmd) is not _LttngLiveViewerConnectCommand:
1734 raise UnexpectedInput(
1735 'First command is not "connect": cmd-cls-name={}'.format(
1736 cmd.__class__.__name__
1737 )
1738 )
1739
1740 # Create viewer session (arbitrary ID 23)
1741 logging.info(
1742 "LTTng live viewer connected: version={}.{}".format(cmd.major, cmd.minor)
1743 )
1744 viewer_session = _LttngLiveViewerSession(
1745 23, self._ts_descriptors, self._max_query_data_response_size
1746 )
1747
1748 # Send "connect" reply
1749 self._send_reply(
1750 _LttngLiveViewerConnectReply(viewer_session.viewer_session_id, 2, 10)
1751 )
1752
1753 # Make the viewer session handle the remaining commands
1754 while True:
1755 cmd = self._recv_command()
1756
1757 if cmd is None:
1758 # Connection closed (at an expected location within the
1759 # conversation)
1760 return
1761
1762 self._send_reply(viewer_session.handle_command(cmd))
1763
1764 def _listen(self):
1765 logging.info("Listening: port={}".format(self._server_port))
1766 # Backlog must be present for Python version < 3.5.
1767 # 128 is an arbitrary number since we expect only 1 connection anyway.
1768 self._sock.listen(128)
1769 self._conn, viewer_addr = self._sock.accept()
1770 logging.info(
1771 "Accepted viewer: addr={}:{}".format(viewer_addr[0], viewer_addr[1])
1772 )
1773
1774 try:
1775 self._handle_connection()
1776 finally:
1777 self._conn.close()
1778
1779 def _write_port_to_file(self, port_filename: str):
1780 # Write the port number to a temporary file.
1781 with tempfile.NamedTemporaryFile(mode="w", delete=False) as tmp_port_file:
1782 print(self._server_port, end="", file=tmp_port_file)
1783
1784 # Rename temporary file to real file
1785 os.replace(tmp_port_file.name, port_filename)
1786 logging.info(
1787 'Renamed port file: src-path="{}", dst-path="{}"'.format(
1788 tmp_port_file.name, port_filename
1789 )
1790 )
1791
1792
1793 def _session_descriptors_from_path(
1794 sessions_filename: str, trace_path_prefix: Optional[str]
1795 ):
1796 # File format is:
1797 #
1798 # [
1799 # {
1800 # "name": "my-session",
1801 # "id": 17,
1802 # "hostname": "myhost",
1803 # "live-timer-freq": 1000000,
1804 # "client-count": 23,
1805 # "traces": [
1806 # {
1807 # "path": "lol"
1808 # },
1809 # {
1810 # "path": "meow/mix",
1811 # "beacons": {
1812 # "my_stream": [ 5235787, 728375283 ]
1813 # },
1814 # "metadata-sections": [
1815 # {
1816 # "line": 1,
1817 # "timestamp": 0
1818 # }
1819 # ]
1820 # }
1821 # ]
1822 # }
1823 # ]
1824 with open(sessions_filename, "r") as sessions_file:
1825 sessions_json = tjson.load(sessions_file, tjson.ArrayVal)
1826
1827 sessions = [] # type: list[LttngTracingSessionDescriptor]
1828
1829 for session_json in sessions_json.iter(tjson.ObjVal):
1830 name = session_json.at("name", tjson.StrVal).val
1831 tracing_session_id = session_json.at("id", tjson.IntVal).val
1832 hostname = session_json.at("hostname", tjson.StrVal).val
1833 live_timer_freq = session_json.at("live-timer-freq", tjson.IntVal).val
1834 client_count = session_json.at("client-count", tjson.IntVal).val
1835 traces_json = session_json.at("traces", tjson.ArrayVal)
1836
1837 traces = [] # type: list[LttngTrace]
1838
1839 for trace_json in traces_json.iter(tjson.ObjVal):
1840 metadata_sections = (
1841 trace_json.at("metadata-sections", tjson.ArrayVal)
1842 if "metadata-sections" in trace_json
1843 else None
1844 )
1845 beacons = (
1846 trace_json.at("beacons", tjson.ObjVal)
1847 if "beacons" in trace_json
1848 else None
1849 )
1850 path = trace_json.at("path", tjson.StrVal).val
1851
1852 if not os.path.isabs(path) and trace_path_prefix:
1853 path = os.path.join(trace_path_prefix, path)
1854
1855 traces.append(
1856 LttngTrace(
1857 path,
1858 metadata_sections,
1859 beacons,
1860 )
1861 )
1862
1863 sessions.append(
1864 LttngTracingSessionDescriptor(
1865 name,
1866 tracing_session_id,
1867 hostname,
1868 live_timer_freq,
1869 client_count,
1870 traces,
1871 )
1872 )
1873
1874 return sessions
1875
1876
1877 def _loglevel_parser(string: str):
1878 loglevels = {"info": logging.INFO, "warning": logging.WARNING}
1879 if string not in loglevels:
1880 msg = "{} is not a valid loglevel".format(string)
1881 raise argparse.ArgumentTypeError(msg)
1882 return loglevels[string]
1883
1884
1885 if __name__ == "__main__":
1886 logging.basicConfig(format="# %(asctime)-25s%(message)s")
1887 parser = argparse.ArgumentParser(
1888 description="LTTng-live protocol mocker", add_help=False
1889 )
1890 parser.add_argument(
1891 "--log-level",
1892 default="warning",
1893 choices=["info", "warning"],
1894 help="The loglevel to be used.",
1895 )
1896
1897 loglevel_namespace, remaining_args = parser.parse_known_args()
1898 logging.getLogger().setLevel(_loglevel_parser(loglevel_namespace.log_level))
1899
1900 parser.add_argument(
1901 "--port",
1902 help="The port to bind to. If missing, use an OS-assigned port..",
1903 type=int,
1904 )
1905 parser.add_argument(
1906 "--port-filename",
1907 help="The final port file. This file is present when the server is ready to receive connection.",
1908 required=True,
1909 )
1910 parser.add_argument(
1911 "--max-query-data-response-size",
1912 type=int,
1913 help="The maximum size of control data response in bytes",
1914 )
1915 parser.add_argument(
1916 "--trace-path-prefix",
1917 type=str,
1918 help="Prefix to prepend to the trace paths of session configurations",
1919 )
1920 parser.add_argument(
1921 "--sessions-filename",
1922 type=str,
1923 help="Path to a session configuration file",
1924 )
1925 parser.add_argument(
1926 "-h",
1927 "--help",
1928 action="help",
1929 default=argparse.SUPPRESS,
1930 help="Show this help message and exit.",
1931 )
1932
1933 args = parser.parse_args(args=remaining_args)
1934 try:
1935 sessions_filename = args.sessions_filename # type: str
1936 trace_path_prefix = args.trace_path_prefix # type: str | None
1937 sessions = _session_descriptors_from_path(
1938 sessions_filename,
1939 trace_path_prefix,
1940 )
1941
1942 port = args.port # type: int | None
1943 port_filename = args.port_filename # type: str
1944 max_query_data_response_size = (
1945 args.max_query_data_response_size
1946 ) # type: int | None
1947 LttngLiveServer(port, port_filename, sessions, max_query_data_response_size)
1948 except UnexpectedInput as exc:
1949 logging.error(str(exc))
1950 print(exc, file=sys.stderr)
1951 sys.exit(1)
This page took 0.070479 seconds and 4 git commands to generate.