cleanup: tests: remove unused files from lf / crlf traces
[babeltrace.git] / tests / data / plugins / src.ctf.lttng-live / lttng_live_server.py
... / ...
CommitLineData
1# SPDX-License-Identifier: MIT
2#
3# Copyright (C) 2019 Philippe Proulx <pproulx@efficios.com>
4#
5
6# pyright: strict, reportTypeCommentUsage=false, reportMissingTypeStubs=false
7
8import os
9import re
10import socket
11import struct
12import logging
13import os.path
14import argparse
15import tempfile
16from typing import Dict, Union, Iterable, Optional, Sequence, overload
17
18import tjson
19
20# isort: off
21from typing import Any, Callable # noqa: F401
22
23# isort: on
24
25
26# An entry within the index of an LTTng data stream.
27class _LttngDataStreamIndexEntry:
28 def __init__(
29 self,
30 offset_bytes: int,
31 total_size_bits: int,
32 content_size_bits: int,
33 timestamp_begin: int,
34 timestamp_end: int,
35 events_discarded: int,
36 stream_class_id: int,
37 ):
38 self._offset_bytes = offset_bytes
39 self._total_size_bits = total_size_bits
40 self._content_size_bits = content_size_bits
41 self._timestamp_begin = timestamp_begin
42 self._timestamp_end = timestamp_end
43 self._events_discarded = events_discarded
44 self._stream_class_id = stream_class_id
45
46 @property
47 def offset_bytes(self):
48 return self._offset_bytes
49
50 @property
51 def total_size_bits(self):
52 return self._total_size_bits
53
54 @property
55 def total_size_bytes(self):
56 return self._total_size_bits // 8
57
58 @property
59 def content_size_bits(self):
60 return self._content_size_bits
61
62 @property
63 def content_size_bytes(self):
64 return self._content_size_bits // 8
65
66 @property
67 def timestamp_begin(self):
68 return self._timestamp_begin
69
70 @property
71 def timestamp_end(self):
72 return self._timestamp_end
73
74 @property
75 def events_discarded(self):
76 return self._events_discarded
77
78 @property
79 def stream_class_id(self):
80 return self._stream_class_id
81
82
83# An entry within the index of an LTTng data stream. While a stream beacon entry
84# is conceptually unrelated to an index, it is sent as a reply to a
85# LttngLiveViewerGetNextDataStreamIndexEntryCommand
86class _LttngDataStreamBeaconIndexEntry:
87 def __init__(self, stream_class_id: int, timestamp: int):
88 self._stream_class_id = stream_class_id
89 self._timestamp = timestamp
90
91 @property
92 def timestamp(self):
93 return self._timestamp
94
95 @property
96 def stream_class_id(self):
97 return self._stream_class_id
98
99
100_LttngIndexEntryT = Union[_LttngDataStreamIndexEntry, _LttngDataStreamBeaconIndexEntry]
101
102
103class _LttngLiveViewerCommand:
104 def __init__(self, version: int):
105 self._version = version
106
107 @property
108 def version(self):
109 return self._version
110
111
112class _LttngLiveViewerConnectCommand(_LttngLiveViewerCommand):
113 def __init__(self, version: int, viewer_session_id: int, major: int, minor: int):
114 super().__init__(version)
115 self._viewer_session_id = viewer_session_id
116 self._major = major
117 self._minor = minor
118
119 @property
120 def viewer_session_id(self):
121 return self._viewer_session_id
122
123 @property
124 def major(self):
125 return self._major
126
127 @property
128 def minor(self):
129 return self._minor
130
131
132class _LttngLiveViewerReply:
133 pass
134
135
136class _LttngLiveViewerConnectReply(_LttngLiveViewerReply):
137 def __init__(self, viewer_session_id: int, major: int, minor: int):
138 self._viewer_session_id = viewer_session_id
139 self._major = major
140 self._minor = minor
141
142 @property
143 def viewer_session_id(self):
144 return self._viewer_session_id
145
146 @property
147 def major(self):
148 return self._major
149
150 @property
151 def minor(self):
152 return self._minor
153
154
155class _LttngLiveViewerGetTracingSessionInfosCommand(_LttngLiveViewerCommand):
156 pass
157
158
159class _LttngLiveViewerTracingSessionInfo:
160 def __init__(
161 self,
162 tracing_session_id: int,
163 live_timer_freq: int,
164 client_count: int,
165 stream_count: int,
166 hostname: str,
167 name: str,
168 ):
169 self._tracing_session_id = tracing_session_id
170 self._live_timer_freq = live_timer_freq
171 self._client_count = client_count
172 self._stream_count = stream_count
173 self._hostname = hostname
174 self._name = name
175
176 @property
177 def tracing_session_id(self):
178 return self._tracing_session_id
179
180 @property
181 def live_timer_freq(self):
182 return self._live_timer_freq
183
184 @property
185 def client_count(self):
186 return self._client_count
187
188 @property
189 def stream_count(self):
190 return self._stream_count
191
192 @property
193 def hostname(self):
194 return self._hostname
195
196 @property
197 def name(self):
198 return self._name
199
200
201class _LttngLiveViewerGetTracingSessionInfosReply(_LttngLiveViewerReply):
202 def __init__(
203 self, tracing_session_infos: Sequence[_LttngLiveViewerTracingSessionInfo]
204 ):
205 self._tracing_session_infos = tracing_session_infos
206
207 @property
208 def tracing_session_infos(self):
209 return self._tracing_session_infos
210
211
212class _LttngLiveViewerAttachToTracingSessionCommand(_LttngLiveViewerCommand):
213 class SeekType:
214 BEGINNING = 1
215 LAST = 2
216
217 def __init__(
218 self, version: int, tracing_session_id: int, offset: int, seek_type: int
219 ):
220 super().__init__(version)
221 self._tracing_session_id = tracing_session_id
222 self._offset = offset
223 self._seek_type = seek_type
224
225 @property
226 def tracing_session_id(self):
227 return self._tracing_session_id
228
229 @property
230 def offset(self):
231 return self._offset
232
233 @property
234 def seek_type(self):
235 return self._seek_type
236
237
238class _LttngLiveViewerStreamInfo:
239 def __init__(
240 self, id: int, trace_id: int, is_metadata: bool, path: str, channel_name: str
241 ):
242 self._id = id
243 self._trace_id = trace_id
244 self._is_metadata = is_metadata
245 self._path = path
246 self._channel_name = channel_name
247
248 @property
249 def id(self):
250 return self._id
251
252 @property
253 def trace_id(self):
254 return self._trace_id
255
256 @property
257 def is_metadata(self):
258 return self._is_metadata
259
260 @property
261 def path(self):
262 return self._path
263
264 @property
265 def channel_name(self):
266 return self._channel_name
267
268
269class _LttngLiveViewerAttachToTracingSessionReply(_LttngLiveViewerReply):
270 class Status:
271 OK = 1
272 ALREADY = 2
273 UNKNOWN = 3
274 NOT_LIVE = 4
275 SEEK_ERROR = 5
276 NO_SESSION = 6
277
278 def __init__(self, status: int, stream_infos: Sequence[_LttngLiveViewerStreamInfo]):
279 self._status = status
280 self._stream_infos = stream_infos
281
282 @property
283 def status(self):
284 return self._status
285
286 @property
287 def stream_infos(self):
288 return self._stream_infos
289
290
291class _LttngLiveViewerGetNextDataStreamIndexEntryCommand(_LttngLiveViewerCommand):
292 def __init__(self, version: int, stream_id: int):
293 super().__init__(version)
294 self._stream_id = stream_id
295
296 @property
297 def stream_id(self):
298 return self._stream_id
299
300
301class _LttngLiveViewerGetNextDataStreamIndexEntryReply(_LttngLiveViewerReply):
302 class Status:
303 OK = 1
304 RETRY = 2
305 HUP = 3
306 ERROR = 4
307 INACTIVE = 5
308 EOF = 6
309
310 def __init__(
311 self,
312 status: int,
313 index_entry: _LttngIndexEntryT,
314 has_new_metadata: bool,
315 has_new_data_stream: bool,
316 ):
317 self._status = status
318 self._index_entry = index_entry
319 self._has_new_metadata = has_new_metadata
320 self._has_new_data_stream = has_new_data_stream
321
322 @property
323 def status(self):
324 return self._status
325
326 @property
327 def index_entry(self):
328 return self._index_entry
329
330 @property
331 def has_new_metadata(self):
332 return self._has_new_metadata
333
334 @property
335 def has_new_data_stream(self):
336 return self._has_new_data_stream
337
338
339class _LttngLiveViewerGetDataStreamPacketDataCommand(_LttngLiveViewerCommand):
340 def __init__(self, version: int, stream_id: int, offset: int, req_length: int):
341 super().__init__(version)
342 self._stream_id = stream_id
343 self._offset = offset
344 self._req_length = req_length
345
346 @property
347 def stream_id(self):
348 return self._stream_id
349
350 @property
351 def offset(self):
352 return self._offset
353
354 @property
355 def req_length(self):
356 return self._req_length
357
358
359class _LttngLiveViewerGetDataStreamPacketDataReply(_LttngLiveViewerReply):
360 class Status:
361 OK = 1
362 RETRY = 2
363 ERROR = 3
364 EOF = 4
365
366 def __init__(
367 self,
368 status: int,
369 data: bytes,
370 has_new_metadata: bool,
371 has_new_data_stream: bool,
372 ):
373 self._status = status
374 self._data = data
375 self._has_new_metadata = has_new_metadata
376 self._has_new_data_stream = has_new_data_stream
377
378 @property
379 def status(self):
380 return self._status
381
382 @property
383 def data(self):
384 return self._data
385
386 @property
387 def has_new_metadata(self):
388 return self._has_new_metadata
389
390 @property
391 def has_new_data_stream(self):
392 return self._has_new_data_stream
393
394
395class _LttngLiveViewerGetMetadataStreamDataCommand(_LttngLiveViewerCommand):
396 def __init__(self, version: int, stream_id: int):
397 super().__init__(version)
398 self._stream_id = stream_id
399
400 @property
401 def stream_id(self):
402 return self._stream_id
403
404
405class _LttngLiveViewerGetMetadataStreamDataContentReply(_LttngLiveViewerReply):
406 class Status:
407 OK = 1
408 NO_NEW = 2
409 ERROR = 3
410
411 def __init__(self, status: int, data: bytes):
412 self._status = status
413 self._data = data
414
415 @property
416 def status(self):
417 return self._status
418
419 @property
420 def data(self):
421 return self._data
422
423
424class _LttngLiveViewerGetNewStreamInfosCommand(_LttngLiveViewerCommand):
425 def __init__(self, version: int, tracing_session_id: int):
426 super().__init__(version)
427 self._tracing_session_id = tracing_session_id
428
429 @property
430 def tracing_session_id(self):
431 return self._tracing_session_id
432
433
434class _LttngLiveViewerGetNewStreamInfosReply(_LttngLiveViewerReply):
435 class Status:
436 OK = 1
437 NO_NEW = 2
438 ERROR = 3
439 HUP = 4
440
441 def __init__(self, status: int, stream_infos: Sequence[_LttngLiveViewerStreamInfo]):
442 self._status = status
443 self._stream_infos = stream_infos
444
445 @property
446 def status(self):
447 return self._status
448
449 @property
450 def stream_infos(self):
451 return self._stream_infos
452
453
454class _LttngLiveViewerCreateViewerSessionCommand(_LttngLiveViewerCommand):
455 pass
456
457
458class _LttngLiveViewerCreateViewerSessionReply(_LttngLiveViewerReply):
459 class Status:
460 OK = 1
461 ERROR = 2
462
463 def __init__(self, status: int):
464 self._status = status
465
466 @property
467 def status(self):
468 return self._status
469
470
471class _LttngLiveViewerDetachFromTracingSessionCommand(_LttngLiveViewerCommand):
472 def __init__(self, version: int, tracing_session_id: int):
473 super().__init__(version)
474 self._tracing_session_id = tracing_session_id
475
476 @property
477 def tracing_session_id(self):
478 return self._tracing_session_id
479
480
481class _LttngLiveViewerDetachFromTracingSessionReply(_LttngLiveViewerReply):
482 class Status:
483 OK = 1
484 UNKNOWN = 2
485 ERROR = 3
486
487 def __init__(self, status: int):
488 self._status = status
489
490 @property
491 def status(self):
492 return self._status
493
494
495# An LTTng live protocol codec can convert bytes to command objects and
496# reply objects to bytes.
497class _LttngLiveViewerProtocolCodec:
498 _COMMAND_HEADER_STRUCT_FMT = "QII"
499 _COMMAND_HEADER_SIZE_BYTES = struct.calcsize(_COMMAND_HEADER_STRUCT_FMT)
500
501 def __init__(self):
502 pass
503
504 def _unpack(self, fmt: str, data: bytes, offset: int = 0):
505 fmt = "!" + fmt
506 return struct.unpack_from(fmt, data, offset)
507
508 def _unpack_payload(self, fmt: str, data: bytes):
509 return self._unpack(
510 fmt, data, _LttngLiveViewerProtocolCodec._COMMAND_HEADER_SIZE_BYTES
511 )
512
513 def decode(self, data: bytes):
514 if len(data) < self._COMMAND_HEADER_SIZE_BYTES:
515 # Not enough data to read the command header
516 return
517
518 payload_size, cmd_type, version = self._unpack(
519 self._COMMAND_HEADER_STRUCT_FMT, data
520 )
521 logging.info(
522 "Decoded command header: payload-size={}, cmd-type={}, version={}".format(
523 payload_size, cmd_type, version
524 )
525 )
526
527 if len(data) < self._COMMAND_HEADER_SIZE_BYTES + payload_size:
528 # Not enough data to read the whole command
529 return
530
531 if cmd_type == 1:
532 viewer_session_id, major, minor, _ = self._unpack_payload("QIII", data)
533 return _LttngLiveViewerConnectCommand(
534 version, viewer_session_id, major, minor
535 )
536 elif cmd_type == 2:
537 return _LttngLiveViewerGetTracingSessionInfosCommand(version)
538 elif cmd_type == 3:
539 tracing_session_id, offset, seek_type = self._unpack_payload("QQI", data)
540 return _LttngLiveViewerAttachToTracingSessionCommand(
541 version, tracing_session_id, offset, seek_type
542 )
543 elif cmd_type == 4:
544 (stream_id,) = self._unpack_payload("Q", data)
545 return _LttngLiveViewerGetNextDataStreamIndexEntryCommand(
546 version, stream_id
547 )
548 elif cmd_type == 5:
549 stream_id, offset, req_length = self._unpack_payload("QQI", data)
550 return _LttngLiveViewerGetDataStreamPacketDataCommand(
551 version, stream_id, offset, req_length
552 )
553 elif cmd_type == 6:
554 (stream_id,) = self._unpack_payload("Q", data)
555 return _LttngLiveViewerGetMetadataStreamDataCommand(version, stream_id)
556 elif cmd_type == 7:
557 (tracing_session_id,) = self._unpack_payload("Q", data)
558 return _LttngLiveViewerGetNewStreamInfosCommand(version, tracing_session_id)
559 elif cmd_type == 8:
560 return _LttngLiveViewerCreateViewerSessionCommand(version)
561 elif cmd_type == 9:
562 (tracing_session_id,) = self._unpack_payload("Q", data)
563 return _LttngLiveViewerDetachFromTracingSessionCommand(
564 version, tracing_session_id
565 )
566 else:
567 raise RuntimeError("Unknown command type {}".format(cmd_type))
568
569 def _pack(self, fmt: str, *args: Any):
570 # Force network byte order
571 return struct.pack("!" + fmt, *args)
572
573 def _encode_zero_padded_str(self, string: str, length: int):
574 data = string.encode()
575 return data.ljust(length, b"\x00")
576
577 def _encode_stream_info(self, info: _LttngLiveViewerStreamInfo):
578 data = self._pack("QQI", info.id, info.trace_id, int(info.is_metadata))
579 data += self._encode_zero_padded_str(info.path, 4096)
580 data += self._encode_zero_padded_str(info.channel_name, 255)
581 return data
582
583 def _get_has_new_stuff_flags(
584 self, has_new_metadata: bool, has_new_data_streams: bool
585 ):
586 flags = 0
587
588 if has_new_metadata:
589 flags |= 1
590
591 if has_new_data_streams:
592 flags |= 2
593
594 return flags
595
596 def encode(
597 self,
598 reply: _LttngLiveViewerReply,
599 ) -> bytes:
600 if type(reply) is _LttngLiveViewerConnectReply:
601 data = self._pack(
602 "QIII", reply.viewer_session_id, reply.major, reply.minor, 2
603 )
604 elif type(reply) is _LttngLiveViewerGetTracingSessionInfosReply:
605 data = self._pack("I", len(reply.tracing_session_infos))
606
607 for info in reply.tracing_session_infos:
608 data += self._pack(
609 "QIII",
610 info.tracing_session_id,
611 info.live_timer_freq,
612 info.client_count,
613 info.stream_count,
614 )
615 data += self._encode_zero_padded_str(info.hostname, 64)
616 data += self._encode_zero_padded_str(info.name, 255)
617 elif type(reply) is _LttngLiveViewerAttachToTracingSessionReply:
618 data = self._pack("II", reply.status, len(reply.stream_infos))
619
620 for info in reply.stream_infos:
621 data += self._encode_stream_info(info)
622 elif type(reply) is _LttngLiveViewerGetNextDataStreamIndexEntryReply:
623 index_format = "QQQQQQQII"
624 entry = reply.index_entry
625 flags = self._get_has_new_stuff_flags(
626 reply.has_new_metadata, reply.has_new_data_stream
627 )
628
629 if isinstance(entry, _LttngDataStreamIndexEntry):
630 data = self._pack(
631 index_format,
632 entry.offset_bytes,
633 entry.total_size_bits,
634 entry.content_size_bits,
635 entry.timestamp_begin,
636 entry.timestamp_end,
637 entry.events_discarded,
638 entry.stream_class_id,
639 reply.status,
640 flags,
641 )
642 else:
643 data = self._pack(
644 index_format,
645 0,
646 0,
647 0,
648 0,
649 entry.timestamp,
650 0,
651 entry.stream_class_id,
652 reply.status,
653 flags,
654 )
655 elif type(reply) is _LttngLiveViewerGetDataStreamPacketDataReply:
656 flags = self._get_has_new_stuff_flags(
657 reply.has_new_metadata, reply.has_new_data_stream
658 )
659 data = self._pack("III", reply.status, len(reply.data), flags)
660 data += reply.data
661 elif type(reply) is _LttngLiveViewerGetMetadataStreamDataContentReply:
662 data = self._pack("QI", len(reply.data), reply.status)
663 data += reply.data
664 elif type(reply) is _LttngLiveViewerGetNewStreamInfosReply:
665 data = self._pack("II", reply.status, len(reply.stream_infos))
666
667 for info in reply.stream_infos:
668 data += self._encode_stream_info(info)
669 elif type(reply) is _LttngLiveViewerCreateViewerSessionReply:
670 data = self._pack("I", reply.status)
671 elif type(reply) is _LttngLiveViewerDetachFromTracingSessionReply:
672 data = self._pack("I", reply.status)
673 else:
674 raise ValueError(
675 "Unknown reply object with class `{}`".format(reply.__class__.__name__)
676 )
677
678 return data
679
680
681def _get_entry_timestamp_begin(
682 entry: _LttngIndexEntryT,
683):
684 if isinstance(entry, _LttngDataStreamBeaconIndexEntry):
685 return entry.timestamp
686 else:
687 return entry.timestamp_begin
688
689
690# The index of an LTTng data stream, a sequence of index entries.
691class _LttngDataStreamIndex(Sequence[_LttngIndexEntryT]):
692 def __init__(self, path: str, beacons: Optional[tjson.ArrayVal]):
693 self._path = path
694 self._build()
695
696 if beacons:
697 stream_class_id = self._entries[0].stream_class_id
698
699 beacons_list = [] # type: list[_LttngDataStreamBeaconIndexEntry]
700 for ts in beacons.iter(tjson.IntVal):
701 beacons_list.append(
702 _LttngDataStreamBeaconIndexEntry(stream_class_id, ts.val)
703 )
704
705 self._add_beacons(beacons_list)
706
707 logging.info(
708 'Built data stream index entries: path="{}", count={}'.format(
709 path, len(self._entries)
710 )
711 )
712
713 def _build(self):
714 self._entries = [] # type: list[_LttngIndexEntryT]
715
716 with open(self._path, "rb") as f:
717 # Read header first
718 fmt = ">IIII"
719 size = struct.calcsize(fmt)
720 data = f.read(size)
721 assert len(data) == size
722 magic, _, _, index_entry_length = struct.unpack(fmt, data)
723 assert magic == 0xC1F1DCC1
724
725 # Read index entries
726 fmt = ">QQQQQQQ"
727 size = struct.calcsize(fmt)
728
729 while True:
730 logging.debug(
731 'Decoding data stream index entry: path="{}", offset={}'.format(
732 self._path, f.tell()
733 )
734 )
735 data = f.read(size)
736
737 if not data:
738 # Done
739 break
740
741 assert len(data) == size
742 (
743 offset_bytes,
744 total_size_bits,
745 content_size_bits,
746 timestamp_begin,
747 timestamp_end,
748 events_discarded,
749 stream_class_id,
750 ) = struct.unpack(fmt, data)
751
752 self._entries.append(
753 _LttngDataStreamIndexEntry(
754 offset_bytes,
755 total_size_bits,
756 content_size_bits,
757 timestamp_begin,
758 timestamp_end,
759 events_discarded,
760 stream_class_id,
761 )
762 )
763
764 # Skip anything else before the next entry
765 f.seek(index_entry_length - size, os.SEEK_CUR)
766
767 def _add_beacons(self, beacons: Iterable[_LttngDataStreamBeaconIndexEntry]):
768 # Assumes entries[n + 1].timestamp_end >= entries[n].timestamp_begin
769 def sort_key(
770 entry: Union[_LttngDataStreamIndexEntry, _LttngDataStreamBeaconIndexEntry],
771 ) -> int:
772 if isinstance(entry, _LttngDataStreamBeaconIndexEntry):
773 return entry.timestamp
774 else:
775 return entry.timestamp_end
776
777 self._entries += beacons
778 self._entries.sort(key=sort_key)
779
780 @overload
781 def __getitem__(self, index: int) -> _LttngIndexEntryT:
782 ...
783
784 @overload
785 def __getitem__(self, index: slice) -> Sequence[_LttngIndexEntryT]: # noqa: F811
786 ...
787
788 def __getitem__( # noqa: F811
789 self, index: Union[int, slice]
790 ) -> Union[_LttngIndexEntryT, Sequence[_LttngIndexEntryT],]:
791 return self._entries[index]
792
793 def __len__(self):
794 return len(self._entries)
795
796 @property
797 def path(self):
798 return self._path
799
800
801# An LTTng data stream.
802class _LttngDataStream:
803 def __init__(self, path: str, beacons_json: Optional[tjson.ArrayVal]):
804 self._path = path
805 filename = os.path.basename(path)
806 match = re.match(r"(.*)_\d+", filename)
807 if not match:
808 raise RuntimeError(
809 "Unexpected data stream file name pattern: {}".format(filename)
810 )
811
812 self._channel_name = match.group(1)
813 trace_dir = os.path.dirname(path)
814 index_path = os.path.join(trace_dir, "index", filename + ".idx")
815 self._index = _LttngDataStreamIndex(index_path, beacons_json)
816 assert os.path.isfile(path)
817 self._file = open(path, "rb")
818 logging.info(
819 'Built data stream: path="{}", channel-name="{}"'.format(
820 path, self._channel_name
821 )
822 )
823
824 @property
825 def path(self):
826 return self._path
827
828 @property
829 def channel_name(self):
830 return self._channel_name
831
832 @property
833 def index(self):
834 return self._index
835
836 def get_data(self, offset_bytes: int, len_bytes: int):
837 self._file.seek(offset_bytes)
838 return self._file.read(len_bytes)
839
840
841class _LttngMetadataStreamSection:
842 def __init__(self, timestamp: int, data: Optional[bytes]):
843 self._timestamp = timestamp
844 if data is None:
845 self._data = bytes()
846 else:
847 self._data = data
848 logging.info(
849 "Built metadata stream section: ts={}, data-len={}".format(
850 self._timestamp, len(self._data)
851 )
852 )
853
854 @property
855 def timestamp(self):
856 return self._timestamp
857
858 @property
859 def data(self):
860 return self._data
861
862
863# An LTTng metadata stream.
864class _LttngMetadataStream:
865 def __init__(
866 self,
867 metadata_file_path: str,
868 config_sections: Sequence[_LttngMetadataStreamSection],
869 ):
870 self._path = metadata_file_path
871 self._sections = config_sections
872 logging.info(
873 "Built metadata stream: path={}, section-len={}".format(
874 self._path, len(self._sections)
875 )
876 )
877
878 @property
879 def path(self):
880 return self._path
881
882 @property
883 def sections(self):
884 return self._sections
885
886
887class LttngMetadataConfigSection:
888 def __init__(self, line: int, timestamp: int, is_empty: bool):
889 self._line = line
890 self._timestamp = timestamp
891 self._is_empty = is_empty
892
893 @property
894 def line(self):
895 return self._line
896
897 @property
898 def timestamp(self):
899 return self._timestamp
900
901 @property
902 def is_empty(self):
903 return self._is_empty
904
905
906def _parse_metadata_sections_config(metadata_sections_json: tjson.ArrayVal):
907 metadata_sections = [] # type: list[LttngMetadataConfigSection]
908 append_empty_section = False
909 last_timestamp = 0
910 last_line = 0
911
912 for section in metadata_sections_json:
913 if isinstance(section, tjson.StrVal):
914 if section.val == "empty":
915 # Found a empty section marker. Actually append the section at the
916 # timestamp of the next concrete section.
917 append_empty_section = True
918 else:
919 raise ValueError("Invalid string value at {}.".format(section.path))
920 elif isinstance(section, tjson.ObjVal):
921 line = section.at("line", tjson.IntVal).val
922 ts = section.at("timestamp", tjson.IntVal).val
923
924 # Sections' timestamps and lines must both be increasing.
925 assert ts > last_timestamp
926 last_timestamp = ts
927
928 assert line > last_line
929 last_line = line
930
931 if append_empty_section:
932 metadata_sections.append(LttngMetadataConfigSection(line, ts, True))
933 append_empty_section = False
934
935 metadata_sections.append(LttngMetadataConfigSection(line, ts, False))
936 else:
937 raise TypeError(
938 "`{}`: expecting a string or object value".format(section.path)
939 )
940
941 return metadata_sections
942
943
944def _split_metadata_sections(
945 metadata_file_path: str, metadata_sections_json: tjson.ArrayVal
946):
947 metadata_sections = _parse_metadata_sections_config(metadata_sections_json)
948
949 sections = [] # type: list[_LttngMetadataStreamSection]
950 with open(metadata_file_path, "r") as metadata_file:
951 metadata_lines = [line for line in metadata_file]
952
953 metadata_section_idx = 0
954 curr_metadata_section = bytearray()
955
956 for idx, line_content in enumerate(metadata_lines):
957 # Add one to the index to convert from the zero-indexing of the
958 # enumerate() function to the one-indexing used by humans when
959 # viewing a text file.
960 curr_line_number = idx + 1
961
962 # If there are no more sections, simply append the line.
963 if metadata_section_idx + 1 >= len(metadata_sections):
964 curr_metadata_section += bytearray(line_content, "utf8")
965 continue
966
967 next_section_line_number = metadata_sections[metadata_section_idx + 1].line
968
969 # If the next section begins at the current line, create a
970 # section with the metadata we gathered so far.
971 if curr_line_number >= next_section_line_number:
972 # Flushing the metadata of the current section.
973 sections.append(
974 _LttngMetadataStreamSection(
975 metadata_sections[metadata_section_idx].timestamp,
976 bytes(curr_metadata_section),
977 )
978 )
979
980 # Move to the next section.
981 metadata_section_idx += 1
982
983 # Clear old content and append current line for the next section.
984 curr_metadata_section.clear()
985 curr_metadata_section += bytearray(line_content, "utf8")
986
987 # Append any empty sections.
988 while metadata_sections[metadata_section_idx].is_empty:
989 sections.append(
990 _LttngMetadataStreamSection(
991 metadata_sections[metadata_section_idx].timestamp, None
992 )
993 )
994 metadata_section_idx += 1
995 else:
996 # Append line_content to the current metadata section.
997 curr_metadata_section += bytearray(line_content, "utf8")
998
999 # We iterated over all the lines of the metadata file. Close the current section.
1000 sections.append(
1001 _LttngMetadataStreamSection(
1002 metadata_sections[metadata_section_idx].timestamp,
1003 bytes(curr_metadata_section),
1004 )
1005 )
1006
1007 return sections
1008
1009
1010_StreamBeaconsT = Dict[str, Iterable[int]]
1011
1012
1013# An LTTng trace, a sequence of LTTng data streams.
1014class LttngTrace(Sequence[_LttngDataStream]):
1015 def __init__(
1016 self,
1017 trace_dir: str,
1018 metadata_sections_json: Optional[tjson.ArrayVal],
1019 beacons_json: Optional[tjson.ObjVal],
1020 ):
1021 self._path = trace_dir
1022 self._create_metadata_stream(trace_dir, metadata_sections_json)
1023 self._create_data_streams(trace_dir, beacons_json)
1024 logging.info('Built trace: path="{}"'.format(trace_dir))
1025
1026 def _create_data_streams(
1027 self, trace_dir: str, beacons_json: Optional[tjson.ObjVal]
1028 ):
1029 data_stream_paths = [] # type: list[str]
1030
1031 for filename in os.listdir(trace_dir):
1032 path = os.path.join(trace_dir, filename)
1033
1034 if not os.path.isfile(path):
1035 continue
1036
1037 if filename.startswith("."):
1038 continue
1039
1040 if filename == "metadata":
1041 continue
1042
1043 data_stream_paths.append(path)
1044
1045 data_stream_paths.sort()
1046 self._data_streams = [] # type: list[_LttngDataStream]
1047
1048 for data_stream_path in data_stream_paths:
1049 stream_name = os.path.basename(data_stream_path)
1050 this_beacons_json = None
1051 if beacons_json is not None and stream_name in beacons_json:
1052 this_beacons_json = beacons_json.at(stream_name, tjson.ArrayVal)
1053
1054 self._data_streams.append(
1055 _LttngDataStream(data_stream_path, this_beacons_json)
1056 )
1057
1058 def _create_metadata_stream(
1059 self, trace_dir: str, metadata_sections_json: Optional[tjson.ArrayVal]
1060 ):
1061 metadata_path = os.path.join(trace_dir, "metadata")
1062 metadata_sections = [] # type: list[_LttngMetadataStreamSection]
1063
1064 if metadata_sections_json is None:
1065 with open(metadata_path, "rb") as metadata_file:
1066 metadata_sections.append(
1067 _LttngMetadataStreamSection(0, metadata_file.read())
1068 )
1069 else:
1070 metadata_sections = _split_metadata_sections(
1071 metadata_path, metadata_sections_json
1072 )
1073
1074 self._metadata_stream = _LttngMetadataStream(metadata_path, metadata_sections)
1075
1076 @property
1077 def path(self):
1078 return self._path
1079
1080 @property
1081 def metadata_stream(self):
1082 return self._metadata_stream
1083
1084 @overload
1085 def __getitem__(self, index: int) -> _LttngDataStream:
1086 ...
1087
1088 @overload
1089 def __getitem__(self, index: slice) -> Sequence[_LttngDataStream]: # noqa: F811
1090 ...
1091
1092 def __getitem__( # noqa: F811
1093 self, index: Union[int, slice]
1094 ) -> Union[_LttngDataStream, Sequence[_LttngDataStream]]:
1095 return self._data_streams[index]
1096
1097 def __len__(self):
1098 return len(self._data_streams)
1099
1100
1101# The state of a single data stream.
1102class _LttngLiveViewerSessionDataStreamState:
1103 def __init__(
1104 self,
1105 ts_state: "_LttngLiveViewerSessionTracingSessionState",
1106 info: _LttngLiveViewerStreamInfo,
1107 data_stream: _LttngDataStream,
1108 metadata_stream_id: int,
1109 ):
1110 self._ts_state = ts_state
1111 self._info = info
1112 self._data_stream = data_stream
1113 self._metadata_stream_id = metadata_stream_id
1114 self._cur_index_entry_index = 0
1115 fmt = 'Built data stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
1116 logging.info(
1117 fmt.format(
1118 info.id,
1119 ts_state.tracing_session_descriptor.info.tracing_session_id,
1120 ts_state.tracing_session_descriptor.info.name,
1121 data_stream.path,
1122 )
1123 )
1124
1125 @property
1126 def tracing_session_state(self):
1127 return self._ts_state
1128
1129 @property
1130 def info(self):
1131 return self._info
1132
1133 @property
1134 def data_stream(self):
1135 return self._data_stream
1136
1137 @property
1138 def cur_index_entry(self):
1139 if self._cur_index_entry_index == len(self._data_stream.index):
1140 return
1141
1142 return self._data_stream.index[self._cur_index_entry_index]
1143
1144 @property
1145 def metadata_stream_id(self):
1146 return self._metadata_stream_id
1147
1148 def goto_next_index_entry(self):
1149 self._cur_index_entry_index += 1
1150
1151
1152# The state of a single metadata stream.
1153class _LttngLiveViewerSessionMetadataStreamState:
1154 def __init__(
1155 self,
1156 ts_state: "_LttngLiveViewerSessionTracingSessionState",
1157 info: _LttngLiveViewerStreamInfo,
1158 metadata_stream: _LttngMetadataStream,
1159 ):
1160 self._ts_state = ts_state
1161 self._info = info
1162 self._metadata_stream = metadata_stream
1163 self._cur_metadata_stream_section_index = 0
1164 if len(metadata_stream.sections) > 1:
1165 self._next_metadata_stream_section_timestamp = metadata_stream.sections[
1166 1
1167 ].timestamp
1168 else:
1169 self._next_metadata_stream_section_timestamp = None
1170
1171 self._is_sent = False
1172 fmt = 'Built metadata stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
1173 logging.info(
1174 fmt.format(
1175 info.id,
1176 ts_state.tracing_session_descriptor.info.tracing_session_id,
1177 ts_state.tracing_session_descriptor.info.name,
1178 metadata_stream.path,
1179 )
1180 )
1181
1182 @property
1183 def info(self):
1184 return self._info
1185
1186 @property
1187 def metadata_stream(self):
1188 return self._metadata_stream
1189
1190 @property
1191 def is_sent(self):
1192 return self._is_sent
1193
1194 @is_sent.setter
1195 def is_sent(self, value: bool):
1196 self._is_sent = value
1197
1198 @property
1199 def cur_section(self):
1200 fmt = "Get current metadata section: section-idx={}"
1201 logging.info(fmt.format(self._cur_metadata_stream_section_index))
1202 if self._cur_metadata_stream_section_index == len(
1203 self._metadata_stream.sections
1204 ):
1205 return
1206
1207 return self._metadata_stream.sections[self._cur_metadata_stream_section_index]
1208
1209 def goto_next_section(self):
1210 self._cur_metadata_stream_section_index += 1
1211 if self.cur_section:
1212 self._next_metadata_stream_section_timestamp = self.cur_section.timestamp
1213 else:
1214 self._next_metadata_stream_section_timestamp = None
1215
1216 @property
1217 def next_section_timestamp(self):
1218 return self._next_metadata_stream_section_timestamp
1219
1220
1221# A tracing session descriptor.
1222#
1223# In the constructor, `traces` is a list of LTTng traces (`LttngTrace`
1224# objects).
1225class LttngTracingSessionDescriptor:
1226 def __init__(
1227 self,
1228 name: str,
1229 tracing_session_id: int,
1230 hostname: str,
1231 live_timer_freq: int,
1232 client_count: int,
1233 traces: Iterable[LttngTrace],
1234 ):
1235 for trace in traces:
1236 if name not in trace.path:
1237 fmt = "Tracing session name must be part of every trace path (`{}` not found in `{}`)"
1238 raise ValueError(fmt.format(name, trace.path))
1239
1240 self._traces = traces
1241 stream_count = sum([len(t) + 1 for t in traces])
1242 self._info = _LttngLiveViewerTracingSessionInfo(
1243 tracing_session_id,
1244 live_timer_freq,
1245 client_count,
1246 stream_count,
1247 hostname,
1248 name,
1249 )
1250
1251 @property
1252 def traces(self):
1253 return self._traces
1254
1255 @property
1256 def info(self):
1257 return self._info
1258
1259
1260# The state of a tracing session.
1261class _LttngLiveViewerSessionTracingSessionState:
1262 def __init__(self, tc_descr: LttngTracingSessionDescriptor, base_stream_id: int):
1263 self._tc_descr = tc_descr
1264 self._stream_infos = [] # type: list[_LttngLiveViewerStreamInfo]
1265 self._ds_states = {} # type: dict[int, _LttngLiveViewerSessionDataStreamState]
1266 self._ms_states = (
1267 {}
1268 ) # type: dict[int, _LttngLiveViewerSessionMetadataStreamState]
1269 stream_id = base_stream_id
1270
1271 for trace in tc_descr.traces:
1272 trace_id = stream_id * 1000
1273
1274 # Metadata stream -> stream info and metadata stream state
1275 info = _LttngLiveViewerStreamInfo(
1276 stream_id, trace_id, True, trace.metadata_stream.path, "metadata"
1277 )
1278 self._stream_infos.append(info)
1279 self._ms_states[stream_id] = _LttngLiveViewerSessionMetadataStreamState(
1280 self, info, trace.metadata_stream
1281 )
1282 metadata_stream_id = stream_id
1283 stream_id += 1
1284
1285 # Data streams -> stream infos and data stream states
1286 for data_stream in trace:
1287 info = _LttngLiveViewerStreamInfo(
1288 stream_id,
1289 trace_id,
1290 False,
1291 data_stream.path,
1292 data_stream.channel_name,
1293 )
1294 self._stream_infos.append(info)
1295 self._ds_states[stream_id] = _LttngLiveViewerSessionDataStreamState(
1296 self, info, data_stream, metadata_stream_id
1297 )
1298 stream_id += 1
1299
1300 self._is_attached = False
1301 fmt = 'Built tracing session state: id={}, name="{}"'
1302 logging.info(fmt.format(tc_descr.info.tracing_session_id, tc_descr.info.name))
1303
1304 @property
1305 def tracing_session_descriptor(self):
1306 return self._tc_descr
1307
1308 @property
1309 def data_stream_states(self):
1310 return self._ds_states
1311
1312 @property
1313 def metadata_stream_states(self):
1314 return self._ms_states
1315
1316 @property
1317 def stream_infos(self):
1318 return self._stream_infos
1319
1320 @property
1321 def has_new_metadata(self):
1322 return any([not ms.is_sent for ms in self._ms_states.values()])
1323
1324 @property
1325 def is_attached(self):
1326 return self._is_attached
1327
1328 @is_attached.setter
1329 def is_attached(self, value: bool):
1330 self._is_attached = value
1331
1332
1333def needs_new_metadata_section(
1334 metadata_stream_state: _LttngLiveViewerSessionMetadataStreamState,
1335 latest_timestamp: int,
1336):
1337 if metadata_stream_state.next_section_timestamp is None:
1338 return False
1339
1340 if latest_timestamp >= metadata_stream_state.next_section_timestamp:
1341 return True
1342 else:
1343 return False
1344
1345
1346# An LTTng live viewer session manages a view on tracing sessions
1347# and replies to commands accordingly.
1348class _LttngLiveViewerSession:
1349 def __init__(
1350 self,
1351 viewer_session_id: int,
1352 tracing_session_descriptors: Iterable[LttngTracingSessionDescriptor],
1353 max_query_data_response_size: Optional[int],
1354 ):
1355 self._viewer_session_id = viewer_session_id
1356 self._ts_states = (
1357 {}
1358 ) # type: dict[int, _LttngLiveViewerSessionTracingSessionState]
1359 self._stream_states = (
1360 {}
1361 ) # type: dict[int, _LttngLiveViewerSessionDataStreamState | _LttngLiveViewerSessionMetadataStreamState]
1362 self._max_query_data_response_size = max_query_data_response_size
1363 total_stream_infos = 0
1364
1365 for ts_descr in tracing_session_descriptors:
1366 ts_state = _LttngLiveViewerSessionTracingSessionState(
1367 ts_descr, total_stream_infos
1368 )
1369 ts_id = ts_state.tracing_session_descriptor.info.tracing_session_id
1370 self._ts_states[ts_id] = ts_state
1371 total_stream_infos += len(ts_state.stream_infos)
1372
1373 # Update session's stream states to have the new states
1374 self._stream_states.update(ts_state.data_stream_states)
1375 self._stream_states.update(ts_state.metadata_stream_states)
1376
1377 self._command_handlers = {
1378 _LttngLiveViewerAttachToTracingSessionCommand: self._handle_attach_to_tracing_session_command,
1379 _LttngLiveViewerCreateViewerSessionCommand: self._handle_create_viewer_session_command,
1380 _LttngLiveViewerDetachFromTracingSessionCommand: self._handle_detach_from_tracing_session_command,
1381 _LttngLiveViewerGetDataStreamPacketDataCommand: self._handle_get_data_stream_packet_data_command,
1382 _LttngLiveViewerGetMetadataStreamDataCommand: self._handle_get_metadata_stream_data_command,
1383 _LttngLiveViewerGetNewStreamInfosCommand: self._handle_get_new_stream_infos_command,
1384 _LttngLiveViewerGetNextDataStreamIndexEntryCommand: self._handle_get_next_data_stream_index_entry_command,
1385 _LttngLiveViewerGetTracingSessionInfosCommand: self._handle_get_tracing_session_infos_command,
1386 } # type: dict[type[_LttngLiveViewerCommand], Callable[[Any], _LttngLiveViewerReply]]
1387
1388 @property
1389 def viewer_session_id(self):
1390 return self._viewer_session_id
1391
1392 def _get_tracing_session_state(self, tracing_session_id: int):
1393 if tracing_session_id not in self._ts_states:
1394 raise RuntimeError(
1395 "Unknown tracing session ID {}".format(tracing_session_id)
1396 )
1397
1398 return self._ts_states[tracing_session_id]
1399
1400 def _get_data_stream_state(self, stream_id: int):
1401 if stream_id not in self._stream_states:
1402 RuntimeError("Unknown stream ID {}".format(stream_id))
1403
1404 stream = self._stream_states[stream_id]
1405 if type(stream) is not _LttngLiveViewerSessionDataStreamState:
1406 raise RuntimeError("Stream is not a data stream")
1407
1408 return stream
1409
1410 def _get_metadata_stream_state(self, stream_id: int):
1411 if stream_id not in self._stream_states:
1412 RuntimeError("Unknown stream ID {}".format(stream_id))
1413
1414 stream = self._stream_states[stream_id]
1415 if type(stream) is not _LttngLiveViewerSessionMetadataStreamState:
1416 raise RuntimeError("Stream is not a metadata stream")
1417
1418 return stream
1419
1420 def handle_command(self, cmd: _LttngLiveViewerCommand):
1421 logging.info(
1422 "Handling command in viewer session: cmd-cls-name={}".format(
1423 cmd.__class__.__name__
1424 )
1425 )
1426 cmd_type = type(cmd)
1427
1428 if cmd_type not in self._command_handlers:
1429 raise RuntimeError(
1430 "Unexpected command: cmd-cls-name={}".format(cmd.__class__.__name__)
1431 )
1432
1433 return self._command_handlers[cmd_type](cmd)
1434
1435 def _handle_attach_to_tracing_session_command(
1436 self, cmd: _LttngLiveViewerAttachToTracingSessionCommand
1437 ):
1438 fmt = 'Handling "attach to tracing session" command: ts-id={}, offset={}, seek-type={}'
1439 logging.info(fmt.format(cmd.tracing_session_id, cmd.offset, cmd.seek_type))
1440 ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
1441 info = ts_state.tracing_session_descriptor.info
1442
1443 if ts_state.is_attached:
1444 raise RuntimeError(
1445 "Cannot attach to tracing session `{}`: viewer is already attached".format(
1446 info.name
1447 )
1448 )
1449
1450 ts_state.is_attached = True
1451 status = _LttngLiveViewerAttachToTracingSessionReply.Status.OK
1452 return _LttngLiveViewerAttachToTracingSessionReply(
1453 status, ts_state.stream_infos
1454 )
1455
1456 def _handle_detach_from_tracing_session_command(
1457 self, cmd: _LttngLiveViewerDetachFromTracingSessionCommand
1458 ):
1459 fmt = 'Handling "detach from tracing session" command: ts-id={}'
1460 logging.info(fmt.format(cmd.tracing_session_id))
1461 ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
1462 info = ts_state.tracing_session_descriptor.info
1463
1464 if not ts_state.is_attached:
1465 raise RuntimeError(
1466 "Cannot detach to tracing session `{}`: viewer is not attached".format(
1467 info.name
1468 )
1469 )
1470
1471 ts_state.is_attached = False
1472 status = _LttngLiveViewerDetachFromTracingSessionReply.Status.OK
1473 return _LttngLiveViewerDetachFromTracingSessionReply(status)
1474
1475 def _handle_get_next_data_stream_index_entry_command(
1476 self, cmd: _LttngLiveViewerGetNextDataStreamIndexEntryCommand
1477 ):
1478 fmt = 'Handling "get next data stream index entry" command: stream-id={}'
1479 logging.info(fmt.format(cmd.stream_id))
1480 stream_state = self._get_data_stream_state(cmd.stream_id)
1481 metadata_stream_state = self._get_metadata_stream_state(
1482 stream_state.metadata_stream_id
1483 )
1484
1485 if stream_state.cur_index_entry is None:
1486 # The viewer is done reading this stream
1487 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.HUP
1488
1489 # Dummy data stream index entry to use with the `HUP` status
1490 # (the reply needs one, but the viewer ignores it)
1491 index_entry = _LttngDataStreamIndexEntry(0, 0, 0, 0, 0, 0, 0)
1492
1493 return _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1494 status, index_entry, False, False
1495 )
1496
1497 timestamp_begin = _get_entry_timestamp_begin(stream_state.cur_index_entry)
1498
1499 if needs_new_metadata_section(metadata_stream_state, timestamp_begin):
1500 metadata_stream_state.is_sent = False
1501 metadata_stream_state.goto_next_section()
1502
1503 # The viewer only checks the `has_new_metadata` flag if the
1504 # reply's status is `OK`, so we need to provide an index here
1505 has_new_metadata = stream_state.tracing_session_state.has_new_metadata
1506 if isinstance(stream_state.cur_index_entry, _LttngDataStreamIndexEntry):
1507 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.OK
1508 else:
1509 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.INACTIVE
1510
1511 reply = _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1512 status, stream_state.cur_index_entry, has_new_metadata, False
1513 )
1514 stream_state.goto_next_index_entry()
1515 return reply
1516
1517 def _handle_get_data_stream_packet_data_command(
1518 self, cmd: _LttngLiveViewerGetDataStreamPacketDataCommand
1519 ):
1520 fmt = 'Handling "get data stream packet data" command: stream-id={}, offset={}, req-length={}'
1521 logging.info(fmt.format(cmd.stream_id, cmd.offset, cmd.req_length))
1522 stream_state = self._get_data_stream_state(cmd.stream_id)
1523 data_response_length = cmd.req_length
1524
1525 if stream_state.tracing_session_state.has_new_metadata:
1526 status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.ERROR
1527 return _LttngLiveViewerGetDataStreamPacketDataReply(
1528 status, bytes(), True, False
1529 )
1530
1531 if self._max_query_data_response_size:
1532 # Enforce a server side limit on the query requested length.
1533 # To ensure that the transaction terminate take the minimum of both
1534 # value.
1535 data_response_length = min(
1536 cmd.req_length, self._max_query_data_response_size
1537 )
1538 fmt = 'Limiting "get data stream packet data" command: req-length={} actual response size={}'
1539 logging.info(fmt.format(cmd.req_length, data_response_length))
1540
1541 data = stream_state.data_stream.get_data(cmd.offset, data_response_length)
1542 status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.OK
1543 return _LttngLiveViewerGetDataStreamPacketDataReply(status, data, False, False)
1544
1545 def _handle_get_metadata_stream_data_command(
1546 self, cmd: _LttngLiveViewerGetMetadataStreamDataCommand
1547 ):
1548 fmt = 'Handling "get metadata stream data" command: stream-id={}'
1549 logging.info(fmt.format(cmd.stream_id))
1550 metadata_stream_state = self._get_metadata_stream_state(cmd.stream_id)
1551
1552 if metadata_stream_state.is_sent:
1553 status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.NO_NEW
1554 return _LttngLiveViewerGetMetadataStreamDataContentReply(status, bytes())
1555
1556 metadata_stream_state.is_sent = True
1557 status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.OK
1558 metadata_section = metadata_stream_state.cur_section
1559 assert metadata_section is not None
1560
1561 # If we are sending an empty section, ready the next one right away.
1562 if len(metadata_section.data) == 0:
1563 metadata_stream_state.is_sent = False
1564 metadata_stream_state.goto_next_section()
1565
1566 fmt = 'Replying to "get metadata stream data" command: metadata-size={}'
1567 logging.info(fmt.format(len(metadata_section.data)))
1568 return _LttngLiveViewerGetMetadataStreamDataContentReply(
1569 status, metadata_section.data
1570 )
1571
1572 def _handle_get_new_stream_infos_command(
1573 self, cmd: _LttngLiveViewerGetNewStreamInfosCommand
1574 ):
1575 fmt = 'Handling "get new stream infos" command: ts-id={}'
1576 logging.info(fmt.format(cmd.tracing_session_id))
1577
1578 # As of this version, all the tracing session's stream infos are
1579 # always given to the viewer when sending the "attach to tracing
1580 # session" reply, so there's nothing new here. Return the `HUP`
1581 # status as, if we're handling this command, the viewer consumed
1582 # all the existing data streams.
1583 status = _LttngLiveViewerGetNewStreamInfosReply.Status.HUP
1584 return _LttngLiveViewerGetNewStreamInfosReply(status, [])
1585
1586 def _handle_get_tracing_session_infos_command(
1587 self, cmd: _LttngLiveViewerGetTracingSessionInfosCommand
1588 ):
1589 logging.info('Handling "get tracing session infos" command.')
1590 infos = [
1591 tss.tracing_session_descriptor.info for tss in self._ts_states.values()
1592 ]
1593 infos.sort(key=lambda info: info.name)
1594 return _LttngLiveViewerGetTracingSessionInfosReply(infos)
1595
1596 def _handle_create_viewer_session_command(
1597 self, cmd: _LttngLiveViewerCreateViewerSessionCommand
1598 ):
1599 logging.info('Handling "create viewer session" command.')
1600 status = _LttngLiveViewerCreateViewerSessionReply.Status.OK
1601
1602 # This does nothing here. In the LTTng relay daemon, it
1603 # allocates the viewer session's state.
1604 return _LttngLiveViewerCreateViewerSessionReply(status)
1605
1606
1607# An LTTng live TCP server.
1608#
1609# On creation, it binds to `localhost` on the TCP port `port` if not `None`, or
1610# on an OS-assigned TCP port otherwise. It writes the decimal TCP port number
1611# to a temporary port file. It renames the temporary port file to
1612# `port_filename`.
1613#
1614# `tracing_session_descriptors` is a list of tracing session descriptors
1615# (`LttngTracingSessionDescriptor`) to serve.
1616#
1617# This server accepts a single viewer (client).
1618#
1619# When the viewer closes the connection, the server's constructor
1620# returns.
1621class LttngLiveServer:
1622 def __init__(
1623 self,
1624 port: Optional[int],
1625 port_filename: Optional[str],
1626 tracing_session_descriptors: Iterable[LttngTracingSessionDescriptor],
1627 max_query_data_response_size: Optional[int],
1628 ):
1629 logging.info("Server configuration:")
1630
1631 logging.info(" Port file name: `{}`".format(port_filename))
1632
1633 if max_query_data_response_size is not None:
1634 logging.info(
1635 " Maximum response data query size: `{}`".format(
1636 max_query_data_response_size
1637 )
1638 )
1639
1640 for ts_descr in tracing_session_descriptors:
1641 info = ts_descr.info
1642 fmt = ' TS descriptor: name="{}", id={}, hostname="{}", live-timer-freq={}, client-count={}, stream-count={}:'
1643 logging.info(
1644 fmt.format(
1645 info.name,
1646 info.tracing_session_id,
1647 info.hostname,
1648 info.live_timer_freq,
1649 info.client_count,
1650 info.stream_count,
1651 )
1652 )
1653
1654 for trace in ts_descr.traces:
1655 logging.info(' Trace: path="{}"'.format(trace.path))
1656
1657 self._ts_descriptors = tracing_session_descriptors
1658 self._max_query_data_response_size = max_query_data_response_size
1659 self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1660 self._codec = _LttngLiveViewerProtocolCodec()
1661
1662 # Port 0: OS assigns an unused port
1663 serv_addr = ("localhost", port if port is not None else 0)
1664 self._sock.bind(serv_addr)
1665
1666 if port_filename is not None:
1667 self._write_port_to_file(port_filename)
1668
1669 print("Listening on port {}".format(self._server_port))
1670
1671 for ts_descr in tracing_session_descriptors:
1672 info = ts_descr.info
1673 print(
1674 "net://localhost:{}/host/{}/{}".format(
1675 self._server_port, info.hostname, info.name
1676 )
1677 )
1678
1679 try:
1680 self._listen()
1681 finally:
1682 self._sock.close()
1683 logging.info("Closed connection and socket.")
1684
1685 @property
1686 def _server_port(self):
1687 return self._sock.getsockname()[1]
1688
1689 def _recv_command(self):
1690 data = bytes()
1691
1692 while True:
1693 logging.info("Waiting for viewer command.")
1694 buf = self._conn.recv(128)
1695
1696 if not buf:
1697 logging.info("Client closed connection.")
1698
1699 if data:
1700 raise RuntimeError(
1701 "Client closed connection after having sent {} command bytes.".format(
1702 len(data)
1703 )
1704 )
1705
1706 return
1707
1708 logging.info("Received data from viewer: length={}".format(len(buf)))
1709
1710 data += buf
1711
1712 try:
1713 cmd = self._codec.decode(data)
1714 except struct.error as exc:
1715 raise RuntimeError("Malformed command: {}".format(exc)) from exc
1716
1717 if cmd is not None:
1718 logging.info(
1719 "Received command from viewer: cmd-cls-name={}".format(
1720 cmd.__class__.__name__
1721 )
1722 )
1723 return cmd
1724
1725 def _send_reply(self, reply: _LttngLiveViewerReply):
1726 data = self._codec.encode(reply)
1727 logging.info(
1728 "Sending reply to viewer: reply-cls-name={}, length={}".format(
1729 reply.__class__.__name__, len(data)
1730 )
1731 )
1732 self._conn.sendall(data)
1733
1734 def _handle_connection(self):
1735 # First command must be "connect"
1736 cmd = self._recv_command()
1737
1738 if type(cmd) is not _LttngLiveViewerConnectCommand:
1739 raise RuntimeError(
1740 'First command is not "connect": cmd-cls-name={}'.format(
1741 cmd.__class__.__name__
1742 )
1743 )
1744
1745 # Create viewer session (arbitrary ID 23)
1746 logging.info(
1747 "LTTng live viewer connected: version={}.{}".format(cmd.major, cmd.minor)
1748 )
1749 viewer_session = _LttngLiveViewerSession(
1750 23, self._ts_descriptors, self._max_query_data_response_size
1751 )
1752
1753 # Send "connect" reply
1754 self._send_reply(
1755 _LttngLiveViewerConnectReply(viewer_session.viewer_session_id, 2, 10)
1756 )
1757
1758 # Make the viewer session handle the remaining commands
1759 while True:
1760 cmd = self._recv_command()
1761
1762 if cmd is None:
1763 # Connection closed (at an expected location within the
1764 # conversation)
1765 return
1766
1767 self._send_reply(viewer_session.handle_command(cmd))
1768
1769 def _listen(self):
1770 logging.info("Listening: port={}".format(self._server_port))
1771 # Backlog must be present for Python version < 3.5.
1772 # 128 is an arbitrary number since we expect only 1 connection anyway.
1773 self._sock.listen(128)
1774 self._conn, viewer_addr = self._sock.accept()
1775 logging.info(
1776 "Accepted viewer: addr={}:{}".format(viewer_addr[0], viewer_addr[1])
1777 )
1778
1779 try:
1780 self._handle_connection()
1781 finally:
1782 self._conn.close()
1783
1784 def _write_port_to_file(self, port_filename: str):
1785 # Write the port number to a temporary file.
1786 with tempfile.NamedTemporaryFile(
1787 mode="w", delete=False, dir=os.path.dirname(port_filename)
1788 ) as tmp_port_file:
1789 print(self._server_port, end="", file=tmp_port_file)
1790
1791 # Rename temporary file to real file
1792 os.replace(tmp_port_file.name, port_filename)
1793 logging.info(
1794 'Renamed port file: src-path="{}", dst-path="{}"'.format(
1795 tmp_port_file.name, port_filename
1796 )
1797 )
1798
1799
1800def _session_descriptors_from_path(
1801 sessions_filename: str, trace_path_prefix: Optional[str]
1802):
1803 # File format is:
1804 #
1805 # [
1806 # {
1807 # "name": "my-session",
1808 # "id": 17,
1809 # "hostname": "myhost",
1810 # "live-timer-freq": 1000000,
1811 # "client-count": 23,
1812 # "traces": [
1813 # {
1814 # "path": "lol"
1815 # },
1816 # {
1817 # "path": "meow/mix",
1818 # "beacons": {
1819 # "my_stream": [ 5235787, 728375283 ]
1820 # },
1821 # "metadata-sections": [
1822 # {
1823 # "line": 1,
1824 # "timestamp": 0
1825 # }
1826 # ]
1827 # }
1828 # ]
1829 # }
1830 # ]
1831 with open(sessions_filename, "r") as sessions_file:
1832 sessions_json = tjson.load(sessions_file, tjson.ArrayVal)
1833
1834 sessions = [] # type: list[LttngTracingSessionDescriptor]
1835
1836 for session_json in sessions_json.iter(tjson.ObjVal):
1837 name = session_json.at("name", tjson.StrVal).val
1838 tracing_session_id = session_json.at("id", tjson.IntVal).val
1839 hostname = session_json.at("hostname", tjson.StrVal).val
1840 live_timer_freq = session_json.at("live-timer-freq", tjson.IntVal).val
1841 client_count = session_json.at("client-count", tjson.IntVal).val
1842 traces_json = session_json.at("traces", tjson.ArrayVal)
1843
1844 traces = [] # type: list[LttngTrace]
1845
1846 for trace_json in traces_json.iter(tjson.ObjVal):
1847 metadata_sections = (
1848 trace_json.at("metadata-sections", tjson.ArrayVal)
1849 if "metadata-sections" in trace_json
1850 else None
1851 )
1852 beacons = (
1853 trace_json.at("beacons", tjson.ObjVal)
1854 if "beacons" in trace_json
1855 else None
1856 )
1857 path = trace_json.at("path", tjson.StrVal).val
1858
1859 if not os.path.isabs(path) and trace_path_prefix:
1860 path = os.path.join(trace_path_prefix, path)
1861
1862 traces.append(
1863 LttngTrace(
1864 path,
1865 metadata_sections,
1866 beacons,
1867 )
1868 )
1869
1870 sessions.append(
1871 LttngTracingSessionDescriptor(
1872 name,
1873 tracing_session_id,
1874 hostname,
1875 live_timer_freq,
1876 client_count,
1877 traces,
1878 )
1879 )
1880
1881 return sessions
1882
1883
1884def _loglevel_parser(string: str):
1885 loglevels = {"info": logging.INFO, "warning": logging.WARNING}
1886 if string not in loglevels:
1887 msg = "{} is not a valid loglevel".format(string)
1888 raise argparse.ArgumentTypeError(msg)
1889 return loglevels[string]
1890
1891
1892if __name__ == "__main__":
1893 logging.basicConfig(format="# %(asctime)-25s%(message)s")
1894 parser = argparse.ArgumentParser(
1895 description="LTTng-live protocol mocker", add_help=False
1896 )
1897 parser.add_argument(
1898 "--log-level",
1899 default="warning",
1900 choices=["info", "warning"],
1901 help="The loglevel to be used.",
1902 )
1903
1904 loglevel_namespace, remaining_args = parser.parse_known_args()
1905 logging.getLogger().setLevel(_loglevel_parser(loglevel_namespace.log_level))
1906
1907 parser.add_argument(
1908 "--port",
1909 help="The port to bind to. If missing, use an OS-assigned port..",
1910 type=int,
1911 )
1912 parser.add_argument(
1913 "--port-filename",
1914 help="The final port file. This file is present when the server is ready to receive connection.",
1915 )
1916 parser.add_argument(
1917 "--max-query-data-response-size",
1918 type=int,
1919 help="The maximum size of control data response in bytes",
1920 )
1921 parser.add_argument(
1922 "--trace-path-prefix",
1923 type=str,
1924 help="Prefix to prepend to the trace paths of session configurations",
1925 )
1926 parser.add_argument(
1927 "sessions_filename",
1928 type=str,
1929 help="Path to a session configuration file",
1930 metavar="sessions-filename",
1931 )
1932 parser.add_argument(
1933 "-h",
1934 "--help",
1935 action="help",
1936 default=argparse.SUPPRESS,
1937 help="Show this help message and exit.",
1938 )
1939
1940 args = parser.parse_args(args=remaining_args)
1941 sessions_filename = args.sessions_filename # type: str
1942 trace_path_prefix = args.trace_path_prefix # type: str | None
1943 sessions = _session_descriptors_from_path(
1944 sessions_filename,
1945 trace_path_prefix,
1946 )
1947
1948 port = args.port # type: int | None
1949 port_filename = args.port_filename # type: str | None
1950 max_query_data_response_size = args.max_query_data_response_size # type: int | None
1951 LttngLiveServer(port, port_filename, sessions, max_query_data_response_size)
This page took 0.027575 seconds and 4 git commands to generate.