1 # SPDX-License-Identifier: MIT
3 # Copyright (C) 2019 Philippe Proulx <pproulx@efficios.com>
16 import collections
.abc
17 from collections
import namedtuple
20 class UnexpectedInput(RuntimeError):
24 # An entry within the index of an LTTng data stream.
25 class _LttngDataStreamIndexEntry
:
36 self
._offset
_bytes
= offset_bytes
37 self
._total
_size
_bits
= total_size_bits
38 self
._content
_size
_bits
= content_size_bits
39 self
._timestamp
_begin
= timestamp_begin
40 self
._timestamp
_end
= timestamp_end
41 self
._events
_discarded
= events_discarded
42 self
._stream
_class
_id
= stream_class_id
45 def offset_bytes(self
):
46 return self
._offset
_bytes
49 def total_size_bits(self
):
50 return self
._total
_size
_bits
53 def total_size_bytes(self
):
54 return self
._total
_size
_bits
// 8
57 def content_size_bits(self
):
58 return self
._content
_size
_bits
61 def content_size_bytes(self
):
62 return self
._content
_size
_bits
// 8
65 def timestamp_begin(self
):
66 return self
._timestamp
_begin
69 def timestamp_end(self
):
70 return self
._timestamp
_end
73 def events_discarded(self
):
74 return self
._events
_discarded
77 def stream_class_id(self
):
78 return self
._stream
_class
_id
81 # An entry within the index of an LTTng data stream. While a stream beacon entry
82 # is conceptually unrelated to an index, it is sent as a reply to a
83 # LttngLiveViewerGetNextDataStreamIndexEntryCommand
84 class _LttngDataStreamBeaconEntry
:
85 def __init__(self
, stream_class_id
, timestamp
):
86 self
._stream
_class
_id
= stream_class_id
87 self
._timestamp
= timestamp
91 return self
._timestamp
94 def stream_class_id(self
):
95 return self
._stream
_class
_id
98 class _LttngLiveViewerCommand
:
99 def __init__(self
, version
):
100 self
._version
= version
107 class _LttngLiveViewerConnectCommand(_LttngLiveViewerCommand
):
108 def __init__(self
, version
, viewer_session_id
, major
, minor
):
109 super().__init
__(version
)
110 self
._viewer
_session
_id
= viewer_session_id
115 def viewer_session_id(self
):
116 return self
._viewer
_session
_id
127 class _LttngLiveViewerConnectReply
:
128 def __init__(self
, viewer_session_id
, major
, minor
):
129 self
._viewer
_session
_id
= viewer_session_id
134 def viewer_session_id(self
):
135 return self
._viewer
_session
_id
146 class _LttngLiveViewerGetTracingSessionInfosCommand(_LttngLiveViewerCommand
):
150 class _LttngLiveViewerTracingSessionInfo
:
160 self
._tracing
_session
_id
= tracing_session_id
161 self
._live
_timer
_freq
= live_timer_freq
162 self
._client
_count
= client_count
163 self
._stream
_count
= stream_count
164 self
._hostname
= hostname
168 def tracing_session_id(self
):
169 return self
._tracing
_session
_id
172 def live_timer_freq(self
):
173 return self
._live
_timer
_freq
176 def client_count(self
):
177 return self
._client
_count
180 def stream_count(self
):
181 return self
._stream
_count
185 return self
._hostname
192 class _LttngLiveViewerGetTracingSessionInfosReply
:
193 def __init__(self
, tracing_session_infos
):
194 self
._tracing
_session
_infos
= tracing_session_infos
197 def tracing_session_infos(self
):
198 return self
._tracing
_session
_infos
201 class _LttngLiveViewerAttachToTracingSessionCommand(_LttngLiveViewerCommand
):
206 def __init__(self
, version
, tracing_session_id
, offset
, seek_type
):
207 super().__init
__(version
)
208 self
._tracing
_session
_id
= tracing_session_id
209 self
._offset
= offset
210 self
._seek
_type
= seek_type
213 def tracing_session_id(self
):
214 return self
._tracing
_session
_id
222 return self
._seek
_type
225 class _LttngLiveViewerStreamInfo
:
226 def __init__(self
, id, trace_id
, is_metadata
, path
, channel_name
):
228 self
._trace
_id
= trace_id
229 self
._is
_metadata
= is_metadata
231 self
._channel
_name
= channel_name
239 return self
._trace
_id
242 def is_metadata(self
):
243 return self
._is
_metadata
250 def channel_name(self
):
251 return self
._channel
_name
254 class _LttngLiveViewerAttachToTracingSessionReply
:
263 def __init__(self
, status
, stream_infos
):
264 self
._status
= status
265 self
._stream
_infos
= stream_infos
272 def stream_infos(self
):
273 return self
._stream
_infos
276 class _LttngLiveViewerGetNextDataStreamIndexEntryCommand(_LttngLiveViewerCommand
):
277 def __init__(self
, version
, stream_id
):
278 super().__init
__(version
)
279 self
._stream
_id
= stream_id
283 return self
._stream
_id
286 class _LttngLiveViewerGetNextDataStreamIndexEntryReply
:
295 def __init__(self
, status
, index_entry
, has_new_metadata
, has_new_data_stream
):
296 self
._status
= status
297 self
._index
_entry
= index_entry
298 self
._has
_new
_metadata
= has_new_metadata
299 self
._has
_new
_data
_stream
= has_new_data_stream
306 def index_entry(self
):
307 return self
._index
_entry
310 def has_new_metadata(self
):
311 return self
._has
_new
_metadata
314 def has_new_data_stream(self
):
315 return self
._has
_new
_data
_stream
318 class _LttngLiveViewerGetDataStreamPacketDataCommand(_LttngLiveViewerCommand
):
319 def __init__(self
, version
, stream_id
, offset
, req_length
):
320 super().__init
__(version
)
321 self
._stream
_id
= stream_id
322 self
._offset
= offset
323 self
._req
_length
= req_length
327 return self
._stream
_id
334 def req_length(self
):
335 return self
._req
_length
338 class _LttngLiveViewerGetDataStreamPacketDataReply
:
345 def __init__(self
, status
, data
, has_new_metadata
, has_new_data_stream
):
346 self
._status
= status
348 self
._has
_new
_metadata
= has_new_metadata
349 self
._has
_new
_data
_stream
= has_new_data_stream
360 def has_new_metadata(self
):
361 return self
._has
_new
_metadata
364 def has_new_data_stream(self
):
365 return self
._has
_new
_data
_stream
368 class _LttngLiveViewerGetMetadataStreamDataCommand(_LttngLiveViewerCommand
):
369 def __init__(self
, version
, stream_id
):
370 super().__init
__(version
)
371 self
._stream
_id
= stream_id
375 return self
._stream
_id
378 class _LttngLiveViewerGetMetadataStreamDataContentReply
:
384 def __init__(self
, status
, data
):
385 self
._status
= status
397 class _LttngLiveViewerGetNewStreamInfosCommand(_LttngLiveViewerCommand
):
398 def __init__(self
, version
, tracing_session_id
):
399 super().__init
__(version
)
400 self
._tracing
_session
_id
= tracing_session_id
403 def tracing_session_id(self
):
404 return self
._tracing
_session
_id
407 class _LttngLiveViewerGetNewStreamInfosReply
:
414 def __init__(self
, status
, stream_infos
):
415 self
._status
= status
416 self
._stream
_infos
= stream_infos
423 def stream_infos(self
):
424 return self
._stream
_infos
427 class _LttngLiveViewerCreateViewerSessionCommand(_LttngLiveViewerCommand
):
431 class _LttngLiveViewerCreateViewerSessionReply
:
436 def __init__(self
, status
):
437 self
._status
= status
444 class _LttngLiveViewerDetachFromTracingSessionCommand(_LttngLiveViewerCommand
):
445 def __init__(self
, version
, tracing_session_id
):
446 super().__init
__(version
)
447 self
._tracing
_session
_id
= tracing_session_id
450 def tracing_session_id(self
):
451 return self
._tracing
_session
_id
454 class _LttngLiveViewerDetachFromTracingSessionReply
:
460 def __init__(self
, status
):
461 self
._status
= status
468 # An LTTng live protocol codec can convert bytes to command objects and
469 # reply objects to bytes.
470 class _LttngLiveViewerProtocolCodec
:
471 _COMMAND_HEADER_STRUCT_FMT
= "QII"
472 _COMMAND_HEADER_SIZE_BYTES
= struct
.calcsize(_COMMAND_HEADER_STRUCT_FMT
)
477 def _unpack(self
, fmt
, data
, offset
=0):
479 return struct
.unpack_from(fmt
, data
, offset
)
481 def _unpack_payload(self
, fmt
, data
):
483 fmt
, data
, _LttngLiveViewerProtocolCodec
._COMMAND
_HEADER
_SIZE
_BYTES
486 def decode(self
, data
):
487 if len(data
) < self
._COMMAND
_HEADER
_SIZE
_BYTES
:
488 # Not enough data to read the command header
491 payload_size
, cmd_type
, version
= self
._unpack
(
492 self
._COMMAND
_HEADER
_STRUCT
_FMT
, data
495 "Decoded command header: payload-size={}, cmd-type={}, version={}".format(
496 payload_size
, cmd_type
, version
500 if len(data
) < self
._COMMAND
_HEADER
_SIZE
_BYTES
+ payload_size
:
501 # Not enough data to read the whole command
505 viewer_session_id
, major
, minor
, conn_type
= self
._unpack
_payload
(
508 return _LttngLiveViewerConnectCommand(
509 version
, viewer_session_id
, major
, minor
512 return _LttngLiveViewerGetTracingSessionInfosCommand(version
)
514 tracing_session_id
, offset
, seek_type
= self
._unpack
_payload
("QQI", data
)
515 return _LttngLiveViewerAttachToTracingSessionCommand(
516 version
, tracing_session_id
, offset
, seek_type
519 (stream_id
,) = self
._unpack
_payload
("Q", data
)
520 return _LttngLiveViewerGetNextDataStreamIndexEntryCommand(
524 stream_id
, offset
, req_length
= self
._unpack
_payload
("QQI", data
)
525 return _LttngLiveViewerGetDataStreamPacketDataCommand(
526 version
, stream_id
, offset
, req_length
529 (stream_id
,) = self
._unpack
_payload
("Q", data
)
530 return _LttngLiveViewerGetMetadataStreamDataCommand(version
, stream_id
)
532 (tracing_session_id
,) = self
._unpack
_payload
("Q", data
)
533 return _LttngLiveViewerGetNewStreamInfosCommand(version
, tracing_session_id
)
535 return _LttngLiveViewerCreateViewerSessionCommand(version
)
537 (tracing_session_id
,) = self
._unpack
_payload
("Q", data
)
538 return _LttngLiveViewerDetachFromTracingSessionCommand(
539 version
, tracing_session_id
542 raise UnexpectedInput("Unknown command type {}".format(cmd_type
))
544 def _pack(self
, fmt
, *args
):
545 # Force network byte order
546 return struct
.pack("!" + fmt
, *args
)
548 def _encode_zero_padded_str(self
, string
, length
):
549 data
= string
.encode()
550 return data
.ljust(length
, b
"\x00")
552 def _encode_stream_info(self
, info
):
553 data
= self
._pack
("QQI", info
.id, info
.trace_id
, int(info
.is_metadata
))
554 data
+= self
._encode
_zero
_padded
_str
(info
.path
, 4096)
555 data
+= self
._encode
_zero
_padded
_str
(info
.channel_name
, 255)
558 def _get_has_new_stuff_flags(self
, has_new_metadata
, has_new_data_streams
):
564 if has_new_data_streams
:
569 def encode(self
, reply
):
570 if type(reply
) is _LttngLiveViewerConnectReply
:
572 "QIII", reply
.viewer_session_id
, reply
.major
, reply
.minor
, 2
574 elif type(reply
) is _LttngLiveViewerGetTracingSessionInfosReply
:
575 data
= self
._pack
("I", len(reply
.tracing_session_infos
))
577 for info
in reply
.tracing_session_infos
:
580 info
.tracing_session_id
,
581 info
.live_timer_freq
,
585 data
+= self
._encode
_zero
_padded
_str
(info
.hostname
, 64)
586 data
+= self
._encode
_zero
_padded
_str
(info
.name
, 255)
587 elif type(reply
) is _LttngLiveViewerAttachToTracingSessionReply
:
588 data
= self
._pack
("II", reply
.status
, len(reply
.stream_infos
))
590 for info
in reply
.stream_infos
:
591 data
+= self
._encode
_stream
_info
(info
)
592 elif type(reply
) is _LttngLiveViewerGetNextDataStreamIndexEntryReply
:
593 index_format
= "QQQQQQQII"
594 entry
= reply
.index_entry
595 flags
= self
._get
_has
_new
_stuff
_flags
(
596 reply
.has_new_metadata
, reply
.has_new_data_stream
599 if type(entry
) is _LttngDataStreamIndexEntry
:
603 entry
.total_size_bits
,
604 entry
.content_size_bits
,
605 entry
.timestamp_begin
,
607 entry
.events_discarded
,
608 entry
.stream_class_id
,
613 assert type(entry
) is _LttngDataStreamBeaconEntry
622 entry
.stream_class_id
,
626 elif type(reply
) is _LttngLiveViewerGetDataStreamPacketDataReply
:
627 flags
= self
._get
_has
_new
_stuff
_flags
(
628 reply
.has_new_metadata
, reply
.has_new_data_stream
630 data
= self
._pack
("III", reply
.status
, len(reply
.data
), flags
)
632 elif type(reply
) is _LttngLiveViewerGetMetadataStreamDataContentReply
:
633 data
= self
._pack
("QI", len(reply
.data
), reply
.status
)
635 elif type(reply
) is _LttngLiveViewerGetNewStreamInfosReply
:
636 data
= self
._pack
("II", reply
.status
, len(reply
.stream_infos
))
638 for info
in reply
.stream_infos
:
639 data
+= self
._encode
_stream
_info
(info
)
640 elif type(reply
) is _LttngLiveViewerCreateViewerSessionReply
:
641 data
= self
._pack
("I", reply
.status
)
642 elif type(reply
) is _LttngLiveViewerDetachFromTracingSessionReply
:
643 data
= self
._pack
("I", reply
.status
)
646 "Unknown reply object with class `{}`".format(reply
.__class
__.__name
__)
652 def _get_entry_timestamp_begin(entry
):
653 if type(entry
) is _LttngDataStreamBeaconEntry
:
654 return entry
.timestamp
656 assert type(entry
) is _LttngDataStreamIndexEntry
657 return entry
.timestamp_begin
660 # The index of an LTTng data stream, a sequence of index entries.
661 class _LttngDataStreamIndex(collections
.abc
.Sequence
):
662 def __init__(self
, path
, beacons
):
667 stream_class_id
= self
._entries
[0].stream_class_id
669 _LttngDataStreamBeaconEntry(stream_class_id
, ts
) for ts
in beacons
671 self
._add
_beacons
(beacons
)
674 'Built data stream index entries: path="{}", count={}'.format(
675 path
, len(self
._entries
)
681 assert os
.path
.isfile(self
._path
)
683 with
open(self
._path
, "rb") as f
:
686 size
= struct
.calcsize(fmt
)
688 assert len(data
) == size
689 magic
, index_major
, index_minor
, index_entry_length
= struct
.unpack(
692 assert magic
== 0xC1F1DCC1
696 size
= struct
.calcsize(fmt
)
700 'Decoding data stream index entry: path="{}", offset={}'.format(
710 assert len(data
) == size
719 ) = struct
.unpack(fmt
, data
)
721 self
._entries
.append(
722 _LttngDataStreamIndexEntry(
733 # Skip anything else before the next entry
734 f
.seek(index_entry_length
- size
, os
.SEEK_CUR
)
736 def _add_beacons(self
, beacons
):
737 # Assumes entries[n + 1].timestamp_end >= entries[n].timestamp_begin
739 if type(entry
) is _LttngDataStreamBeaconEntry
:
740 return entry
.timestamp
742 return entry
.timestamp_end
744 self
._entries
+= beacons
745 self
._entries
.sort(key
=sort_key
)
747 def __getitem__(self
, index
):
748 return self
._entries
[index
]
751 return len(self
._entries
)
758 # An LTTng data stream.
759 class _LttngDataStream
:
760 def __init__(self
, path
, beacons
):
762 filename
= os
.path
.basename(path
)
763 match
= re
.match(r
"(.*)_\d+", filename
)
766 "Unexpected data stream file name pattern: {}".format(filename
)
769 self
._channel
_name
= match
.group(1)
770 trace_dir
= os
.path
.dirname(path
)
771 index_path
= os
.path
.join(trace_dir
, "index", filename
+ ".idx")
772 self
._index
= _LttngDataStreamIndex(index_path
, beacons
)
773 assert os
.path
.isfile(path
)
774 self
._file
= open(path
, "rb")
776 'Built data stream: path="{}", channel-name="{}"'.format(
777 path
, self
._channel
_name
786 def channel_name(self
):
787 return self
._channel
_name
793 def get_data(self
, offset_bytes
, len_bytes
):
794 self
._file
.seek(offset_bytes
)
795 return self
._file
.read(len_bytes
)
798 class _LttngMetadataStreamSection
:
799 def __init__(self
, timestamp
, data
):
800 self
._timestamp
= timestamp
806 "Built metadata stream section: ts={}, data-len={}".format(
807 self
._timestamp
, len(self
._data
)
813 return self
._timestamp
820 # An LTTng metadata stream.
821 class _LttngMetadataStream
:
822 def __init__(self
, metadata_file_path
, config_sections
):
823 self
._path
= metadata_file_path
824 self
._sections
= config_sections
826 "Built metadata stream: path={}, section-len={}".format(
827 self
._path
, len(self
._sections
)
837 return self
._sections
840 LttngMetadataConfigSection
= namedtuple(
841 "LttngMetadataConfigSection", ["line", "timestamp", "is_empty"]
845 def _parse_metadata_sections_config(config_sections
):
846 assert config_sections
is not None
847 config_metadata_sections
= []
848 append_empty_section
= False
852 for config_section
in config_sections
:
853 if config_section
== "empty":
854 # Found a empty section marker. Actually append the section at the
855 # timestamp of the next concrete section.
856 append_empty_section
= True
858 assert type(config_section
) is dict
859 line
= config_section
.get("line")
860 ts
= config_section
.get("timestamp")
862 if type(line
) is not int:
863 raise RuntimeError("`line` is not an integer")
865 if type(ts
) is not int:
866 raise RuntimeError("`timestamp` is not an integer")
868 # Sections' timestamps and lines must both be increasing.
869 assert ts
> last_timestamp
871 assert line
> last_line
874 if append_empty_section
:
875 config_metadata_sections
.append(
876 LttngMetadataConfigSection(line
, ts
, True)
878 append_empty_section
= False
880 config_metadata_sections
.append(LttngMetadataConfigSection(line
, ts
, False))
882 return config_metadata_sections
885 def _split_metadata_sections(metadata_file_path
, raw_config_sections
):
886 assert isinstance(raw_config_sections
, collections
.abc
.Sequence
)
888 parsed_sections
= _parse_metadata_sections_config(raw_config_sections
)
891 with
open(metadata_file_path
, "r") as metadata_file
:
892 metadata_lines
= [line
for line
in metadata_file
]
894 config_metadata_sections_idx
= 0
895 curr_metadata_section
= bytearray()
897 for idx
, line_content
in enumerate(metadata_lines
):
898 # Add one to the index to convert from the zero-indexing of the
899 # enumerate() function to the one-indexing used by humans when
900 # viewing a text file.
901 curr_line_number
= idx
+ 1
903 # If there are no more sections, simply append the line.
904 if config_metadata_sections_idx
+ 1 >= len(parsed_sections
):
905 curr_metadata_section
+= bytearray(line_content
, "utf8")
908 next_section_line_number
= parsed_sections
[
909 config_metadata_sections_idx
+ 1
912 # If the next section begins at the current line, create a
913 # section with the metadata we gathered so far.
914 if curr_line_number
>= next_section_line_number
:
915 # Flushing the metadata of the current section.
917 _LttngMetadataStreamSection(
918 parsed_sections
[config_metadata_sections_idx
].timestamp
,
919 bytes(curr_metadata_section
),
923 # Move to the next section.
924 config_metadata_sections_idx
+= 1
926 # Clear old content and append current line for the next section.
927 curr_metadata_section
.clear()
928 curr_metadata_section
+= bytearray(line_content
, "utf8")
930 # Append any empty sections.
931 while parsed_sections
[config_metadata_sections_idx
].is_empty
:
933 _LttngMetadataStreamSection(
934 parsed_sections
[config_metadata_sections_idx
].timestamp
, None
937 config_metadata_sections_idx
+= 1
939 # Append line_content to the current metadata section.
940 curr_metadata_section
+= bytearray(line_content
, "utf8")
942 # We iterated over all the lines of the metadata file. Close the current section.
944 _LttngMetadataStreamSection(
945 parsed_sections
[config_metadata_sections_idx
].timestamp
,
946 bytes(curr_metadata_section
),
953 # An LTTng trace, a sequence of LTTng data streams.
954 class LttngTrace(collections
.abc
.Sequence
):
955 def __init__(self
, trace_dir
, metadata_sections
, beacons
):
956 assert os
.path
.isdir(trace_dir
)
957 self
._path
= trace_dir
958 self
._create
_metadata
_stream
(trace_dir
, metadata_sections
)
959 self
._create
_data
_streams
(trace_dir
, beacons
)
960 logging
.info('Built trace: path="{}"'.format(trace_dir
))
962 def _create_data_streams(self
, trace_dir
, beacons
):
963 data_stream_paths
= []
965 for filename
in os
.listdir(trace_dir
):
966 path
= os
.path
.join(trace_dir
, filename
)
968 if not os
.path
.isfile(path
):
971 if filename
.startswith("."):
974 if filename
== "metadata":
977 data_stream_paths
.append(path
)
979 data_stream_paths
.sort()
980 self
._data
_streams
= []
982 for data_stream_path
in data_stream_paths
:
983 stream_name
= os
.path
.basename(data_stream_path
)
984 this_stream_beacons
= None
986 if beacons
is not None and stream_name
in beacons
:
987 this_stream_beacons
= beacons
[stream_name
]
989 self
._data
_streams
.append(
990 _LttngDataStream(data_stream_path
, this_stream_beacons
)
993 def _create_metadata_stream(self
, trace_dir
, config_metadata_sections
):
994 metadata_path
= os
.path
.join(trace_dir
, "metadata")
995 metadata_sections
= []
997 if config_metadata_sections
is None:
998 with
open(metadata_path
, "rb") as metadata_file
:
999 metadata_sections
.append(
1000 _LttngMetadataStreamSection(0, metadata_file
.read())
1003 metadata_sections
= _split_metadata_sections(
1004 metadata_path
, config_metadata_sections
1007 self
._metadata
_stream
= _LttngMetadataStream(metadata_path
, metadata_sections
)
1014 def metadata_stream(self
):
1015 return self
._metadata
_stream
1017 def __getitem__(self
, index
):
1018 return self
._data
_streams
[index
]
1021 return len(self
._data
_streams
)
1024 # The state of a single data stream.
1025 class _LttngLiveViewerSessionDataStreamState
:
1026 def __init__(self
, ts_state
, info
, data_stream
, metadata_stream_id
):
1027 self
._ts
_state
= ts_state
1029 self
._data
_stream
= data_stream
1030 self
._metadata
_stream
_id
= metadata_stream_id
1031 self
._cur
_index
_entry
_index
= 0
1032 fmt
= 'Built data stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
1036 ts_state
.tracing_session_descriptor
.info
.tracing_session_id
,
1037 ts_state
.tracing_session_descriptor
.info
.name
,
1043 def tracing_session_state(self
):
1044 return self
._ts
_state
1051 def data_stream(self
):
1052 return self
._data
_stream
1055 def cur_index_entry(self
):
1056 if self
._cur
_index
_entry
_index
== len(self
._data
_stream
.index
):
1059 return self
._data
_stream
.index
[self
._cur
_index
_entry
_index
]
1061 def goto_next_index_entry(self
):
1062 self
._cur
_index
_entry
_index
+= 1
1065 # The state of a single metadata stream.
1066 class _LttngLiveViewerSessionMetadataStreamState
:
1067 def __init__(self
, ts_state
, info
, metadata_stream
):
1068 self
._ts
_state
= ts_state
1070 self
._metadata
_stream
= metadata_stream
1071 self
._cur
_metadata
_stream
_section
_index
= 0
1072 if len(metadata_stream
.sections
) > 1:
1073 self
._next
_metadata
_stream
_section
_timestamp
= metadata_stream
.sections
[
1077 self
._next
_metadata
_stream
_section
_timestamp
= None
1079 self
._is
_sent
= False
1080 fmt
= 'Built metadata stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
1084 ts_state
.tracing_session_descriptor
.info
.tracing_session_id
,
1085 ts_state
.tracing_session_descriptor
.info
.name
,
1086 metadata_stream
.path
,
1095 def metadata_stream(self
):
1096 return self
._metadata
_stream
1100 return self
._is
_sent
1103 def is_sent(self
, value
):
1104 self
._is
_sent
= value
1107 def cur_section(self
):
1108 fmt
= "Get current metadata section: section-idx={}"
1109 logging
.info(fmt
.format(self
._cur
_metadata
_stream
_section
_index
))
1110 if self
._cur
_metadata
_stream
_section
_index
== len(
1111 self
._metadata
_stream
.sections
1115 return self
._metadata
_stream
.sections
[self
._cur
_metadata
_stream
_section
_index
]
1117 def goto_next_section(self
):
1118 self
._cur
_metadata
_stream
_section
_index
+= 1
1119 if self
.cur_section
:
1120 self
._next
_metadata
_stream
_section
_timestamp
= self
.cur_section
.timestamp
1122 self
._next
_metadata
_stream
_section
_timestamp
= None
1125 def next_section_timestamp(self
):
1126 return self
._next
_metadata
_stream
_section
_timestamp
1129 # A tracing session descriptor.
1131 # In the constructor, `traces` is a list of LTTng traces (`LttngTrace`
1133 class LttngTracingSessionDescriptor
:
1135 self
, name
, tracing_session_id
, hostname
, live_timer_freq
, client_count
, traces
1137 for trace
in traces
:
1138 if name
not in trace
.path
:
1139 fmt
= "Tracing session name must be part of every trace path (`{}` not found in `{}`)"
1140 raise ValueError(fmt
.format(name
, trace
.path
))
1142 self
._traces
= traces
1143 stream_count
= sum([len(t
) + 1 for t
in traces
])
1144 self
._info
= _LttngLiveViewerTracingSessionInfo(
1162 # The state of a tracing session.
1163 class _LttngLiveViewerSessionTracingSessionState
:
1164 def __init__(self
, tc_descr
, base_stream_id
):
1165 self
._tc
_descr
= tc_descr
1166 self
._stream
_infos
= []
1167 self
._ds
_states
= {}
1168 self
._ms
_states
= {}
1169 stream_id
= base_stream_id
1171 for trace
in tc_descr
.traces
:
1172 trace_id
= stream_id
* 1000
1174 # Metadata stream -> stream info and metadata stream state
1175 info
= _LttngLiveViewerStreamInfo(
1176 stream_id
, trace_id
, True, trace
.metadata_stream
.path
, "metadata"
1178 self
._stream
_infos
.append(info
)
1179 self
._ms
_states
[stream_id
] = _LttngLiveViewerSessionMetadataStreamState(
1180 self
, info
, trace
.metadata_stream
1182 metadata_stream_id
= stream_id
1185 # Data streams -> stream infos and data stream states
1186 for data_stream
in trace
:
1187 info
= _LttngLiveViewerStreamInfo(
1192 data_stream
.channel_name
,
1194 self
._stream
_infos
.append(info
)
1195 self
._ds
_states
[stream_id
] = _LttngLiveViewerSessionDataStreamState(
1196 self
, info
, data_stream
, metadata_stream_id
1200 self
._is
_attached
= False
1201 fmt
= 'Built tracing session state: id={}, name="{}"'
1202 logging
.info(fmt
.format(tc_descr
.info
.tracing_session_id
, tc_descr
.info
.name
))
1205 def tracing_session_descriptor(self
):
1206 return self
._tc
_descr
1209 def data_stream_states(self
):
1210 return self
._ds
_states
1213 def metadata_stream_states(self
):
1214 return self
._ms
_states
1217 def stream_infos(self
):
1218 return self
._stream
_infos
1221 def has_new_metadata(self
):
1222 return any([not ms
.is_sent
for ms
in self
._ms
_states
.values()])
1225 def is_attached(self
):
1226 return self
._is
_attached
1229 def is_attached(self
, value
):
1230 self
._is
_attached
= value
1233 def needs_new_metadata_section(metadata_stream_state
, latest_timestamp
):
1234 if metadata_stream_state
.next_section_timestamp
is None:
1237 if latest_timestamp
>= metadata_stream_state
.next_section_timestamp
:
1243 # An LTTng live viewer session manages a view on tracing sessions
1244 # and replies to commands accordingly.
1245 class _LttngLiveViewerSession
:
1249 tracing_session_descriptors
,
1250 max_query_data_response_size
,
1252 self
._viewer
_session
_id
= viewer_session_id
1253 self
._ts
_states
= {}
1254 self
._stream
_states
= {}
1255 self
._max
_query
_data
_response
_size
= max_query_data_response_size
1256 total_stream_infos
= 0
1258 for ts_descr
in tracing_session_descriptors
:
1259 ts_state
= _LttngLiveViewerSessionTracingSessionState(
1260 ts_descr
, total_stream_infos
1262 ts_id
= ts_state
.tracing_session_descriptor
.info
.tracing_session_id
1263 self
._ts
_states
[ts_id
] = ts_state
1264 total_stream_infos
+= len(ts_state
.stream_infos
)
1266 # Update session's stream states to have the new states
1267 self
._stream
_states
.update(ts_state
.data_stream_states
)
1268 self
._stream
_states
.update(ts_state
.metadata_stream_states
)
1270 self
._command
_handlers
= {
1271 _LttngLiveViewerAttachToTracingSessionCommand
: self
._handle
_attach
_to
_tracing
_session
_command
,
1272 _LttngLiveViewerCreateViewerSessionCommand
: self
._handle
_create
_viewer
_session
_command
,
1273 _LttngLiveViewerDetachFromTracingSessionCommand
: self
._handle
_detach
_from
_tracing
_session
_command
,
1274 _LttngLiveViewerGetDataStreamPacketDataCommand
: self
._handle
_get
_data
_stream
_packet
_data
_command
,
1275 _LttngLiveViewerGetMetadataStreamDataCommand
: self
._handle
_get
_metadata
_stream
_data
_command
,
1276 _LttngLiveViewerGetNewStreamInfosCommand
: self
._handle
_get
_new
_stream
_infos
_command
,
1277 _LttngLiveViewerGetNextDataStreamIndexEntryCommand
: self
._handle
_get
_next
_data
_stream
_index
_entry
_command
,
1278 _LttngLiveViewerGetTracingSessionInfosCommand
: self
._handle
_get
_tracing
_session
_infos
_command
,
1282 def viewer_session_id(self
):
1283 return self
._viewer
_session
_id
1285 def _get_tracing_session_state(self
, tracing_session_id
):
1286 if tracing_session_id
not in self
._ts
_states
:
1287 raise UnexpectedInput(
1288 "Unknown tracing session ID {}".format(tracing_session_id
)
1291 return self
._ts
_states
[tracing_session_id
]
1293 def _get_stream_state(self
, stream_id
):
1294 if stream_id
not in self
._stream
_states
:
1295 UnexpectedInput("Unknown stream ID {}".format(stream_id
))
1297 return self
._stream
_states
[stream_id
]
1299 def handle_command(self
, cmd
):
1301 "Handling command in viewer session: cmd-cls-name={}".format(
1302 cmd
.__class
__.__name
__
1305 cmd_type
= type(cmd
)
1307 if cmd_type
not in self
._command
_handlers
:
1308 raise UnexpectedInput(
1309 "Unexpected command: cmd-cls-name={}".format(cmd
.__class
__.__name
__)
1312 return self
._command
_handlers
[cmd_type
](cmd
)
1314 def _handle_attach_to_tracing_session_command(self
, cmd
):
1315 fmt
= 'Handling "attach to tracing session" command: ts-id={}, offset={}, seek-type={}'
1316 logging
.info(fmt
.format(cmd
.tracing_session_id
, cmd
.offset
, cmd
.seek_type
))
1317 ts_state
= self
._get
_tracing
_session
_state
(cmd
.tracing_session_id
)
1318 info
= ts_state
.tracing_session_descriptor
.info
1320 if ts_state
.is_attached
:
1321 raise UnexpectedInput(
1322 "Cannot attach to tracing session `{}`: viewer is already attached".format(
1327 ts_state
.is_attached
= True
1328 status
= _LttngLiveViewerAttachToTracingSessionReply
.Status
.OK
1329 return _LttngLiveViewerAttachToTracingSessionReply(
1330 status
, ts_state
.stream_infos
1333 def _handle_detach_from_tracing_session_command(self
, cmd
):
1334 fmt
= 'Handling "detach from tracing session" command: ts-id={}'
1335 logging
.info(fmt
.format(cmd
.tracing_session_id
))
1336 ts_state
= self
._get
_tracing
_session
_state
(cmd
.tracing_session_id
)
1337 info
= ts_state
.tracing_session_descriptor
.info
1339 if not ts_state
.is_attached
:
1340 raise UnexpectedInput(
1341 "Cannot detach to tracing session `{}`: viewer is not attached".format(
1346 ts_state
.is_attached
= False
1347 status
= _LttngLiveViewerDetachFromTracingSessionReply
.Status
.OK
1348 return _LttngLiveViewerDetachFromTracingSessionReply(status
)
1350 def _handle_get_next_data_stream_index_entry_command(self
, cmd
):
1351 fmt
= 'Handling "get next data stream index entry" command: stream-id={}'
1352 logging
.info(fmt
.format(cmd
.stream_id
))
1353 stream_state
= self
._get
_stream
_state
(cmd
.stream_id
)
1354 metadata_stream_state
= self
._get
_stream
_state
(stream_state
._metadata
_stream
_id
)
1356 if type(stream_state
) is not _LttngLiveViewerSessionDataStreamState
:
1357 raise UnexpectedInput(
1358 "Stream with ID {} is not a data stream".format(cmd
.stream_id
)
1361 if stream_state
.cur_index_entry
is None:
1362 # The viewer is done reading this stream
1363 status
= _LttngLiveViewerGetNextDataStreamIndexEntryReply
.Status
.HUP
1365 # Dummy data stream index entry to use with the `HUP` status
1366 # (the reply needs one, but the viewer ignores it)
1367 index_entry
= _LttngDataStreamIndexEntry(0, 0, 0, 0, 0, 0, 0)
1369 return _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1370 status
, index_entry
, False, False
1373 timestamp_begin
= _get_entry_timestamp_begin(stream_state
.cur_index_entry
)
1375 if needs_new_metadata_section(metadata_stream_state
, timestamp_begin
):
1376 metadata_stream_state
.is_sent
= False
1377 metadata_stream_state
.goto_next_section()
1379 # The viewer only checks the `has_new_metadata` flag if the
1380 # reply's status is `OK`, so we need to provide an index here
1381 has_new_metadata
= stream_state
.tracing_session_state
.has_new_metadata
1382 if type(stream_state
.cur_index_entry
) is _LttngDataStreamIndexEntry
:
1383 status
= _LttngLiveViewerGetNextDataStreamIndexEntryReply
.Status
.OK
1385 assert type(stream_state
.cur_index_entry
) is _LttngDataStreamBeaconEntry
1386 status
= _LttngLiveViewerGetNextDataStreamIndexEntryReply
.Status
.INACTIVE
1388 reply
= _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1389 status
, stream_state
.cur_index_entry
, has_new_metadata
, False
1391 stream_state
.goto_next_index_entry()
1394 def _handle_get_data_stream_packet_data_command(self
, cmd
):
1395 fmt
= 'Handling "get data stream packet data" command: stream-id={}, offset={}, req-length={}'
1396 logging
.info(fmt
.format(cmd
.stream_id
, cmd
.offset
, cmd
.req_length
))
1397 stream_state
= self
._get
_stream
_state
(cmd
.stream_id
)
1398 data_response_length
= cmd
.req_length
1400 if type(stream_state
) is not _LttngLiveViewerSessionDataStreamState
:
1401 raise UnexpectedInput(
1402 "Stream with ID {} is not a data stream".format(cmd
.stream_id
)
1405 if stream_state
.tracing_session_state
.has_new_metadata
:
1406 status
= _LttngLiveViewerGetDataStreamPacketDataReply
.Status
.ERROR
1407 return _LttngLiveViewerGetDataStreamPacketDataReply(
1408 status
, bytes(), True, False
1411 if self
._max
_query
_data
_response
_size
:
1412 # Enforce a server side limit on the query requested length.
1413 # To ensure that the transaction terminate take the minimum of both
1415 data_response_length
= min(
1416 cmd
.req_length
, self
._max
_query
_data
_response
_size
1418 fmt
= 'Limiting "get data stream packet data" command: req-length={} actual response size={}'
1419 logging
.info(fmt
.format(cmd
.req_length
, data_response_length
))
1421 data
= stream_state
.data_stream
.get_data(cmd
.offset
, data_response_length
)
1422 status
= _LttngLiveViewerGetDataStreamPacketDataReply
.Status
.OK
1423 return _LttngLiveViewerGetDataStreamPacketDataReply(status
, data
, False, False)
1425 def _handle_get_metadata_stream_data_command(self
, cmd
):
1426 fmt
= 'Handling "get metadata stream data" command: stream-id={}'
1427 logging
.info(fmt
.format(cmd
.stream_id
))
1428 metadata_stream_state
= self
._get
_stream
_state
(cmd
.stream_id
)
1431 type(metadata_stream_state
)
1432 is not _LttngLiveViewerSessionMetadataStreamState
1434 raise UnexpectedInput(
1435 "Stream with ID {} is not a metadata stream".format(cmd
.stream_id
)
1438 if metadata_stream_state
.is_sent
:
1439 status
= _LttngLiveViewerGetMetadataStreamDataContentReply
.Status
.NO_NEW
1440 return _LttngLiveViewerGetMetadataStreamDataContentReply(status
, bytes())
1442 metadata_stream_state
.is_sent
= True
1443 status
= _LttngLiveViewerGetMetadataStreamDataContentReply
.Status
.OK
1444 metadata_section
= metadata_stream_state
.cur_section
1445 assert metadata_section
is not None
1447 # If we are sending an empty section, ready the next one right away.
1448 if len(metadata_section
.data
) == 0:
1449 metadata_stream_state
.is_sent
= False
1450 metadata_stream_state
.goto_next_section()
1452 fmt
= 'Replying to "get metadata stream data" command: metadata-size={}'
1453 logging
.info(fmt
.format(len(metadata_section
.data
)))
1454 return _LttngLiveViewerGetMetadataStreamDataContentReply(
1455 status
, metadata_section
.data
1458 def _handle_get_new_stream_infos_command(self
, cmd
):
1459 fmt
= 'Handling "get new stream infos" command: ts-id={}'
1460 logging
.info(fmt
.format(cmd
.tracing_session_id
))
1462 # As of this version, all the tracing session's stream infos are
1463 # always given to the viewer when sending the "attach to tracing
1464 # session" reply, so there's nothing new here. Return the `HUP`
1465 # status as, if we're handling this command, the viewer consumed
1466 # all the existing data streams.
1467 status
= _LttngLiveViewerGetNewStreamInfosReply
.Status
.HUP
1468 return _LttngLiveViewerGetNewStreamInfosReply(status
, [])
1470 def _handle_get_tracing_session_infos_command(self
, cmd
):
1471 logging
.info('Handling "get tracing session infos" command.')
1473 tss
.tracing_session_descriptor
.info
for tss
in self
._ts
_states
.values()
1475 infos
.sort(key
=lambda info
: info
.name
)
1476 return _LttngLiveViewerGetTracingSessionInfosReply(infos
)
1478 def _handle_create_viewer_session_command(self
, cmd
):
1479 logging
.info('Handling "create viewer session" command.')
1480 status
= _LttngLiveViewerCreateViewerSessionReply
.Status
.OK
1482 # This does nothing here. In the LTTng relay daemon, it
1483 # allocates the viewer session's state.
1484 return _LttngLiveViewerCreateViewerSessionReply(status
)
1487 # An LTTng live TCP server.
1489 # On creation, it binds to `localhost` on the TCP port `port` if not `None`, or
1490 # on an OS-assigned TCP port otherwise. It writes the decimal TCP port number
1491 # to a temporary port file. It renames the temporary port file to
1494 # `tracing_session_descriptors` is a list of tracing session descriptors
1495 # (`LttngTracingSessionDescriptor`) to serve.
1497 # This server accepts a single viewer (client).
1499 # When the viewer closes the connection, the server's constructor
1501 class LttngLiveServer
:
1506 tracing_session_descriptors
,
1507 max_query_data_response_size
,
1509 logging
.info("Server configuration:")
1511 logging
.info(" Port file name: `{}`".format(port_filename
))
1513 if max_query_data_response_size
is not None:
1515 " Maximum response data query size: `{}`".format(
1516 max_query_data_response_size
1520 for ts_descr
in tracing_session_descriptors
:
1521 info
= ts_descr
.info
1522 fmt
= ' TS descriptor: name="{}", id={}, hostname="{}", live-timer-freq={}, client-count={}, stream-count={}:'
1526 info
.tracing_session_id
,
1528 info
.live_timer_freq
,
1534 for trace
in ts_descr
.traces
:
1535 logging
.info(' Trace: path="{}"'.format(trace
.path
))
1537 self
._ts
_descriptors
= tracing_session_descriptors
1538 self
._max
_query
_data
_response
_size
= max_query_data_response_size
1539 self
._sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
1540 self
._codec
= _LttngLiveViewerProtocolCodec()
1542 # Port 0: OS assigns an unused port
1543 serv_addr
= ("localhost", port
if port
is not None else 0)
1544 self
._sock
.bind(serv_addr
)
1545 self
._write
_port
_to
_file
(port_filename
)
1551 logging
.info("Closed connection and socket.")
1554 def _server_port(self
):
1555 return self
._sock
.getsockname()[1]
1557 def _recv_command(self
):
1561 logging
.info("Waiting for viewer command.")
1562 buf
= self
._conn
.recv(128)
1565 logging
.info("Client closed connection.")
1568 raise UnexpectedInput(
1569 "Client closed connection after having sent {} command bytes.".format(
1576 logging
.info("Received data from viewer: length={}".format(len(buf
)))
1581 cmd
= self
._codec
.decode(data
)
1582 except struct
.error
as exc
:
1583 raise UnexpectedInput("Malformed command: {}".format(exc
)) from exc
1587 "Received command from viewer: cmd-cls-name={}".format(
1588 cmd
.__class
__.__name
__
1593 def _send_reply(self
, reply
):
1594 data
= self
._codec
.encode(reply
)
1596 "Sending reply to viewer: reply-cls-name={}, length={}".format(
1597 reply
.__class
__.__name
__, len(data
)
1600 self
._conn
.sendall(data
)
1602 def _handle_connection(self
):
1603 # First command must be "connect"
1604 cmd
= self
._recv
_command
()
1606 if type(cmd
) is not _LttngLiveViewerConnectCommand
:
1607 raise UnexpectedInput(
1608 'First command is not "connect": cmd-cls-name={}'.format(
1609 cmd
.__class
__.__name
__
1613 # Create viewer session (arbitrary ID 23)
1615 "LTTng live viewer connected: version={}.{}".format(cmd
.major
, cmd
.minor
)
1617 viewer_session
= _LttngLiveViewerSession(
1618 23, self
._ts
_descriptors
, self
._max
_query
_data
_response
_size
1621 # Send "connect" reply
1623 _LttngLiveViewerConnectReply(viewer_session
.viewer_session_id
, 2, 10)
1626 # Make the viewer session handle the remaining commands
1628 cmd
= self
._recv
_command
()
1631 # Connection closed (at an expected location within the
1635 self
._send
_reply
(viewer_session
.handle_command(cmd
))
1638 logging
.info("Listening: port={}".format(self
._server
_port
))
1639 # Backlog must be present for Python version < 3.5.
1640 # 128 is an arbitrary number since we expect only 1 connection anyway.
1641 self
._sock
.listen(128)
1642 self
._conn
, viewer_addr
= self
._sock
.accept()
1644 "Accepted viewer: addr={}:{}".format(viewer_addr
[0], viewer_addr
[1])
1648 self
._handle
_connection
()
1652 def _write_port_to_file(self
, port_filename
):
1653 # Write the port number to a temporary file.
1654 with tempfile
.NamedTemporaryFile(mode
="w", delete
=False) as tmp_port_file
:
1655 print(self
._server
_port
, end
="", file=tmp_port_file
)
1657 # Rename temporary file to real file
1658 os
.replace(tmp_port_file
.name
, port_filename
)
1660 'Renamed port file: src-path="{}", dst-path="{}"'.format(
1661 tmp_port_file
.name
, port_filename
1666 def _session_descriptors_from_path(sessions_filename
, trace_path_prefix
):
1671 # "name": "my-session",
1673 # "hostname": "myhost",
1674 # "live-timer-freq": 1000000,
1675 # "client-count": 23,
1681 # "path": "meow/mix",
1683 # "my_stream": [ 5235787, 728375283 ]
1685 # "metadata-sections": [
1695 with
open(sessions_filename
, "r") as sessions_file
:
1696 params
= json
.load(sessions_file
)
1700 for session
in params
:
1701 name
= session
["name"]
1702 tracing_session_id
= session
["id"]
1703 hostname
= session
["hostname"]
1704 live_timer_freq
= session
["live-timer-freq"]
1705 client_count
= session
["client-count"]
1708 for trace
in session
["traces"]:
1709 metadata_sections
= trace
.get("metadata-sections")
1710 beacons
= trace
.get("beacons")
1711 path
= trace
["path"]
1713 if not os
.path
.isabs(path
):
1714 path
= os
.path
.join(trace_path_prefix
, path
)
1716 traces
.append(LttngTrace(path
, metadata_sections
, beacons
))
1719 LttngTracingSessionDescriptor(
1732 def _loglevel_parser(string
):
1733 loglevels
= {"info": logging
.INFO
, "warning": logging
.WARNING
}
1734 if string
not in loglevels
:
1735 msg
= "{} is not a valid loglevel".format(string
)
1736 raise argparse
.ArgumentTypeError(msg
)
1737 return loglevels
[string
]
1740 if __name__
== "__main__":
1741 logging
.basicConfig(format
="# %(asctime)-25s%(message)s")
1742 parser
= argparse
.ArgumentParser(
1743 description
="LTTng-live protocol mocker", add_help
=False
1745 parser
.add_argument(
1748 choices
=["info", "warning"],
1749 help="The loglevel to be used.",
1752 loglevel_namespace
, remaining_args
= parser
.parse_known_args()
1753 logging
.getLogger().setLevel(_loglevel_parser(loglevel_namespace
.log_level
))
1755 parser
.add_argument(
1757 help="The port to bind to. If missing, use an OS-assigned port..",
1760 parser
.add_argument(
1762 help="The final port file. This file is present when the server is ready to receive connection.",
1765 parser
.add_argument(
1766 "--max-query-data-response-size",
1768 help="The maximum size of control data response in bytes",
1770 parser
.add_argument(
1771 "--trace-path-prefix",
1773 help="Prefix to prepend to the trace paths of session configurations",
1775 parser
.add_argument(
1776 "--sessions-filename",
1778 help="Path to a session configuration file",
1780 parser
.add_argument(
1784 default
=argparse
.SUPPRESS
,
1785 help="Show this help message and exit.",
1788 args
= parser
.parse_args(args
=remaining_args
)
1790 sessions
= _session_descriptors_from_path(
1791 args
.sessions_filename
,
1792 args
.trace_path_prefix
,
1795 args
.port
, args
.port_filename
, sessions
, args
.max_query_data_response_size
1797 except UnexpectedInput
as exc
:
1798 logging
.error(str(exc
))
1799 print(exc
, file=sys
.stderr
)