dacc178c756ae444d433aa686b0d1e3e795b374b
[babeltrace.git] / tests / data / plugins / src.ctf.lttng-live / lttng_live_server.py
1 # SPDX-License-Identifier: MIT
2 #
3 # Copyright (C) 2019 Philippe Proulx <pproulx@efficios.com>
4 #
5
6 # pyright: strict, reportTypeCommentUsage=false, reportMissingTypeStubs=false
7
8 import os
9 import re
10 import time
11 import socket
12 import struct
13 import logging
14 import os.path
15 import argparse
16 import tempfile
17 from abc import ABC, abstractmethod
18 from typing import Dict, Union, Iterable, Optional, Sequence, overload
19
20 import tjson
21
22 # isort: off
23 from typing import Any, Callable # noqa: F401
24
25 # isort: on
26
27
28 # An entry within the index of an LTTng data stream.
29 class _LttngDataStreamIndexEntry:
30 def __init__(
31 self,
32 offset_bytes: int,
33 total_size_bits: int,
34 content_size_bits: int,
35 timestamp_begin: int,
36 timestamp_end: int,
37 events_discarded: int,
38 stream_class_id: int,
39 ):
40 self._offset_bytes = offset_bytes
41 self._total_size_bits = total_size_bits
42 self._content_size_bits = content_size_bits
43 self._timestamp_begin = timestamp_begin
44 self._timestamp_end = timestamp_end
45 self._events_discarded = events_discarded
46 self._stream_class_id = stream_class_id
47
48 @property
49 def offset_bytes(self):
50 return self._offset_bytes
51
52 @property
53 def total_size_bits(self):
54 return self._total_size_bits
55
56 @property
57 def total_size_bytes(self):
58 return self._total_size_bits // 8
59
60 @property
61 def content_size_bits(self):
62 return self._content_size_bits
63
64 @property
65 def content_size_bytes(self):
66 return self._content_size_bits // 8
67
68 @property
69 def timestamp_begin(self):
70 return self._timestamp_begin
71
72 @property
73 def timestamp_end(self):
74 return self._timestamp_end
75
76 @property
77 def events_discarded(self):
78 return self._events_discarded
79
80 @property
81 def stream_class_id(self):
82 return self._stream_class_id
83
84
85 # An entry within the index of an LTTng data stream. While a stream beacon entry
86 # is conceptually unrelated to an index, it is sent as a reply to a
87 # LttngLiveViewerGetNextDataStreamIndexEntryCommand
88 class _LttngDataStreamBeaconIndexEntry:
89 def __init__(self, stream_class_id: int, timestamp: int):
90 self._stream_class_id = stream_class_id
91 self._timestamp = timestamp
92
93 @property
94 def timestamp(self):
95 return self._timestamp
96
97 @property
98 def stream_class_id(self):
99 return self._stream_class_id
100
101
102 _LttngIndexEntryT = Union[_LttngDataStreamIndexEntry, _LttngDataStreamBeaconIndexEntry]
103
104
105 class _LttngLiveViewerCommand:
106 def __init__(self, version: int):
107 self._version = version
108
109 @property
110 def version(self):
111 return self._version
112
113
114 class _LttngLiveViewerConnectCommand(_LttngLiveViewerCommand):
115 def __init__(self, version: int, viewer_session_id: int, major: int, minor: int):
116 super().__init__(version)
117 self._viewer_session_id = viewer_session_id
118 self._major = major
119 self._minor = minor
120
121 @property
122 def viewer_session_id(self):
123 return self._viewer_session_id
124
125 @property
126 def major(self):
127 return self._major
128
129 @property
130 def minor(self):
131 return self._minor
132
133
134 class _LttngLiveViewerReply:
135 pass
136
137
138 class _LttngLiveViewerConnectReply(_LttngLiveViewerReply):
139 def __init__(self, viewer_session_id: int, major: int, minor: int):
140 self._viewer_session_id = viewer_session_id
141 self._major = major
142 self._minor = minor
143
144 @property
145 def viewer_session_id(self):
146 return self._viewer_session_id
147
148 @property
149 def major(self):
150 return self._major
151
152 @property
153 def minor(self):
154 return self._minor
155
156
157 class _LttngLiveViewerGetTracingSessionInfosCommand(_LttngLiveViewerCommand):
158 pass
159
160
161 class _LttngLiveViewerTracingSessionInfo:
162 def __init__(
163 self,
164 tracing_session_id: int,
165 live_timer_freq: int,
166 client_count: int,
167 stream_count: int,
168 hostname: str,
169 name: str,
170 ):
171 self._tracing_session_id = tracing_session_id
172 self._live_timer_freq = live_timer_freq
173 self._client_count = client_count
174 self._stream_count = stream_count
175 self._hostname = hostname
176 self._name = name
177
178 @property
179 def tracing_session_id(self):
180 return self._tracing_session_id
181
182 @property
183 def live_timer_freq(self):
184 return self._live_timer_freq
185
186 @property
187 def client_count(self):
188 return self._client_count
189
190 @property
191 def stream_count(self):
192 return self._stream_count
193
194 @property
195 def hostname(self):
196 return self._hostname
197
198 @property
199 def name(self):
200 return self._name
201
202
203 class _LttngLiveViewerGetTracingSessionInfosReply(_LttngLiveViewerReply):
204 def __init__(
205 self, tracing_session_infos: Sequence[_LttngLiveViewerTracingSessionInfo]
206 ):
207 self._tracing_session_infos = tracing_session_infos
208
209 @property
210 def tracing_session_infos(self):
211 return self._tracing_session_infos
212
213
214 class _LttngLiveViewerAttachToTracingSessionCommand(_LttngLiveViewerCommand):
215 class SeekType:
216 BEGINNING = 1
217 LAST = 2
218
219 def __init__(
220 self, version: int, tracing_session_id: int, offset: int, seek_type: int
221 ):
222 super().__init__(version)
223 self._tracing_session_id = tracing_session_id
224 self._offset = offset
225 self._seek_type = seek_type
226
227 @property
228 def tracing_session_id(self):
229 return self._tracing_session_id
230
231 @property
232 def offset(self):
233 return self._offset
234
235 @property
236 def seek_type(self):
237 return self._seek_type
238
239
240 class _LttngLiveViewerStreamInfo:
241 def __init__(
242 self, id: int, trace_id: int, is_metadata: bool, path: str, channel_name: str
243 ):
244 self._id = id
245 self._trace_id = trace_id
246 self._is_metadata = is_metadata
247 self._path = path
248 self._channel_name = channel_name
249
250 @property
251 def id(self):
252 return self._id
253
254 @property
255 def trace_id(self):
256 return self._trace_id
257
258 @property
259 def is_metadata(self):
260 return self._is_metadata
261
262 @property
263 def path(self):
264 return self._path
265
266 @property
267 def channel_name(self):
268 return self._channel_name
269
270
271 class _LttngLiveViewerAttachToTracingSessionReply(_LttngLiveViewerReply):
272 class Status:
273 OK = 1
274 ALREADY = 2
275 UNKNOWN = 3
276 NOT_LIVE = 4
277 SEEK_ERROR = 5
278 NO_SESSION = 6
279
280 def __init__(self, status: int, stream_infos: Sequence[_LttngLiveViewerStreamInfo]):
281 self._status = status
282 self._stream_infos = stream_infos
283
284 @property
285 def status(self):
286 return self._status
287
288 @property
289 def stream_infos(self):
290 return self._stream_infos
291
292
293 class _LttngLiveViewerGetNextDataStreamIndexEntryCommand(_LttngLiveViewerCommand):
294 def __init__(self, version: int, stream_id: int):
295 super().__init__(version)
296 self._stream_id = stream_id
297
298 @property
299 def stream_id(self):
300 return self._stream_id
301
302
303 class _LttngLiveViewerGetNextDataStreamIndexEntryReply(_LttngLiveViewerReply):
304 class Status:
305 OK = 1
306 RETRY = 2
307 HUP = 3
308 ERROR = 4
309 INACTIVE = 5
310 EOF = 6
311
312 def __init__(
313 self,
314 status: int,
315 index_entry: _LttngIndexEntryT,
316 has_new_metadata: bool,
317 has_new_data_stream: bool,
318 ):
319 self._status = status
320 self._index_entry = index_entry
321 self._has_new_metadata = has_new_metadata
322 self._has_new_data_stream = has_new_data_stream
323
324 @property
325 def status(self):
326 return self._status
327
328 @property
329 def index_entry(self):
330 return self._index_entry
331
332 @property
333 def has_new_metadata(self):
334 return self._has_new_metadata
335
336 @property
337 def has_new_data_stream(self):
338 return self._has_new_data_stream
339
340
341 class _LttngLiveViewerGetDataStreamPacketDataCommand(_LttngLiveViewerCommand):
342 def __init__(self, version: int, stream_id: int, offset: int, req_length: int):
343 super().__init__(version)
344 self._stream_id = stream_id
345 self._offset = offset
346 self._req_length = req_length
347
348 @property
349 def stream_id(self):
350 return self._stream_id
351
352 @property
353 def offset(self):
354 return self._offset
355
356 @property
357 def req_length(self):
358 return self._req_length
359
360
361 class _LttngLiveViewerGetDataStreamPacketDataReply(_LttngLiveViewerReply):
362 class Status:
363 OK = 1
364 RETRY = 2
365 ERROR = 3
366 EOF = 4
367
368 def __init__(
369 self,
370 status: int,
371 data: bytes,
372 has_new_metadata: bool,
373 has_new_data_stream: bool,
374 ):
375 self._status = status
376 self._data = data
377 self._has_new_metadata = has_new_metadata
378 self._has_new_data_stream = has_new_data_stream
379
380 @property
381 def status(self):
382 return self._status
383
384 @property
385 def data(self):
386 return self._data
387
388 @property
389 def has_new_metadata(self):
390 return self._has_new_metadata
391
392 @property
393 def has_new_data_stream(self):
394 return self._has_new_data_stream
395
396
397 class _LttngLiveViewerGetMetadataStreamDataCommand(_LttngLiveViewerCommand):
398 def __init__(self, version: int, stream_id: int):
399 super().__init__(version)
400 self._stream_id = stream_id
401
402 @property
403 def stream_id(self):
404 return self._stream_id
405
406
407 class _LttngLiveViewerGetMetadataStreamDataContentReply(_LttngLiveViewerReply):
408 class Status:
409 OK = 1
410 NO_NEW = 2
411 ERROR = 3
412
413 def __init__(self, status: int, data: bytes):
414 self._status = status
415 self._data = data
416
417 @property
418 def status(self):
419 return self._status
420
421 @property
422 def data(self):
423 return self._data
424
425
426 class _LttngLiveViewerGetNewStreamInfosCommand(_LttngLiveViewerCommand):
427 def __init__(self, version: int, tracing_session_id: int):
428 super().__init__(version)
429 self._tracing_session_id = tracing_session_id
430
431 @property
432 def tracing_session_id(self):
433 return self._tracing_session_id
434
435
436 class _LttngLiveViewerGetNewStreamInfosReply(_LttngLiveViewerReply):
437 class Status:
438 OK = 1
439 NO_NEW = 2
440 ERROR = 3
441 HUP = 4
442
443 def __init__(self, status: int, stream_infos: Sequence[_LttngLiveViewerStreamInfo]):
444 self._status = status
445 self._stream_infos = stream_infos
446
447 @property
448 def status(self):
449 return self._status
450
451 @property
452 def stream_infos(self):
453 return self._stream_infos
454
455
456 class _LttngLiveViewerCreateViewerSessionCommand(_LttngLiveViewerCommand):
457 pass
458
459
460 class _LttngLiveViewerCreateViewerSessionReply(_LttngLiveViewerReply):
461 class Status:
462 OK = 1
463 ERROR = 2
464
465 def __init__(self, status: int):
466 self._status = status
467
468 @property
469 def status(self):
470 return self._status
471
472
473 class _LttngLiveViewerDetachFromTracingSessionCommand(_LttngLiveViewerCommand):
474 def __init__(self, version: int, tracing_session_id: int):
475 super().__init__(version)
476 self._tracing_session_id = tracing_session_id
477
478 @property
479 def tracing_session_id(self):
480 return self._tracing_session_id
481
482
483 class _LttngLiveViewerDetachFromTracingSessionReply(_LttngLiveViewerReply):
484 class Status:
485 OK = 1
486 UNKNOWN = 2
487 ERROR = 3
488
489 def __init__(self, status: int):
490 self._status = status
491
492 @property
493 def status(self):
494 return self._status
495
496
497 # An LTTng live protocol codec can convert bytes to command objects and
498 # reply objects to bytes.
499 class _LttngLiveViewerProtocolCodec:
500 _COMMAND_HEADER_STRUCT_FMT = "QII"
501 _COMMAND_HEADER_SIZE_BYTES = struct.calcsize(_COMMAND_HEADER_STRUCT_FMT)
502
503 def __init__(self):
504 pass
505
506 def _unpack(self, fmt: str, data: bytes, offset: int = 0):
507 fmt = "!" + fmt
508 return struct.unpack_from(fmt, data, offset)
509
510 def _unpack_payload(self, fmt: str, data: bytes):
511 return self._unpack(
512 fmt, data, _LttngLiveViewerProtocolCodec._COMMAND_HEADER_SIZE_BYTES
513 )
514
515 def decode(self, data: bytes):
516 if len(data) < self._COMMAND_HEADER_SIZE_BYTES:
517 # Not enough data to read the command header
518 return
519
520 payload_size, cmd_type, version = self._unpack(
521 self._COMMAND_HEADER_STRUCT_FMT, data
522 )
523 logging.info(
524 "Decoded command header: payload-size={}, cmd-type={}, version={}".format(
525 payload_size, cmd_type, version
526 )
527 )
528
529 if len(data) < self._COMMAND_HEADER_SIZE_BYTES + payload_size:
530 # Not enough data to read the whole command
531 return
532
533 if cmd_type == 1:
534 viewer_session_id, major, minor, _ = self._unpack_payload("QIII", data)
535 return _LttngLiveViewerConnectCommand(
536 version, viewer_session_id, major, minor
537 )
538 elif cmd_type == 2:
539 return _LttngLiveViewerGetTracingSessionInfosCommand(version)
540 elif cmd_type == 3:
541 tracing_session_id, offset, seek_type = self._unpack_payload("QQI", data)
542 return _LttngLiveViewerAttachToTracingSessionCommand(
543 version, tracing_session_id, offset, seek_type
544 )
545 elif cmd_type == 4:
546 (stream_id,) = self._unpack_payload("Q", data)
547 return _LttngLiveViewerGetNextDataStreamIndexEntryCommand(
548 version, stream_id
549 )
550 elif cmd_type == 5:
551 stream_id, offset, req_length = self._unpack_payload("QQI", data)
552 return _LttngLiveViewerGetDataStreamPacketDataCommand(
553 version, stream_id, offset, req_length
554 )
555 elif cmd_type == 6:
556 (stream_id,) = self._unpack_payload("Q", data)
557 return _LttngLiveViewerGetMetadataStreamDataCommand(version, stream_id)
558 elif cmd_type == 7:
559 (tracing_session_id,) = self._unpack_payload("Q", data)
560 return _LttngLiveViewerGetNewStreamInfosCommand(version, tracing_session_id)
561 elif cmd_type == 8:
562 return _LttngLiveViewerCreateViewerSessionCommand(version)
563 elif cmd_type == 9:
564 (tracing_session_id,) = self._unpack_payload("Q", data)
565 return _LttngLiveViewerDetachFromTracingSessionCommand(
566 version, tracing_session_id
567 )
568 else:
569 raise RuntimeError("Unknown command type {}".format(cmd_type))
570
571 def _pack(self, fmt: str, *args: Any):
572 # Force network byte order
573 return struct.pack("!" + fmt, *args)
574
575 def _encode_zero_padded_str(self, string: str, length: int):
576 data = string.encode()
577 return data.ljust(length, b"\x00")
578
579 def _encode_stream_info(self, info: _LttngLiveViewerStreamInfo):
580 data = self._pack("QQI", info.id, info.trace_id, int(info.is_metadata))
581 data += self._encode_zero_padded_str(info.path, 4096)
582 data += self._encode_zero_padded_str(info.channel_name, 255)
583 return data
584
585 def _get_has_new_stuff_flags(
586 self, has_new_metadata: bool, has_new_data_streams: bool
587 ):
588 flags = 0
589
590 if has_new_metadata:
591 flags |= 1
592
593 if has_new_data_streams:
594 flags |= 2
595
596 return flags
597
598 def encode(
599 self,
600 reply: _LttngLiveViewerReply,
601 ) -> bytes:
602 if type(reply) is _LttngLiveViewerConnectReply:
603 data = self._pack(
604 "QIII", reply.viewer_session_id, reply.major, reply.minor, 2
605 )
606 elif type(reply) is _LttngLiveViewerGetTracingSessionInfosReply:
607 data = self._pack("I", len(reply.tracing_session_infos))
608
609 for info in reply.tracing_session_infos:
610 data += self._pack(
611 "QIII",
612 info.tracing_session_id,
613 info.live_timer_freq,
614 info.client_count,
615 info.stream_count,
616 )
617 data += self._encode_zero_padded_str(info.hostname, 64)
618 data += self._encode_zero_padded_str(info.name, 255)
619 elif type(reply) is _LttngLiveViewerAttachToTracingSessionReply:
620 data = self._pack("II", reply.status, len(reply.stream_infos))
621
622 for info in reply.stream_infos:
623 data += self._encode_stream_info(info)
624 elif type(reply) is _LttngLiveViewerGetNextDataStreamIndexEntryReply:
625 index_format = "QQQQQQQII"
626 entry = reply.index_entry
627 flags = self._get_has_new_stuff_flags(
628 reply.has_new_metadata, reply.has_new_data_stream
629 )
630
631 if isinstance(entry, _LttngDataStreamIndexEntry):
632 data = self._pack(
633 index_format,
634 entry.offset_bytes,
635 entry.total_size_bits,
636 entry.content_size_bits,
637 entry.timestamp_begin,
638 entry.timestamp_end,
639 entry.events_discarded,
640 entry.stream_class_id,
641 reply.status,
642 flags,
643 )
644 else:
645 data = self._pack(
646 index_format,
647 0,
648 0,
649 0,
650 0,
651 entry.timestamp,
652 0,
653 entry.stream_class_id,
654 reply.status,
655 flags,
656 )
657 elif type(reply) is _LttngLiveViewerGetDataStreamPacketDataReply:
658 flags = self._get_has_new_stuff_flags(
659 reply.has_new_metadata, reply.has_new_data_stream
660 )
661 data = self._pack("III", reply.status, len(reply.data), flags)
662 data += reply.data
663 elif type(reply) is _LttngLiveViewerGetMetadataStreamDataContentReply:
664 data = self._pack("QI", len(reply.data), reply.status)
665 data += reply.data
666 elif type(reply) is _LttngLiveViewerGetNewStreamInfosReply:
667 data = self._pack("II", reply.status, len(reply.stream_infos))
668
669 for info in reply.stream_infos:
670 data += self._encode_stream_info(info)
671 elif type(reply) is _LttngLiveViewerCreateViewerSessionReply:
672 data = self._pack("I", reply.status)
673 elif type(reply) is _LttngLiveViewerDetachFromTracingSessionReply:
674 data = self._pack("I", reply.status)
675 else:
676 raise ValueError(
677 "Unknown reply object with class `{}`".format(reply.__class__.__name__)
678 )
679
680 return data
681
682
683 def _get_entry_timestamp_begin(
684 entry: _LttngIndexEntryT,
685 ):
686 if isinstance(entry, _LttngDataStreamBeaconIndexEntry):
687 return entry.timestamp
688 else:
689 return entry.timestamp_begin
690
691
692 # The index of an LTTng data stream, a sequence of index entries.
693 class _LttngDataStreamIndex(Sequence[_LttngIndexEntryT]):
694 def __init__(self, path: str, beacons: Optional[tjson.ArrayVal]):
695 self._path = path
696 self._build()
697
698 if beacons:
699 stream_class_id = self._entries[0].stream_class_id
700
701 beacons_list = [] # type: list[_LttngDataStreamBeaconIndexEntry]
702 for ts in beacons.iter(tjson.IntVal):
703 beacons_list.append(
704 _LttngDataStreamBeaconIndexEntry(stream_class_id, ts.val)
705 )
706
707 self._add_beacons(beacons_list)
708
709 logging.info(
710 'Built data stream index entries: path="{}", count={}'.format(
711 path, len(self._entries)
712 )
713 )
714
715 def _build(self):
716 self._entries = [] # type: list[_LttngIndexEntryT]
717
718 with open(self._path, "rb") as f:
719 # Read header first
720 fmt = ">IIII"
721 size = struct.calcsize(fmt)
722 data = f.read(size)
723 assert len(data) == size
724 magic, _, _, index_entry_length = struct.unpack(fmt, data)
725 assert magic == 0xC1F1DCC1
726
727 # Read index entries
728 fmt = ">QQQQQQQ"
729 size = struct.calcsize(fmt)
730
731 while True:
732 logging.debug(
733 'Decoding data stream index entry: path="{}", offset={}'.format(
734 self._path, f.tell()
735 )
736 )
737 data = f.read(size)
738
739 if not data:
740 # Done
741 break
742
743 assert len(data) == size
744 (
745 offset_bytes,
746 total_size_bits,
747 content_size_bits,
748 timestamp_begin,
749 timestamp_end,
750 events_discarded,
751 stream_class_id,
752 ) = struct.unpack(fmt, data)
753
754 self._entries.append(
755 _LttngDataStreamIndexEntry(
756 offset_bytes,
757 total_size_bits,
758 content_size_bits,
759 timestamp_begin,
760 timestamp_end,
761 events_discarded,
762 stream_class_id,
763 )
764 )
765
766 # Skip anything else before the next entry
767 f.seek(index_entry_length - size, os.SEEK_CUR)
768
769 def _add_beacons(self, beacons: Iterable[_LttngDataStreamBeaconIndexEntry]):
770 # Assumes entries[n + 1].timestamp_end >= entries[n].timestamp_begin
771 def sort_key(
772 entry: Union[_LttngDataStreamIndexEntry, _LttngDataStreamBeaconIndexEntry],
773 ) -> int:
774 if isinstance(entry, _LttngDataStreamBeaconIndexEntry):
775 return entry.timestamp
776 else:
777 return entry.timestamp_end
778
779 self._entries += beacons
780 self._entries.sort(key=sort_key)
781
782 @overload
783 def __getitem__(self, index: int) -> _LttngIndexEntryT:
784 ...
785
786 @overload
787 def __getitem__(self, index: slice) -> Sequence[_LttngIndexEntryT]: # noqa: F811
788 ...
789
790 def __getitem__( # noqa: F811
791 self, index: Union[int, slice]
792 ) -> Union[_LttngIndexEntryT, Sequence[_LttngIndexEntryT],]:
793 return self._entries[index]
794
795 def __len__(self):
796 return len(self._entries)
797
798 @property
799 def path(self):
800 return self._path
801
802
803 # Any LTTng stream (metadata or data).
804 class _LttngStream(ABC):
805 @abstractmethod
806 def __init__(self, creation_timestamp: int):
807 self._creation_timestamp = creation_timestamp
808
809 @property
810 def creation_timestamp(self):
811 return self._creation_timestamp
812
813
814 # An LTTng data stream.
815 class _LttngDataStream(_LttngStream):
816 def __init__(
817 self, path: str, beacons_json: Optional[tjson.ArrayVal], creation_timestamp: int
818 ):
819 super().__init__(creation_timestamp)
820 self._path = path
821 filename = os.path.basename(path)
822 match = re.match(r"(.*)_\d+", filename)
823 if not match:
824 raise RuntimeError(
825 "Unexpected data stream file name pattern: {}".format(filename)
826 )
827
828 self._channel_name = match.group(1)
829 trace_dir = os.path.dirname(path)
830 index_path = os.path.join(trace_dir, "index", filename + ".idx")
831 self._index = _LttngDataStreamIndex(index_path, beacons_json)
832 assert os.path.isfile(path)
833 self._file = open(path, "rb")
834 logging.info(
835 'Built data stream: path="{}", channel-name="{}"'.format(
836 path, self._channel_name
837 )
838 )
839
840 @property
841 def path(self):
842 return self._path
843
844 @property
845 def channel_name(self):
846 return self._channel_name
847
848 @property
849 def index(self):
850 return self._index
851
852 def get_data(self, offset_bytes: int, len_bytes: int):
853 self._file.seek(offset_bytes)
854 return self._file.read(len_bytes)
855
856
857 class _LttngMetadataStreamSection:
858 def __init__(self, timestamp: int, data: Optional[bytes]):
859 self._timestamp = timestamp
860 if data is None:
861 self._data = bytes()
862 else:
863 self._data = data
864 logging.info(
865 "Built metadata stream section: ts={}, data-len={}".format(
866 self._timestamp, len(self._data)
867 )
868 )
869
870 @property
871 def timestamp(self):
872 return self._timestamp
873
874 @property
875 def data(self):
876 return self._data
877
878
879 # An LTTng metadata stream.
880 class _LttngMetadataStream(_LttngStream):
881 def __init__(
882 self,
883 metadata_file_path: str,
884 config_sections: Sequence[_LttngMetadataStreamSection],
885 creation_timestamp: int,
886 ):
887 super().__init__(creation_timestamp)
888 self._path = metadata_file_path
889 self._sections = config_sections
890 logging.info(
891 "Built metadata stream: path={}, section-len={}".format(
892 self._path, len(self._sections)
893 )
894 )
895
896 @property
897 def path(self):
898 return self._path
899
900 @property
901 def sections(self):
902 return self._sections
903
904
905 class LttngMetadataConfigSection:
906 def __init__(self, line: int, timestamp: int, is_empty: bool):
907 self._line = line
908 self._timestamp = timestamp
909 self._is_empty = is_empty
910
911 @property
912 def line(self):
913 return self._line
914
915 @property
916 def timestamp(self):
917 return self._timestamp
918
919 @property
920 def is_empty(self):
921 return self._is_empty
922
923
924 def _parse_metadata_sections_config(metadata_sections_json: tjson.ArrayVal):
925 metadata_sections = [] # type: list[LttngMetadataConfigSection]
926 append_empty_section = False
927 last_timestamp = 0
928 last_line = 0
929
930 for section in metadata_sections_json:
931 if isinstance(section, tjson.StrVal):
932 if section.val == "empty":
933 # Found an empty section marker. Actually append the
934 # section at the timestamp of the next concrete section.
935 append_empty_section = True
936 else:
937 raise ValueError("Invalid string value at {}.".format(section.path))
938 elif isinstance(section, tjson.ObjVal):
939 line = section.at("line", tjson.IntVal).val
940 ts = section.at("timestamp", tjson.IntVal).val
941
942 # Sections' timestamps and lines must both be increasing.
943 assert ts > last_timestamp
944 last_timestamp = ts
945
946 assert line > last_line
947 last_line = line
948
949 if append_empty_section:
950 metadata_sections.append(LttngMetadataConfigSection(line, ts, True))
951 append_empty_section = False
952
953 metadata_sections.append(LttngMetadataConfigSection(line, ts, False))
954 else:
955 raise TypeError(
956 "`{}`: expecting a string or object value".format(section.path)
957 )
958
959 return metadata_sections
960
961
962 def _split_metadata_sections(
963 metadata_file_path: str, metadata_sections_json: tjson.ArrayVal
964 ):
965 metadata_sections = _parse_metadata_sections_config(metadata_sections_json)
966
967 sections = [] # type: list[_LttngMetadataStreamSection]
968 with open(metadata_file_path, "r") as metadata_file:
969 metadata_lines = [line for line in metadata_file]
970
971 metadata_section_idx = 0
972 curr_metadata_section = bytearray()
973
974 for idx, line_content in enumerate(metadata_lines):
975 # Add one to the index to convert from the zero-indexing of the
976 # enumerate() function to the one-indexing used by humans when
977 # viewing a text file.
978 curr_line_number = idx + 1
979
980 # If there are no more sections, simply append the line.
981 if metadata_section_idx + 1 >= len(metadata_sections):
982 curr_metadata_section += bytearray(line_content, "utf8")
983 continue
984
985 next_section_line_number = metadata_sections[metadata_section_idx + 1].line
986
987 # If the next section begins at the current line, create a
988 # section with the metadata we gathered so far.
989 if curr_line_number >= next_section_line_number:
990 # Flushing the metadata of the current section.
991 sections.append(
992 _LttngMetadataStreamSection(
993 metadata_sections[metadata_section_idx].timestamp,
994 bytes(curr_metadata_section),
995 )
996 )
997
998 # Move to the next section.
999 metadata_section_idx += 1
1000
1001 # Clear old content and append current line for the next section.
1002 curr_metadata_section.clear()
1003 curr_metadata_section += bytearray(line_content, "utf8")
1004
1005 # Append any empty sections.
1006 while metadata_sections[metadata_section_idx].is_empty:
1007 sections.append(
1008 _LttngMetadataStreamSection(
1009 metadata_sections[metadata_section_idx].timestamp, None
1010 )
1011 )
1012 metadata_section_idx += 1
1013 else:
1014 # Append line_content to the current metadata section.
1015 curr_metadata_section += bytearray(line_content, "utf8")
1016
1017 # We iterated over all the lines of the metadata file. Close the current section.
1018 sections.append(
1019 _LttngMetadataStreamSection(
1020 metadata_sections[metadata_section_idx].timestamp,
1021 bytes(curr_metadata_section),
1022 )
1023 )
1024
1025 return sections
1026
1027
1028 _StreamBeaconsT = Dict[str, Iterable[int]]
1029
1030
1031 # An LTTng trace, a sequence of LTTng data streams.
1032 class LttngTrace(Sequence[_LttngDataStream]):
1033 def __init__(
1034 self,
1035 trace_dir: str,
1036 metadata_sections_json: Optional[tjson.ArrayVal],
1037 beacons_json: Optional[tjson.ObjVal],
1038 creation_timestamp: int,
1039 ):
1040 self._path = trace_dir
1041 self._creation_timestamp = creation_timestamp
1042 self._create_metadata_stream(trace_dir, metadata_sections_json)
1043 self._create_data_streams(trace_dir, beacons_json)
1044 logging.info('Built trace: path="{}"'.format(trace_dir))
1045
1046 def _create_data_streams(
1047 self, trace_dir: str, beacons_json: Optional[tjson.ObjVal]
1048 ):
1049 data_stream_paths = [] # type: list[str]
1050
1051 for filename in os.listdir(trace_dir):
1052 path = os.path.join(trace_dir, filename)
1053
1054 if not os.path.isfile(path):
1055 continue
1056
1057 if filename.startswith("."):
1058 continue
1059
1060 if filename == "metadata":
1061 continue
1062
1063 data_stream_paths.append(path)
1064
1065 data_stream_paths.sort()
1066 self._data_streams = [] # type: list[_LttngDataStream]
1067
1068 for data_stream_path in data_stream_paths:
1069 stream_name = os.path.basename(data_stream_path)
1070 this_beacons_json = None
1071 if beacons_json is not None and stream_name in beacons_json:
1072 this_beacons_json = beacons_json.at(stream_name, tjson.ArrayVal)
1073
1074 self._data_streams.append(
1075 _LttngDataStream(
1076 data_stream_path, this_beacons_json, self._creation_timestamp
1077 )
1078 )
1079
1080 def _create_metadata_stream(
1081 self, trace_dir: str, metadata_sections_json: Optional[tjson.ArrayVal]
1082 ):
1083 metadata_path = os.path.join(trace_dir, "metadata")
1084 metadata_sections = [] # type: list[_LttngMetadataStreamSection]
1085
1086 if metadata_sections_json is None:
1087 with open(metadata_path, "rb") as metadata_file:
1088 metadata_sections.append(
1089 _LttngMetadataStreamSection(0, metadata_file.read())
1090 )
1091 else:
1092 metadata_sections = _split_metadata_sections(
1093 metadata_path, metadata_sections_json
1094 )
1095
1096 self._metadata_stream = _LttngMetadataStream(
1097 metadata_path, metadata_sections, self.creation_timestamp
1098 )
1099
1100 @property
1101 def path(self):
1102 return self._path
1103
1104 @property
1105 def metadata_stream(self):
1106 return self._metadata_stream
1107
1108 @property
1109 def creation_timestamp(self):
1110 return self._creation_timestamp
1111
1112 @overload
1113 def __getitem__(self, index: int) -> _LttngDataStream:
1114 ...
1115
1116 @overload
1117 def __getitem__(self, index: slice) -> Sequence[_LttngDataStream]: # noqa: F811
1118 ...
1119
1120 def __getitem__( # noqa: F811
1121 self, index: Union[int, slice]
1122 ) -> Union[_LttngDataStream, Sequence[_LttngDataStream]]:
1123 return self._data_streams[index]
1124
1125 def __len__(self):
1126 return len(self._data_streams)
1127
1128
1129 # Stream (metadata or data) state specific to the LTTng live protocol.
1130 class _LttngLiveViewerSessionStreamState:
1131 @abstractmethod
1132 def __init__(self):
1133 # A stream is considered "announced" when it has been returned
1134 # to the LTTng live client in response to a "get new stream
1135 # infos" (`_LttngLiveViewerGetNewStreamInfosCommand`) command.
1136 self._announced = False # type: bool
1137 pass
1138
1139 @property
1140 def is_announced(self):
1141 return self._announced
1142
1143 def mark_as_announced(self):
1144 self._announced = True
1145
1146 @property
1147 @abstractmethod
1148 def stream(self) -> _LttngStream:
1149 pass
1150
1151
1152 # The state of a single data stream.
1153 class _LttngLiveViewerSessionDataStreamState(_LttngLiveViewerSessionStreamState):
1154 def __init__(
1155 self,
1156 ts_state: "_LttngLiveViewerSessionTracingSessionState",
1157 info: _LttngLiveViewerStreamInfo,
1158 data_stream: _LttngDataStream,
1159 metadata_stream_id: int,
1160 ):
1161 super().__init__()
1162 self._ts_state = ts_state
1163 self._info = info
1164 self._data_stream = data_stream
1165 self._metadata_stream_id = metadata_stream_id
1166 self._cur_index_entry_index = 0
1167 fmt = 'Built data stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
1168 logging.info(
1169 fmt.format(
1170 info.id,
1171 ts_state.tracing_session_descriptor.info.tracing_session_id,
1172 ts_state.tracing_session_descriptor.info.name,
1173 data_stream.path,
1174 )
1175 )
1176
1177 @property
1178 def tracing_session_state(self):
1179 return self._ts_state
1180
1181 @property
1182 def info(self):
1183 return self._info
1184
1185 @property
1186 def stream(self):
1187 return self._data_stream
1188
1189 @property
1190 def cur_index_entry(self):
1191 if self._cur_index_entry_index == len(self._data_stream.index):
1192 return
1193
1194 return self._data_stream.index[self._cur_index_entry_index]
1195
1196 @property
1197 def metadata_stream_id(self):
1198 return self._metadata_stream_id
1199
1200 def goto_next_index_entry(self):
1201 self._cur_index_entry_index += 1
1202
1203
1204 # The state of a single metadata stream.
1205 class _LttngLiveViewerSessionMetadataStreamState(_LttngLiveViewerSessionStreamState):
1206 def __init__(
1207 self,
1208 ts_state: "_LttngLiveViewerSessionTracingSessionState",
1209 info: _LttngLiveViewerStreamInfo,
1210 metadata_stream: _LttngMetadataStream,
1211 ):
1212 super().__init__()
1213 self._ts_state = ts_state
1214 self._info = info
1215 self._metadata_stream = metadata_stream
1216 self._cur_metadata_stream_section_index = 0
1217 if len(metadata_stream.sections) > 1:
1218 self._next_metadata_stream_section_timestamp = metadata_stream.sections[
1219 1
1220 ].timestamp
1221 else:
1222 self._next_metadata_stream_section_timestamp = None
1223
1224 self._all_data_is_sent = False
1225 fmt = 'Built metadata stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
1226 logging.info(
1227 fmt.format(
1228 info.id,
1229 ts_state.tracing_session_descriptor.info.tracing_session_id,
1230 ts_state.tracing_session_descriptor.info.name,
1231 metadata_stream.path,
1232 )
1233 )
1234
1235 @property
1236 def info(self):
1237 return self._info
1238
1239 @property
1240 def stream(self):
1241 return self._metadata_stream
1242
1243 @property
1244 def all_data_is_sent(self):
1245 return self._all_data_is_sent
1246
1247 @all_data_is_sent.setter
1248 def all_data_is_sent(self, value: bool):
1249 self._all_data_is_sent = value
1250
1251 @property
1252 def cur_section(self):
1253 fmt = "Get current metadata section: section-idx={}"
1254 logging.info(fmt.format(self._cur_metadata_stream_section_index))
1255 if self._cur_metadata_stream_section_index == len(
1256 self._metadata_stream.sections
1257 ):
1258 return
1259
1260 return self._metadata_stream.sections[self._cur_metadata_stream_section_index]
1261
1262 def goto_next_section(self):
1263 self._cur_metadata_stream_section_index += 1
1264 if self.cur_section:
1265 self._next_metadata_stream_section_timestamp = self.cur_section.timestamp
1266 else:
1267 self._next_metadata_stream_section_timestamp = None
1268
1269 @property
1270 def next_section_timestamp(self):
1271 return self._next_metadata_stream_section_timestamp
1272
1273
1274 # A tracing session descriptor.
1275 #
1276 # In the constructor, `traces` is a list of LTTng traces (`LttngTrace`
1277 # objects).
1278 class LttngTracingSessionDescriptor:
1279 def __init__(
1280 self,
1281 name: str,
1282 tracing_session_id: int,
1283 hostname: str,
1284 live_timer_freq: int,
1285 client_count: int,
1286 traces: Iterable[LttngTrace],
1287 ):
1288 for trace in traces:
1289 if name not in trace.path:
1290 fmt = "Tracing session name must be part of every trace path (`{}` not found in `{}`)"
1291 raise ValueError(fmt.format(name, trace.path))
1292
1293 self._traces = traces
1294 stream_count = sum([len(t) + 1 for t in traces])
1295 self._info = _LttngLiveViewerTracingSessionInfo(
1296 tracing_session_id,
1297 live_timer_freq,
1298 client_count,
1299 stream_count,
1300 hostname,
1301 name,
1302 )
1303
1304 @property
1305 def traces(self):
1306 return self._traces
1307
1308 @property
1309 def info(self):
1310 return self._info
1311
1312
1313 # The state of a tracing session.
1314 class _LttngLiveViewerSessionTracingSessionState:
1315 def __init__(self, tc_descr: LttngTracingSessionDescriptor, base_stream_id: int):
1316 self._tc_descr = tc_descr
1317 self._client_visible_stream_infos = [] # type: list[_LttngLiveViewerStreamInfo]
1318 self._ds_states = {} # type: dict[int, _LttngLiveViewerSessionDataStreamState]
1319 self._ms_states = (
1320 {}
1321 ) # type: dict[int, _LttngLiveViewerSessionMetadataStreamState]
1322 self._last_delivered_index_timestamp = 0
1323 self._last_allocated_stream_id = base_stream_id
1324
1325 for trace in tc_descr.traces:
1326 trace_stream_infos = [] # type: list[_LttngLiveViewerStreamInfo]
1327 trace_id = self._last_allocated_stream_id * 1000
1328
1329 # Metadata stream -> stream info and metadata stream state
1330 info = _LttngLiveViewerStreamInfo(
1331 self._last_allocated_stream_id,
1332 trace_id,
1333 True,
1334 trace.metadata_stream.path,
1335 "metadata",
1336 )
1337
1338 trace_stream_infos.append(info)
1339 self._ms_states[
1340 self._last_allocated_stream_id
1341 ] = _LttngLiveViewerSessionMetadataStreamState(
1342 self, info, trace.metadata_stream
1343 )
1344 metadata_stream_id = self._last_allocated_stream_id
1345 self._last_allocated_stream_id += 1
1346
1347 # Data streams -> stream infos and data stream states
1348 for data_stream in trace:
1349 info = _LttngLiveViewerStreamInfo(
1350 self._last_allocated_stream_id,
1351 trace_id,
1352 False,
1353 data_stream.path,
1354 data_stream.channel_name,
1355 )
1356 trace_stream_infos.append(info)
1357 self._ds_states[
1358 self._last_allocated_stream_id
1359 ] = _LttngLiveViewerSessionDataStreamState(
1360 self, info, data_stream, metadata_stream_id
1361 )
1362 self._last_allocated_stream_id += 1
1363
1364 if trace.creation_timestamp == 0:
1365 # Only announce streams for traces that are created at
1366 # the origin.
1367 #
1368 # The rest of the streams will be discovered by the
1369 # client as indexes are received with a "has new data
1370 # streams" flag set in the reply.
1371 self._client_visible_stream_infos.extend(trace_stream_infos)
1372
1373 self._is_attached = False
1374 fmt = 'Built tracing session state: id={}, name="{}"'
1375 logging.info(fmt.format(tc_descr.info.tracing_session_id, tc_descr.info.name))
1376
1377 @property
1378 def tracing_session_descriptor(self):
1379 return self._tc_descr
1380
1381 @property
1382 def data_stream_states(self):
1383 return self._ds_states
1384
1385 @property
1386 def metadata_stream_states(self):
1387 return self._ms_states
1388
1389 @property
1390 def client_visible_stream_infos(self):
1391 return self._client_visible_stream_infos
1392
1393 @property
1394 def has_new_metadata(self):
1395 return any(
1396 [
1397 ms.is_announced and not ms.all_data_is_sent
1398 for ms in self._ms_states.values()
1399 ]
1400 )
1401
1402 @property
1403 def is_attached(self):
1404 return self._is_attached
1405
1406 @is_attached.setter
1407 def is_attached(self, value: bool):
1408 self._is_attached = value
1409
1410
1411 def needs_new_metadata_section(
1412 metadata_stream_state: _LttngLiveViewerSessionMetadataStreamState,
1413 latest_timestamp: int,
1414 ):
1415 if metadata_stream_state.next_section_timestamp is None:
1416 return False
1417
1418 if latest_timestamp >= metadata_stream_state.next_section_timestamp:
1419 return True
1420 else:
1421 return False
1422
1423
1424 # An LTTng live viewer session manages a view on tracing sessions
1425 # and replies to commands accordingly.
1426 class _LttngLiveViewerSession:
1427 def __init__(
1428 self,
1429 viewer_session_id: int,
1430 tracing_session_descriptors: Iterable[LttngTracingSessionDescriptor],
1431 max_query_data_response_size: Optional[int],
1432 ):
1433 self._viewer_session_id = viewer_session_id
1434 self._ts_states = (
1435 {}
1436 ) # type: dict[int, _LttngLiveViewerSessionTracingSessionState]
1437 self._stream_states = (
1438 {}
1439 ) # type: dict[int, _LttngLiveViewerSessionDataStreamState | _LttngLiveViewerSessionMetadataStreamState]
1440 self._max_query_data_response_size = max_query_data_response_size
1441 total_stream_infos = 0
1442
1443 for ts_descr in tracing_session_descriptors:
1444 ts_state = _LttngLiveViewerSessionTracingSessionState(
1445 ts_descr, total_stream_infos
1446 )
1447 ts_id = ts_state.tracing_session_descriptor.info.tracing_session_id
1448 self._ts_states[ts_id] = ts_state
1449 total_stream_infos += len(ts_state.client_visible_stream_infos)
1450
1451 # Update session's stream states to have the new states
1452 self._stream_states.update(ts_state.data_stream_states)
1453 self._stream_states.update(ts_state.metadata_stream_states)
1454
1455 self._command_handlers = {
1456 _LttngLiveViewerAttachToTracingSessionCommand: self._handle_attach_to_tracing_session_command,
1457 _LttngLiveViewerCreateViewerSessionCommand: self._handle_create_viewer_session_command,
1458 _LttngLiveViewerDetachFromTracingSessionCommand: self._handle_detach_from_tracing_session_command,
1459 _LttngLiveViewerGetDataStreamPacketDataCommand: self._handle_get_data_stream_packet_data_command,
1460 _LttngLiveViewerGetMetadataStreamDataCommand: self._handle_get_metadata_stream_data_command,
1461 _LttngLiveViewerGetNewStreamInfosCommand: self._handle_get_new_stream_infos_command,
1462 _LttngLiveViewerGetNextDataStreamIndexEntryCommand: self._handle_get_next_data_stream_index_entry_command,
1463 _LttngLiveViewerGetTracingSessionInfosCommand: self._handle_get_tracing_session_infos_command,
1464 } # type: dict[type[_LttngLiveViewerCommand], Callable[[Any], _LttngLiveViewerReply]]
1465
1466 @property
1467 def viewer_session_id(self):
1468 return self._viewer_session_id
1469
1470 def _get_tracing_session_state(self, tracing_session_id: int):
1471 if tracing_session_id not in self._ts_states:
1472 raise RuntimeError(
1473 "Unknown tracing session ID {}".format(tracing_session_id)
1474 )
1475
1476 return self._ts_states[tracing_session_id]
1477
1478 def _get_data_stream_state(self, stream_id: int):
1479 if stream_id not in self._stream_states:
1480 RuntimeError("Unknown stream ID {}".format(stream_id))
1481
1482 stream = self._stream_states[stream_id]
1483 if type(stream) is not _LttngLiveViewerSessionDataStreamState:
1484 raise RuntimeError("Stream is not a data stream")
1485
1486 return stream
1487
1488 def _get_metadata_stream_state(self, stream_id: int):
1489 if stream_id not in self._stream_states:
1490 RuntimeError("Unknown stream ID {}".format(stream_id))
1491
1492 stream = self._stream_states[stream_id]
1493 if type(stream) is not _LttngLiveViewerSessionMetadataStreamState:
1494 raise RuntimeError("Stream is not a metadata stream")
1495
1496 return stream
1497
1498 def handle_command(self, cmd: _LttngLiveViewerCommand):
1499 logging.info(
1500 "Handling command in viewer session: cmd-cls-name={}".format(
1501 cmd.__class__.__name__
1502 )
1503 )
1504 cmd_type = type(cmd)
1505
1506 if cmd_type not in self._command_handlers:
1507 raise RuntimeError(
1508 "Unexpected command: cmd-cls-name={}".format(cmd.__class__.__name__)
1509 )
1510
1511 return self._command_handlers[cmd_type](cmd)
1512
1513 def _handle_attach_to_tracing_session_command(
1514 self, cmd: _LttngLiveViewerAttachToTracingSessionCommand
1515 ):
1516 fmt = 'Handling "attach to tracing session" command: ts-id={}, offset={}, seek-type={}'
1517 logging.info(fmt.format(cmd.tracing_session_id, cmd.offset, cmd.seek_type))
1518 ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
1519 info = ts_state.tracing_session_descriptor.info
1520
1521 if ts_state.is_attached:
1522 raise RuntimeError(
1523 "Cannot attach to tracing session `{}`: viewer is already attached".format(
1524 info.name
1525 )
1526 )
1527
1528 ts_state.is_attached = True
1529 status = _LttngLiveViewerAttachToTracingSessionReply.Status.OK
1530 stream_infos_to_announce = ts_state.client_visible_stream_infos
1531
1532 # Mark stream infos transmitted as part of the reply as
1533 # announced.
1534 for si in stream_infos_to_announce:
1535 if si.is_metadata:
1536 self._get_metadata_stream_state(si.id).mark_as_announced()
1537 else:
1538 self._get_data_stream_state(si.id).mark_as_announced()
1539
1540 return _LttngLiveViewerAttachToTracingSessionReply(
1541 status, stream_infos_to_announce
1542 )
1543
1544 def _handle_detach_from_tracing_session_command(
1545 self, cmd: _LttngLiveViewerDetachFromTracingSessionCommand
1546 ):
1547 fmt = 'Handling "detach from tracing session" command: ts-id={}'
1548 logging.info(fmt.format(cmd.tracing_session_id))
1549 ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
1550 info = ts_state.tracing_session_descriptor.info
1551
1552 if not ts_state.is_attached:
1553 raise RuntimeError(
1554 "Cannot detach to tracing session `{}`: viewer is not attached".format(
1555 info.name
1556 )
1557 )
1558
1559 ts_state.is_attached = False
1560 status = _LttngLiveViewerDetachFromTracingSessionReply.Status.OK
1561 return _LttngLiveViewerDetachFromTracingSessionReply(status)
1562
1563 @staticmethod
1564 def _stream_is_ready(
1565 stream_state: _LttngLiveViewerSessionStreamState, creation_timestamp: int
1566 ):
1567 return (
1568 not stream_state.is_announced
1569 and stream_state.stream.creation_timestamp <= creation_timestamp
1570 )
1571
1572 def _needs_new_streams(self, current_timestamp: int):
1573 return any(
1574 self._stream_is_ready(ss, current_timestamp)
1575 for ss in self._stream_states.values()
1576 )
1577
1578 def _handle_get_next_data_stream_index_entry_command(
1579 self, cmd: _LttngLiveViewerGetNextDataStreamIndexEntryCommand
1580 ):
1581 fmt = 'Handling "get next data stream index entry" command: stream-id={}'
1582 logging.info(fmt.format(cmd.stream_id))
1583 stream_state = self._get_data_stream_state(cmd.stream_id)
1584 metadata_stream_state = self._get_metadata_stream_state(
1585 stream_state.metadata_stream_id
1586 )
1587
1588 if stream_state.cur_index_entry is None:
1589 # The viewer is done reading this stream
1590 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.HUP
1591
1592 # Dummy data stream index entry to use with the `HUP` status
1593 # (the reply needs one, but the viewer ignores it)
1594 index_entry = _LttngDataStreamIndexEntry(0, 0, 0, 0, 0, 0, 0)
1595
1596 return _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1597 status, index_entry, False, False
1598 )
1599
1600 timestamp_begin = _get_entry_timestamp_begin(stream_state.cur_index_entry)
1601
1602 if needs_new_metadata_section(metadata_stream_state, timestamp_begin):
1603 metadata_stream_state.all_data_is_sent = False
1604 metadata_stream_state.goto_next_section()
1605
1606 # The viewer only checks the `has_new_metadata` flag if the
1607 # reply's status is `OK`, so we need to provide an index here
1608 has_new_metadata = stream_state.tracing_session_state.has_new_metadata
1609 if isinstance(stream_state.cur_index_entry, _LttngDataStreamIndexEntry):
1610 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.OK
1611 else:
1612 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.INACTIVE
1613
1614 reply = _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1615 status,
1616 stream_state.cur_index_entry,
1617 has_new_metadata,
1618 self._needs_new_streams(timestamp_begin),
1619 )
1620 self._last_delivered_index_timestamp_begin = timestamp_begin
1621 stream_state.goto_next_index_entry()
1622 return reply
1623
1624 def _handle_get_data_stream_packet_data_command(
1625 self, cmd: _LttngLiveViewerGetDataStreamPacketDataCommand
1626 ):
1627 fmt = 'Handling "get data stream packet data" command: stream-id={}, offset={}, req-length={}'
1628 logging.info(fmt.format(cmd.stream_id, cmd.offset, cmd.req_length))
1629 stream_state = self._get_data_stream_state(cmd.stream_id)
1630 data_response_length = cmd.req_length
1631
1632 if stream_state.tracing_session_state.has_new_metadata:
1633 status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.ERROR
1634 return _LttngLiveViewerGetDataStreamPacketDataReply(
1635 status, bytes(), True, False
1636 )
1637
1638 if self._max_query_data_response_size:
1639 # Enforce a server side limit on the query requested length.
1640 # To ensure that the transaction terminate take the minimum of both
1641 # value.
1642 data_response_length = min(
1643 cmd.req_length, self._max_query_data_response_size
1644 )
1645 fmt = 'Limiting "get data stream packet data" command: req-length={} actual response size={}'
1646 logging.info(fmt.format(cmd.req_length, data_response_length))
1647
1648 data = stream_state.stream.get_data(cmd.offset, data_response_length)
1649 status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.OK
1650 return _LttngLiveViewerGetDataStreamPacketDataReply(status, data, False, False)
1651
1652 def _handle_get_metadata_stream_data_command(
1653 self, cmd: _LttngLiveViewerGetMetadataStreamDataCommand
1654 ):
1655 fmt = 'Handling "get metadata stream data" command: stream-id={}'
1656 logging.info(fmt.format(cmd.stream_id))
1657 metadata_stream_state = self._get_metadata_stream_state(cmd.stream_id)
1658
1659 if metadata_stream_state.all_data_is_sent:
1660 status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.NO_NEW
1661 return _LttngLiveViewerGetMetadataStreamDataContentReply(status, bytes())
1662
1663 metadata_stream_state.all_data_is_sent = True
1664 status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.OK
1665 metadata_section = metadata_stream_state.cur_section
1666 assert metadata_section is not None
1667
1668 # If we are sending an empty section, ready the next one right away.
1669 if len(metadata_section.data) == 0:
1670 metadata_stream_state.all_data_is_sent = False
1671 metadata_stream_state.goto_next_section()
1672
1673 fmt = 'Replying to "get metadata stream data" command: metadata-size={}'
1674 logging.info(fmt.format(len(metadata_section.data)))
1675 return _LttngLiveViewerGetMetadataStreamDataContentReply(
1676 status, metadata_section.data
1677 )
1678
1679 def _get_stream_infos_ready_for_announcement(self):
1680 ready_stream_infos = [] # type: list[_LttngLiveViewerStreamInfo]
1681
1682 for ss in self._stream_states.values():
1683 if self._stream_is_ready(ss, ss.stream.creation_timestamp):
1684 ready_stream_infos.append(ss.info)
1685
1686 return ready_stream_infos
1687
1688 # A stream is considered finished if it has been announced and all
1689 # of its index entries have been provided to the client.
1690 def _all_streams_finished(self):
1691 return all(
1692 isinstance(stream_state, _LttngLiveViewerSessionMetadataStreamState)
1693 or (stream_state.cur_index_entry is None and stream_state.is_announced)
1694 for stream_state in self._stream_states.values()
1695 )
1696
1697 def _handle_get_new_stream_infos_command(
1698 self, cmd: _LttngLiveViewerGetNewStreamInfosCommand
1699 ):
1700 fmt = 'Handling "get new stream infos" command: ts-id={}'
1701 logging.info(fmt.format(cmd.tracing_session_id))
1702 newly_announced_stream_infos = self._get_stream_infos_ready_for_announcement()
1703
1704 # Mark stream infos transmitted as part of the reply as
1705 # announced.
1706 for si in newly_announced_stream_infos:
1707 if si.is_metadata:
1708 self._get_metadata_stream_state(si.id).mark_as_announced()
1709 else:
1710 self._get_data_stream_state(si.id).mark_as_announced()
1711
1712 status = _LttngLiveViewerGetNewStreamInfosReply.Status.OK
1713
1714 if len(newly_announced_stream_infos) == 0:
1715 # If all streams have been transmitted and no new traces are
1716 # scheduled for creation, hang up to signal that the tracing
1717 # session is "done".
1718 status = (
1719 _LttngLiveViewerGetNewStreamInfosReply.Status.HUP
1720 if self._all_streams_finished()
1721 else _LttngLiveViewerGetNewStreamInfosReply.Status.NO_NEW
1722 )
1723
1724 return _LttngLiveViewerGetNewStreamInfosReply(
1725 status, newly_announced_stream_infos
1726 )
1727
1728 def _handle_get_tracing_session_infos_command(
1729 self, cmd: _LttngLiveViewerGetTracingSessionInfosCommand
1730 ):
1731 logging.info('Handling "get tracing session infos" command.')
1732 infos = [
1733 tss.tracing_session_descriptor.info for tss in self._ts_states.values()
1734 ]
1735 infos.sort(key=lambda info: info.name)
1736 return _LttngLiveViewerGetTracingSessionInfosReply(infos)
1737
1738 def _handle_create_viewer_session_command(
1739 self, cmd: _LttngLiveViewerCreateViewerSessionCommand
1740 ):
1741 logging.info('Handling "create viewer session" command.')
1742 status = _LttngLiveViewerCreateViewerSessionReply.Status.OK
1743
1744 # This does nothing here. In the LTTng relay daemon, it
1745 # allocates the viewer session's state.
1746 return _LttngLiveViewerCreateViewerSessionReply(status)
1747
1748
1749 # An LTTng live TCP server.
1750 #
1751 # On creation, it binds to `localhost` on the TCP port `port` if not `None`, or
1752 # on an OS-assigned TCP port otherwise. It writes the decimal TCP port number
1753 # to a temporary port file. It renames the temporary port file to
1754 # `port_filename`.
1755 #
1756 # `tracing_session_descriptors` is a list of tracing session descriptors
1757 # (`LttngTracingSessionDescriptor`) to serve.
1758 #
1759 # This server accepts a single viewer (client).
1760 #
1761 # When the viewer closes the connection, the server's constructor
1762 # returns.
1763 class LttngLiveServer:
1764 def __init__(
1765 self,
1766 port: Optional[int],
1767 port_filename: Optional[str],
1768 tracing_session_descriptors: Iterable[LttngTracingSessionDescriptor],
1769 max_query_data_response_size: Optional[int],
1770 ):
1771 logging.info("Server configuration:")
1772
1773 logging.info(" Port file name: `{}`".format(port_filename))
1774
1775 if max_query_data_response_size is not None:
1776 logging.info(
1777 " Maximum response data query size: `{}`".format(
1778 max_query_data_response_size
1779 )
1780 )
1781
1782 for ts_descr in tracing_session_descriptors:
1783 info = ts_descr.info
1784 fmt = ' TS descriptor: name="{}", id={}, hostname="{}", live-timer-freq={}, client-count={}, stream-count={}:'
1785 logging.info(
1786 fmt.format(
1787 info.name,
1788 info.tracing_session_id,
1789 info.hostname,
1790 info.live_timer_freq,
1791 info.client_count,
1792 info.stream_count,
1793 )
1794 )
1795
1796 for trace in ts_descr.traces:
1797 logging.info(' Trace: path="{}"'.format(trace.path))
1798
1799 self._ts_descriptors = tracing_session_descriptors
1800 self._max_query_data_response_size = max_query_data_response_size
1801 self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1802 self._codec = _LttngLiveViewerProtocolCodec()
1803
1804 # Port 0: OS assigns an unused port
1805 serv_addr = ("localhost", port if port is not None else 0)
1806 self._sock.bind(serv_addr)
1807
1808 if port_filename is not None:
1809 self._write_port_to_file(port_filename)
1810
1811 print("Listening on port {}".format(self._server_port))
1812
1813 for ts_descr in tracing_session_descriptors:
1814 info = ts_descr.info
1815 print(
1816 "net://localhost:{}/host/{}/{}".format(
1817 self._server_port, info.hostname, info.name
1818 )
1819 )
1820
1821 try:
1822 self._listen()
1823 finally:
1824 self._sock.close()
1825 logging.info("Closed connection and socket.")
1826
1827 @property
1828 def _server_port(self):
1829 return self._sock.getsockname()[1]
1830
1831 def _recv_command(self):
1832 data = bytes()
1833
1834 while True:
1835 logging.info("Waiting for viewer command.")
1836 buf = self._conn.recv(128)
1837
1838 if not buf:
1839 logging.info("Client closed connection.")
1840
1841 if data:
1842 raise RuntimeError(
1843 "Client closed connection after having sent {} command bytes.".format(
1844 len(data)
1845 )
1846 )
1847
1848 return
1849
1850 logging.info("Received data from viewer: length={}".format(len(buf)))
1851
1852 data += buf
1853
1854 try:
1855 cmd = self._codec.decode(data)
1856 except struct.error as exc:
1857 raise RuntimeError("Malformed command: {}".format(exc)) from exc
1858
1859 if cmd is not None:
1860 logging.info(
1861 "Received command from viewer: cmd-cls-name={}".format(
1862 cmd.__class__.__name__
1863 )
1864 )
1865 return cmd
1866
1867 def _send_reply(self, reply: _LttngLiveViewerReply):
1868 data = self._codec.encode(reply)
1869 logging.info(
1870 "Sending reply to viewer: reply-cls-name={}, length={}".format(
1871 reply.__class__.__name__, len(data)
1872 )
1873 )
1874 self._conn.sendall(data)
1875
1876 def _handle_connection(self):
1877 # First command must be "connect"
1878 cmd = self._recv_command()
1879
1880 if type(cmd) is not _LttngLiveViewerConnectCommand:
1881 raise RuntimeError(
1882 'First command is not "connect": cmd-cls-name={}'.format(
1883 cmd.__class__.__name__
1884 )
1885 )
1886
1887 # Create viewer session (arbitrary ID 23)
1888 logging.info(
1889 "LTTng live viewer connected: version={}.{}".format(cmd.major, cmd.minor)
1890 )
1891 viewer_session = _LttngLiveViewerSession(
1892 23, self._ts_descriptors, self._max_query_data_response_size
1893 )
1894
1895 # Send "connect" reply
1896 self._send_reply(
1897 _LttngLiveViewerConnectReply(viewer_session.viewer_session_id, 2, 10)
1898 )
1899
1900 # Make the viewer session handle the remaining commands
1901 while True:
1902 cmd = self._recv_command()
1903
1904 if cmd is None:
1905 # Connection closed (at an expected location within the
1906 # conversation)
1907 return
1908
1909 self._send_reply(viewer_session.handle_command(cmd))
1910
1911 def _listen(self):
1912 logging.info("Listening: port={}".format(self._server_port))
1913 # Backlog must be present for Python version < 3.5.
1914 # 128 is an arbitrary number since we expect only 1 connection anyway.
1915 self._sock.listen(128)
1916 self._conn, viewer_addr = self._sock.accept()
1917 logging.info(
1918 "Accepted viewer: addr={}:{}".format(viewer_addr[0], viewer_addr[1])
1919 )
1920
1921 try:
1922 self._handle_connection()
1923 finally:
1924 self._conn.close()
1925
1926 def _write_port_to_file(self, port_filename: str):
1927 # Write the port number to a temporary file.
1928 with tempfile.NamedTemporaryFile(
1929 mode="w", delete=False, dir=os.path.dirname(port_filename)
1930 ) as tmp_port_file:
1931 print(self._server_port, end="", file=tmp_port_file)
1932
1933 # Rename temporary file to real file.
1934 #
1935 # For unknown reasons, on Windows, moving the port file from its
1936 # temporary location to its final location (where the user of
1937 # the server expects it to appear) may raise a `PermissionError`
1938 # exception.
1939 #
1940 # We suppose it's possible that something in the Windows kernel
1941 # hasn't completely finished using the file when we try to move
1942 # it.
1943 #
1944 # Use a wait-and-retry scheme as a (bad) workaround.
1945 num_attempts = 5
1946 retry_delay_s = 1
1947
1948 for attempt in reversed(range(num_attempts)):
1949 try:
1950 os.replace(tmp_port_file.name, port_filename)
1951 logging.info(
1952 'Renamed port file: src-path="{}", dst-path="{}"'.format(
1953 tmp_port_file.name, port_filename
1954 )
1955 )
1956 return
1957 except PermissionError:
1958 logging.info(
1959 'Permission error while attempting to rename port file; retrying in {} second: src-path="{}", dst-path="{}"'.format(
1960 retry_delay_s, tmp_port_file.name, port_filename
1961 )
1962 )
1963
1964 if attempt == 0:
1965 raise
1966
1967 time.sleep(retry_delay_s)
1968
1969
1970 def _session_descriptors_from_path(
1971 sessions_filename: str, trace_path_prefix: Optional[str]
1972 ):
1973 # File format is:
1974 #
1975 # [
1976 # {
1977 # "name": "my-session",
1978 # "id": 17,
1979 # "hostname": "myhost",
1980 # "live-timer-freq": 1000000,
1981 # "client-count": 23,
1982 # "traces": [
1983 # {
1984 # "path": "lol",
1985 # creation-timestamp: 12948
1986 # },
1987 # {
1988 # "path": "meow/mix",
1989 # "beacons": {
1990 # "my_stream": [ 5235787, 728375283 ]
1991 # },
1992 # "metadata-sections": [
1993 # {
1994 # "line": 1,
1995 # "timestamp": 0
1996 # }
1997 # ]
1998 # }
1999 # ]
2000 # }
2001 # ]
2002 with open(sessions_filename, "r") as sessions_file:
2003 sessions_json = tjson.load(sessions_file, tjson.ArrayVal)
2004
2005 sessions = [] # type: list[LttngTracingSessionDescriptor]
2006
2007 for session_json in sessions_json.iter(tjson.ObjVal):
2008 name = session_json.at("name", tjson.StrVal).val
2009 tracing_session_id = session_json.at("id", tjson.IntVal).val
2010 hostname = session_json.at("hostname", tjson.StrVal).val
2011 live_timer_freq = session_json.at("live-timer-freq", tjson.IntVal).val
2012 client_count = session_json.at("client-count", tjson.IntVal).val
2013 traces_json = session_json.at("traces", tjson.ArrayVal)
2014
2015 traces = [] # type: list[LttngTrace]
2016
2017 for trace_json in traces_json.iter(tjson.ObjVal):
2018 metadata_sections = (
2019 trace_json.at("metadata-sections", tjson.ArrayVal)
2020 if "metadata-sections" in trace_json
2021 else None
2022 )
2023 beacons = (
2024 trace_json.at("beacons", tjson.ObjVal)
2025 if "beacons" in trace_json
2026 else None
2027 )
2028 path = trace_json.at("path", tjson.StrVal).val
2029 creation_timestamp = (
2030 trace_json.at("creation-timestamp", tjson.IntVal).val
2031 if "creation-timestamp" in trace_json
2032 else 0
2033 ) # type: int
2034
2035 if not os.path.isabs(path) and trace_path_prefix:
2036 path = os.path.join(trace_path_prefix, path)
2037
2038 traces.append(
2039 LttngTrace(path, metadata_sections, beacons, creation_timestamp)
2040 )
2041
2042 sessions.append(
2043 LttngTracingSessionDescriptor(
2044 name,
2045 tracing_session_id,
2046 hostname,
2047 live_timer_freq,
2048 client_count,
2049 traces,
2050 )
2051 )
2052
2053 return sessions
2054
2055
2056 def _loglevel_parser(string: str):
2057 loglevels = {"info": logging.INFO, "warning": logging.WARNING}
2058 if string not in loglevels:
2059 msg = "{} is not a valid loglevel".format(string)
2060 raise argparse.ArgumentTypeError(msg)
2061 return loglevels[string]
2062
2063
2064 if __name__ == "__main__":
2065 logging.basicConfig(format="# %(asctime)-25s%(message)s")
2066 parser = argparse.ArgumentParser(
2067 description="LTTng-live protocol mocker", add_help=False
2068 )
2069 parser.add_argument(
2070 "--log-level",
2071 default="warning",
2072 choices=["info", "warning"],
2073 help="The loglevel to be used.",
2074 )
2075
2076 loglevel_namespace, remaining_args = parser.parse_known_args()
2077 logging.getLogger().setLevel(_loglevel_parser(loglevel_namespace.log_level))
2078
2079 parser.add_argument(
2080 "--port",
2081 help="The port to bind to. If missing, use an OS-assigned port..",
2082 type=int,
2083 )
2084 parser.add_argument(
2085 "--port-filename",
2086 help="The final port file. This file is present when the server is ready to receive connection.",
2087 )
2088 parser.add_argument(
2089 "--max-query-data-response-size",
2090 type=int,
2091 help="The maximum size of control data response in bytes",
2092 )
2093 parser.add_argument(
2094 "--trace-path-prefix",
2095 type=str,
2096 help="Prefix to prepend to the trace paths of session configurations",
2097 )
2098 parser.add_argument(
2099 "sessions_filename",
2100 type=str,
2101 help="Path to a session configuration file",
2102 metavar="sessions-filename",
2103 )
2104 parser.add_argument(
2105 "-h",
2106 "--help",
2107 action="help",
2108 default=argparse.SUPPRESS,
2109 help="Show this help message and exit.",
2110 )
2111
2112 args = parser.parse_args(args=remaining_args)
2113 sessions_filename = args.sessions_filename # type: str
2114 trace_path_prefix = args.trace_path_prefix # type: str | None
2115 sessions = _session_descriptors_from_path(
2116 sessions_filename,
2117 trace_path_prefix,
2118 )
2119
2120 port = args.port # type: int | None
2121 port_filename = args.port_filename # type: str | None
2122 max_query_data_response_size = args.max_query_data_response_size # type: int | None
2123 LttngLiveServer(port, port_filename, sessions, max_query_data_response_size)
This page took 0.07217 seconds and 5 git commands to generate.