3a3c565e094a53caa5956ab4173fff4f125968c9
[babeltrace.git] / tests / data / plugins / src.ctf.lttng-live / lttng_live_server.py
1 # SPDX-License-Identifier: MIT
2 #
3 # Copyright (C) 2019 Philippe Proulx <pproulx@efficios.com>
4 #
5
6 # pyright: strict, reportTypeCommentUsage=false, reportMissingTypeStubs=false
7
8 import os
9 import re
10 import time
11 import socket
12 import struct
13 import logging
14 import os.path
15 import argparse
16 import tempfile
17 from typing import Dict, Union, Iterable, Optional, Sequence, overload
18
19 import tjson
20
21 # isort: off
22 from typing import Any, Callable # noqa: F401
23
24 # isort: on
25
26
27 # An entry within the index of an LTTng data stream.
28 class _LttngDataStreamIndexEntry:
29 def __init__(
30 self,
31 offset_bytes: int,
32 total_size_bits: int,
33 content_size_bits: int,
34 timestamp_begin: int,
35 timestamp_end: int,
36 events_discarded: int,
37 stream_class_id: int,
38 ):
39 self._offset_bytes = offset_bytes
40 self._total_size_bits = total_size_bits
41 self._content_size_bits = content_size_bits
42 self._timestamp_begin = timestamp_begin
43 self._timestamp_end = timestamp_end
44 self._events_discarded = events_discarded
45 self._stream_class_id = stream_class_id
46
47 @property
48 def offset_bytes(self):
49 return self._offset_bytes
50
51 @property
52 def total_size_bits(self):
53 return self._total_size_bits
54
55 @property
56 def total_size_bytes(self):
57 return self._total_size_bits // 8
58
59 @property
60 def content_size_bits(self):
61 return self._content_size_bits
62
63 @property
64 def content_size_bytes(self):
65 return self._content_size_bits // 8
66
67 @property
68 def timestamp_begin(self):
69 return self._timestamp_begin
70
71 @property
72 def timestamp_end(self):
73 return self._timestamp_end
74
75 @property
76 def events_discarded(self):
77 return self._events_discarded
78
79 @property
80 def stream_class_id(self):
81 return self._stream_class_id
82
83
84 # An entry within the index of an LTTng data stream. While a stream beacon entry
85 # is conceptually unrelated to an index, it is sent as a reply to a
86 # LttngLiveViewerGetNextDataStreamIndexEntryCommand
87 class _LttngDataStreamBeaconIndexEntry:
88 def __init__(self, stream_class_id: int, timestamp: int):
89 self._stream_class_id = stream_class_id
90 self._timestamp = timestamp
91
92 @property
93 def timestamp(self):
94 return self._timestamp
95
96 @property
97 def stream_class_id(self):
98 return self._stream_class_id
99
100
101 _LttngIndexEntryT = Union[_LttngDataStreamIndexEntry, _LttngDataStreamBeaconIndexEntry]
102
103
104 class _LttngLiveViewerCommand:
105 def __init__(self, version: int):
106 self._version = version
107
108 @property
109 def version(self):
110 return self._version
111
112
113 class _LttngLiveViewerConnectCommand(_LttngLiveViewerCommand):
114 def __init__(self, version: int, viewer_session_id: int, major: int, minor: int):
115 super().__init__(version)
116 self._viewer_session_id = viewer_session_id
117 self._major = major
118 self._minor = minor
119
120 @property
121 def viewer_session_id(self):
122 return self._viewer_session_id
123
124 @property
125 def major(self):
126 return self._major
127
128 @property
129 def minor(self):
130 return self._minor
131
132
133 class _LttngLiveViewerReply:
134 pass
135
136
137 class _LttngLiveViewerConnectReply(_LttngLiveViewerReply):
138 def __init__(self, viewer_session_id: int, major: int, minor: int):
139 self._viewer_session_id = viewer_session_id
140 self._major = major
141 self._minor = minor
142
143 @property
144 def viewer_session_id(self):
145 return self._viewer_session_id
146
147 @property
148 def major(self):
149 return self._major
150
151 @property
152 def minor(self):
153 return self._minor
154
155
156 class _LttngLiveViewerGetTracingSessionInfosCommand(_LttngLiveViewerCommand):
157 pass
158
159
160 class _LttngLiveViewerTracingSessionInfo:
161 def __init__(
162 self,
163 tracing_session_id: int,
164 live_timer_freq: int,
165 client_count: int,
166 stream_count: int,
167 hostname: str,
168 name: str,
169 ):
170 self._tracing_session_id = tracing_session_id
171 self._live_timer_freq = live_timer_freq
172 self._client_count = client_count
173 self._stream_count = stream_count
174 self._hostname = hostname
175 self._name = name
176
177 @property
178 def tracing_session_id(self):
179 return self._tracing_session_id
180
181 @property
182 def live_timer_freq(self):
183 return self._live_timer_freq
184
185 @property
186 def client_count(self):
187 return self._client_count
188
189 @property
190 def stream_count(self):
191 return self._stream_count
192
193 @property
194 def hostname(self):
195 return self._hostname
196
197 @property
198 def name(self):
199 return self._name
200
201
202 class _LttngLiveViewerGetTracingSessionInfosReply(_LttngLiveViewerReply):
203 def __init__(
204 self, tracing_session_infos: Sequence[_LttngLiveViewerTracingSessionInfo]
205 ):
206 self._tracing_session_infos = tracing_session_infos
207
208 @property
209 def tracing_session_infos(self):
210 return self._tracing_session_infos
211
212
213 class _LttngLiveViewerAttachToTracingSessionCommand(_LttngLiveViewerCommand):
214 class SeekType:
215 BEGINNING = 1
216 LAST = 2
217
218 def __init__(
219 self, version: int, tracing_session_id: int, offset: int, seek_type: int
220 ):
221 super().__init__(version)
222 self._tracing_session_id = tracing_session_id
223 self._offset = offset
224 self._seek_type = seek_type
225
226 @property
227 def tracing_session_id(self):
228 return self._tracing_session_id
229
230 @property
231 def offset(self):
232 return self._offset
233
234 @property
235 def seek_type(self):
236 return self._seek_type
237
238
239 class _LttngLiveViewerStreamInfo:
240 def __init__(
241 self, id: int, trace_id: int, is_metadata: bool, path: str, channel_name: str
242 ):
243 self._id = id
244 self._trace_id = trace_id
245 self._is_metadata = is_metadata
246 self._path = path
247 self._channel_name = channel_name
248
249 @property
250 def id(self):
251 return self._id
252
253 @property
254 def trace_id(self):
255 return self._trace_id
256
257 @property
258 def is_metadata(self):
259 return self._is_metadata
260
261 @property
262 def path(self):
263 return self._path
264
265 @property
266 def channel_name(self):
267 return self._channel_name
268
269
270 class _LttngLiveViewerAttachToTracingSessionReply(_LttngLiveViewerReply):
271 class Status:
272 OK = 1
273 ALREADY = 2
274 UNKNOWN = 3
275 NOT_LIVE = 4
276 SEEK_ERROR = 5
277 NO_SESSION = 6
278
279 def __init__(self, status: int, stream_infos: Sequence[_LttngLiveViewerStreamInfo]):
280 self._status = status
281 self._stream_infos = stream_infos
282
283 @property
284 def status(self):
285 return self._status
286
287 @property
288 def stream_infos(self):
289 return self._stream_infos
290
291
292 class _LttngLiveViewerGetNextDataStreamIndexEntryCommand(_LttngLiveViewerCommand):
293 def __init__(self, version: int, stream_id: int):
294 super().__init__(version)
295 self._stream_id = stream_id
296
297 @property
298 def stream_id(self):
299 return self._stream_id
300
301
302 class _LttngLiveViewerGetNextDataStreamIndexEntryReply(_LttngLiveViewerReply):
303 class Status:
304 OK = 1
305 RETRY = 2
306 HUP = 3
307 ERROR = 4
308 INACTIVE = 5
309 EOF = 6
310
311 def __init__(
312 self,
313 status: int,
314 index_entry: _LttngIndexEntryT,
315 has_new_metadata: bool,
316 has_new_data_stream: bool,
317 ):
318 self._status = status
319 self._index_entry = index_entry
320 self._has_new_metadata = has_new_metadata
321 self._has_new_data_stream = has_new_data_stream
322
323 @property
324 def status(self):
325 return self._status
326
327 @property
328 def index_entry(self):
329 return self._index_entry
330
331 @property
332 def has_new_metadata(self):
333 return self._has_new_metadata
334
335 @property
336 def has_new_data_stream(self):
337 return self._has_new_data_stream
338
339
340 class _LttngLiveViewerGetDataStreamPacketDataCommand(_LttngLiveViewerCommand):
341 def __init__(self, version: int, stream_id: int, offset: int, req_length: int):
342 super().__init__(version)
343 self._stream_id = stream_id
344 self._offset = offset
345 self._req_length = req_length
346
347 @property
348 def stream_id(self):
349 return self._stream_id
350
351 @property
352 def offset(self):
353 return self._offset
354
355 @property
356 def req_length(self):
357 return self._req_length
358
359
360 class _LttngLiveViewerGetDataStreamPacketDataReply(_LttngLiveViewerReply):
361 class Status:
362 OK = 1
363 RETRY = 2
364 ERROR = 3
365 EOF = 4
366
367 def __init__(
368 self,
369 status: int,
370 data: bytes,
371 has_new_metadata: bool,
372 has_new_data_stream: bool,
373 ):
374 self._status = status
375 self._data = data
376 self._has_new_metadata = has_new_metadata
377 self._has_new_data_stream = has_new_data_stream
378
379 @property
380 def status(self):
381 return self._status
382
383 @property
384 def data(self):
385 return self._data
386
387 @property
388 def has_new_metadata(self):
389 return self._has_new_metadata
390
391 @property
392 def has_new_data_stream(self):
393 return self._has_new_data_stream
394
395
396 class _LttngLiveViewerGetMetadataStreamDataCommand(_LttngLiveViewerCommand):
397 def __init__(self, version: int, stream_id: int):
398 super().__init__(version)
399 self._stream_id = stream_id
400
401 @property
402 def stream_id(self):
403 return self._stream_id
404
405
406 class _LttngLiveViewerGetMetadataStreamDataContentReply(_LttngLiveViewerReply):
407 class Status:
408 OK = 1
409 NO_NEW = 2
410 ERROR = 3
411
412 def __init__(self, status: int, data: bytes):
413 self._status = status
414 self._data = data
415
416 @property
417 def status(self):
418 return self._status
419
420 @property
421 def data(self):
422 return self._data
423
424
425 class _LttngLiveViewerGetNewStreamInfosCommand(_LttngLiveViewerCommand):
426 def __init__(self, version: int, tracing_session_id: int):
427 super().__init__(version)
428 self._tracing_session_id = tracing_session_id
429
430 @property
431 def tracing_session_id(self):
432 return self._tracing_session_id
433
434
435 class _LttngLiveViewerGetNewStreamInfosReply(_LttngLiveViewerReply):
436 class Status:
437 OK = 1
438 NO_NEW = 2
439 ERROR = 3
440 HUP = 4
441
442 def __init__(self, status: int, stream_infos: Sequence[_LttngLiveViewerStreamInfo]):
443 self._status = status
444 self._stream_infos = stream_infos
445
446 @property
447 def status(self):
448 return self._status
449
450 @property
451 def stream_infos(self):
452 return self._stream_infos
453
454
455 class _LttngLiveViewerCreateViewerSessionCommand(_LttngLiveViewerCommand):
456 pass
457
458
459 class _LttngLiveViewerCreateViewerSessionReply(_LttngLiveViewerReply):
460 class Status:
461 OK = 1
462 ERROR = 2
463
464 def __init__(self, status: int):
465 self._status = status
466
467 @property
468 def status(self):
469 return self._status
470
471
472 class _LttngLiveViewerDetachFromTracingSessionCommand(_LttngLiveViewerCommand):
473 def __init__(self, version: int, tracing_session_id: int):
474 super().__init__(version)
475 self._tracing_session_id = tracing_session_id
476
477 @property
478 def tracing_session_id(self):
479 return self._tracing_session_id
480
481
482 class _LttngLiveViewerDetachFromTracingSessionReply(_LttngLiveViewerReply):
483 class Status:
484 OK = 1
485 UNKNOWN = 2
486 ERROR = 3
487
488 def __init__(self, status: int):
489 self._status = status
490
491 @property
492 def status(self):
493 return self._status
494
495
496 # An LTTng live protocol codec can convert bytes to command objects and
497 # reply objects to bytes.
498 class _LttngLiveViewerProtocolCodec:
499 _COMMAND_HEADER_STRUCT_FMT = "QII"
500 _COMMAND_HEADER_SIZE_BYTES = struct.calcsize(_COMMAND_HEADER_STRUCT_FMT)
501
502 def __init__(self):
503 pass
504
505 def _unpack(self, fmt: str, data: bytes, offset: int = 0):
506 fmt = "!" + fmt
507 return struct.unpack_from(fmt, data, offset)
508
509 def _unpack_payload(self, fmt: str, data: bytes):
510 return self._unpack(
511 fmt, data, _LttngLiveViewerProtocolCodec._COMMAND_HEADER_SIZE_BYTES
512 )
513
514 def decode(self, data: bytes):
515 if len(data) < self._COMMAND_HEADER_SIZE_BYTES:
516 # Not enough data to read the command header
517 return
518
519 payload_size, cmd_type, version = self._unpack(
520 self._COMMAND_HEADER_STRUCT_FMT, data
521 )
522 logging.info(
523 "Decoded command header: payload-size={}, cmd-type={}, version={}".format(
524 payload_size, cmd_type, version
525 )
526 )
527
528 if len(data) < self._COMMAND_HEADER_SIZE_BYTES + payload_size:
529 # Not enough data to read the whole command
530 return
531
532 if cmd_type == 1:
533 viewer_session_id, major, minor, _ = self._unpack_payload("QIII", data)
534 return _LttngLiveViewerConnectCommand(
535 version, viewer_session_id, major, minor
536 )
537 elif cmd_type == 2:
538 return _LttngLiveViewerGetTracingSessionInfosCommand(version)
539 elif cmd_type == 3:
540 tracing_session_id, offset, seek_type = self._unpack_payload("QQI", data)
541 return _LttngLiveViewerAttachToTracingSessionCommand(
542 version, tracing_session_id, offset, seek_type
543 )
544 elif cmd_type == 4:
545 (stream_id,) = self._unpack_payload("Q", data)
546 return _LttngLiveViewerGetNextDataStreamIndexEntryCommand(
547 version, stream_id
548 )
549 elif cmd_type == 5:
550 stream_id, offset, req_length = self._unpack_payload("QQI", data)
551 return _LttngLiveViewerGetDataStreamPacketDataCommand(
552 version, stream_id, offset, req_length
553 )
554 elif cmd_type == 6:
555 (stream_id,) = self._unpack_payload("Q", data)
556 return _LttngLiveViewerGetMetadataStreamDataCommand(version, stream_id)
557 elif cmd_type == 7:
558 (tracing_session_id,) = self._unpack_payload("Q", data)
559 return _LttngLiveViewerGetNewStreamInfosCommand(version, tracing_session_id)
560 elif cmd_type == 8:
561 return _LttngLiveViewerCreateViewerSessionCommand(version)
562 elif cmd_type == 9:
563 (tracing_session_id,) = self._unpack_payload("Q", data)
564 return _LttngLiveViewerDetachFromTracingSessionCommand(
565 version, tracing_session_id
566 )
567 else:
568 raise RuntimeError("Unknown command type {}".format(cmd_type))
569
570 def _pack(self, fmt: str, *args: Any):
571 # Force network byte order
572 return struct.pack("!" + fmt, *args)
573
574 def _encode_zero_padded_str(self, string: str, length: int):
575 data = string.encode()
576 return data.ljust(length, b"\x00")
577
578 def _encode_stream_info(self, info: _LttngLiveViewerStreamInfo):
579 data = self._pack("QQI", info.id, info.trace_id, int(info.is_metadata))
580 data += self._encode_zero_padded_str(info.path, 4096)
581 data += self._encode_zero_padded_str(info.channel_name, 255)
582 return data
583
584 def _get_has_new_stuff_flags(
585 self, has_new_metadata: bool, has_new_data_streams: bool
586 ):
587 flags = 0
588
589 if has_new_metadata:
590 flags |= 1
591
592 if has_new_data_streams:
593 flags |= 2
594
595 return flags
596
597 def encode(
598 self,
599 reply: _LttngLiveViewerReply,
600 ) -> bytes:
601 if type(reply) is _LttngLiveViewerConnectReply:
602 data = self._pack(
603 "QIII", reply.viewer_session_id, reply.major, reply.minor, 2
604 )
605 elif type(reply) is _LttngLiveViewerGetTracingSessionInfosReply:
606 data = self._pack("I", len(reply.tracing_session_infos))
607
608 for info in reply.tracing_session_infos:
609 data += self._pack(
610 "QIII",
611 info.tracing_session_id,
612 info.live_timer_freq,
613 info.client_count,
614 info.stream_count,
615 )
616 data += self._encode_zero_padded_str(info.hostname, 64)
617 data += self._encode_zero_padded_str(info.name, 255)
618 elif type(reply) is _LttngLiveViewerAttachToTracingSessionReply:
619 data = self._pack("II", reply.status, len(reply.stream_infos))
620
621 for info in reply.stream_infos:
622 data += self._encode_stream_info(info)
623 elif type(reply) is _LttngLiveViewerGetNextDataStreamIndexEntryReply:
624 index_format = "QQQQQQQII"
625 entry = reply.index_entry
626 flags = self._get_has_new_stuff_flags(
627 reply.has_new_metadata, reply.has_new_data_stream
628 )
629
630 if isinstance(entry, _LttngDataStreamIndexEntry):
631 data = self._pack(
632 index_format,
633 entry.offset_bytes,
634 entry.total_size_bits,
635 entry.content_size_bits,
636 entry.timestamp_begin,
637 entry.timestamp_end,
638 entry.events_discarded,
639 entry.stream_class_id,
640 reply.status,
641 flags,
642 )
643 else:
644 data = self._pack(
645 index_format,
646 0,
647 0,
648 0,
649 0,
650 entry.timestamp,
651 0,
652 entry.stream_class_id,
653 reply.status,
654 flags,
655 )
656 elif type(reply) is _LttngLiveViewerGetDataStreamPacketDataReply:
657 flags = self._get_has_new_stuff_flags(
658 reply.has_new_metadata, reply.has_new_data_stream
659 )
660 data = self._pack("III", reply.status, len(reply.data), flags)
661 data += reply.data
662 elif type(reply) is _LttngLiveViewerGetMetadataStreamDataContentReply:
663 data = self._pack("QI", len(reply.data), reply.status)
664 data += reply.data
665 elif type(reply) is _LttngLiveViewerGetNewStreamInfosReply:
666 data = self._pack("II", reply.status, len(reply.stream_infos))
667
668 for info in reply.stream_infos:
669 data += self._encode_stream_info(info)
670 elif type(reply) is _LttngLiveViewerCreateViewerSessionReply:
671 data = self._pack("I", reply.status)
672 elif type(reply) is _LttngLiveViewerDetachFromTracingSessionReply:
673 data = self._pack("I", reply.status)
674 else:
675 raise ValueError(
676 "Unknown reply object with class `{}`".format(reply.__class__.__name__)
677 )
678
679 return data
680
681
682 def _get_entry_timestamp_begin(
683 entry: _LttngIndexEntryT,
684 ):
685 if isinstance(entry, _LttngDataStreamBeaconIndexEntry):
686 return entry.timestamp
687 else:
688 return entry.timestamp_begin
689
690
691 # The index of an LTTng data stream, a sequence of index entries.
692 class _LttngDataStreamIndex(Sequence[_LttngIndexEntryT]):
693 def __init__(self, path: str, beacons: Optional[tjson.ArrayVal]):
694 self._path = path
695 self._build()
696
697 if beacons:
698 stream_class_id = self._entries[0].stream_class_id
699
700 beacons_list = [] # type: list[_LttngDataStreamBeaconIndexEntry]
701 for ts in beacons.iter(tjson.IntVal):
702 beacons_list.append(
703 _LttngDataStreamBeaconIndexEntry(stream_class_id, ts.val)
704 )
705
706 self._add_beacons(beacons_list)
707
708 logging.info(
709 'Built data stream index entries: path="{}", count={}'.format(
710 path, len(self._entries)
711 )
712 )
713
714 def _build(self):
715 self._entries = [] # type: list[_LttngIndexEntryT]
716
717 with open(self._path, "rb") as f:
718 # Read header first
719 fmt = ">IIII"
720 size = struct.calcsize(fmt)
721 data = f.read(size)
722 assert len(data) == size
723 magic, _, _, index_entry_length = struct.unpack(fmt, data)
724 assert magic == 0xC1F1DCC1
725
726 # Read index entries
727 fmt = ">QQQQQQQ"
728 size = struct.calcsize(fmt)
729
730 while True:
731 logging.debug(
732 'Decoding data stream index entry: path="{}", offset={}'.format(
733 self._path, f.tell()
734 )
735 )
736 data = f.read(size)
737
738 if not data:
739 # Done
740 break
741
742 assert len(data) == size
743 (
744 offset_bytes,
745 total_size_bits,
746 content_size_bits,
747 timestamp_begin,
748 timestamp_end,
749 events_discarded,
750 stream_class_id,
751 ) = struct.unpack(fmt, data)
752
753 self._entries.append(
754 _LttngDataStreamIndexEntry(
755 offset_bytes,
756 total_size_bits,
757 content_size_bits,
758 timestamp_begin,
759 timestamp_end,
760 events_discarded,
761 stream_class_id,
762 )
763 )
764
765 # Skip anything else before the next entry
766 f.seek(index_entry_length - size, os.SEEK_CUR)
767
768 def _add_beacons(self, beacons: Iterable[_LttngDataStreamBeaconIndexEntry]):
769 # Assumes entries[n + 1].timestamp_end >= entries[n].timestamp_begin
770 def sort_key(
771 entry: Union[_LttngDataStreamIndexEntry, _LttngDataStreamBeaconIndexEntry],
772 ) -> int:
773 if isinstance(entry, _LttngDataStreamBeaconIndexEntry):
774 return entry.timestamp
775 else:
776 return entry.timestamp_end
777
778 self._entries += beacons
779 self._entries.sort(key=sort_key)
780
781 @overload
782 def __getitem__(self, index: int) -> _LttngIndexEntryT:
783 ...
784
785 @overload
786 def __getitem__(self, index: slice) -> Sequence[_LttngIndexEntryT]: # noqa: F811
787 ...
788
789 def __getitem__( # noqa: F811
790 self, index: Union[int, slice]
791 ) -> Union[_LttngIndexEntryT, Sequence[_LttngIndexEntryT],]:
792 return self._entries[index]
793
794 def __len__(self):
795 return len(self._entries)
796
797 @property
798 def path(self):
799 return self._path
800
801
802 # An LTTng data stream.
803 class _LttngDataStream:
804 def __init__(self, path: str, beacons_json: Optional[tjson.ArrayVal]):
805 self._path = path
806 filename = os.path.basename(path)
807 match = re.match(r"(.*)_\d+", filename)
808 if not match:
809 raise RuntimeError(
810 "Unexpected data stream file name pattern: {}".format(filename)
811 )
812
813 self._channel_name = match.group(1)
814 trace_dir = os.path.dirname(path)
815 index_path = os.path.join(trace_dir, "index", filename + ".idx")
816 self._index = _LttngDataStreamIndex(index_path, beacons_json)
817 assert os.path.isfile(path)
818 self._file = open(path, "rb")
819 logging.info(
820 'Built data stream: path="{}", channel-name="{}"'.format(
821 path, self._channel_name
822 )
823 )
824
825 @property
826 def path(self):
827 return self._path
828
829 @property
830 def channel_name(self):
831 return self._channel_name
832
833 @property
834 def index(self):
835 return self._index
836
837 def get_data(self, offset_bytes: int, len_bytes: int):
838 self._file.seek(offset_bytes)
839 return self._file.read(len_bytes)
840
841
842 class _LttngMetadataStreamSection:
843 def __init__(self, timestamp: int, data: Optional[bytes]):
844 self._timestamp = timestamp
845 if data is None:
846 self._data = bytes()
847 else:
848 self._data = data
849 logging.info(
850 "Built metadata stream section: ts={}, data-len={}".format(
851 self._timestamp, len(self._data)
852 )
853 )
854
855 @property
856 def timestamp(self):
857 return self._timestamp
858
859 @property
860 def data(self):
861 return self._data
862
863
864 # An LTTng metadata stream.
865 class _LttngMetadataStream:
866 def __init__(
867 self,
868 metadata_file_path: str,
869 config_sections: Sequence[_LttngMetadataStreamSection],
870 ):
871 self._path = metadata_file_path
872 self._sections = config_sections
873 logging.info(
874 "Built metadata stream: path={}, section-len={}".format(
875 self._path, len(self._sections)
876 )
877 )
878
879 @property
880 def path(self):
881 return self._path
882
883 @property
884 def sections(self):
885 return self._sections
886
887
888 class LttngMetadataConfigSection:
889 def __init__(self, line: int, timestamp: int, is_empty: bool):
890 self._line = line
891 self._timestamp = timestamp
892 self._is_empty = is_empty
893
894 @property
895 def line(self):
896 return self._line
897
898 @property
899 def timestamp(self):
900 return self._timestamp
901
902 @property
903 def is_empty(self):
904 return self._is_empty
905
906
907 def _parse_metadata_sections_config(metadata_sections_json: tjson.ArrayVal):
908 metadata_sections = [] # type: list[LttngMetadataConfigSection]
909 append_empty_section = False
910 last_timestamp = 0
911 last_line = 0
912
913 for section in metadata_sections_json:
914 if isinstance(section, tjson.StrVal):
915 if section.val == "empty":
916 # Found a empty section marker. Actually append the section at the
917 # timestamp of the next concrete section.
918 append_empty_section = True
919 else:
920 raise ValueError("Invalid string value at {}.".format(section.path))
921 elif isinstance(section, tjson.ObjVal):
922 line = section.at("line", tjson.IntVal).val
923 ts = section.at("timestamp", tjson.IntVal).val
924
925 # Sections' timestamps and lines must both be increasing.
926 assert ts > last_timestamp
927 last_timestamp = ts
928
929 assert line > last_line
930 last_line = line
931
932 if append_empty_section:
933 metadata_sections.append(LttngMetadataConfigSection(line, ts, True))
934 append_empty_section = False
935
936 metadata_sections.append(LttngMetadataConfigSection(line, ts, False))
937 else:
938 raise TypeError(
939 "`{}`: expecting a string or object value".format(section.path)
940 )
941
942 return metadata_sections
943
944
945 def _split_metadata_sections(
946 metadata_file_path: str, metadata_sections_json: tjson.ArrayVal
947 ):
948 metadata_sections = _parse_metadata_sections_config(metadata_sections_json)
949
950 sections = [] # type: list[_LttngMetadataStreamSection]
951 with open(metadata_file_path, "r") as metadata_file:
952 metadata_lines = [line for line in metadata_file]
953
954 metadata_section_idx = 0
955 curr_metadata_section = bytearray()
956
957 for idx, line_content in enumerate(metadata_lines):
958 # Add one to the index to convert from the zero-indexing of the
959 # enumerate() function to the one-indexing used by humans when
960 # viewing a text file.
961 curr_line_number = idx + 1
962
963 # If there are no more sections, simply append the line.
964 if metadata_section_idx + 1 >= len(metadata_sections):
965 curr_metadata_section += bytearray(line_content, "utf8")
966 continue
967
968 next_section_line_number = metadata_sections[metadata_section_idx + 1].line
969
970 # If the next section begins at the current line, create a
971 # section with the metadata we gathered so far.
972 if curr_line_number >= next_section_line_number:
973 # Flushing the metadata of the current section.
974 sections.append(
975 _LttngMetadataStreamSection(
976 metadata_sections[metadata_section_idx].timestamp,
977 bytes(curr_metadata_section),
978 )
979 )
980
981 # Move to the next section.
982 metadata_section_idx += 1
983
984 # Clear old content and append current line for the next section.
985 curr_metadata_section.clear()
986 curr_metadata_section += bytearray(line_content, "utf8")
987
988 # Append any empty sections.
989 while metadata_sections[metadata_section_idx].is_empty:
990 sections.append(
991 _LttngMetadataStreamSection(
992 metadata_sections[metadata_section_idx].timestamp, None
993 )
994 )
995 metadata_section_idx += 1
996 else:
997 # Append line_content to the current metadata section.
998 curr_metadata_section += bytearray(line_content, "utf8")
999
1000 # We iterated over all the lines of the metadata file. Close the current section.
1001 sections.append(
1002 _LttngMetadataStreamSection(
1003 metadata_sections[metadata_section_idx].timestamp,
1004 bytes(curr_metadata_section),
1005 )
1006 )
1007
1008 return sections
1009
1010
1011 _StreamBeaconsT = Dict[str, Iterable[int]]
1012
1013
1014 # An LTTng trace, a sequence of LTTng data streams.
1015 class LttngTrace(Sequence[_LttngDataStream]):
1016 def __init__(
1017 self,
1018 trace_dir: str,
1019 metadata_sections_json: Optional[tjson.ArrayVal],
1020 beacons_json: Optional[tjson.ObjVal],
1021 ):
1022 self._path = trace_dir
1023 self._create_metadata_stream(trace_dir, metadata_sections_json)
1024 self._create_data_streams(trace_dir, beacons_json)
1025 logging.info('Built trace: path="{}"'.format(trace_dir))
1026
1027 def _create_data_streams(
1028 self, trace_dir: str, beacons_json: Optional[tjson.ObjVal]
1029 ):
1030 data_stream_paths = [] # type: list[str]
1031
1032 for filename in os.listdir(trace_dir):
1033 path = os.path.join(trace_dir, filename)
1034
1035 if not os.path.isfile(path):
1036 continue
1037
1038 if filename.startswith("."):
1039 continue
1040
1041 if filename == "metadata":
1042 continue
1043
1044 data_stream_paths.append(path)
1045
1046 data_stream_paths.sort()
1047 self._data_streams = [] # type: list[_LttngDataStream]
1048
1049 for data_stream_path in data_stream_paths:
1050 stream_name = os.path.basename(data_stream_path)
1051 this_beacons_json = None
1052 if beacons_json is not None and stream_name in beacons_json:
1053 this_beacons_json = beacons_json.at(stream_name, tjson.ArrayVal)
1054
1055 self._data_streams.append(
1056 _LttngDataStream(data_stream_path, this_beacons_json)
1057 )
1058
1059 def _create_metadata_stream(
1060 self, trace_dir: str, metadata_sections_json: Optional[tjson.ArrayVal]
1061 ):
1062 metadata_path = os.path.join(trace_dir, "metadata")
1063 metadata_sections = [] # type: list[_LttngMetadataStreamSection]
1064
1065 if metadata_sections_json is None:
1066 with open(metadata_path, "rb") as metadata_file:
1067 metadata_sections.append(
1068 _LttngMetadataStreamSection(0, metadata_file.read())
1069 )
1070 else:
1071 metadata_sections = _split_metadata_sections(
1072 metadata_path, metadata_sections_json
1073 )
1074
1075 self._metadata_stream = _LttngMetadataStream(metadata_path, metadata_sections)
1076
1077 @property
1078 def path(self):
1079 return self._path
1080
1081 @property
1082 def metadata_stream(self):
1083 return self._metadata_stream
1084
1085 @overload
1086 def __getitem__(self, index: int) -> _LttngDataStream:
1087 ...
1088
1089 @overload
1090 def __getitem__(self, index: slice) -> Sequence[_LttngDataStream]: # noqa: F811
1091 ...
1092
1093 def __getitem__( # noqa: F811
1094 self, index: Union[int, slice]
1095 ) -> Union[_LttngDataStream, Sequence[_LttngDataStream]]:
1096 return self._data_streams[index]
1097
1098 def __len__(self):
1099 return len(self._data_streams)
1100
1101
1102 # The state of a single data stream.
1103 class _LttngLiveViewerSessionDataStreamState:
1104 def __init__(
1105 self,
1106 ts_state: "_LttngLiveViewerSessionTracingSessionState",
1107 info: _LttngLiveViewerStreamInfo,
1108 data_stream: _LttngDataStream,
1109 metadata_stream_id: int,
1110 ):
1111 self._ts_state = ts_state
1112 self._info = info
1113 self._data_stream = data_stream
1114 self._metadata_stream_id = metadata_stream_id
1115 self._cur_index_entry_index = 0
1116 fmt = 'Built data stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
1117 logging.info(
1118 fmt.format(
1119 info.id,
1120 ts_state.tracing_session_descriptor.info.tracing_session_id,
1121 ts_state.tracing_session_descriptor.info.name,
1122 data_stream.path,
1123 )
1124 )
1125
1126 @property
1127 def tracing_session_state(self):
1128 return self._ts_state
1129
1130 @property
1131 def info(self):
1132 return self._info
1133
1134 @property
1135 def data_stream(self):
1136 return self._data_stream
1137
1138 @property
1139 def cur_index_entry(self):
1140 if self._cur_index_entry_index == len(self._data_stream.index):
1141 return
1142
1143 return self._data_stream.index[self._cur_index_entry_index]
1144
1145 @property
1146 def metadata_stream_id(self):
1147 return self._metadata_stream_id
1148
1149 def goto_next_index_entry(self):
1150 self._cur_index_entry_index += 1
1151
1152
1153 # The state of a single metadata stream.
1154 class _LttngLiveViewerSessionMetadataStreamState:
1155 def __init__(
1156 self,
1157 ts_state: "_LttngLiveViewerSessionTracingSessionState",
1158 info: _LttngLiveViewerStreamInfo,
1159 metadata_stream: _LttngMetadataStream,
1160 ):
1161 self._ts_state = ts_state
1162 self._info = info
1163 self._metadata_stream = metadata_stream
1164 self._cur_metadata_stream_section_index = 0
1165 if len(metadata_stream.sections) > 1:
1166 self._next_metadata_stream_section_timestamp = metadata_stream.sections[
1167 1
1168 ].timestamp
1169 else:
1170 self._next_metadata_stream_section_timestamp = None
1171
1172 self._is_sent = False
1173 fmt = 'Built metadata stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
1174 logging.info(
1175 fmt.format(
1176 info.id,
1177 ts_state.tracing_session_descriptor.info.tracing_session_id,
1178 ts_state.tracing_session_descriptor.info.name,
1179 metadata_stream.path,
1180 )
1181 )
1182
1183 @property
1184 def info(self):
1185 return self._info
1186
1187 @property
1188 def metadata_stream(self):
1189 return self._metadata_stream
1190
1191 @property
1192 def is_sent(self):
1193 return self._is_sent
1194
1195 @is_sent.setter
1196 def is_sent(self, value: bool):
1197 self._is_sent = value
1198
1199 @property
1200 def cur_section(self):
1201 fmt = "Get current metadata section: section-idx={}"
1202 logging.info(fmt.format(self._cur_metadata_stream_section_index))
1203 if self._cur_metadata_stream_section_index == len(
1204 self._metadata_stream.sections
1205 ):
1206 return
1207
1208 return self._metadata_stream.sections[self._cur_metadata_stream_section_index]
1209
1210 def goto_next_section(self):
1211 self._cur_metadata_stream_section_index += 1
1212 if self.cur_section:
1213 self._next_metadata_stream_section_timestamp = self.cur_section.timestamp
1214 else:
1215 self._next_metadata_stream_section_timestamp = None
1216
1217 @property
1218 def next_section_timestamp(self):
1219 return self._next_metadata_stream_section_timestamp
1220
1221
1222 # A tracing session descriptor.
1223 #
1224 # In the constructor, `traces` is a list of LTTng traces (`LttngTrace`
1225 # objects).
1226 class LttngTracingSessionDescriptor:
1227 def __init__(
1228 self,
1229 name: str,
1230 tracing_session_id: int,
1231 hostname: str,
1232 live_timer_freq: int,
1233 client_count: int,
1234 traces: Iterable[LttngTrace],
1235 ):
1236 for trace in traces:
1237 if name not in trace.path:
1238 fmt = "Tracing session name must be part of every trace path (`{}` not found in `{}`)"
1239 raise ValueError(fmt.format(name, trace.path))
1240
1241 self._traces = traces
1242 stream_count = sum([len(t) + 1 for t in traces])
1243 self._info = _LttngLiveViewerTracingSessionInfo(
1244 tracing_session_id,
1245 live_timer_freq,
1246 client_count,
1247 stream_count,
1248 hostname,
1249 name,
1250 )
1251
1252 @property
1253 def traces(self):
1254 return self._traces
1255
1256 @property
1257 def info(self):
1258 return self._info
1259
1260
1261 # The state of a tracing session.
1262 class _LttngLiveViewerSessionTracingSessionState:
1263 def __init__(self, tc_descr: LttngTracingSessionDescriptor, base_stream_id: int):
1264 self._tc_descr = tc_descr
1265 self._stream_infos = [] # type: list[_LttngLiveViewerStreamInfo]
1266 self._ds_states = {} # type: dict[int, _LttngLiveViewerSessionDataStreamState]
1267 self._ms_states = (
1268 {}
1269 ) # type: dict[int, _LttngLiveViewerSessionMetadataStreamState]
1270 stream_id = base_stream_id
1271
1272 for trace in tc_descr.traces:
1273 trace_id = stream_id * 1000
1274
1275 # Metadata stream -> stream info and metadata stream state
1276 info = _LttngLiveViewerStreamInfo(
1277 stream_id, trace_id, True, trace.metadata_stream.path, "metadata"
1278 )
1279 self._stream_infos.append(info)
1280 self._ms_states[stream_id] = _LttngLiveViewerSessionMetadataStreamState(
1281 self, info, trace.metadata_stream
1282 )
1283 metadata_stream_id = stream_id
1284 stream_id += 1
1285
1286 # Data streams -> stream infos and data stream states
1287 for data_stream in trace:
1288 info = _LttngLiveViewerStreamInfo(
1289 stream_id,
1290 trace_id,
1291 False,
1292 data_stream.path,
1293 data_stream.channel_name,
1294 )
1295 self._stream_infos.append(info)
1296 self._ds_states[stream_id] = _LttngLiveViewerSessionDataStreamState(
1297 self, info, data_stream, metadata_stream_id
1298 )
1299 stream_id += 1
1300
1301 self._is_attached = False
1302 fmt = 'Built tracing session state: id={}, name="{}"'
1303 logging.info(fmt.format(tc_descr.info.tracing_session_id, tc_descr.info.name))
1304
1305 @property
1306 def tracing_session_descriptor(self):
1307 return self._tc_descr
1308
1309 @property
1310 def data_stream_states(self):
1311 return self._ds_states
1312
1313 @property
1314 def metadata_stream_states(self):
1315 return self._ms_states
1316
1317 @property
1318 def stream_infos(self):
1319 return self._stream_infos
1320
1321 @property
1322 def has_new_metadata(self):
1323 return any([not ms.is_sent for ms in self._ms_states.values()])
1324
1325 @property
1326 def is_attached(self):
1327 return self._is_attached
1328
1329 @is_attached.setter
1330 def is_attached(self, value: bool):
1331 self._is_attached = value
1332
1333
1334 def needs_new_metadata_section(
1335 metadata_stream_state: _LttngLiveViewerSessionMetadataStreamState,
1336 latest_timestamp: int,
1337 ):
1338 if metadata_stream_state.next_section_timestamp is None:
1339 return False
1340
1341 if latest_timestamp >= metadata_stream_state.next_section_timestamp:
1342 return True
1343 else:
1344 return False
1345
1346
1347 # An LTTng live viewer session manages a view on tracing sessions
1348 # and replies to commands accordingly.
1349 class _LttngLiveViewerSession:
1350 def __init__(
1351 self,
1352 viewer_session_id: int,
1353 tracing_session_descriptors: Iterable[LttngTracingSessionDescriptor],
1354 max_query_data_response_size: Optional[int],
1355 ):
1356 self._viewer_session_id = viewer_session_id
1357 self._ts_states = (
1358 {}
1359 ) # type: dict[int, _LttngLiveViewerSessionTracingSessionState]
1360 self._stream_states = (
1361 {}
1362 ) # type: dict[int, _LttngLiveViewerSessionDataStreamState | _LttngLiveViewerSessionMetadataStreamState]
1363 self._max_query_data_response_size = max_query_data_response_size
1364 total_stream_infos = 0
1365
1366 for ts_descr in tracing_session_descriptors:
1367 ts_state = _LttngLiveViewerSessionTracingSessionState(
1368 ts_descr, total_stream_infos
1369 )
1370 ts_id = ts_state.tracing_session_descriptor.info.tracing_session_id
1371 self._ts_states[ts_id] = ts_state
1372 total_stream_infos += len(ts_state.stream_infos)
1373
1374 # Update session's stream states to have the new states
1375 self._stream_states.update(ts_state.data_stream_states)
1376 self._stream_states.update(ts_state.metadata_stream_states)
1377
1378 self._command_handlers = {
1379 _LttngLiveViewerAttachToTracingSessionCommand: self._handle_attach_to_tracing_session_command,
1380 _LttngLiveViewerCreateViewerSessionCommand: self._handle_create_viewer_session_command,
1381 _LttngLiveViewerDetachFromTracingSessionCommand: self._handle_detach_from_tracing_session_command,
1382 _LttngLiveViewerGetDataStreamPacketDataCommand: self._handle_get_data_stream_packet_data_command,
1383 _LttngLiveViewerGetMetadataStreamDataCommand: self._handle_get_metadata_stream_data_command,
1384 _LttngLiveViewerGetNewStreamInfosCommand: self._handle_get_new_stream_infos_command,
1385 _LttngLiveViewerGetNextDataStreamIndexEntryCommand: self._handle_get_next_data_stream_index_entry_command,
1386 _LttngLiveViewerGetTracingSessionInfosCommand: self._handle_get_tracing_session_infos_command,
1387 } # type: dict[type[_LttngLiveViewerCommand], Callable[[Any], _LttngLiveViewerReply]]
1388
1389 @property
1390 def viewer_session_id(self):
1391 return self._viewer_session_id
1392
1393 def _get_tracing_session_state(self, tracing_session_id: int):
1394 if tracing_session_id not in self._ts_states:
1395 raise RuntimeError(
1396 "Unknown tracing session ID {}".format(tracing_session_id)
1397 )
1398
1399 return self._ts_states[tracing_session_id]
1400
1401 def _get_data_stream_state(self, stream_id: int):
1402 if stream_id not in self._stream_states:
1403 RuntimeError("Unknown stream ID {}".format(stream_id))
1404
1405 stream = self._stream_states[stream_id]
1406 if type(stream) is not _LttngLiveViewerSessionDataStreamState:
1407 raise RuntimeError("Stream is not a data stream")
1408
1409 return stream
1410
1411 def _get_metadata_stream_state(self, stream_id: int):
1412 if stream_id not in self._stream_states:
1413 RuntimeError("Unknown stream ID {}".format(stream_id))
1414
1415 stream = self._stream_states[stream_id]
1416 if type(stream) is not _LttngLiveViewerSessionMetadataStreamState:
1417 raise RuntimeError("Stream is not a metadata stream")
1418
1419 return stream
1420
1421 def handle_command(self, cmd: _LttngLiveViewerCommand):
1422 logging.info(
1423 "Handling command in viewer session: cmd-cls-name={}".format(
1424 cmd.__class__.__name__
1425 )
1426 )
1427 cmd_type = type(cmd)
1428
1429 if cmd_type not in self._command_handlers:
1430 raise RuntimeError(
1431 "Unexpected command: cmd-cls-name={}".format(cmd.__class__.__name__)
1432 )
1433
1434 return self._command_handlers[cmd_type](cmd)
1435
1436 def _handle_attach_to_tracing_session_command(
1437 self, cmd: _LttngLiveViewerAttachToTracingSessionCommand
1438 ):
1439 fmt = 'Handling "attach to tracing session" command: ts-id={}, offset={}, seek-type={}'
1440 logging.info(fmt.format(cmd.tracing_session_id, cmd.offset, cmd.seek_type))
1441 ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
1442 info = ts_state.tracing_session_descriptor.info
1443
1444 if ts_state.is_attached:
1445 raise RuntimeError(
1446 "Cannot attach to tracing session `{}`: viewer is already attached".format(
1447 info.name
1448 )
1449 )
1450
1451 ts_state.is_attached = True
1452 status = _LttngLiveViewerAttachToTracingSessionReply.Status.OK
1453 return _LttngLiveViewerAttachToTracingSessionReply(
1454 status, ts_state.stream_infos
1455 )
1456
1457 def _handle_detach_from_tracing_session_command(
1458 self, cmd: _LttngLiveViewerDetachFromTracingSessionCommand
1459 ):
1460 fmt = 'Handling "detach from tracing session" command: ts-id={}'
1461 logging.info(fmt.format(cmd.tracing_session_id))
1462 ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
1463 info = ts_state.tracing_session_descriptor.info
1464
1465 if not ts_state.is_attached:
1466 raise RuntimeError(
1467 "Cannot detach to tracing session `{}`: viewer is not attached".format(
1468 info.name
1469 )
1470 )
1471
1472 ts_state.is_attached = False
1473 status = _LttngLiveViewerDetachFromTracingSessionReply.Status.OK
1474 return _LttngLiveViewerDetachFromTracingSessionReply(status)
1475
1476 def _handle_get_next_data_stream_index_entry_command(
1477 self, cmd: _LttngLiveViewerGetNextDataStreamIndexEntryCommand
1478 ):
1479 fmt = 'Handling "get next data stream index entry" command: stream-id={}'
1480 logging.info(fmt.format(cmd.stream_id))
1481 stream_state = self._get_data_stream_state(cmd.stream_id)
1482 metadata_stream_state = self._get_metadata_stream_state(
1483 stream_state.metadata_stream_id
1484 )
1485
1486 if stream_state.cur_index_entry is None:
1487 # The viewer is done reading this stream
1488 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.HUP
1489
1490 # Dummy data stream index entry to use with the `HUP` status
1491 # (the reply needs one, but the viewer ignores it)
1492 index_entry = _LttngDataStreamIndexEntry(0, 0, 0, 0, 0, 0, 0)
1493
1494 return _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1495 status, index_entry, False, False
1496 )
1497
1498 timestamp_begin = _get_entry_timestamp_begin(stream_state.cur_index_entry)
1499
1500 if needs_new_metadata_section(metadata_stream_state, timestamp_begin):
1501 metadata_stream_state.is_sent = False
1502 metadata_stream_state.goto_next_section()
1503
1504 # The viewer only checks the `has_new_metadata` flag if the
1505 # reply's status is `OK`, so we need to provide an index here
1506 has_new_metadata = stream_state.tracing_session_state.has_new_metadata
1507 if isinstance(stream_state.cur_index_entry, _LttngDataStreamIndexEntry):
1508 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.OK
1509 else:
1510 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.INACTIVE
1511
1512 reply = _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1513 status, stream_state.cur_index_entry, has_new_metadata, False
1514 )
1515 stream_state.goto_next_index_entry()
1516 return reply
1517
1518 def _handle_get_data_stream_packet_data_command(
1519 self, cmd: _LttngLiveViewerGetDataStreamPacketDataCommand
1520 ):
1521 fmt = 'Handling "get data stream packet data" command: stream-id={}, offset={}, req-length={}'
1522 logging.info(fmt.format(cmd.stream_id, cmd.offset, cmd.req_length))
1523 stream_state = self._get_data_stream_state(cmd.stream_id)
1524 data_response_length = cmd.req_length
1525
1526 if stream_state.tracing_session_state.has_new_metadata:
1527 status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.ERROR
1528 return _LttngLiveViewerGetDataStreamPacketDataReply(
1529 status, bytes(), True, False
1530 )
1531
1532 if self._max_query_data_response_size:
1533 # Enforce a server side limit on the query requested length.
1534 # To ensure that the transaction terminate take the minimum of both
1535 # value.
1536 data_response_length = min(
1537 cmd.req_length, self._max_query_data_response_size
1538 )
1539 fmt = 'Limiting "get data stream packet data" command: req-length={} actual response size={}'
1540 logging.info(fmt.format(cmd.req_length, data_response_length))
1541
1542 data = stream_state.data_stream.get_data(cmd.offset, data_response_length)
1543 status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.OK
1544 return _LttngLiveViewerGetDataStreamPacketDataReply(status, data, False, False)
1545
1546 def _handle_get_metadata_stream_data_command(
1547 self, cmd: _LttngLiveViewerGetMetadataStreamDataCommand
1548 ):
1549 fmt = 'Handling "get metadata stream data" command: stream-id={}'
1550 logging.info(fmt.format(cmd.stream_id))
1551 metadata_stream_state = self._get_metadata_stream_state(cmd.stream_id)
1552
1553 if metadata_stream_state.is_sent:
1554 status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.NO_NEW
1555 return _LttngLiveViewerGetMetadataStreamDataContentReply(status, bytes())
1556
1557 metadata_stream_state.is_sent = True
1558 status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.OK
1559 metadata_section = metadata_stream_state.cur_section
1560 assert metadata_section is not None
1561
1562 # If we are sending an empty section, ready the next one right away.
1563 if len(metadata_section.data) == 0:
1564 metadata_stream_state.is_sent = False
1565 metadata_stream_state.goto_next_section()
1566
1567 fmt = 'Replying to "get metadata stream data" command: metadata-size={}'
1568 logging.info(fmt.format(len(metadata_section.data)))
1569 return _LttngLiveViewerGetMetadataStreamDataContentReply(
1570 status, metadata_section.data
1571 )
1572
1573 def _handle_get_new_stream_infos_command(
1574 self, cmd: _LttngLiveViewerGetNewStreamInfosCommand
1575 ):
1576 fmt = 'Handling "get new stream infos" command: ts-id={}'
1577 logging.info(fmt.format(cmd.tracing_session_id))
1578
1579 # As of this version, all the tracing session's stream infos are
1580 # always given to the viewer when sending the "attach to tracing
1581 # session" reply, so there's nothing new here. Return the `HUP`
1582 # status as, if we're handling this command, the viewer consumed
1583 # all the existing data streams.
1584 status = _LttngLiveViewerGetNewStreamInfosReply.Status.HUP
1585 return _LttngLiveViewerGetNewStreamInfosReply(status, [])
1586
1587 def _handle_get_tracing_session_infos_command(
1588 self, cmd: _LttngLiveViewerGetTracingSessionInfosCommand
1589 ):
1590 logging.info('Handling "get tracing session infos" command.')
1591 infos = [
1592 tss.tracing_session_descriptor.info for tss in self._ts_states.values()
1593 ]
1594 infos.sort(key=lambda info: info.name)
1595 return _LttngLiveViewerGetTracingSessionInfosReply(infos)
1596
1597 def _handle_create_viewer_session_command(
1598 self, cmd: _LttngLiveViewerCreateViewerSessionCommand
1599 ):
1600 logging.info('Handling "create viewer session" command.')
1601 status = _LttngLiveViewerCreateViewerSessionReply.Status.OK
1602
1603 # This does nothing here. In the LTTng relay daemon, it
1604 # allocates the viewer session's state.
1605 return _LttngLiveViewerCreateViewerSessionReply(status)
1606
1607
1608 # An LTTng live TCP server.
1609 #
1610 # On creation, it binds to `localhost` on the TCP port `port` if not `None`, or
1611 # on an OS-assigned TCP port otherwise. It writes the decimal TCP port number
1612 # to a temporary port file. It renames the temporary port file to
1613 # `port_filename`.
1614 #
1615 # `tracing_session_descriptors` is a list of tracing session descriptors
1616 # (`LttngTracingSessionDescriptor`) to serve.
1617 #
1618 # This server accepts a single viewer (client).
1619 #
1620 # When the viewer closes the connection, the server's constructor
1621 # returns.
1622 class LttngLiveServer:
1623 def __init__(
1624 self,
1625 port: Optional[int],
1626 port_filename: Optional[str],
1627 tracing_session_descriptors: Iterable[LttngTracingSessionDescriptor],
1628 max_query_data_response_size: Optional[int],
1629 ):
1630 logging.info("Server configuration:")
1631
1632 logging.info(" Port file name: `{}`".format(port_filename))
1633
1634 if max_query_data_response_size is not None:
1635 logging.info(
1636 " Maximum response data query size: `{}`".format(
1637 max_query_data_response_size
1638 )
1639 )
1640
1641 for ts_descr in tracing_session_descriptors:
1642 info = ts_descr.info
1643 fmt = ' TS descriptor: name="{}", id={}, hostname="{}", live-timer-freq={}, client-count={}, stream-count={}:'
1644 logging.info(
1645 fmt.format(
1646 info.name,
1647 info.tracing_session_id,
1648 info.hostname,
1649 info.live_timer_freq,
1650 info.client_count,
1651 info.stream_count,
1652 )
1653 )
1654
1655 for trace in ts_descr.traces:
1656 logging.info(' Trace: path="{}"'.format(trace.path))
1657
1658 self._ts_descriptors = tracing_session_descriptors
1659 self._max_query_data_response_size = max_query_data_response_size
1660 self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1661 self._codec = _LttngLiveViewerProtocolCodec()
1662
1663 # Port 0: OS assigns an unused port
1664 serv_addr = ("localhost", port if port is not None else 0)
1665 self._sock.bind(serv_addr)
1666
1667 if port_filename is not None:
1668 self._write_port_to_file(port_filename)
1669
1670 print("Listening on port {}".format(self._server_port))
1671
1672 for ts_descr in tracing_session_descriptors:
1673 info = ts_descr.info
1674 print(
1675 "net://localhost:{}/host/{}/{}".format(
1676 self._server_port, info.hostname, info.name
1677 )
1678 )
1679
1680 try:
1681 self._listen()
1682 finally:
1683 self._sock.close()
1684 logging.info("Closed connection and socket.")
1685
1686 @property
1687 def _server_port(self):
1688 return self._sock.getsockname()[1]
1689
1690 def _recv_command(self):
1691 data = bytes()
1692
1693 while True:
1694 logging.info("Waiting for viewer command.")
1695 buf = self._conn.recv(128)
1696
1697 if not buf:
1698 logging.info("Client closed connection.")
1699
1700 if data:
1701 raise RuntimeError(
1702 "Client closed connection after having sent {} command bytes.".format(
1703 len(data)
1704 )
1705 )
1706
1707 return
1708
1709 logging.info("Received data from viewer: length={}".format(len(buf)))
1710
1711 data += buf
1712
1713 try:
1714 cmd = self._codec.decode(data)
1715 except struct.error as exc:
1716 raise RuntimeError("Malformed command: {}".format(exc)) from exc
1717
1718 if cmd is not None:
1719 logging.info(
1720 "Received command from viewer: cmd-cls-name={}".format(
1721 cmd.__class__.__name__
1722 )
1723 )
1724 return cmd
1725
1726 def _send_reply(self, reply: _LttngLiveViewerReply):
1727 data = self._codec.encode(reply)
1728 logging.info(
1729 "Sending reply to viewer: reply-cls-name={}, length={}".format(
1730 reply.__class__.__name__, len(data)
1731 )
1732 )
1733 self._conn.sendall(data)
1734
1735 def _handle_connection(self):
1736 # First command must be "connect"
1737 cmd = self._recv_command()
1738
1739 if type(cmd) is not _LttngLiveViewerConnectCommand:
1740 raise RuntimeError(
1741 'First command is not "connect": cmd-cls-name={}'.format(
1742 cmd.__class__.__name__
1743 )
1744 )
1745
1746 # Create viewer session (arbitrary ID 23)
1747 logging.info(
1748 "LTTng live viewer connected: version={}.{}".format(cmd.major, cmd.minor)
1749 )
1750 viewer_session = _LttngLiveViewerSession(
1751 23, self._ts_descriptors, self._max_query_data_response_size
1752 )
1753
1754 # Send "connect" reply
1755 self._send_reply(
1756 _LttngLiveViewerConnectReply(viewer_session.viewer_session_id, 2, 10)
1757 )
1758
1759 # Make the viewer session handle the remaining commands
1760 while True:
1761 cmd = self._recv_command()
1762
1763 if cmd is None:
1764 # Connection closed (at an expected location within the
1765 # conversation)
1766 return
1767
1768 self._send_reply(viewer_session.handle_command(cmd))
1769
1770 def _listen(self):
1771 logging.info("Listening: port={}".format(self._server_port))
1772 # Backlog must be present for Python version < 3.5.
1773 # 128 is an arbitrary number since we expect only 1 connection anyway.
1774 self._sock.listen(128)
1775 self._conn, viewer_addr = self._sock.accept()
1776 logging.info(
1777 "Accepted viewer: addr={}:{}".format(viewer_addr[0], viewer_addr[1])
1778 )
1779
1780 try:
1781 self._handle_connection()
1782 finally:
1783 self._conn.close()
1784
1785 def _write_port_to_file(self, port_filename: str):
1786 # Write the port number to a temporary file.
1787 with tempfile.NamedTemporaryFile(
1788 mode="w", delete=False, dir=os.path.dirname(port_filename)
1789 ) as tmp_port_file:
1790 print(self._server_port, end="", file=tmp_port_file)
1791
1792 # Rename temporary file to real file.
1793 #
1794 # For unknown reasons, on Windows, moving the port file from its
1795 # temporary location to its final location (where the user of
1796 # the server expects it to appear) may raise a `PermissionError`
1797 # exception.
1798 #
1799 # We suppose it's possible that something in the Windows kernel
1800 # hasn't completely finished using the file when we try to move
1801 # it.
1802 #
1803 # Use a wait-and-retry scheme as a (bad) workaround.
1804 num_attempts = 5
1805 retry_delay_s = 1
1806
1807 for attempt in reversed(range(num_attempts)):
1808 try:
1809 os.replace(tmp_port_file.name, port_filename)
1810 logging.info(
1811 'Renamed port file: src-path="{}", dst-path="{}"'.format(
1812 tmp_port_file.name, port_filename
1813 )
1814 )
1815 return
1816 except PermissionError:
1817 logging.info(
1818 'Permission error while attempting to rename port file; retrying in {} second: src-path="{}", dst-path="{}"'.format(
1819 retry_delay_s, tmp_port_file.name, port_filename
1820 )
1821 )
1822
1823 if attempt == 0:
1824 raise
1825
1826 time.sleep(retry_delay_s)
1827
1828
1829 def _session_descriptors_from_path(
1830 sessions_filename: str, trace_path_prefix: Optional[str]
1831 ):
1832 # File format is:
1833 #
1834 # [
1835 # {
1836 # "name": "my-session",
1837 # "id": 17,
1838 # "hostname": "myhost",
1839 # "live-timer-freq": 1000000,
1840 # "client-count": 23,
1841 # "traces": [
1842 # {
1843 # "path": "lol"
1844 # },
1845 # {
1846 # "path": "meow/mix",
1847 # "beacons": {
1848 # "my_stream": [ 5235787, 728375283 ]
1849 # },
1850 # "metadata-sections": [
1851 # {
1852 # "line": 1,
1853 # "timestamp": 0
1854 # }
1855 # ]
1856 # }
1857 # ]
1858 # }
1859 # ]
1860 with open(sessions_filename, "r") as sessions_file:
1861 sessions_json = tjson.load(sessions_file, tjson.ArrayVal)
1862
1863 sessions = [] # type: list[LttngTracingSessionDescriptor]
1864
1865 for session_json in sessions_json.iter(tjson.ObjVal):
1866 name = session_json.at("name", tjson.StrVal).val
1867 tracing_session_id = session_json.at("id", tjson.IntVal).val
1868 hostname = session_json.at("hostname", tjson.StrVal).val
1869 live_timer_freq = session_json.at("live-timer-freq", tjson.IntVal).val
1870 client_count = session_json.at("client-count", tjson.IntVal).val
1871 traces_json = session_json.at("traces", tjson.ArrayVal)
1872
1873 traces = [] # type: list[LttngTrace]
1874
1875 for trace_json in traces_json.iter(tjson.ObjVal):
1876 metadata_sections = (
1877 trace_json.at("metadata-sections", tjson.ArrayVal)
1878 if "metadata-sections" in trace_json
1879 else None
1880 )
1881 beacons = (
1882 trace_json.at("beacons", tjson.ObjVal)
1883 if "beacons" in trace_json
1884 else None
1885 )
1886 path = trace_json.at("path", tjson.StrVal).val
1887
1888 if not os.path.isabs(path) and trace_path_prefix:
1889 path = os.path.join(trace_path_prefix, path)
1890
1891 traces.append(
1892 LttngTrace(
1893 path,
1894 metadata_sections,
1895 beacons,
1896 )
1897 )
1898
1899 sessions.append(
1900 LttngTracingSessionDescriptor(
1901 name,
1902 tracing_session_id,
1903 hostname,
1904 live_timer_freq,
1905 client_count,
1906 traces,
1907 )
1908 )
1909
1910 return sessions
1911
1912
1913 def _loglevel_parser(string: str):
1914 loglevels = {"info": logging.INFO, "warning": logging.WARNING}
1915 if string not in loglevels:
1916 msg = "{} is not a valid loglevel".format(string)
1917 raise argparse.ArgumentTypeError(msg)
1918 return loglevels[string]
1919
1920
1921 if __name__ == "__main__":
1922 logging.basicConfig(format="# %(asctime)-25s%(message)s")
1923 parser = argparse.ArgumentParser(
1924 description="LTTng-live protocol mocker", add_help=False
1925 )
1926 parser.add_argument(
1927 "--log-level",
1928 default="warning",
1929 choices=["info", "warning"],
1930 help="The loglevel to be used.",
1931 )
1932
1933 loglevel_namespace, remaining_args = parser.parse_known_args()
1934 logging.getLogger().setLevel(_loglevel_parser(loglevel_namespace.log_level))
1935
1936 parser.add_argument(
1937 "--port",
1938 help="The port to bind to. If missing, use an OS-assigned port..",
1939 type=int,
1940 )
1941 parser.add_argument(
1942 "--port-filename",
1943 help="The final port file. This file is present when the server is ready to receive connection.",
1944 )
1945 parser.add_argument(
1946 "--max-query-data-response-size",
1947 type=int,
1948 help="The maximum size of control data response in bytes",
1949 )
1950 parser.add_argument(
1951 "--trace-path-prefix",
1952 type=str,
1953 help="Prefix to prepend to the trace paths of session configurations",
1954 )
1955 parser.add_argument(
1956 "sessions_filename",
1957 type=str,
1958 help="Path to a session configuration file",
1959 metavar="sessions-filename",
1960 )
1961 parser.add_argument(
1962 "-h",
1963 "--help",
1964 action="help",
1965 default=argparse.SUPPRESS,
1966 help="Show this help message and exit.",
1967 )
1968
1969 args = parser.parse_args(args=remaining_args)
1970 sessions_filename = args.sessions_filename # type: str
1971 trace_path_prefix = args.trace_path_prefix # type: str | None
1972 sessions = _session_descriptors_from_path(
1973 sessions_filename,
1974 trace_path_prefix,
1975 )
1976
1977 port = args.port # type: int | None
1978 port_filename = args.port_filename # type: str | None
1979 max_query_data_response_size = args.max_query_data_response_size # type: int | None
1980 LttngLiveServer(port, port_filename, sessions, max_query_data_response_size)
This page took 0.068769 seconds and 3 git commands to generate.