1 # SPDX-License-Identifier: MIT
3 # Copyright (C) 2019 Philippe Proulx <pproulx@efficios.com>
6 # pyright: strict, reportTypeCommentUsage=false, reportMissingTypeStubs=false
17 from abc
import ABC
, abstractmethod
18 from typing
import Dict
, Union
, Iterable
, Optional
, Sequence
, overload
23 from typing
import Any
, Callable
# noqa: F401
28 # An entry within the index of an LTTng data stream.
29 class _LttngDataStreamIndexEntry
:
34 content_size_bits
: int,
37 events_discarded
: int,
40 self
._offset
_bytes
= offset_bytes
41 self
._total
_size
_bits
= total_size_bits
42 self
._content
_size
_bits
= content_size_bits
43 self
._timestamp
_begin
= timestamp_begin
44 self
._timestamp
_end
= timestamp_end
45 self
._events
_discarded
= events_discarded
46 self
._stream
_class
_id
= stream_class_id
49 def offset_bytes(self
):
50 return self
._offset
_bytes
53 def total_size_bits(self
):
54 return self
._total
_size
_bits
57 def total_size_bytes(self
):
58 return self
._total
_size
_bits
// 8
61 def content_size_bits(self
):
62 return self
._content
_size
_bits
65 def content_size_bytes(self
):
66 return self
._content
_size
_bits
// 8
69 def timestamp_begin(self
):
70 return self
._timestamp
_begin
73 def timestamp_end(self
):
74 return self
._timestamp
_end
77 def events_discarded(self
):
78 return self
._events
_discarded
81 def stream_class_id(self
):
82 return self
._stream
_class
_id
85 # An entry within the index of an LTTng data stream. While a stream beacon entry
86 # is conceptually unrelated to an index, it is sent as a reply to a
87 # LttngLiveViewerGetNextDataStreamIndexEntryCommand
88 class _LttngDataStreamBeaconIndexEntry
:
89 def __init__(self
, stream_class_id
: int, timestamp
: int):
90 self
._stream
_class
_id
= stream_class_id
91 self
._timestamp
= timestamp
95 return self
._timestamp
98 def stream_class_id(self
):
99 return self
._stream
_class
_id
102 _LttngIndexEntryT
= Union
[_LttngDataStreamIndexEntry
, _LttngDataStreamBeaconIndexEntry
]
105 class _LttngLiveViewerCommand
:
106 def __init__(self
, version
: int):
107 self
._version
= version
114 class _LttngLiveViewerConnectCommand(_LttngLiveViewerCommand
):
115 def __init__(self
, version
: int, viewer_session_id
: int, major
: int, minor
: int):
116 super().__init
__(version
)
117 self
._viewer
_session
_id
= viewer_session_id
122 def viewer_session_id(self
):
123 return self
._viewer
_session
_id
134 class _LttngLiveViewerReply
:
138 class _LttngLiveViewerConnectReply(_LttngLiveViewerReply
):
139 def __init__(self
, viewer_session_id
: int, major
: int, minor
: int):
140 self
._viewer
_session
_id
= viewer_session_id
145 def viewer_session_id(self
):
146 return self
._viewer
_session
_id
157 class _LttngLiveViewerGetTracingSessionInfosCommand(_LttngLiveViewerCommand
):
161 class _LttngLiveViewerTracingSessionInfo
:
164 tracing_session_id
: int,
165 live_timer_freq
: int,
171 self
._tracing
_session
_id
= tracing_session_id
172 self
._live
_timer
_freq
= live_timer_freq
173 self
._client
_count
= client_count
174 self
._stream
_count
= stream_count
175 self
._hostname
= hostname
179 def tracing_session_id(self
):
180 return self
._tracing
_session
_id
183 def live_timer_freq(self
):
184 return self
._live
_timer
_freq
187 def client_count(self
):
188 return self
._client
_count
191 def stream_count(self
):
192 return self
._stream
_count
196 return self
._hostname
203 class _LttngLiveViewerGetTracingSessionInfosReply(_LttngLiveViewerReply
):
205 self
, tracing_session_infos
: Sequence
[_LttngLiveViewerTracingSessionInfo
]
207 self
._tracing
_session
_infos
= tracing_session_infos
210 def tracing_session_infos(self
):
211 return self
._tracing
_session
_infos
214 class _LttngLiveViewerAttachToTracingSessionCommand(_LttngLiveViewerCommand
):
220 self
, version
: int, tracing_session_id
: int, offset
: int, seek_type
: int
222 super().__init
__(version
)
223 self
._tracing
_session
_id
= tracing_session_id
224 self
._offset
= offset
225 self
._seek
_type
= seek_type
228 def tracing_session_id(self
):
229 return self
._tracing
_session
_id
237 return self
._seek
_type
240 class _LttngLiveViewerStreamInfo
:
242 self
, id: int, trace_id
: int, is_metadata
: bool, path
: str, channel_name
: str
245 self
._trace
_id
= trace_id
246 self
._is
_metadata
= is_metadata
248 self
._channel
_name
= channel_name
256 return self
._trace
_id
259 def is_metadata(self
):
260 return self
._is
_metadata
267 def channel_name(self
):
268 return self
._channel
_name
271 class _LttngLiveViewerAttachToTracingSessionReply(_LttngLiveViewerReply
):
280 def __init__(self
, status
: int, stream_infos
: Sequence
[_LttngLiveViewerStreamInfo
]):
281 self
._status
= status
282 self
._stream
_infos
= stream_infos
289 def stream_infos(self
):
290 return self
._stream
_infos
293 class _LttngLiveViewerGetNextDataStreamIndexEntryCommand(_LttngLiveViewerCommand
):
294 def __init__(self
, version
: int, stream_id
: int):
295 super().__init
__(version
)
296 self
._stream
_id
= stream_id
300 return self
._stream
_id
303 class _LttngLiveViewerGetNextDataStreamIndexEntryReply(_LttngLiveViewerReply
):
315 index_entry
: _LttngIndexEntryT
,
316 has_new_metadata
: bool,
317 has_new_data_stream
: bool,
319 self
._status
= status
320 self
._index
_entry
= index_entry
321 self
._has
_new
_metadata
= has_new_metadata
322 self
._has
_new
_data
_stream
= has_new_data_stream
329 def index_entry(self
):
330 return self
._index
_entry
333 def has_new_metadata(self
):
334 return self
._has
_new
_metadata
337 def has_new_data_stream(self
):
338 return self
._has
_new
_data
_stream
341 class _LttngLiveViewerGetDataStreamPacketDataCommand(_LttngLiveViewerCommand
):
342 def __init__(self
, version
: int, stream_id
: int, offset
: int, req_length
: int):
343 super().__init
__(version
)
344 self
._stream
_id
= stream_id
345 self
._offset
= offset
346 self
._req
_length
= req_length
350 return self
._stream
_id
357 def req_length(self
):
358 return self
._req
_length
361 class _LttngLiveViewerGetDataStreamPacketDataReply(_LttngLiveViewerReply
):
372 has_new_metadata
: bool,
373 has_new_data_stream
: bool,
375 self
._status
= status
377 self
._has
_new
_metadata
= has_new_metadata
378 self
._has
_new
_data
_stream
= has_new_data_stream
389 def has_new_metadata(self
):
390 return self
._has
_new
_metadata
393 def has_new_data_stream(self
):
394 return self
._has
_new
_data
_stream
397 class _LttngLiveViewerGetMetadataStreamDataCommand(_LttngLiveViewerCommand
):
398 def __init__(self
, version
: int, stream_id
: int):
399 super().__init
__(version
)
400 self
._stream
_id
= stream_id
404 return self
._stream
_id
407 class _LttngLiveViewerGetMetadataStreamDataContentReply(_LttngLiveViewerReply
):
413 def __init__(self
, status
: int, data
: bytes
):
414 self
._status
= status
426 class _LttngLiveViewerGetNewStreamInfosCommand(_LttngLiveViewerCommand
):
427 def __init__(self
, version
: int, tracing_session_id
: int):
428 super().__init
__(version
)
429 self
._tracing
_session
_id
= tracing_session_id
432 def tracing_session_id(self
):
433 return self
._tracing
_session
_id
436 class _LttngLiveViewerGetNewStreamInfosReply(_LttngLiveViewerReply
):
443 def __init__(self
, status
: int, stream_infos
: Sequence
[_LttngLiveViewerStreamInfo
]):
444 self
._status
= status
445 self
._stream
_infos
= stream_infos
452 def stream_infos(self
):
453 return self
._stream
_infos
456 class _LttngLiveViewerCreateViewerSessionCommand(_LttngLiveViewerCommand
):
460 class _LttngLiveViewerCreateViewerSessionReply(_LttngLiveViewerReply
):
465 def __init__(self
, status
: int):
466 self
._status
= status
473 class _LttngLiveViewerDetachFromTracingSessionCommand(_LttngLiveViewerCommand
):
474 def __init__(self
, version
: int, tracing_session_id
: int):
475 super().__init
__(version
)
476 self
._tracing
_session
_id
= tracing_session_id
479 def tracing_session_id(self
):
480 return self
._tracing
_session
_id
483 class _LttngLiveViewerDetachFromTracingSessionReply(_LttngLiveViewerReply
):
489 def __init__(self
, status
: int):
490 self
._status
= status
497 # An LTTng live protocol codec can convert bytes to command objects and
498 # reply objects to bytes.
499 class _LttngLiveViewerProtocolCodec
:
500 _COMMAND_HEADER_STRUCT_FMT
= "QII"
501 _COMMAND_HEADER_SIZE_BYTES
= struct
.calcsize(_COMMAND_HEADER_STRUCT_FMT
)
506 def _unpack(self
, fmt
: str, data
: bytes
, offset
: int = 0):
508 return struct
.unpack_from(fmt
, data
, offset
)
510 def _unpack_payload(self
, fmt
: str, data
: bytes
):
512 fmt
, data
, _LttngLiveViewerProtocolCodec
._COMMAND
_HEADER
_SIZE
_BYTES
515 def decode(self
, data
: bytes
):
516 if len(data
) < self
._COMMAND
_HEADER
_SIZE
_BYTES
:
517 # Not enough data to read the command header
520 payload_size
, cmd_type
, version
= self
._unpack
(
521 self
._COMMAND
_HEADER
_STRUCT
_FMT
, data
524 "Decoded command header: payload-size={}, cmd-type={}, version={}".format(
525 payload_size
, cmd_type
, version
529 if len(data
) < self
._COMMAND
_HEADER
_SIZE
_BYTES
+ payload_size
:
530 # Not enough data to read the whole command
534 viewer_session_id
, major
, minor
, _
= self
._unpack
_payload
("QIII", data
)
535 return _LttngLiveViewerConnectCommand(
536 version
, viewer_session_id
, major
, minor
539 return _LttngLiveViewerGetTracingSessionInfosCommand(version
)
541 tracing_session_id
, offset
, seek_type
= self
._unpack
_payload
("QQI", data
)
542 return _LttngLiveViewerAttachToTracingSessionCommand(
543 version
, tracing_session_id
, offset
, seek_type
546 (stream_id
,) = self
._unpack
_payload
("Q", data
)
547 return _LttngLiveViewerGetNextDataStreamIndexEntryCommand(
551 stream_id
, offset
, req_length
= self
._unpack
_payload
("QQI", data
)
552 return _LttngLiveViewerGetDataStreamPacketDataCommand(
553 version
, stream_id
, offset
, req_length
556 (stream_id
,) = self
._unpack
_payload
("Q", data
)
557 return _LttngLiveViewerGetMetadataStreamDataCommand(version
, stream_id
)
559 (tracing_session_id
,) = self
._unpack
_payload
("Q", data
)
560 return _LttngLiveViewerGetNewStreamInfosCommand(version
, tracing_session_id
)
562 return _LttngLiveViewerCreateViewerSessionCommand(version
)
564 (tracing_session_id
,) = self
._unpack
_payload
("Q", data
)
565 return _LttngLiveViewerDetachFromTracingSessionCommand(
566 version
, tracing_session_id
569 raise RuntimeError("Unknown command type {}".format(cmd_type
))
571 def _pack(self
, fmt
: str, *args
: Any
):
572 # Force network byte order
573 return struct
.pack("!" + fmt
, *args
)
575 def _encode_zero_padded_str(self
, string
: str, length
: int):
576 data
= string
.encode()
577 return data
.ljust(length
, b
"\x00")
579 def _encode_stream_info(self
, info
: _LttngLiveViewerStreamInfo
):
580 data
= self
._pack
("QQI", info
.id, info
.trace_id
, int(info
.is_metadata
))
581 data
+= self
._encode
_zero
_padded
_str
(info
.path
, 4096)
582 data
+= self
._encode
_zero
_padded
_str
(info
.channel_name
, 255)
585 def _get_has_new_stuff_flags(
586 self
, has_new_metadata
: bool, has_new_data_streams
: bool
593 if has_new_data_streams
:
600 reply
: _LttngLiveViewerReply
,
602 if type(reply
) is _LttngLiveViewerConnectReply
:
604 "QIII", reply
.viewer_session_id
, reply
.major
, reply
.minor
, 2
606 elif type(reply
) is _LttngLiveViewerGetTracingSessionInfosReply
:
607 data
= self
._pack
("I", len(reply
.tracing_session_infos
))
609 for info
in reply
.tracing_session_infos
:
612 info
.tracing_session_id
,
613 info
.live_timer_freq
,
617 data
+= self
._encode
_zero
_padded
_str
(info
.hostname
, 64)
618 data
+= self
._encode
_zero
_padded
_str
(info
.name
, 255)
619 elif type(reply
) is _LttngLiveViewerAttachToTracingSessionReply
:
620 data
= self
._pack
("II", reply
.status
, len(reply
.stream_infos
))
622 for info
in reply
.stream_infos
:
623 data
+= self
._encode
_stream
_info
(info
)
624 elif type(reply
) is _LttngLiveViewerGetNextDataStreamIndexEntryReply
:
625 index_format
= "QQQQQQQII"
626 entry
= reply
.index_entry
627 flags
= self
._get
_has
_new
_stuff
_flags
(
628 reply
.has_new_metadata
, reply
.has_new_data_stream
631 if isinstance(entry
, _LttngDataStreamIndexEntry
):
635 entry
.total_size_bits
,
636 entry
.content_size_bits
,
637 entry
.timestamp_begin
,
639 entry
.events_discarded
,
640 entry
.stream_class_id
,
653 entry
.stream_class_id
,
657 elif type(reply
) is _LttngLiveViewerGetDataStreamPacketDataReply
:
658 flags
= self
._get
_has
_new
_stuff
_flags
(
659 reply
.has_new_metadata
, reply
.has_new_data_stream
661 data
= self
._pack
("III", reply
.status
, len(reply
.data
), flags
)
663 elif type(reply
) is _LttngLiveViewerGetMetadataStreamDataContentReply
:
664 data
= self
._pack
("QI", len(reply
.data
), reply
.status
)
666 elif type(reply
) is _LttngLiveViewerGetNewStreamInfosReply
:
667 data
= self
._pack
("II", reply
.status
, len(reply
.stream_infos
))
669 for info
in reply
.stream_infos
:
670 data
+= self
._encode
_stream
_info
(info
)
671 elif type(reply
) is _LttngLiveViewerCreateViewerSessionReply
:
672 data
= self
._pack
("I", reply
.status
)
673 elif type(reply
) is _LttngLiveViewerDetachFromTracingSessionReply
:
674 data
= self
._pack
("I", reply
.status
)
677 "Unknown reply object with class `{}`".format(reply
.__class
__.__name
__)
683 def _get_entry_timestamp_begin(
684 entry
: _LttngIndexEntryT
,
686 if isinstance(entry
, _LttngDataStreamBeaconIndexEntry
):
687 return entry
.timestamp
689 return entry
.timestamp_begin
692 # The index of an LTTng data stream, a sequence of index entries.
693 class _LttngDataStreamIndex(Sequence
[_LttngIndexEntryT
]):
694 def __init__(self
, path
: str, beacons
: Optional
[tjson
.ArrayVal
]):
699 stream_class_id
= self
._entries
[0].stream_class_id
701 beacons_list
= [] # type: list[_LttngDataStreamBeaconIndexEntry]
702 for ts
in beacons
.iter(tjson
.IntVal
):
704 _LttngDataStreamBeaconIndexEntry(stream_class_id
, ts
.val
)
707 self
._add
_beacons
(beacons_list
)
710 'Built data stream index entries: path="{}", count={}'.format(
711 path
, len(self
._entries
)
716 self
._entries
= [] # type: list[_LttngIndexEntryT]
718 with
open(self
._path
, "rb") as f
:
721 size
= struct
.calcsize(fmt
)
723 assert len(data
) == size
724 magic
, _
, _
, index_entry_length
= struct
.unpack(fmt
, data
)
725 assert magic
== 0xC1F1DCC1
729 size
= struct
.calcsize(fmt
)
733 'Decoding data stream index entry: path="{}", offset={}'.format(
743 assert len(data
) == size
752 ) = struct
.unpack(fmt
, data
)
754 self
._entries
.append(
755 _LttngDataStreamIndexEntry(
766 # Skip anything else before the next entry
767 f
.seek(index_entry_length
- size
, os
.SEEK_CUR
)
769 def _add_beacons(self
, beacons
: Iterable
[_LttngDataStreamBeaconIndexEntry
]):
770 # Assumes entries[n + 1].timestamp_end >= entries[n].timestamp_begin
772 entry
: Union
[_LttngDataStreamIndexEntry
, _LttngDataStreamBeaconIndexEntry
],
774 if isinstance(entry
, _LttngDataStreamBeaconIndexEntry
):
775 return entry
.timestamp
777 return entry
.timestamp_end
779 self
._entries
+= beacons
780 self
._entries
.sort(key
=sort_key
)
783 def __getitem__(self
, index
: int) -> _LttngIndexEntryT
:
787 def __getitem__(self
, index
: slice) -> Sequence
[_LttngIndexEntryT
]: # noqa: F811
790 def __getitem__( # noqa: F811
791 self
, index
: Union
[int, slice]
792 ) -> Union
[_LttngIndexEntryT
, Sequence
[_LttngIndexEntryT
],]:
793 return self
._entries
[index
]
796 return len(self
._entries
)
803 # Any LTTng stream (metadata or data).
804 class _LttngStream(ABC
):
806 def __init__(self
, creation_timestamp
: int):
807 self
._creation
_timestamp
= creation_timestamp
810 def creation_timestamp(self
):
811 return self
._creation
_timestamp
814 # An LTTng data stream.
815 class _LttngDataStream(_LttngStream
):
817 self
, path
: str, beacons_json
: Optional
[tjson
.ArrayVal
], creation_timestamp
: int
819 super().__init
__(creation_timestamp
)
821 filename
= os
.path
.basename(path
)
822 match
= re
.match(r
"(.*)_\d+", filename
)
825 "Unexpected data stream file name pattern: {}".format(filename
)
828 self
._channel
_name
= match
.group(1)
829 trace_dir
= os
.path
.dirname(path
)
830 index_path
= os
.path
.join(trace_dir
, "index", filename
+ ".idx")
831 self
._index
= _LttngDataStreamIndex(index_path
, beacons_json
)
832 assert os
.path
.isfile(path
)
833 self
._file
= open(path
, "rb")
835 'Built data stream: path="{}", channel-name="{}"'.format(
836 path
, self
._channel
_name
845 def channel_name(self
):
846 return self
._channel
_name
852 def get_data(self
, offset_bytes
: int, len_bytes
: int):
853 self
._file
.seek(offset_bytes
)
854 return self
._file
.read(len_bytes
)
857 class _LttngMetadataStreamSection
:
858 def __init__(self
, timestamp
: int, data
: Optional
[bytes
]):
859 self
._timestamp
= timestamp
865 "Built metadata stream section: ts={}, data-len={}".format(
866 self
._timestamp
, len(self
._data
)
872 return self
._timestamp
879 # An LTTng metadata stream.
880 class _LttngMetadataStream(_LttngStream
):
883 metadata_file_path
: str,
884 config_sections
: Sequence
[_LttngMetadataStreamSection
],
885 creation_timestamp
: int,
887 super().__init
__(creation_timestamp
)
888 self
._path
= metadata_file_path
889 self
._sections
= config_sections
891 "Built metadata stream: path={}, section-len={}".format(
892 self
._path
, len(self
._sections
)
902 return self
._sections
905 class LttngMetadataConfigSection
:
906 def __init__(self
, line
: int, timestamp
: int, is_empty
: bool):
908 self
._timestamp
= timestamp
909 self
._is
_empty
= is_empty
917 return self
._timestamp
921 return self
._is
_empty
924 def _parse_metadata_sections_config(metadata_sections_json
: tjson
.ArrayVal
):
925 metadata_sections
= [] # type: list[LttngMetadataConfigSection]
926 append_empty_section
= False
930 for section
in metadata_sections_json
:
931 if isinstance(section
, tjson
.StrVal
):
932 if section
.val
== "empty":
933 # Found an empty section marker. Actually append the
934 # section at the timestamp of the next concrete section.
935 append_empty_section
= True
937 raise ValueError("Invalid string value at {}.".format(section
.path
))
938 elif isinstance(section
, tjson
.ObjVal
):
939 line
= section
.at("line", tjson
.IntVal
).val
940 ts
= section
.at("timestamp", tjson
.IntVal
).val
942 # Sections' timestamps and lines must both be increasing.
943 assert ts
> last_timestamp
946 assert line
> last_line
949 if append_empty_section
:
950 metadata_sections
.append(LttngMetadataConfigSection(line
, ts
, True))
951 append_empty_section
= False
953 metadata_sections
.append(LttngMetadataConfigSection(line
, ts
, False))
956 "`{}`: expecting a string or object value".format(section
.path
)
959 return metadata_sections
962 def _split_metadata_sections(
963 metadata_file_path
: str, metadata_sections_json
: tjson
.ArrayVal
965 metadata_sections
= _parse_metadata_sections_config(metadata_sections_json
)
967 sections
= [] # type: list[_LttngMetadataStreamSection]
968 with
open(metadata_file_path
, "r") as metadata_file
:
969 metadata_lines
= [line
for line
in metadata_file
]
971 metadata_section_idx
= 0
972 curr_metadata_section
= bytearray()
974 for idx
, line_content
in enumerate(metadata_lines
):
975 # Add one to the index to convert from the zero-indexing of the
976 # enumerate() function to the one-indexing used by humans when
977 # viewing a text file.
978 curr_line_number
= idx
+ 1
980 # If there are no more sections, simply append the line.
981 if metadata_section_idx
+ 1 >= len(metadata_sections
):
982 curr_metadata_section
+= bytearray(line_content
, "utf8")
985 next_section_line_number
= metadata_sections
[metadata_section_idx
+ 1].line
987 # If the next section begins at the current line, create a
988 # section with the metadata we gathered so far.
989 if curr_line_number
>= next_section_line_number
:
990 # Flushing the metadata of the current section.
992 _LttngMetadataStreamSection(
993 metadata_sections
[metadata_section_idx
].timestamp
,
994 bytes(curr_metadata_section
),
998 # Move to the next section.
999 metadata_section_idx
+= 1
1001 # Clear old content and append current line for the next section.
1002 curr_metadata_section
.clear()
1003 curr_metadata_section
+= bytearray(line_content
, "utf8")
1005 # Append any empty sections.
1006 while metadata_sections
[metadata_section_idx
].is_empty
:
1008 _LttngMetadataStreamSection(
1009 metadata_sections
[metadata_section_idx
].timestamp
, None
1012 metadata_section_idx
+= 1
1014 # Append line_content to the current metadata section.
1015 curr_metadata_section
+= bytearray(line_content
, "utf8")
1017 # We iterated over all the lines of the metadata file. Close the current section.
1019 _LttngMetadataStreamSection(
1020 metadata_sections
[metadata_section_idx
].timestamp
,
1021 bytes(curr_metadata_section
),
1028 _StreamBeaconsT
= Dict
[str, Iterable
[int]]
1031 # An LTTng trace, a sequence of LTTng data streams.
1032 class LttngTrace(Sequence
[_LttngDataStream
]):
1036 metadata_sections_json
: Optional
[tjson
.ArrayVal
],
1037 beacons_json
: Optional
[tjson
.ObjVal
],
1038 creation_timestamp
: int,
1040 self
._path
= trace_dir
1041 self
._creation
_timestamp
= creation_timestamp
1042 self
._create
_metadata
_stream
(trace_dir
, metadata_sections_json
)
1043 self
._create
_data
_streams
(trace_dir
, beacons_json
)
1044 logging
.info('Built trace: path="{}"'.format(trace_dir
))
1046 def _create_data_streams(
1047 self
, trace_dir
: str, beacons_json
: Optional
[tjson
.ObjVal
]
1049 data_stream_paths
= [] # type: list[str]
1051 for filename
in os
.listdir(trace_dir
):
1052 path
= os
.path
.join(trace_dir
, filename
)
1054 if not os
.path
.isfile(path
):
1057 if filename
.startswith("."):
1060 if filename
== "metadata":
1063 data_stream_paths
.append(path
)
1065 data_stream_paths
.sort()
1066 self
._data
_streams
= [] # type: list[_LttngDataStream]
1068 for data_stream_path
in data_stream_paths
:
1069 stream_name
= os
.path
.basename(data_stream_path
)
1070 this_beacons_json
= None
1071 if beacons_json
is not None and stream_name
in beacons_json
:
1072 this_beacons_json
= beacons_json
.at(stream_name
, tjson
.ArrayVal
)
1074 self
._data
_streams
.append(
1076 data_stream_path
, this_beacons_json
, self
._creation
_timestamp
1080 def _create_metadata_stream(
1081 self
, trace_dir
: str, metadata_sections_json
: Optional
[tjson
.ArrayVal
]
1083 metadata_path
= os
.path
.join(trace_dir
, "metadata")
1084 metadata_sections
= [] # type: list[_LttngMetadataStreamSection]
1086 if metadata_sections_json
is None:
1087 with
open(metadata_path
, "rb") as metadata_file
:
1088 metadata_sections
.append(
1089 _LttngMetadataStreamSection(0, metadata_file
.read())
1092 metadata_sections
= _split_metadata_sections(
1093 metadata_path
, metadata_sections_json
1096 self
._metadata
_stream
= _LttngMetadataStream(
1097 metadata_path
, metadata_sections
, self
.creation_timestamp
1105 def metadata_stream(self
):
1106 return self
._metadata
_stream
1109 def creation_timestamp(self
):
1110 return self
._creation
_timestamp
1113 def __getitem__(self
, index
: int) -> _LttngDataStream
:
1117 def __getitem__(self
, index
: slice) -> Sequence
[_LttngDataStream
]: # noqa: F811
1120 def __getitem__( # noqa: F811
1121 self
, index
: Union
[int, slice]
1122 ) -> Union
[_LttngDataStream
, Sequence
[_LttngDataStream
]]:
1123 return self
._data
_streams
[index
]
1126 return len(self
._data
_streams
)
1129 # Stream (metadata or data) state specific to the LTTng live protocol.
1130 class _LttngLiveViewerSessionStreamState
:
1133 # A stream is considered "announced" when it has been returned
1134 # to the LTTng live client in response to a "get new stream
1135 # infos" (`_LttngLiveViewerGetNewStreamInfosCommand`) command.
1136 self
._announced
= False # type: bool
1140 def is_announced(self
):
1141 return self
._announced
1143 def mark_as_announced(self
):
1144 self
._announced
= True
1148 def stream(self
) -> _LttngStream
:
1152 # The state of a single data stream.
1153 class _LttngLiveViewerSessionDataStreamState(_LttngLiveViewerSessionStreamState
):
1156 ts_state
: "_LttngLiveViewerSessionTracingSessionState",
1157 info
: _LttngLiveViewerStreamInfo
,
1158 data_stream
: _LttngDataStream
,
1159 metadata_stream_id
: int,
1162 self
._ts
_state
= ts_state
1164 self
._data
_stream
= data_stream
1165 self
._metadata
_stream
_id
= metadata_stream_id
1166 self
._cur
_index
_entry
_index
= 0
1167 fmt
= 'Built data stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
1171 ts_state
.tracing_session_descriptor
.info
.tracing_session_id
,
1172 ts_state
.tracing_session_descriptor
.info
.name
,
1178 def tracing_session_state(self
):
1179 return self
._ts
_state
1187 return self
._data
_stream
1190 def cur_index_entry(self
):
1191 if self
._cur
_index
_entry
_index
== len(self
._data
_stream
.index
):
1194 return self
._data
_stream
.index
[self
._cur
_index
_entry
_index
]
1197 def metadata_stream_id(self
):
1198 return self
._metadata
_stream
_id
1200 def goto_next_index_entry(self
):
1201 self
._cur
_index
_entry
_index
+= 1
1204 # The state of a single metadata stream.
1205 class _LttngLiveViewerSessionMetadataStreamState(_LttngLiveViewerSessionStreamState
):
1208 ts_state
: "_LttngLiveViewerSessionTracingSessionState",
1209 info
: _LttngLiveViewerStreamInfo
,
1210 metadata_stream
: _LttngMetadataStream
,
1213 self
._ts
_state
= ts_state
1215 self
._metadata
_stream
= metadata_stream
1216 self
._cur
_metadata
_stream
_section
_index
= 0
1217 if len(metadata_stream
.sections
) > 1:
1218 self
._next
_metadata
_stream
_section
_timestamp
= metadata_stream
.sections
[
1222 self
._next
_metadata
_stream
_section
_timestamp
= None
1224 self
._all
_data
_is
_sent
= False
1225 fmt
= 'Built metadata stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
1229 ts_state
.tracing_session_descriptor
.info
.tracing_session_id
,
1230 ts_state
.tracing_session_descriptor
.info
.name
,
1231 metadata_stream
.path
,
1241 return self
._metadata
_stream
1244 def all_data_is_sent(self
):
1245 return self
._all
_data
_is
_sent
1247 @all_data_is_sent.setter
1248 def all_data_is_sent(self
, value
: bool):
1249 self
._all
_data
_is
_sent
= value
1252 def cur_section(self
):
1253 fmt
= "Get current metadata section: section-idx={}"
1254 logging
.info(fmt
.format(self
._cur
_metadata
_stream
_section
_index
))
1255 if self
._cur
_metadata
_stream
_section
_index
== len(
1256 self
._metadata
_stream
.sections
1260 return self
._metadata
_stream
.sections
[self
._cur
_metadata
_stream
_section
_index
]
1262 def goto_next_section(self
):
1263 self
._cur
_metadata
_stream
_section
_index
+= 1
1264 if self
.cur_section
:
1265 self
._next
_metadata
_stream
_section
_timestamp
= self
.cur_section
.timestamp
1267 self
._next
_metadata
_stream
_section
_timestamp
= None
1270 def next_section_timestamp(self
):
1271 return self
._next
_metadata
_stream
_section
_timestamp
1274 # A tracing session descriptor.
1276 # In the constructor, `traces` is a list of LTTng traces (`LttngTrace`
1278 class LttngTracingSessionDescriptor
:
1282 tracing_session_id
: int,
1284 live_timer_freq
: int,
1286 traces
: Iterable
[LttngTrace
],
1288 for trace
in traces
:
1289 if name
not in trace
.path
:
1290 fmt
= "Tracing session name must be part of every trace path (`{}` not found in `{}`)"
1291 raise ValueError(fmt
.format(name
, trace
.path
))
1293 self
._traces
= traces
1294 stream_count
= sum([len(t
) + 1 for t
in traces
])
1295 self
._info
= _LttngLiveViewerTracingSessionInfo(
1313 # The state of a tracing session.
1314 class _LttngLiveViewerSessionTracingSessionState
:
1315 def __init__(self
, tc_descr
: LttngTracingSessionDescriptor
, base_stream_id
: int):
1316 self
._tc
_descr
= tc_descr
1317 self
._client
_visible
_stream
_infos
= [] # type: list[_LttngLiveViewerStreamInfo]
1318 self
._ds
_states
= {} # type: dict[int, _LttngLiveViewerSessionDataStreamState]
1321 ) # type: dict[int, _LttngLiveViewerSessionMetadataStreamState]
1322 self
._last
_delivered
_index
_timestamp
= 0
1323 self
._last
_allocated
_stream
_id
= base_stream_id
1325 for trace
in tc_descr
.traces
:
1326 trace_stream_infos
= [] # type: list[_LttngLiveViewerStreamInfo]
1327 trace_id
= self
._last
_allocated
_stream
_id
* 1000
1329 # Metadata stream -> stream info and metadata stream state
1330 info
= _LttngLiveViewerStreamInfo(
1331 self
._last
_allocated
_stream
_id
,
1334 trace
.metadata_stream
.path
,
1338 trace_stream_infos
.append(info
)
1340 self
._last
_allocated
_stream
_id
1341 ] = _LttngLiveViewerSessionMetadataStreamState(
1342 self
, info
, trace
.metadata_stream
1344 metadata_stream_id
= self
._last
_allocated
_stream
_id
1345 self
._last
_allocated
_stream
_id
+= 1
1347 # Data streams -> stream infos and data stream states
1348 for data_stream
in trace
:
1349 info
= _LttngLiveViewerStreamInfo(
1350 self
._last
_allocated
_stream
_id
,
1354 data_stream
.channel_name
,
1356 trace_stream_infos
.append(info
)
1358 self
._last
_allocated
_stream
_id
1359 ] = _LttngLiveViewerSessionDataStreamState(
1360 self
, info
, data_stream
, metadata_stream_id
1362 self
._last
_allocated
_stream
_id
+= 1
1364 if trace
.creation_timestamp
== 0:
1365 # Only announce streams for traces that are created at
1368 # The rest of the streams will be discovered by the
1369 # client as indexes are received with a "has new data
1370 # streams" flag set in the reply.
1371 self
._client
_visible
_stream
_infos
.extend(trace_stream_infos
)
1373 self
._is
_attached
= False
1374 fmt
= 'Built tracing session state: id={}, name="{}"'
1375 logging
.info(fmt
.format(tc_descr
.info
.tracing_session_id
, tc_descr
.info
.name
))
1378 def tracing_session_descriptor(self
):
1379 return self
._tc
_descr
1382 def data_stream_states(self
):
1383 return self
._ds
_states
1386 def metadata_stream_states(self
):
1387 return self
._ms
_states
1390 def client_visible_stream_infos(self
):
1391 return self
._client
_visible
_stream
_infos
1394 def has_new_metadata(self
):
1397 ms
.is_announced
and not ms
.all_data_is_sent
1398 for ms
in self
._ms
_states
.values()
1403 def is_attached(self
):
1404 return self
._is
_attached
1407 def is_attached(self
, value
: bool):
1408 self
._is
_attached
= value
1411 def needs_new_metadata_section(
1412 metadata_stream_state
: _LttngLiveViewerSessionMetadataStreamState
,
1413 latest_timestamp
: int,
1415 if metadata_stream_state
.next_section_timestamp
is None:
1418 if latest_timestamp
>= metadata_stream_state
.next_section_timestamp
:
1424 # An LTTng live viewer session manages a view on tracing sessions
1425 # and replies to commands accordingly.
1426 class _LttngLiveViewerSession
:
1429 viewer_session_id
: int,
1430 tracing_session_descriptors
: Iterable
[LttngTracingSessionDescriptor
],
1431 max_query_data_response_size
: Optional
[int],
1433 self
._viewer
_session
_id
= viewer_session_id
1436 ) # type: dict[int, _LttngLiveViewerSessionTracingSessionState]
1437 self
._stream
_states
= (
1439 ) # type: dict[int, _LttngLiveViewerSessionDataStreamState | _LttngLiveViewerSessionMetadataStreamState]
1440 self
._max
_query
_data
_response
_size
= max_query_data_response_size
1441 total_stream_infos
= 0
1443 for ts_descr
in tracing_session_descriptors
:
1444 ts_state
= _LttngLiveViewerSessionTracingSessionState(
1445 ts_descr
, total_stream_infos
1447 ts_id
= ts_state
.tracing_session_descriptor
.info
.tracing_session_id
1448 self
._ts
_states
[ts_id
] = ts_state
1449 total_stream_infos
+= len(ts_state
.client_visible_stream_infos
)
1451 # Update session's stream states to have the new states
1452 self
._stream
_states
.update(ts_state
.data_stream_states
)
1453 self
._stream
_states
.update(ts_state
.metadata_stream_states
)
1455 self
._command
_handlers
= {
1456 _LttngLiveViewerAttachToTracingSessionCommand
: self
._handle
_attach
_to
_tracing
_session
_command
,
1457 _LttngLiveViewerCreateViewerSessionCommand
: self
._handle
_create
_viewer
_session
_command
,
1458 _LttngLiveViewerDetachFromTracingSessionCommand
: self
._handle
_detach
_from
_tracing
_session
_command
,
1459 _LttngLiveViewerGetDataStreamPacketDataCommand
: self
._handle
_get
_data
_stream
_packet
_data
_command
,
1460 _LttngLiveViewerGetMetadataStreamDataCommand
: self
._handle
_get
_metadata
_stream
_data
_command
,
1461 _LttngLiveViewerGetNewStreamInfosCommand
: self
._handle
_get
_new
_stream
_infos
_command
,
1462 _LttngLiveViewerGetNextDataStreamIndexEntryCommand
: self
._handle
_get
_next
_data
_stream
_index
_entry
_command
,
1463 _LttngLiveViewerGetTracingSessionInfosCommand
: self
._handle
_get
_tracing
_session
_infos
_command
,
1464 } # type: dict[type[_LttngLiveViewerCommand], Callable[[Any], _LttngLiveViewerReply]]
1467 def viewer_session_id(self
):
1468 return self
._viewer
_session
_id
1470 def _get_tracing_session_state(self
, tracing_session_id
: int):
1471 if tracing_session_id
not in self
._ts
_states
:
1473 "Unknown tracing session ID {}".format(tracing_session_id
)
1476 return self
._ts
_states
[tracing_session_id
]
1478 def _get_data_stream_state(self
, stream_id
: int):
1479 if stream_id
not in self
._stream
_states
:
1480 RuntimeError("Unknown stream ID {}".format(stream_id
))
1482 stream
= self
._stream
_states
[stream_id
]
1483 if type(stream
) is not _LttngLiveViewerSessionDataStreamState
:
1484 raise RuntimeError("Stream is not a data stream")
1488 def _get_metadata_stream_state(self
, stream_id
: int):
1489 if stream_id
not in self
._stream
_states
:
1490 RuntimeError("Unknown stream ID {}".format(stream_id
))
1492 stream
= self
._stream
_states
[stream_id
]
1493 if type(stream
) is not _LttngLiveViewerSessionMetadataStreamState
:
1494 raise RuntimeError("Stream is not a metadata stream")
1498 def handle_command(self
, cmd
: _LttngLiveViewerCommand
):
1500 "Handling command in viewer session: cmd-cls-name={}".format(
1501 cmd
.__class
__.__name
__
1504 cmd_type
= type(cmd
)
1506 if cmd_type
not in self
._command
_handlers
:
1508 "Unexpected command: cmd-cls-name={}".format(cmd
.__class
__.__name
__)
1511 return self
._command
_handlers
[cmd_type
](cmd
)
1513 def _handle_attach_to_tracing_session_command(
1514 self
, cmd
: _LttngLiveViewerAttachToTracingSessionCommand
1516 fmt
= 'Handling "attach to tracing session" command: ts-id={}, offset={}, seek-type={}'
1517 logging
.info(fmt
.format(cmd
.tracing_session_id
, cmd
.offset
, cmd
.seek_type
))
1518 ts_state
= self
._get
_tracing
_session
_state
(cmd
.tracing_session_id
)
1519 info
= ts_state
.tracing_session_descriptor
.info
1521 if ts_state
.is_attached
:
1523 "Cannot attach to tracing session `{}`: viewer is already attached".format(
1528 ts_state
.is_attached
= True
1529 status
= _LttngLiveViewerAttachToTracingSessionReply
.Status
.OK
1530 stream_infos_to_announce
= ts_state
.client_visible_stream_infos
1532 # Mark stream infos transmitted as part of the reply as
1534 for si
in stream_infos_to_announce
:
1536 self
._get
_metadata
_stream
_state
(si
.id).mark_as_announced()
1538 self
._get
_data
_stream
_state
(si
.id).mark_as_announced()
1540 return _LttngLiveViewerAttachToTracingSessionReply(
1541 status
, stream_infos_to_announce
1544 def _handle_detach_from_tracing_session_command(
1545 self
, cmd
: _LttngLiveViewerDetachFromTracingSessionCommand
1547 fmt
= 'Handling "detach from tracing session" command: ts-id={}'
1548 logging
.info(fmt
.format(cmd
.tracing_session_id
))
1549 ts_state
= self
._get
_tracing
_session
_state
(cmd
.tracing_session_id
)
1550 info
= ts_state
.tracing_session_descriptor
.info
1552 if not ts_state
.is_attached
:
1554 "Cannot detach to tracing session `{}`: viewer is not attached".format(
1559 ts_state
.is_attached
= False
1560 status
= _LttngLiveViewerDetachFromTracingSessionReply
.Status
.OK
1561 return _LttngLiveViewerDetachFromTracingSessionReply(status
)
1564 def _stream_is_ready(
1565 stream_state
: _LttngLiveViewerSessionStreamState
, creation_timestamp
: int
1568 not stream_state
.is_announced
1569 and stream_state
.stream
.creation_timestamp
<= creation_timestamp
1572 def _needs_new_streams(self
, current_timestamp
: int):
1574 self
._stream
_is
_ready
(ss
, current_timestamp
)
1575 for ss
in self
._stream
_states
.values()
1578 def _handle_get_next_data_stream_index_entry_command(
1579 self
, cmd
: _LttngLiveViewerGetNextDataStreamIndexEntryCommand
1581 fmt
= 'Handling "get next data stream index entry" command: stream-id={}'
1582 logging
.info(fmt
.format(cmd
.stream_id
))
1583 stream_state
= self
._get
_data
_stream
_state
(cmd
.stream_id
)
1584 metadata_stream_state
= self
._get
_metadata
_stream
_state
(
1585 stream_state
.metadata_stream_id
1588 if stream_state
.cur_index_entry
is None:
1589 # The viewer is done reading this stream
1590 status
= _LttngLiveViewerGetNextDataStreamIndexEntryReply
.Status
.HUP
1592 # Dummy data stream index entry to use with the `HUP` status
1593 # (the reply needs one, but the viewer ignores it)
1594 index_entry
= _LttngDataStreamIndexEntry(0, 0, 0, 0, 0, 0, 0)
1596 return _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1597 status
, index_entry
, False, False
1600 timestamp_begin
= _get_entry_timestamp_begin(stream_state
.cur_index_entry
)
1602 if needs_new_metadata_section(metadata_stream_state
, timestamp_begin
):
1603 metadata_stream_state
.all_data_is_sent
= False
1604 metadata_stream_state
.goto_next_section()
1606 # The viewer only checks the `has_new_metadata` flag if the
1607 # reply's status is `OK`, so we need to provide an index here
1608 has_new_metadata
= stream_state
.tracing_session_state
.has_new_metadata
1609 if isinstance(stream_state
.cur_index_entry
, _LttngDataStreamIndexEntry
):
1610 status
= _LttngLiveViewerGetNextDataStreamIndexEntryReply
.Status
.OK
1612 status
= _LttngLiveViewerGetNextDataStreamIndexEntryReply
.Status
.INACTIVE
1614 reply
= _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1616 stream_state
.cur_index_entry
,
1618 self
._needs
_new
_streams
(timestamp_begin
),
1620 self
._last
_delivered
_index
_timestamp
_begin
= timestamp_begin
1621 stream_state
.goto_next_index_entry()
1624 def _handle_get_data_stream_packet_data_command(
1625 self
, cmd
: _LttngLiveViewerGetDataStreamPacketDataCommand
1627 fmt
= 'Handling "get data stream packet data" command: stream-id={}, offset={}, req-length={}'
1628 logging
.info(fmt
.format(cmd
.stream_id
, cmd
.offset
, cmd
.req_length
))
1629 stream_state
= self
._get
_data
_stream
_state
(cmd
.stream_id
)
1630 data_response_length
= cmd
.req_length
1632 if stream_state
.tracing_session_state
.has_new_metadata
:
1633 status
= _LttngLiveViewerGetDataStreamPacketDataReply
.Status
.ERROR
1634 return _LttngLiveViewerGetDataStreamPacketDataReply(
1635 status
, bytes(), True, False
1638 if self
._max
_query
_data
_response
_size
:
1639 # Enforce a server side limit on the query requested length.
1640 # To ensure that the transaction terminate take the minimum of both
1642 data_response_length
= min(
1643 cmd
.req_length
, self
._max
_query
_data
_response
_size
1645 fmt
= 'Limiting "get data stream packet data" command: req-length={} actual response size={}'
1646 logging
.info(fmt
.format(cmd
.req_length
, data_response_length
))
1648 data
= stream_state
.stream
.get_data(cmd
.offset
, data_response_length
)
1649 status
= _LttngLiveViewerGetDataStreamPacketDataReply
.Status
.OK
1650 return _LttngLiveViewerGetDataStreamPacketDataReply(status
, data
, False, False)
1652 def _handle_get_metadata_stream_data_command(
1653 self
, cmd
: _LttngLiveViewerGetMetadataStreamDataCommand
1655 fmt
= 'Handling "get metadata stream data" command: stream-id={}'
1656 logging
.info(fmt
.format(cmd
.stream_id
))
1657 metadata_stream_state
= self
._get
_metadata
_stream
_state
(cmd
.stream_id
)
1659 if metadata_stream_state
.all_data_is_sent
:
1660 status
= _LttngLiveViewerGetMetadataStreamDataContentReply
.Status
.NO_NEW
1661 return _LttngLiveViewerGetMetadataStreamDataContentReply(status
, bytes())
1663 metadata_stream_state
.all_data_is_sent
= True
1664 status
= _LttngLiveViewerGetMetadataStreamDataContentReply
.Status
.OK
1665 metadata_section
= metadata_stream_state
.cur_section
1666 assert metadata_section
is not None
1668 # If we are sending an empty section, ready the next one right away.
1669 if len(metadata_section
.data
) == 0:
1670 metadata_stream_state
.all_data_is_sent
= False
1671 metadata_stream_state
.goto_next_section()
1673 fmt
= 'Replying to "get metadata stream data" command: metadata-size={}'
1674 logging
.info(fmt
.format(len(metadata_section
.data
)))
1675 return _LttngLiveViewerGetMetadataStreamDataContentReply(
1676 status
, metadata_section
.data
1679 def _get_stream_infos_ready_for_announcement(self
):
1680 ready_stream_infos
= [] # type: list[_LttngLiveViewerStreamInfo]
1682 for ss
in self
._stream
_states
.values():
1683 if self
._stream
_is
_ready
(ss
, ss
.stream
.creation_timestamp
):
1684 ready_stream_infos
.append(ss
.info
)
1686 return ready_stream_infos
1688 # A stream is considered finished if it has been announced and all
1689 # of its index entries have been provided to the client.
1690 def _all_streams_finished(self
):
1692 isinstance(stream_state
, _LttngLiveViewerSessionMetadataStreamState
)
1693 or (stream_state
.cur_index_entry
is None and stream_state
.is_announced
)
1694 for stream_state
in self
._stream
_states
.values()
1697 def _handle_get_new_stream_infos_command(
1698 self
, cmd
: _LttngLiveViewerGetNewStreamInfosCommand
1700 fmt
= 'Handling "get new stream infos" command: ts-id={}'
1701 logging
.info(fmt
.format(cmd
.tracing_session_id
))
1702 newly_announced_stream_infos
= self
._get
_stream
_infos
_ready
_for
_announcement
()
1704 # Mark stream infos transmitted as part of the reply as
1706 for si
in newly_announced_stream_infos
:
1708 self
._get
_metadata
_stream
_state
(si
.id).mark_as_announced()
1710 self
._get
_data
_stream
_state
(si
.id).mark_as_announced()
1712 status
= _LttngLiveViewerGetNewStreamInfosReply
.Status
.OK
1714 if len(newly_announced_stream_infos
) == 0:
1715 # If all streams have been transmitted and no new traces are
1716 # scheduled for creation, hang up to signal that the tracing
1717 # session is "done".
1719 _LttngLiveViewerGetNewStreamInfosReply
.Status
.HUP
1720 if self
._all
_streams
_finished
()
1721 else _LttngLiveViewerGetNewStreamInfosReply
.Status
.NO_NEW
1724 return _LttngLiveViewerGetNewStreamInfosReply(
1725 status
, newly_announced_stream_infos
1728 def _handle_get_tracing_session_infos_command(
1729 self
, cmd
: _LttngLiveViewerGetTracingSessionInfosCommand
1731 logging
.info('Handling "get tracing session infos" command.')
1733 tss
.tracing_session_descriptor
.info
for tss
in self
._ts
_states
.values()
1735 infos
.sort(key
=lambda info
: info
.name
)
1736 return _LttngLiveViewerGetTracingSessionInfosReply(infos
)
1738 def _handle_create_viewer_session_command(
1739 self
, cmd
: _LttngLiveViewerCreateViewerSessionCommand
1741 logging
.info('Handling "create viewer session" command.')
1742 status
= _LttngLiveViewerCreateViewerSessionReply
.Status
.OK
1744 # This does nothing here. In the LTTng relay daemon, it
1745 # allocates the viewer session's state.
1746 return _LttngLiveViewerCreateViewerSessionReply(status
)
1749 # An LTTng live TCP server.
1751 # On creation, it binds to `localhost` on the TCP port `port` if not `None`, or
1752 # on an OS-assigned TCP port otherwise. It writes the decimal TCP port number
1753 # to a temporary port file. It renames the temporary port file to
1756 # `tracing_session_descriptors` is a list of tracing session descriptors
1757 # (`LttngTracingSessionDescriptor`) to serve.
1759 # This server accepts a single viewer (client).
1761 # When the viewer closes the connection, the server's constructor
1763 class LttngLiveServer
:
1766 port
: Optional
[int],
1767 port_filename
: Optional
[str],
1768 tracing_session_descriptors
: Iterable
[LttngTracingSessionDescriptor
],
1769 max_query_data_response_size
: Optional
[int],
1771 logging
.info("Server configuration:")
1773 logging
.info(" Port file name: `{}`".format(port_filename
))
1775 if max_query_data_response_size
is not None:
1777 " Maximum response data query size: `{}`".format(
1778 max_query_data_response_size
1782 for ts_descr
in tracing_session_descriptors
:
1783 info
= ts_descr
.info
1784 fmt
= ' TS descriptor: name="{}", id={}, hostname="{}", live-timer-freq={}, client-count={}, stream-count={}:'
1788 info
.tracing_session_id
,
1790 info
.live_timer_freq
,
1796 for trace
in ts_descr
.traces
:
1797 logging
.info(' Trace: path="{}"'.format(trace
.path
))
1799 self
._ts
_descriptors
= tracing_session_descriptors
1800 self
._max
_query
_data
_response
_size
= max_query_data_response_size
1801 self
._sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
1802 self
._codec
= _LttngLiveViewerProtocolCodec()
1804 # Port 0: OS assigns an unused port
1805 serv_addr
= ("localhost", port
if port
is not None else 0)
1806 self
._sock
.bind(serv_addr
)
1808 if port_filename
is not None:
1809 self
._write
_port
_to
_file
(port_filename
)
1811 print("Listening on port {}".format(self
._server
_port
))
1813 for ts_descr
in tracing_session_descriptors
:
1814 info
= ts_descr
.info
1816 "net://localhost:{}/host/{}/{}".format(
1817 self
._server
_port
, info
.hostname
, info
.name
1825 logging
.info("Closed connection and socket.")
1828 def _server_port(self
):
1829 return self
._sock
.getsockname()[1]
1831 def _recv_command(self
):
1835 logging
.info("Waiting for viewer command.")
1836 buf
= self
._conn
.recv(128)
1839 logging
.info("Client closed connection.")
1843 "Client closed connection after having sent {} command bytes.".format(
1850 logging
.info("Received data from viewer: length={}".format(len(buf
)))
1855 cmd
= self
._codec
.decode(data
)
1856 except struct
.error
as exc
:
1857 raise RuntimeError("Malformed command: {}".format(exc
)) from exc
1861 "Received command from viewer: cmd-cls-name={}".format(
1862 cmd
.__class
__.__name
__
1867 def _send_reply(self
, reply
: _LttngLiveViewerReply
):
1868 data
= self
._codec
.encode(reply
)
1870 "Sending reply to viewer: reply-cls-name={}, length={}".format(
1871 reply
.__class
__.__name
__, len(data
)
1874 self
._conn
.sendall(data
)
1876 def _handle_connection(self
):
1877 # First command must be "connect"
1878 cmd
= self
._recv
_command
()
1880 if type(cmd
) is not _LttngLiveViewerConnectCommand
:
1882 'First command is not "connect": cmd-cls-name={}'.format(
1883 cmd
.__class
__.__name
__
1887 # Create viewer session (arbitrary ID 23)
1889 "LTTng live viewer connected: version={}.{}".format(cmd
.major
, cmd
.minor
)
1891 viewer_session
= _LttngLiveViewerSession(
1892 23, self
._ts
_descriptors
, self
._max
_query
_data
_response
_size
1895 # Send "connect" reply
1897 _LttngLiveViewerConnectReply(viewer_session
.viewer_session_id
, 2, 10)
1900 # Make the viewer session handle the remaining commands
1902 cmd
= self
._recv
_command
()
1905 # Connection closed (at an expected location within the
1909 self
._send
_reply
(viewer_session
.handle_command(cmd
))
1912 logging
.info("Listening: port={}".format(self
._server
_port
))
1913 # Backlog must be present for Python version < 3.5.
1914 # 128 is an arbitrary number since we expect only 1 connection anyway.
1915 self
._sock
.listen(128)
1916 self
._conn
, viewer_addr
= self
._sock
.accept()
1918 "Accepted viewer: addr={}:{}".format(viewer_addr
[0], viewer_addr
[1])
1922 self
._handle
_connection
()
1926 def _write_port_to_file(self
, port_filename
: str):
1927 # Write the port number to a temporary file.
1928 with tempfile
.NamedTemporaryFile(
1929 mode
="w", delete
=False, dir=os
.path
.dirname(port_filename
)
1931 print(self
._server
_port
, end
="", file=tmp_port_file
)
1933 # Rename temporary file to real file.
1935 # For unknown reasons, on Windows, moving the port file from its
1936 # temporary location to its final location (where the user of
1937 # the server expects it to appear) may raise a `PermissionError`
1940 # We suppose it's possible that something in the Windows kernel
1941 # hasn't completely finished using the file when we try to move
1944 # Use a wait-and-retry scheme as a (bad) workaround.
1948 for attempt
in reversed(range(num_attempts
)):
1950 os
.replace(tmp_port_file
.name
, port_filename
)
1952 'Renamed port file: src-path="{}", dst-path="{}"'.format(
1953 tmp_port_file
.name
, port_filename
1957 except PermissionError
:
1959 'Permission error while attempting to rename port file; retrying in {} second: src-path="{}", dst-path="{}"'.format(
1960 retry_delay_s
, tmp_port_file
.name
, port_filename
1967 time
.sleep(retry_delay_s
)
1970 def _session_descriptors_from_path(
1971 sessions_filename
: str, trace_path_prefix
: Optional
[str]
1977 # "name": "my-session",
1979 # "hostname": "myhost",
1980 # "live-timer-freq": 1000000,
1981 # "client-count": 23,
1985 # creation-timestamp: 12948
1988 # "path": "meow/mix",
1990 # "my_stream": [ 5235787, 728375283 ]
1992 # "metadata-sections": [
2002 with
open(sessions_filename
, "r") as sessions_file
:
2003 sessions_json
= tjson
.load(sessions_file
, tjson
.ArrayVal
)
2005 sessions
= [] # type: list[LttngTracingSessionDescriptor]
2007 for session_json
in sessions_json
.iter(tjson
.ObjVal
):
2008 name
= session_json
.at("name", tjson
.StrVal
).val
2009 tracing_session_id
= session_json
.at("id", tjson
.IntVal
).val
2010 hostname
= session_json
.at("hostname", tjson
.StrVal
).val
2011 live_timer_freq
= session_json
.at("live-timer-freq", tjson
.IntVal
).val
2012 client_count
= session_json
.at("client-count", tjson
.IntVal
).val
2013 traces_json
= session_json
.at("traces", tjson
.ArrayVal
)
2015 traces
= [] # type: list[LttngTrace]
2017 for trace_json
in traces_json
.iter(tjson
.ObjVal
):
2018 metadata_sections
= (
2019 trace_json
.at("metadata-sections", tjson
.ArrayVal
)
2020 if "metadata-sections" in trace_json
2024 trace_json
.at("beacons", tjson
.ObjVal
)
2025 if "beacons" in trace_json
2028 path
= trace_json
.at("path", tjson
.StrVal
).val
2029 creation_timestamp
= (
2030 trace_json
.at("creation-timestamp", tjson
.IntVal
).val
2031 if "creation-timestamp" in trace_json
2035 if not os
.path
.isabs(path
) and trace_path_prefix
:
2036 path
= os
.path
.join(trace_path_prefix
, path
)
2039 LttngTrace(path
, metadata_sections
, beacons
, creation_timestamp
)
2043 LttngTracingSessionDescriptor(
2056 def _loglevel_parser(string
: str):
2057 loglevels
= {"info": logging
.INFO
, "warning": logging
.WARNING
}
2058 if string
not in loglevels
:
2059 msg
= "{} is not a valid loglevel".format(string
)
2060 raise argparse
.ArgumentTypeError(msg
)
2061 return loglevels
[string
]
2064 if __name__
== "__main__":
2065 logging
.basicConfig(format
="# %(asctime)-25s%(message)s")
2066 parser
= argparse
.ArgumentParser(
2067 description
="LTTng-live protocol mocker", add_help
=False
2069 parser
.add_argument(
2072 choices
=["info", "warning"],
2073 help="The loglevel to be used.",
2076 loglevel_namespace
, remaining_args
= parser
.parse_known_args()
2077 logging
.getLogger().setLevel(_loglevel_parser(loglevel_namespace
.log_level
))
2079 parser
.add_argument(
2081 help="The port to bind to. If missing, use an OS-assigned port..",
2084 parser
.add_argument(
2086 help="The final port file. This file is present when the server is ready to receive connection.",
2088 parser
.add_argument(
2089 "--max-query-data-response-size",
2091 help="The maximum size of control data response in bytes",
2093 parser
.add_argument(
2094 "--trace-path-prefix",
2096 help="Prefix to prepend to the trace paths of session configurations",
2098 parser
.add_argument(
2099 "sessions_filename",
2101 help="Path to a session configuration file",
2102 metavar
="sessions-filename",
2104 parser
.add_argument(
2108 default
=argparse
.SUPPRESS
,
2109 help="Show this help message and exit.",
2112 args
= parser
.parse_args(args
=remaining_args
)
2113 sessions_filename
= args
.sessions_filename
# type: str
2114 trace_path_prefix
= args
.trace_path_prefix
# type: str | None
2115 sessions
= _session_descriptors_from_path(
2120 port
= args
.port
# type: int | None
2121 port_filename
= args
.port_filename
# type: str | None
2122 max_query_data_response_size
= args
.max_query_data_response_size
# type: int | None
2123 LttngLiveServer(port
, port_filename
, sessions
, max_query_data_response_size
)