cleanup: tests: remove unused files from lf / crlf traces
[babeltrace.git] / tests / data / plugins / src.ctf.lttng-live / lttng_live_server.py
1 # SPDX-License-Identifier: MIT
2 #
3 # Copyright (C) 2019 Philippe Proulx <pproulx@efficios.com>
4 #
5
6 # pyright: strict, reportTypeCommentUsage=false, reportMissingTypeStubs=false
7
8 import os
9 import re
10 import socket
11 import struct
12 import logging
13 import os.path
14 import argparse
15 import tempfile
16 from typing import Dict, Union, Iterable, Optional, Sequence, overload
17
18 import tjson
19
20 # isort: off
21 from typing import Any, Callable # noqa: F401
22
23 # isort: on
24
25
26 # An entry within the index of an LTTng data stream.
27 class _LttngDataStreamIndexEntry:
28 def __init__(
29 self,
30 offset_bytes: int,
31 total_size_bits: int,
32 content_size_bits: int,
33 timestamp_begin: int,
34 timestamp_end: int,
35 events_discarded: int,
36 stream_class_id: int,
37 ):
38 self._offset_bytes = offset_bytes
39 self._total_size_bits = total_size_bits
40 self._content_size_bits = content_size_bits
41 self._timestamp_begin = timestamp_begin
42 self._timestamp_end = timestamp_end
43 self._events_discarded = events_discarded
44 self._stream_class_id = stream_class_id
45
46 @property
47 def offset_bytes(self):
48 return self._offset_bytes
49
50 @property
51 def total_size_bits(self):
52 return self._total_size_bits
53
54 @property
55 def total_size_bytes(self):
56 return self._total_size_bits // 8
57
58 @property
59 def content_size_bits(self):
60 return self._content_size_bits
61
62 @property
63 def content_size_bytes(self):
64 return self._content_size_bits // 8
65
66 @property
67 def timestamp_begin(self):
68 return self._timestamp_begin
69
70 @property
71 def timestamp_end(self):
72 return self._timestamp_end
73
74 @property
75 def events_discarded(self):
76 return self._events_discarded
77
78 @property
79 def stream_class_id(self):
80 return self._stream_class_id
81
82
83 # An entry within the index of an LTTng data stream. While a stream beacon entry
84 # is conceptually unrelated to an index, it is sent as a reply to a
85 # LttngLiveViewerGetNextDataStreamIndexEntryCommand
86 class _LttngDataStreamBeaconIndexEntry:
87 def __init__(self, stream_class_id: int, timestamp: int):
88 self._stream_class_id = stream_class_id
89 self._timestamp = timestamp
90
91 @property
92 def timestamp(self):
93 return self._timestamp
94
95 @property
96 def stream_class_id(self):
97 return self._stream_class_id
98
99
100 _LttngIndexEntryT = Union[_LttngDataStreamIndexEntry, _LttngDataStreamBeaconIndexEntry]
101
102
103 class _LttngLiveViewerCommand:
104 def __init__(self, version: int):
105 self._version = version
106
107 @property
108 def version(self):
109 return self._version
110
111
112 class _LttngLiveViewerConnectCommand(_LttngLiveViewerCommand):
113 def __init__(self, version: int, viewer_session_id: int, major: int, minor: int):
114 super().__init__(version)
115 self._viewer_session_id = viewer_session_id
116 self._major = major
117 self._minor = minor
118
119 @property
120 def viewer_session_id(self):
121 return self._viewer_session_id
122
123 @property
124 def major(self):
125 return self._major
126
127 @property
128 def minor(self):
129 return self._minor
130
131
132 class _LttngLiveViewerReply:
133 pass
134
135
136 class _LttngLiveViewerConnectReply(_LttngLiveViewerReply):
137 def __init__(self, viewer_session_id: int, major: int, minor: int):
138 self._viewer_session_id = viewer_session_id
139 self._major = major
140 self._minor = minor
141
142 @property
143 def viewer_session_id(self):
144 return self._viewer_session_id
145
146 @property
147 def major(self):
148 return self._major
149
150 @property
151 def minor(self):
152 return self._minor
153
154
155 class _LttngLiveViewerGetTracingSessionInfosCommand(_LttngLiveViewerCommand):
156 pass
157
158
159 class _LttngLiveViewerTracingSessionInfo:
160 def __init__(
161 self,
162 tracing_session_id: int,
163 live_timer_freq: int,
164 client_count: int,
165 stream_count: int,
166 hostname: str,
167 name: str,
168 ):
169 self._tracing_session_id = tracing_session_id
170 self._live_timer_freq = live_timer_freq
171 self._client_count = client_count
172 self._stream_count = stream_count
173 self._hostname = hostname
174 self._name = name
175
176 @property
177 def tracing_session_id(self):
178 return self._tracing_session_id
179
180 @property
181 def live_timer_freq(self):
182 return self._live_timer_freq
183
184 @property
185 def client_count(self):
186 return self._client_count
187
188 @property
189 def stream_count(self):
190 return self._stream_count
191
192 @property
193 def hostname(self):
194 return self._hostname
195
196 @property
197 def name(self):
198 return self._name
199
200
201 class _LttngLiveViewerGetTracingSessionInfosReply(_LttngLiveViewerReply):
202 def __init__(
203 self, tracing_session_infos: Sequence[_LttngLiveViewerTracingSessionInfo]
204 ):
205 self._tracing_session_infos = tracing_session_infos
206
207 @property
208 def tracing_session_infos(self):
209 return self._tracing_session_infos
210
211
212 class _LttngLiveViewerAttachToTracingSessionCommand(_LttngLiveViewerCommand):
213 class SeekType:
214 BEGINNING = 1
215 LAST = 2
216
217 def __init__(
218 self, version: int, tracing_session_id: int, offset: int, seek_type: int
219 ):
220 super().__init__(version)
221 self._tracing_session_id = tracing_session_id
222 self._offset = offset
223 self._seek_type = seek_type
224
225 @property
226 def tracing_session_id(self):
227 return self._tracing_session_id
228
229 @property
230 def offset(self):
231 return self._offset
232
233 @property
234 def seek_type(self):
235 return self._seek_type
236
237
238 class _LttngLiveViewerStreamInfo:
239 def __init__(
240 self, id: int, trace_id: int, is_metadata: bool, path: str, channel_name: str
241 ):
242 self._id = id
243 self._trace_id = trace_id
244 self._is_metadata = is_metadata
245 self._path = path
246 self._channel_name = channel_name
247
248 @property
249 def id(self):
250 return self._id
251
252 @property
253 def trace_id(self):
254 return self._trace_id
255
256 @property
257 def is_metadata(self):
258 return self._is_metadata
259
260 @property
261 def path(self):
262 return self._path
263
264 @property
265 def channel_name(self):
266 return self._channel_name
267
268
269 class _LttngLiveViewerAttachToTracingSessionReply(_LttngLiveViewerReply):
270 class Status:
271 OK = 1
272 ALREADY = 2
273 UNKNOWN = 3
274 NOT_LIVE = 4
275 SEEK_ERROR = 5
276 NO_SESSION = 6
277
278 def __init__(self, status: int, stream_infos: Sequence[_LttngLiveViewerStreamInfo]):
279 self._status = status
280 self._stream_infos = stream_infos
281
282 @property
283 def status(self):
284 return self._status
285
286 @property
287 def stream_infos(self):
288 return self._stream_infos
289
290
291 class _LttngLiveViewerGetNextDataStreamIndexEntryCommand(_LttngLiveViewerCommand):
292 def __init__(self, version: int, stream_id: int):
293 super().__init__(version)
294 self._stream_id = stream_id
295
296 @property
297 def stream_id(self):
298 return self._stream_id
299
300
301 class _LttngLiveViewerGetNextDataStreamIndexEntryReply(_LttngLiveViewerReply):
302 class Status:
303 OK = 1
304 RETRY = 2
305 HUP = 3
306 ERROR = 4
307 INACTIVE = 5
308 EOF = 6
309
310 def __init__(
311 self,
312 status: int,
313 index_entry: _LttngIndexEntryT,
314 has_new_metadata: bool,
315 has_new_data_stream: bool,
316 ):
317 self._status = status
318 self._index_entry = index_entry
319 self._has_new_metadata = has_new_metadata
320 self._has_new_data_stream = has_new_data_stream
321
322 @property
323 def status(self):
324 return self._status
325
326 @property
327 def index_entry(self):
328 return self._index_entry
329
330 @property
331 def has_new_metadata(self):
332 return self._has_new_metadata
333
334 @property
335 def has_new_data_stream(self):
336 return self._has_new_data_stream
337
338
339 class _LttngLiveViewerGetDataStreamPacketDataCommand(_LttngLiveViewerCommand):
340 def __init__(self, version: int, stream_id: int, offset: int, req_length: int):
341 super().__init__(version)
342 self._stream_id = stream_id
343 self._offset = offset
344 self._req_length = req_length
345
346 @property
347 def stream_id(self):
348 return self._stream_id
349
350 @property
351 def offset(self):
352 return self._offset
353
354 @property
355 def req_length(self):
356 return self._req_length
357
358
359 class _LttngLiveViewerGetDataStreamPacketDataReply(_LttngLiveViewerReply):
360 class Status:
361 OK = 1
362 RETRY = 2
363 ERROR = 3
364 EOF = 4
365
366 def __init__(
367 self,
368 status: int,
369 data: bytes,
370 has_new_metadata: bool,
371 has_new_data_stream: bool,
372 ):
373 self._status = status
374 self._data = data
375 self._has_new_metadata = has_new_metadata
376 self._has_new_data_stream = has_new_data_stream
377
378 @property
379 def status(self):
380 return self._status
381
382 @property
383 def data(self):
384 return self._data
385
386 @property
387 def has_new_metadata(self):
388 return self._has_new_metadata
389
390 @property
391 def has_new_data_stream(self):
392 return self._has_new_data_stream
393
394
395 class _LttngLiveViewerGetMetadataStreamDataCommand(_LttngLiveViewerCommand):
396 def __init__(self, version: int, stream_id: int):
397 super().__init__(version)
398 self._stream_id = stream_id
399
400 @property
401 def stream_id(self):
402 return self._stream_id
403
404
405 class _LttngLiveViewerGetMetadataStreamDataContentReply(_LttngLiveViewerReply):
406 class Status:
407 OK = 1
408 NO_NEW = 2
409 ERROR = 3
410
411 def __init__(self, status: int, data: bytes):
412 self._status = status
413 self._data = data
414
415 @property
416 def status(self):
417 return self._status
418
419 @property
420 def data(self):
421 return self._data
422
423
424 class _LttngLiveViewerGetNewStreamInfosCommand(_LttngLiveViewerCommand):
425 def __init__(self, version: int, tracing_session_id: int):
426 super().__init__(version)
427 self._tracing_session_id = tracing_session_id
428
429 @property
430 def tracing_session_id(self):
431 return self._tracing_session_id
432
433
434 class _LttngLiveViewerGetNewStreamInfosReply(_LttngLiveViewerReply):
435 class Status:
436 OK = 1
437 NO_NEW = 2
438 ERROR = 3
439 HUP = 4
440
441 def __init__(self, status: int, stream_infos: Sequence[_LttngLiveViewerStreamInfo]):
442 self._status = status
443 self._stream_infos = stream_infos
444
445 @property
446 def status(self):
447 return self._status
448
449 @property
450 def stream_infos(self):
451 return self._stream_infos
452
453
454 class _LttngLiveViewerCreateViewerSessionCommand(_LttngLiveViewerCommand):
455 pass
456
457
458 class _LttngLiveViewerCreateViewerSessionReply(_LttngLiveViewerReply):
459 class Status:
460 OK = 1
461 ERROR = 2
462
463 def __init__(self, status: int):
464 self._status = status
465
466 @property
467 def status(self):
468 return self._status
469
470
471 class _LttngLiveViewerDetachFromTracingSessionCommand(_LttngLiveViewerCommand):
472 def __init__(self, version: int, tracing_session_id: int):
473 super().__init__(version)
474 self._tracing_session_id = tracing_session_id
475
476 @property
477 def tracing_session_id(self):
478 return self._tracing_session_id
479
480
481 class _LttngLiveViewerDetachFromTracingSessionReply(_LttngLiveViewerReply):
482 class Status:
483 OK = 1
484 UNKNOWN = 2
485 ERROR = 3
486
487 def __init__(self, status: int):
488 self._status = status
489
490 @property
491 def status(self):
492 return self._status
493
494
495 # An LTTng live protocol codec can convert bytes to command objects and
496 # reply objects to bytes.
497 class _LttngLiveViewerProtocolCodec:
498 _COMMAND_HEADER_STRUCT_FMT = "QII"
499 _COMMAND_HEADER_SIZE_BYTES = struct.calcsize(_COMMAND_HEADER_STRUCT_FMT)
500
501 def __init__(self):
502 pass
503
504 def _unpack(self, fmt: str, data: bytes, offset: int = 0):
505 fmt = "!" + fmt
506 return struct.unpack_from(fmt, data, offset)
507
508 def _unpack_payload(self, fmt: str, data: bytes):
509 return self._unpack(
510 fmt, data, _LttngLiveViewerProtocolCodec._COMMAND_HEADER_SIZE_BYTES
511 )
512
513 def decode(self, data: bytes):
514 if len(data) < self._COMMAND_HEADER_SIZE_BYTES:
515 # Not enough data to read the command header
516 return
517
518 payload_size, cmd_type, version = self._unpack(
519 self._COMMAND_HEADER_STRUCT_FMT, data
520 )
521 logging.info(
522 "Decoded command header: payload-size={}, cmd-type={}, version={}".format(
523 payload_size, cmd_type, version
524 )
525 )
526
527 if len(data) < self._COMMAND_HEADER_SIZE_BYTES + payload_size:
528 # Not enough data to read the whole command
529 return
530
531 if cmd_type == 1:
532 viewer_session_id, major, minor, _ = self._unpack_payload("QIII", data)
533 return _LttngLiveViewerConnectCommand(
534 version, viewer_session_id, major, minor
535 )
536 elif cmd_type == 2:
537 return _LttngLiveViewerGetTracingSessionInfosCommand(version)
538 elif cmd_type == 3:
539 tracing_session_id, offset, seek_type = self._unpack_payload("QQI", data)
540 return _LttngLiveViewerAttachToTracingSessionCommand(
541 version, tracing_session_id, offset, seek_type
542 )
543 elif cmd_type == 4:
544 (stream_id,) = self._unpack_payload("Q", data)
545 return _LttngLiveViewerGetNextDataStreamIndexEntryCommand(
546 version, stream_id
547 )
548 elif cmd_type == 5:
549 stream_id, offset, req_length = self._unpack_payload("QQI", data)
550 return _LttngLiveViewerGetDataStreamPacketDataCommand(
551 version, stream_id, offset, req_length
552 )
553 elif cmd_type == 6:
554 (stream_id,) = self._unpack_payload("Q", data)
555 return _LttngLiveViewerGetMetadataStreamDataCommand(version, stream_id)
556 elif cmd_type == 7:
557 (tracing_session_id,) = self._unpack_payload("Q", data)
558 return _LttngLiveViewerGetNewStreamInfosCommand(version, tracing_session_id)
559 elif cmd_type == 8:
560 return _LttngLiveViewerCreateViewerSessionCommand(version)
561 elif cmd_type == 9:
562 (tracing_session_id,) = self._unpack_payload("Q", data)
563 return _LttngLiveViewerDetachFromTracingSessionCommand(
564 version, tracing_session_id
565 )
566 else:
567 raise RuntimeError("Unknown command type {}".format(cmd_type))
568
569 def _pack(self, fmt: str, *args: Any):
570 # Force network byte order
571 return struct.pack("!" + fmt, *args)
572
573 def _encode_zero_padded_str(self, string: str, length: int):
574 data = string.encode()
575 return data.ljust(length, b"\x00")
576
577 def _encode_stream_info(self, info: _LttngLiveViewerStreamInfo):
578 data = self._pack("QQI", info.id, info.trace_id, int(info.is_metadata))
579 data += self._encode_zero_padded_str(info.path, 4096)
580 data += self._encode_zero_padded_str(info.channel_name, 255)
581 return data
582
583 def _get_has_new_stuff_flags(
584 self, has_new_metadata: bool, has_new_data_streams: bool
585 ):
586 flags = 0
587
588 if has_new_metadata:
589 flags |= 1
590
591 if has_new_data_streams:
592 flags |= 2
593
594 return flags
595
596 def encode(
597 self,
598 reply: _LttngLiveViewerReply,
599 ) -> bytes:
600 if type(reply) is _LttngLiveViewerConnectReply:
601 data = self._pack(
602 "QIII", reply.viewer_session_id, reply.major, reply.minor, 2
603 )
604 elif type(reply) is _LttngLiveViewerGetTracingSessionInfosReply:
605 data = self._pack("I", len(reply.tracing_session_infos))
606
607 for info in reply.tracing_session_infos:
608 data += self._pack(
609 "QIII",
610 info.tracing_session_id,
611 info.live_timer_freq,
612 info.client_count,
613 info.stream_count,
614 )
615 data += self._encode_zero_padded_str(info.hostname, 64)
616 data += self._encode_zero_padded_str(info.name, 255)
617 elif type(reply) is _LttngLiveViewerAttachToTracingSessionReply:
618 data = self._pack("II", reply.status, len(reply.stream_infos))
619
620 for info in reply.stream_infos:
621 data += self._encode_stream_info(info)
622 elif type(reply) is _LttngLiveViewerGetNextDataStreamIndexEntryReply:
623 index_format = "QQQQQQQII"
624 entry = reply.index_entry
625 flags = self._get_has_new_stuff_flags(
626 reply.has_new_metadata, reply.has_new_data_stream
627 )
628
629 if isinstance(entry, _LttngDataStreamIndexEntry):
630 data = self._pack(
631 index_format,
632 entry.offset_bytes,
633 entry.total_size_bits,
634 entry.content_size_bits,
635 entry.timestamp_begin,
636 entry.timestamp_end,
637 entry.events_discarded,
638 entry.stream_class_id,
639 reply.status,
640 flags,
641 )
642 else:
643 data = self._pack(
644 index_format,
645 0,
646 0,
647 0,
648 0,
649 entry.timestamp,
650 0,
651 entry.stream_class_id,
652 reply.status,
653 flags,
654 )
655 elif type(reply) is _LttngLiveViewerGetDataStreamPacketDataReply:
656 flags = self._get_has_new_stuff_flags(
657 reply.has_new_metadata, reply.has_new_data_stream
658 )
659 data = self._pack("III", reply.status, len(reply.data), flags)
660 data += reply.data
661 elif type(reply) is _LttngLiveViewerGetMetadataStreamDataContentReply:
662 data = self._pack("QI", len(reply.data), reply.status)
663 data += reply.data
664 elif type(reply) is _LttngLiveViewerGetNewStreamInfosReply:
665 data = self._pack("II", reply.status, len(reply.stream_infos))
666
667 for info in reply.stream_infos:
668 data += self._encode_stream_info(info)
669 elif type(reply) is _LttngLiveViewerCreateViewerSessionReply:
670 data = self._pack("I", reply.status)
671 elif type(reply) is _LttngLiveViewerDetachFromTracingSessionReply:
672 data = self._pack("I", reply.status)
673 else:
674 raise ValueError(
675 "Unknown reply object with class `{}`".format(reply.__class__.__name__)
676 )
677
678 return data
679
680
681 def _get_entry_timestamp_begin(
682 entry: _LttngIndexEntryT,
683 ):
684 if isinstance(entry, _LttngDataStreamBeaconIndexEntry):
685 return entry.timestamp
686 else:
687 return entry.timestamp_begin
688
689
690 # The index of an LTTng data stream, a sequence of index entries.
691 class _LttngDataStreamIndex(Sequence[_LttngIndexEntryT]):
692 def __init__(self, path: str, beacons: Optional[tjson.ArrayVal]):
693 self._path = path
694 self._build()
695
696 if beacons:
697 stream_class_id = self._entries[0].stream_class_id
698
699 beacons_list = [] # type: list[_LttngDataStreamBeaconIndexEntry]
700 for ts in beacons.iter(tjson.IntVal):
701 beacons_list.append(
702 _LttngDataStreamBeaconIndexEntry(stream_class_id, ts.val)
703 )
704
705 self._add_beacons(beacons_list)
706
707 logging.info(
708 'Built data stream index entries: path="{}", count={}'.format(
709 path, len(self._entries)
710 )
711 )
712
713 def _build(self):
714 self._entries = [] # type: list[_LttngIndexEntryT]
715
716 with open(self._path, "rb") as f:
717 # Read header first
718 fmt = ">IIII"
719 size = struct.calcsize(fmt)
720 data = f.read(size)
721 assert len(data) == size
722 magic, _, _, index_entry_length = struct.unpack(fmt, data)
723 assert magic == 0xC1F1DCC1
724
725 # Read index entries
726 fmt = ">QQQQQQQ"
727 size = struct.calcsize(fmt)
728
729 while True:
730 logging.debug(
731 'Decoding data stream index entry: path="{}", offset={}'.format(
732 self._path, f.tell()
733 )
734 )
735 data = f.read(size)
736
737 if not data:
738 # Done
739 break
740
741 assert len(data) == size
742 (
743 offset_bytes,
744 total_size_bits,
745 content_size_bits,
746 timestamp_begin,
747 timestamp_end,
748 events_discarded,
749 stream_class_id,
750 ) = struct.unpack(fmt, data)
751
752 self._entries.append(
753 _LttngDataStreamIndexEntry(
754 offset_bytes,
755 total_size_bits,
756 content_size_bits,
757 timestamp_begin,
758 timestamp_end,
759 events_discarded,
760 stream_class_id,
761 )
762 )
763
764 # Skip anything else before the next entry
765 f.seek(index_entry_length - size, os.SEEK_CUR)
766
767 def _add_beacons(self, beacons: Iterable[_LttngDataStreamBeaconIndexEntry]):
768 # Assumes entries[n + 1].timestamp_end >= entries[n].timestamp_begin
769 def sort_key(
770 entry: Union[_LttngDataStreamIndexEntry, _LttngDataStreamBeaconIndexEntry],
771 ) -> int:
772 if isinstance(entry, _LttngDataStreamBeaconIndexEntry):
773 return entry.timestamp
774 else:
775 return entry.timestamp_end
776
777 self._entries += beacons
778 self._entries.sort(key=sort_key)
779
780 @overload
781 def __getitem__(self, index: int) -> _LttngIndexEntryT:
782 ...
783
784 @overload
785 def __getitem__(self, index: slice) -> Sequence[_LttngIndexEntryT]: # noqa: F811
786 ...
787
788 def __getitem__( # noqa: F811
789 self, index: Union[int, slice]
790 ) -> Union[_LttngIndexEntryT, Sequence[_LttngIndexEntryT],]:
791 return self._entries[index]
792
793 def __len__(self):
794 return len(self._entries)
795
796 @property
797 def path(self):
798 return self._path
799
800
801 # An LTTng data stream.
802 class _LttngDataStream:
803 def __init__(self, path: str, beacons_json: Optional[tjson.ArrayVal]):
804 self._path = path
805 filename = os.path.basename(path)
806 match = re.match(r"(.*)_\d+", filename)
807 if not match:
808 raise RuntimeError(
809 "Unexpected data stream file name pattern: {}".format(filename)
810 )
811
812 self._channel_name = match.group(1)
813 trace_dir = os.path.dirname(path)
814 index_path = os.path.join(trace_dir, "index", filename + ".idx")
815 self._index = _LttngDataStreamIndex(index_path, beacons_json)
816 assert os.path.isfile(path)
817 self._file = open(path, "rb")
818 logging.info(
819 'Built data stream: path="{}", channel-name="{}"'.format(
820 path, self._channel_name
821 )
822 )
823
824 @property
825 def path(self):
826 return self._path
827
828 @property
829 def channel_name(self):
830 return self._channel_name
831
832 @property
833 def index(self):
834 return self._index
835
836 def get_data(self, offset_bytes: int, len_bytes: int):
837 self._file.seek(offset_bytes)
838 return self._file.read(len_bytes)
839
840
841 class _LttngMetadataStreamSection:
842 def __init__(self, timestamp: int, data: Optional[bytes]):
843 self._timestamp = timestamp
844 if data is None:
845 self._data = bytes()
846 else:
847 self._data = data
848 logging.info(
849 "Built metadata stream section: ts={}, data-len={}".format(
850 self._timestamp, len(self._data)
851 )
852 )
853
854 @property
855 def timestamp(self):
856 return self._timestamp
857
858 @property
859 def data(self):
860 return self._data
861
862
863 # An LTTng metadata stream.
864 class _LttngMetadataStream:
865 def __init__(
866 self,
867 metadata_file_path: str,
868 config_sections: Sequence[_LttngMetadataStreamSection],
869 ):
870 self._path = metadata_file_path
871 self._sections = config_sections
872 logging.info(
873 "Built metadata stream: path={}, section-len={}".format(
874 self._path, len(self._sections)
875 )
876 )
877
878 @property
879 def path(self):
880 return self._path
881
882 @property
883 def sections(self):
884 return self._sections
885
886
887 class LttngMetadataConfigSection:
888 def __init__(self, line: int, timestamp: int, is_empty: bool):
889 self._line = line
890 self._timestamp = timestamp
891 self._is_empty = is_empty
892
893 @property
894 def line(self):
895 return self._line
896
897 @property
898 def timestamp(self):
899 return self._timestamp
900
901 @property
902 def is_empty(self):
903 return self._is_empty
904
905
906 def _parse_metadata_sections_config(metadata_sections_json: tjson.ArrayVal):
907 metadata_sections = [] # type: list[LttngMetadataConfigSection]
908 append_empty_section = False
909 last_timestamp = 0
910 last_line = 0
911
912 for section in metadata_sections_json:
913 if isinstance(section, tjson.StrVal):
914 if section.val == "empty":
915 # Found a empty section marker. Actually append the section at the
916 # timestamp of the next concrete section.
917 append_empty_section = True
918 else:
919 raise ValueError("Invalid string value at {}.".format(section.path))
920 elif isinstance(section, tjson.ObjVal):
921 line = section.at("line", tjson.IntVal).val
922 ts = section.at("timestamp", tjson.IntVal).val
923
924 # Sections' timestamps and lines must both be increasing.
925 assert ts > last_timestamp
926 last_timestamp = ts
927
928 assert line > last_line
929 last_line = line
930
931 if append_empty_section:
932 metadata_sections.append(LttngMetadataConfigSection(line, ts, True))
933 append_empty_section = False
934
935 metadata_sections.append(LttngMetadataConfigSection(line, ts, False))
936 else:
937 raise TypeError(
938 "`{}`: expecting a string or object value".format(section.path)
939 )
940
941 return metadata_sections
942
943
944 def _split_metadata_sections(
945 metadata_file_path: str, metadata_sections_json: tjson.ArrayVal
946 ):
947 metadata_sections = _parse_metadata_sections_config(metadata_sections_json)
948
949 sections = [] # type: list[_LttngMetadataStreamSection]
950 with open(metadata_file_path, "r") as metadata_file:
951 metadata_lines = [line for line in metadata_file]
952
953 metadata_section_idx = 0
954 curr_metadata_section = bytearray()
955
956 for idx, line_content in enumerate(metadata_lines):
957 # Add one to the index to convert from the zero-indexing of the
958 # enumerate() function to the one-indexing used by humans when
959 # viewing a text file.
960 curr_line_number = idx + 1
961
962 # If there are no more sections, simply append the line.
963 if metadata_section_idx + 1 >= len(metadata_sections):
964 curr_metadata_section += bytearray(line_content, "utf8")
965 continue
966
967 next_section_line_number = metadata_sections[metadata_section_idx + 1].line
968
969 # If the next section begins at the current line, create a
970 # section with the metadata we gathered so far.
971 if curr_line_number >= next_section_line_number:
972 # Flushing the metadata of the current section.
973 sections.append(
974 _LttngMetadataStreamSection(
975 metadata_sections[metadata_section_idx].timestamp,
976 bytes(curr_metadata_section),
977 )
978 )
979
980 # Move to the next section.
981 metadata_section_idx += 1
982
983 # Clear old content and append current line for the next section.
984 curr_metadata_section.clear()
985 curr_metadata_section += bytearray(line_content, "utf8")
986
987 # Append any empty sections.
988 while metadata_sections[metadata_section_idx].is_empty:
989 sections.append(
990 _LttngMetadataStreamSection(
991 metadata_sections[metadata_section_idx].timestamp, None
992 )
993 )
994 metadata_section_idx += 1
995 else:
996 # Append line_content to the current metadata section.
997 curr_metadata_section += bytearray(line_content, "utf8")
998
999 # We iterated over all the lines of the metadata file. Close the current section.
1000 sections.append(
1001 _LttngMetadataStreamSection(
1002 metadata_sections[metadata_section_idx].timestamp,
1003 bytes(curr_metadata_section),
1004 )
1005 )
1006
1007 return sections
1008
1009
1010 _StreamBeaconsT = Dict[str, Iterable[int]]
1011
1012
1013 # An LTTng trace, a sequence of LTTng data streams.
1014 class LttngTrace(Sequence[_LttngDataStream]):
1015 def __init__(
1016 self,
1017 trace_dir: str,
1018 metadata_sections_json: Optional[tjson.ArrayVal],
1019 beacons_json: Optional[tjson.ObjVal],
1020 ):
1021 self._path = trace_dir
1022 self._create_metadata_stream(trace_dir, metadata_sections_json)
1023 self._create_data_streams(trace_dir, beacons_json)
1024 logging.info('Built trace: path="{}"'.format(trace_dir))
1025
1026 def _create_data_streams(
1027 self, trace_dir: str, beacons_json: Optional[tjson.ObjVal]
1028 ):
1029 data_stream_paths = [] # type: list[str]
1030
1031 for filename in os.listdir(trace_dir):
1032 path = os.path.join(trace_dir, filename)
1033
1034 if not os.path.isfile(path):
1035 continue
1036
1037 if filename.startswith("."):
1038 continue
1039
1040 if filename == "metadata":
1041 continue
1042
1043 data_stream_paths.append(path)
1044
1045 data_stream_paths.sort()
1046 self._data_streams = [] # type: list[_LttngDataStream]
1047
1048 for data_stream_path in data_stream_paths:
1049 stream_name = os.path.basename(data_stream_path)
1050 this_beacons_json = None
1051 if beacons_json is not None and stream_name in beacons_json:
1052 this_beacons_json = beacons_json.at(stream_name, tjson.ArrayVal)
1053
1054 self._data_streams.append(
1055 _LttngDataStream(data_stream_path, this_beacons_json)
1056 )
1057
1058 def _create_metadata_stream(
1059 self, trace_dir: str, metadata_sections_json: Optional[tjson.ArrayVal]
1060 ):
1061 metadata_path = os.path.join(trace_dir, "metadata")
1062 metadata_sections = [] # type: list[_LttngMetadataStreamSection]
1063
1064 if metadata_sections_json is None:
1065 with open(metadata_path, "rb") as metadata_file:
1066 metadata_sections.append(
1067 _LttngMetadataStreamSection(0, metadata_file.read())
1068 )
1069 else:
1070 metadata_sections = _split_metadata_sections(
1071 metadata_path, metadata_sections_json
1072 )
1073
1074 self._metadata_stream = _LttngMetadataStream(metadata_path, metadata_sections)
1075
1076 @property
1077 def path(self):
1078 return self._path
1079
1080 @property
1081 def metadata_stream(self):
1082 return self._metadata_stream
1083
1084 @overload
1085 def __getitem__(self, index: int) -> _LttngDataStream:
1086 ...
1087
1088 @overload
1089 def __getitem__(self, index: slice) -> Sequence[_LttngDataStream]: # noqa: F811
1090 ...
1091
1092 def __getitem__( # noqa: F811
1093 self, index: Union[int, slice]
1094 ) -> Union[_LttngDataStream, Sequence[_LttngDataStream]]:
1095 return self._data_streams[index]
1096
1097 def __len__(self):
1098 return len(self._data_streams)
1099
1100
1101 # The state of a single data stream.
1102 class _LttngLiveViewerSessionDataStreamState:
1103 def __init__(
1104 self,
1105 ts_state: "_LttngLiveViewerSessionTracingSessionState",
1106 info: _LttngLiveViewerStreamInfo,
1107 data_stream: _LttngDataStream,
1108 metadata_stream_id: int,
1109 ):
1110 self._ts_state = ts_state
1111 self._info = info
1112 self._data_stream = data_stream
1113 self._metadata_stream_id = metadata_stream_id
1114 self._cur_index_entry_index = 0
1115 fmt = 'Built data stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
1116 logging.info(
1117 fmt.format(
1118 info.id,
1119 ts_state.tracing_session_descriptor.info.tracing_session_id,
1120 ts_state.tracing_session_descriptor.info.name,
1121 data_stream.path,
1122 )
1123 )
1124
1125 @property
1126 def tracing_session_state(self):
1127 return self._ts_state
1128
1129 @property
1130 def info(self):
1131 return self._info
1132
1133 @property
1134 def data_stream(self):
1135 return self._data_stream
1136
1137 @property
1138 def cur_index_entry(self):
1139 if self._cur_index_entry_index == len(self._data_stream.index):
1140 return
1141
1142 return self._data_stream.index[self._cur_index_entry_index]
1143
1144 @property
1145 def metadata_stream_id(self):
1146 return self._metadata_stream_id
1147
1148 def goto_next_index_entry(self):
1149 self._cur_index_entry_index += 1
1150
1151
1152 # The state of a single metadata stream.
1153 class _LttngLiveViewerSessionMetadataStreamState:
1154 def __init__(
1155 self,
1156 ts_state: "_LttngLiveViewerSessionTracingSessionState",
1157 info: _LttngLiveViewerStreamInfo,
1158 metadata_stream: _LttngMetadataStream,
1159 ):
1160 self._ts_state = ts_state
1161 self._info = info
1162 self._metadata_stream = metadata_stream
1163 self._cur_metadata_stream_section_index = 0
1164 if len(metadata_stream.sections) > 1:
1165 self._next_metadata_stream_section_timestamp = metadata_stream.sections[
1166 1
1167 ].timestamp
1168 else:
1169 self._next_metadata_stream_section_timestamp = None
1170
1171 self._is_sent = False
1172 fmt = 'Built metadata stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
1173 logging.info(
1174 fmt.format(
1175 info.id,
1176 ts_state.tracing_session_descriptor.info.tracing_session_id,
1177 ts_state.tracing_session_descriptor.info.name,
1178 metadata_stream.path,
1179 )
1180 )
1181
1182 @property
1183 def info(self):
1184 return self._info
1185
1186 @property
1187 def metadata_stream(self):
1188 return self._metadata_stream
1189
1190 @property
1191 def is_sent(self):
1192 return self._is_sent
1193
1194 @is_sent.setter
1195 def is_sent(self, value: bool):
1196 self._is_sent = value
1197
1198 @property
1199 def cur_section(self):
1200 fmt = "Get current metadata section: section-idx={}"
1201 logging.info(fmt.format(self._cur_metadata_stream_section_index))
1202 if self._cur_metadata_stream_section_index == len(
1203 self._metadata_stream.sections
1204 ):
1205 return
1206
1207 return self._metadata_stream.sections[self._cur_metadata_stream_section_index]
1208
1209 def goto_next_section(self):
1210 self._cur_metadata_stream_section_index += 1
1211 if self.cur_section:
1212 self._next_metadata_stream_section_timestamp = self.cur_section.timestamp
1213 else:
1214 self._next_metadata_stream_section_timestamp = None
1215
1216 @property
1217 def next_section_timestamp(self):
1218 return self._next_metadata_stream_section_timestamp
1219
1220
1221 # A tracing session descriptor.
1222 #
1223 # In the constructor, `traces` is a list of LTTng traces (`LttngTrace`
1224 # objects).
1225 class LttngTracingSessionDescriptor:
1226 def __init__(
1227 self,
1228 name: str,
1229 tracing_session_id: int,
1230 hostname: str,
1231 live_timer_freq: int,
1232 client_count: int,
1233 traces: Iterable[LttngTrace],
1234 ):
1235 for trace in traces:
1236 if name not in trace.path:
1237 fmt = "Tracing session name must be part of every trace path (`{}` not found in `{}`)"
1238 raise ValueError(fmt.format(name, trace.path))
1239
1240 self._traces = traces
1241 stream_count = sum([len(t) + 1 for t in traces])
1242 self._info = _LttngLiveViewerTracingSessionInfo(
1243 tracing_session_id,
1244 live_timer_freq,
1245 client_count,
1246 stream_count,
1247 hostname,
1248 name,
1249 )
1250
1251 @property
1252 def traces(self):
1253 return self._traces
1254
1255 @property
1256 def info(self):
1257 return self._info
1258
1259
1260 # The state of a tracing session.
1261 class _LttngLiveViewerSessionTracingSessionState:
1262 def __init__(self, tc_descr: LttngTracingSessionDescriptor, base_stream_id: int):
1263 self._tc_descr = tc_descr
1264 self._stream_infos = [] # type: list[_LttngLiveViewerStreamInfo]
1265 self._ds_states = {} # type: dict[int, _LttngLiveViewerSessionDataStreamState]
1266 self._ms_states = (
1267 {}
1268 ) # type: dict[int, _LttngLiveViewerSessionMetadataStreamState]
1269 stream_id = base_stream_id
1270
1271 for trace in tc_descr.traces:
1272 trace_id = stream_id * 1000
1273
1274 # Metadata stream -> stream info and metadata stream state
1275 info = _LttngLiveViewerStreamInfo(
1276 stream_id, trace_id, True, trace.metadata_stream.path, "metadata"
1277 )
1278 self._stream_infos.append(info)
1279 self._ms_states[stream_id] = _LttngLiveViewerSessionMetadataStreamState(
1280 self, info, trace.metadata_stream
1281 )
1282 metadata_stream_id = stream_id
1283 stream_id += 1
1284
1285 # Data streams -> stream infos and data stream states
1286 for data_stream in trace:
1287 info = _LttngLiveViewerStreamInfo(
1288 stream_id,
1289 trace_id,
1290 False,
1291 data_stream.path,
1292 data_stream.channel_name,
1293 )
1294 self._stream_infos.append(info)
1295 self._ds_states[stream_id] = _LttngLiveViewerSessionDataStreamState(
1296 self, info, data_stream, metadata_stream_id
1297 )
1298 stream_id += 1
1299
1300 self._is_attached = False
1301 fmt = 'Built tracing session state: id={}, name="{}"'
1302 logging.info(fmt.format(tc_descr.info.tracing_session_id, tc_descr.info.name))
1303
1304 @property
1305 def tracing_session_descriptor(self):
1306 return self._tc_descr
1307
1308 @property
1309 def data_stream_states(self):
1310 return self._ds_states
1311
1312 @property
1313 def metadata_stream_states(self):
1314 return self._ms_states
1315
1316 @property
1317 def stream_infos(self):
1318 return self._stream_infos
1319
1320 @property
1321 def has_new_metadata(self):
1322 return any([not ms.is_sent for ms in self._ms_states.values()])
1323
1324 @property
1325 def is_attached(self):
1326 return self._is_attached
1327
1328 @is_attached.setter
1329 def is_attached(self, value: bool):
1330 self._is_attached = value
1331
1332
1333 def needs_new_metadata_section(
1334 metadata_stream_state: _LttngLiveViewerSessionMetadataStreamState,
1335 latest_timestamp: int,
1336 ):
1337 if metadata_stream_state.next_section_timestamp is None:
1338 return False
1339
1340 if latest_timestamp >= metadata_stream_state.next_section_timestamp:
1341 return True
1342 else:
1343 return False
1344
1345
1346 # An LTTng live viewer session manages a view on tracing sessions
1347 # and replies to commands accordingly.
1348 class _LttngLiveViewerSession:
1349 def __init__(
1350 self,
1351 viewer_session_id: int,
1352 tracing_session_descriptors: Iterable[LttngTracingSessionDescriptor],
1353 max_query_data_response_size: Optional[int],
1354 ):
1355 self._viewer_session_id = viewer_session_id
1356 self._ts_states = (
1357 {}
1358 ) # type: dict[int, _LttngLiveViewerSessionTracingSessionState]
1359 self._stream_states = (
1360 {}
1361 ) # type: dict[int, _LttngLiveViewerSessionDataStreamState | _LttngLiveViewerSessionMetadataStreamState]
1362 self._max_query_data_response_size = max_query_data_response_size
1363 total_stream_infos = 0
1364
1365 for ts_descr in tracing_session_descriptors:
1366 ts_state = _LttngLiveViewerSessionTracingSessionState(
1367 ts_descr, total_stream_infos
1368 )
1369 ts_id = ts_state.tracing_session_descriptor.info.tracing_session_id
1370 self._ts_states[ts_id] = ts_state
1371 total_stream_infos += len(ts_state.stream_infos)
1372
1373 # Update session's stream states to have the new states
1374 self._stream_states.update(ts_state.data_stream_states)
1375 self._stream_states.update(ts_state.metadata_stream_states)
1376
1377 self._command_handlers = {
1378 _LttngLiveViewerAttachToTracingSessionCommand: self._handle_attach_to_tracing_session_command,
1379 _LttngLiveViewerCreateViewerSessionCommand: self._handle_create_viewer_session_command,
1380 _LttngLiveViewerDetachFromTracingSessionCommand: self._handle_detach_from_tracing_session_command,
1381 _LttngLiveViewerGetDataStreamPacketDataCommand: self._handle_get_data_stream_packet_data_command,
1382 _LttngLiveViewerGetMetadataStreamDataCommand: self._handle_get_metadata_stream_data_command,
1383 _LttngLiveViewerGetNewStreamInfosCommand: self._handle_get_new_stream_infos_command,
1384 _LttngLiveViewerGetNextDataStreamIndexEntryCommand: self._handle_get_next_data_stream_index_entry_command,
1385 _LttngLiveViewerGetTracingSessionInfosCommand: self._handle_get_tracing_session_infos_command,
1386 } # type: dict[type[_LttngLiveViewerCommand], Callable[[Any], _LttngLiveViewerReply]]
1387
1388 @property
1389 def viewer_session_id(self):
1390 return self._viewer_session_id
1391
1392 def _get_tracing_session_state(self, tracing_session_id: int):
1393 if tracing_session_id not in self._ts_states:
1394 raise RuntimeError(
1395 "Unknown tracing session ID {}".format(tracing_session_id)
1396 )
1397
1398 return self._ts_states[tracing_session_id]
1399
1400 def _get_data_stream_state(self, stream_id: int):
1401 if stream_id not in self._stream_states:
1402 RuntimeError("Unknown stream ID {}".format(stream_id))
1403
1404 stream = self._stream_states[stream_id]
1405 if type(stream) is not _LttngLiveViewerSessionDataStreamState:
1406 raise RuntimeError("Stream is not a data stream")
1407
1408 return stream
1409
1410 def _get_metadata_stream_state(self, stream_id: int):
1411 if stream_id not in self._stream_states:
1412 RuntimeError("Unknown stream ID {}".format(stream_id))
1413
1414 stream = self._stream_states[stream_id]
1415 if type(stream) is not _LttngLiveViewerSessionMetadataStreamState:
1416 raise RuntimeError("Stream is not a metadata stream")
1417
1418 return stream
1419
1420 def handle_command(self, cmd: _LttngLiveViewerCommand):
1421 logging.info(
1422 "Handling command in viewer session: cmd-cls-name={}".format(
1423 cmd.__class__.__name__
1424 )
1425 )
1426 cmd_type = type(cmd)
1427
1428 if cmd_type not in self._command_handlers:
1429 raise RuntimeError(
1430 "Unexpected command: cmd-cls-name={}".format(cmd.__class__.__name__)
1431 )
1432
1433 return self._command_handlers[cmd_type](cmd)
1434
1435 def _handle_attach_to_tracing_session_command(
1436 self, cmd: _LttngLiveViewerAttachToTracingSessionCommand
1437 ):
1438 fmt = 'Handling "attach to tracing session" command: ts-id={}, offset={}, seek-type={}'
1439 logging.info(fmt.format(cmd.tracing_session_id, cmd.offset, cmd.seek_type))
1440 ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
1441 info = ts_state.tracing_session_descriptor.info
1442
1443 if ts_state.is_attached:
1444 raise RuntimeError(
1445 "Cannot attach to tracing session `{}`: viewer is already attached".format(
1446 info.name
1447 )
1448 )
1449
1450 ts_state.is_attached = True
1451 status = _LttngLiveViewerAttachToTracingSessionReply.Status.OK
1452 return _LttngLiveViewerAttachToTracingSessionReply(
1453 status, ts_state.stream_infos
1454 )
1455
1456 def _handle_detach_from_tracing_session_command(
1457 self, cmd: _LttngLiveViewerDetachFromTracingSessionCommand
1458 ):
1459 fmt = 'Handling "detach from tracing session" command: ts-id={}'
1460 logging.info(fmt.format(cmd.tracing_session_id))
1461 ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
1462 info = ts_state.tracing_session_descriptor.info
1463
1464 if not ts_state.is_attached:
1465 raise RuntimeError(
1466 "Cannot detach to tracing session `{}`: viewer is not attached".format(
1467 info.name
1468 )
1469 )
1470
1471 ts_state.is_attached = False
1472 status = _LttngLiveViewerDetachFromTracingSessionReply.Status.OK
1473 return _LttngLiveViewerDetachFromTracingSessionReply(status)
1474
1475 def _handle_get_next_data_stream_index_entry_command(
1476 self, cmd: _LttngLiveViewerGetNextDataStreamIndexEntryCommand
1477 ):
1478 fmt = 'Handling "get next data stream index entry" command: stream-id={}'
1479 logging.info(fmt.format(cmd.stream_id))
1480 stream_state = self._get_data_stream_state(cmd.stream_id)
1481 metadata_stream_state = self._get_metadata_stream_state(
1482 stream_state.metadata_stream_id
1483 )
1484
1485 if stream_state.cur_index_entry is None:
1486 # The viewer is done reading this stream
1487 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.HUP
1488
1489 # Dummy data stream index entry to use with the `HUP` status
1490 # (the reply needs one, but the viewer ignores it)
1491 index_entry = _LttngDataStreamIndexEntry(0, 0, 0, 0, 0, 0, 0)
1492
1493 return _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1494 status, index_entry, False, False
1495 )
1496
1497 timestamp_begin = _get_entry_timestamp_begin(stream_state.cur_index_entry)
1498
1499 if needs_new_metadata_section(metadata_stream_state, timestamp_begin):
1500 metadata_stream_state.is_sent = False
1501 metadata_stream_state.goto_next_section()
1502
1503 # The viewer only checks the `has_new_metadata` flag if the
1504 # reply's status is `OK`, so we need to provide an index here
1505 has_new_metadata = stream_state.tracing_session_state.has_new_metadata
1506 if isinstance(stream_state.cur_index_entry, _LttngDataStreamIndexEntry):
1507 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.OK
1508 else:
1509 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.INACTIVE
1510
1511 reply = _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1512 status, stream_state.cur_index_entry, has_new_metadata, False
1513 )
1514 stream_state.goto_next_index_entry()
1515 return reply
1516
1517 def _handle_get_data_stream_packet_data_command(
1518 self, cmd: _LttngLiveViewerGetDataStreamPacketDataCommand
1519 ):
1520 fmt = 'Handling "get data stream packet data" command: stream-id={}, offset={}, req-length={}'
1521 logging.info(fmt.format(cmd.stream_id, cmd.offset, cmd.req_length))
1522 stream_state = self._get_data_stream_state(cmd.stream_id)
1523 data_response_length = cmd.req_length
1524
1525 if stream_state.tracing_session_state.has_new_metadata:
1526 status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.ERROR
1527 return _LttngLiveViewerGetDataStreamPacketDataReply(
1528 status, bytes(), True, False
1529 )
1530
1531 if self._max_query_data_response_size:
1532 # Enforce a server side limit on the query requested length.
1533 # To ensure that the transaction terminate take the minimum of both
1534 # value.
1535 data_response_length = min(
1536 cmd.req_length, self._max_query_data_response_size
1537 )
1538 fmt = 'Limiting "get data stream packet data" command: req-length={} actual response size={}'
1539 logging.info(fmt.format(cmd.req_length, data_response_length))
1540
1541 data = stream_state.data_stream.get_data(cmd.offset, data_response_length)
1542 status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.OK
1543 return _LttngLiveViewerGetDataStreamPacketDataReply(status, data, False, False)
1544
1545 def _handle_get_metadata_stream_data_command(
1546 self, cmd: _LttngLiveViewerGetMetadataStreamDataCommand
1547 ):
1548 fmt = 'Handling "get metadata stream data" command: stream-id={}'
1549 logging.info(fmt.format(cmd.stream_id))
1550 metadata_stream_state = self._get_metadata_stream_state(cmd.stream_id)
1551
1552 if metadata_stream_state.is_sent:
1553 status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.NO_NEW
1554 return _LttngLiveViewerGetMetadataStreamDataContentReply(status, bytes())
1555
1556 metadata_stream_state.is_sent = True
1557 status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.OK
1558 metadata_section = metadata_stream_state.cur_section
1559 assert metadata_section is not None
1560
1561 # If we are sending an empty section, ready the next one right away.
1562 if len(metadata_section.data) == 0:
1563 metadata_stream_state.is_sent = False
1564 metadata_stream_state.goto_next_section()
1565
1566 fmt = 'Replying to "get metadata stream data" command: metadata-size={}'
1567 logging.info(fmt.format(len(metadata_section.data)))
1568 return _LttngLiveViewerGetMetadataStreamDataContentReply(
1569 status, metadata_section.data
1570 )
1571
1572 def _handle_get_new_stream_infos_command(
1573 self, cmd: _LttngLiveViewerGetNewStreamInfosCommand
1574 ):
1575 fmt = 'Handling "get new stream infos" command: ts-id={}'
1576 logging.info(fmt.format(cmd.tracing_session_id))
1577
1578 # As of this version, all the tracing session's stream infos are
1579 # always given to the viewer when sending the "attach to tracing
1580 # session" reply, so there's nothing new here. Return the `HUP`
1581 # status as, if we're handling this command, the viewer consumed
1582 # all the existing data streams.
1583 status = _LttngLiveViewerGetNewStreamInfosReply.Status.HUP
1584 return _LttngLiveViewerGetNewStreamInfosReply(status, [])
1585
1586 def _handle_get_tracing_session_infos_command(
1587 self, cmd: _LttngLiveViewerGetTracingSessionInfosCommand
1588 ):
1589 logging.info('Handling "get tracing session infos" command.')
1590 infos = [
1591 tss.tracing_session_descriptor.info for tss in self._ts_states.values()
1592 ]
1593 infos.sort(key=lambda info: info.name)
1594 return _LttngLiveViewerGetTracingSessionInfosReply(infos)
1595
1596 def _handle_create_viewer_session_command(
1597 self, cmd: _LttngLiveViewerCreateViewerSessionCommand
1598 ):
1599 logging.info('Handling "create viewer session" command.')
1600 status = _LttngLiveViewerCreateViewerSessionReply.Status.OK
1601
1602 # This does nothing here. In the LTTng relay daemon, it
1603 # allocates the viewer session's state.
1604 return _LttngLiveViewerCreateViewerSessionReply(status)
1605
1606
1607 # An LTTng live TCP server.
1608 #
1609 # On creation, it binds to `localhost` on the TCP port `port` if not `None`, or
1610 # on an OS-assigned TCP port otherwise. It writes the decimal TCP port number
1611 # to a temporary port file. It renames the temporary port file to
1612 # `port_filename`.
1613 #
1614 # `tracing_session_descriptors` is a list of tracing session descriptors
1615 # (`LttngTracingSessionDescriptor`) to serve.
1616 #
1617 # This server accepts a single viewer (client).
1618 #
1619 # When the viewer closes the connection, the server's constructor
1620 # returns.
1621 class LttngLiveServer:
1622 def __init__(
1623 self,
1624 port: Optional[int],
1625 port_filename: Optional[str],
1626 tracing_session_descriptors: Iterable[LttngTracingSessionDescriptor],
1627 max_query_data_response_size: Optional[int],
1628 ):
1629 logging.info("Server configuration:")
1630
1631 logging.info(" Port file name: `{}`".format(port_filename))
1632
1633 if max_query_data_response_size is not None:
1634 logging.info(
1635 " Maximum response data query size: `{}`".format(
1636 max_query_data_response_size
1637 )
1638 )
1639
1640 for ts_descr in tracing_session_descriptors:
1641 info = ts_descr.info
1642 fmt = ' TS descriptor: name="{}", id={}, hostname="{}", live-timer-freq={}, client-count={}, stream-count={}:'
1643 logging.info(
1644 fmt.format(
1645 info.name,
1646 info.tracing_session_id,
1647 info.hostname,
1648 info.live_timer_freq,
1649 info.client_count,
1650 info.stream_count,
1651 )
1652 )
1653
1654 for trace in ts_descr.traces:
1655 logging.info(' Trace: path="{}"'.format(trace.path))
1656
1657 self._ts_descriptors = tracing_session_descriptors
1658 self._max_query_data_response_size = max_query_data_response_size
1659 self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1660 self._codec = _LttngLiveViewerProtocolCodec()
1661
1662 # Port 0: OS assigns an unused port
1663 serv_addr = ("localhost", port if port is not None else 0)
1664 self._sock.bind(serv_addr)
1665
1666 if port_filename is not None:
1667 self._write_port_to_file(port_filename)
1668
1669 print("Listening on port {}".format(self._server_port))
1670
1671 for ts_descr in tracing_session_descriptors:
1672 info = ts_descr.info
1673 print(
1674 "net://localhost:{}/host/{}/{}".format(
1675 self._server_port, info.hostname, info.name
1676 )
1677 )
1678
1679 try:
1680 self._listen()
1681 finally:
1682 self._sock.close()
1683 logging.info("Closed connection and socket.")
1684
1685 @property
1686 def _server_port(self):
1687 return self._sock.getsockname()[1]
1688
1689 def _recv_command(self):
1690 data = bytes()
1691
1692 while True:
1693 logging.info("Waiting for viewer command.")
1694 buf = self._conn.recv(128)
1695
1696 if not buf:
1697 logging.info("Client closed connection.")
1698
1699 if data:
1700 raise RuntimeError(
1701 "Client closed connection after having sent {} command bytes.".format(
1702 len(data)
1703 )
1704 )
1705
1706 return
1707
1708 logging.info("Received data from viewer: length={}".format(len(buf)))
1709
1710 data += buf
1711
1712 try:
1713 cmd = self._codec.decode(data)
1714 except struct.error as exc:
1715 raise RuntimeError("Malformed command: {}".format(exc)) from exc
1716
1717 if cmd is not None:
1718 logging.info(
1719 "Received command from viewer: cmd-cls-name={}".format(
1720 cmd.__class__.__name__
1721 )
1722 )
1723 return cmd
1724
1725 def _send_reply(self, reply: _LttngLiveViewerReply):
1726 data = self._codec.encode(reply)
1727 logging.info(
1728 "Sending reply to viewer: reply-cls-name={}, length={}".format(
1729 reply.__class__.__name__, len(data)
1730 )
1731 )
1732 self._conn.sendall(data)
1733
1734 def _handle_connection(self):
1735 # First command must be "connect"
1736 cmd = self._recv_command()
1737
1738 if type(cmd) is not _LttngLiveViewerConnectCommand:
1739 raise RuntimeError(
1740 'First command is not "connect": cmd-cls-name={}'.format(
1741 cmd.__class__.__name__
1742 )
1743 )
1744
1745 # Create viewer session (arbitrary ID 23)
1746 logging.info(
1747 "LTTng live viewer connected: version={}.{}".format(cmd.major, cmd.minor)
1748 )
1749 viewer_session = _LttngLiveViewerSession(
1750 23, self._ts_descriptors, self._max_query_data_response_size
1751 )
1752
1753 # Send "connect" reply
1754 self._send_reply(
1755 _LttngLiveViewerConnectReply(viewer_session.viewer_session_id, 2, 10)
1756 )
1757
1758 # Make the viewer session handle the remaining commands
1759 while True:
1760 cmd = self._recv_command()
1761
1762 if cmd is None:
1763 # Connection closed (at an expected location within the
1764 # conversation)
1765 return
1766
1767 self._send_reply(viewer_session.handle_command(cmd))
1768
1769 def _listen(self):
1770 logging.info("Listening: port={}".format(self._server_port))
1771 # Backlog must be present for Python version < 3.5.
1772 # 128 is an arbitrary number since we expect only 1 connection anyway.
1773 self._sock.listen(128)
1774 self._conn, viewer_addr = self._sock.accept()
1775 logging.info(
1776 "Accepted viewer: addr={}:{}".format(viewer_addr[0], viewer_addr[1])
1777 )
1778
1779 try:
1780 self._handle_connection()
1781 finally:
1782 self._conn.close()
1783
1784 def _write_port_to_file(self, port_filename: str):
1785 # Write the port number to a temporary file.
1786 with tempfile.NamedTemporaryFile(
1787 mode="w", delete=False, dir=os.path.dirname(port_filename)
1788 ) as tmp_port_file:
1789 print(self._server_port, end="", file=tmp_port_file)
1790
1791 # Rename temporary file to real file
1792 os.replace(tmp_port_file.name, port_filename)
1793 logging.info(
1794 'Renamed port file: src-path="{}", dst-path="{}"'.format(
1795 tmp_port_file.name, port_filename
1796 )
1797 )
1798
1799
1800 def _session_descriptors_from_path(
1801 sessions_filename: str, trace_path_prefix: Optional[str]
1802 ):
1803 # File format is:
1804 #
1805 # [
1806 # {
1807 # "name": "my-session",
1808 # "id": 17,
1809 # "hostname": "myhost",
1810 # "live-timer-freq": 1000000,
1811 # "client-count": 23,
1812 # "traces": [
1813 # {
1814 # "path": "lol"
1815 # },
1816 # {
1817 # "path": "meow/mix",
1818 # "beacons": {
1819 # "my_stream": [ 5235787, 728375283 ]
1820 # },
1821 # "metadata-sections": [
1822 # {
1823 # "line": 1,
1824 # "timestamp": 0
1825 # }
1826 # ]
1827 # }
1828 # ]
1829 # }
1830 # ]
1831 with open(sessions_filename, "r") as sessions_file:
1832 sessions_json = tjson.load(sessions_file, tjson.ArrayVal)
1833
1834 sessions = [] # type: list[LttngTracingSessionDescriptor]
1835
1836 for session_json in sessions_json.iter(tjson.ObjVal):
1837 name = session_json.at("name", tjson.StrVal).val
1838 tracing_session_id = session_json.at("id", tjson.IntVal).val
1839 hostname = session_json.at("hostname", tjson.StrVal).val
1840 live_timer_freq = session_json.at("live-timer-freq", tjson.IntVal).val
1841 client_count = session_json.at("client-count", tjson.IntVal).val
1842 traces_json = session_json.at("traces", tjson.ArrayVal)
1843
1844 traces = [] # type: list[LttngTrace]
1845
1846 for trace_json in traces_json.iter(tjson.ObjVal):
1847 metadata_sections = (
1848 trace_json.at("metadata-sections", tjson.ArrayVal)
1849 if "metadata-sections" in trace_json
1850 else None
1851 )
1852 beacons = (
1853 trace_json.at("beacons", tjson.ObjVal)
1854 if "beacons" in trace_json
1855 else None
1856 )
1857 path = trace_json.at("path", tjson.StrVal).val
1858
1859 if not os.path.isabs(path) and trace_path_prefix:
1860 path = os.path.join(trace_path_prefix, path)
1861
1862 traces.append(
1863 LttngTrace(
1864 path,
1865 metadata_sections,
1866 beacons,
1867 )
1868 )
1869
1870 sessions.append(
1871 LttngTracingSessionDescriptor(
1872 name,
1873 tracing_session_id,
1874 hostname,
1875 live_timer_freq,
1876 client_count,
1877 traces,
1878 )
1879 )
1880
1881 return sessions
1882
1883
1884 def _loglevel_parser(string: str):
1885 loglevels = {"info": logging.INFO, "warning": logging.WARNING}
1886 if string not in loglevels:
1887 msg = "{} is not a valid loglevel".format(string)
1888 raise argparse.ArgumentTypeError(msg)
1889 return loglevels[string]
1890
1891
1892 if __name__ == "__main__":
1893 logging.basicConfig(format="# %(asctime)-25s%(message)s")
1894 parser = argparse.ArgumentParser(
1895 description="LTTng-live protocol mocker", add_help=False
1896 )
1897 parser.add_argument(
1898 "--log-level",
1899 default="warning",
1900 choices=["info", "warning"],
1901 help="The loglevel to be used.",
1902 )
1903
1904 loglevel_namespace, remaining_args = parser.parse_known_args()
1905 logging.getLogger().setLevel(_loglevel_parser(loglevel_namespace.log_level))
1906
1907 parser.add_argument(
1908 "--port",
1909 help="The port to bind to. If missing, use an OS-assigned port..",
1910 type=int,
1911 )
1912 parser.add_argument(
1913 "--port-filename",
1914 help="The final port file. This file is present when the server is ready to receive connection.",
1915 )
1916 parser.add_argument(
1917 "--max-query-data-response-size",
1918 type=int,
1919 help="The maximum size of control data response in bytes",
1920 )
1921 parser.add_argument(
1922 "--trace-path-prefix",
1923 type=str,
1924 help="Prefix to prepend to the trace paths of session configurations",
1925 )
1926 parser.add_argument(
1927 "sessions_filename",
1928 type=str,
1929 help="Path to a session configuration file",
1930 metavar="sessions-filename",
1931 )
1932 parser.add_argument(
1933 "-h",
1934 "--help",
1935 action="help",
1936 default=argparse.SUPPRESS,
1937 help="Show this help message and exit.",
1938 )
1939
1940 args = parser.parse_args(args=remaining_args)
1941 sessions_filename = args.sessions_filename # type: str
1942 trace_path_prefix = args.trace_path_prefix # type: str | None
1943 sessions = _session_descriptors_from_path(
1944 sessions_filename,
1945 trace_path_prefix,
1946 )
1947
1948 port = args.port # type: int | None
1949 port_filename = args.port_filename # type: str | None
1950 max_query_data_response_size = args.max_query_data_response_size # type: int | None
1951 LttngLiveServer(port, port_filename, sessions, max_query_data_response_size)
This page took 0.101745 seconds and 5 git commands to generate.