1 # SPDX-License-Identifier: MIT
3 # Copyright (C) 2019 Philippe Proulx <pproulx@efficios.com>
17 from collections
import namedtuple
20 class UnexpectedInput(RuntimeError):
24 class _LttngLiveViewerCommand
:
25 def __init__(self
, version
):
26 self
._version
= version
33 class _LttngLiveViewerConnectCommand(_LttngLiveViewerCommand
):
34 def __init__(self
, version
, viewer_session_id
, major
, minor
):
35 super().__init
__(version
)
36 self
._viewer
_session
_id
= viewer_session_id
41 def viewer_session_id(self
):
42 return self
._viewer
_session
_id
53 class _LttngLiveViewerConnectReply
:
54 def __init__(self
, viewer_session_id
, major
, minor
):
55 self
._viewer
_session
_id
= viewer_session_id
60 def viewer_session_id(self
):
61 return self
._viewer
_session
_id
72 class _LttngLiveViewerGetTracingSessionInfosCommand(_LttngLiveViewerCommand
):
76 class _LttngLiveViewerTracingSessionInfo
:
86 self
._tracing
_session
_id
= tracing_session_id
87 self
._live
_timer
_freq
= live_timer_freq
88 self
._client
_count
= client_count
89 self
._stream
_count
= stream_count
90 self
._hostname
= hostname
94 def tracing_session_id(self
):
95 return self
._tracing
_session
_id
98 def live_timer_freq(self
):
99 return self
._live
_timer
_freq
102 def client_count(self
):
103 return self
._client
_count
106 def stream_count(self
):
107 return self
._stream
_count
111 return self
._hostname
118 class _LttngLiveViewerGetTracingSessionInfosReply
:
119 def __init__(self
, tracing_session_infos
):
120 self
._tracing
_session
_infos
= tracing_session_infos
123 def tracing_session_infos(self
):
124 return self
._tracing
_session
_infos
127 class _LttngLiveViewerAttachToTracingSessionCommand(_LttngLiveViewerCommand
):
132 def __init__(self
, version
, tracing_session_id
, offset
, seek_type
):
133 super().__init
__(version
)
134 self
._tracing
_session
_id
= tracing_session_id
135 self
._offset
= offset
136 self
._seek
_type
= seek_type
139 def tracing_session_id(self
):
140 return self
._tracing
_session
_id
148 return self
._seek
_type
151 class _LttngLiveViewerStreamInfo
:
152 def __init__(self
, id, trace_id
, is_metadata
, path
, channel_name
):
154 self
._trace
_id
= trace_id
155 self
._is
_metadata
= is_metadata
157 self
._channel
_name
= channel_name
165 return self
._trace
_id
168 def is_metadata(self
):
169 return self
._is
_metadata
176 def channel_name(self
):
177 return self
._channel
_name
180 class _LttngLiveViewerAttachToTracingSessionReply
:
189 def __init__(self
, status
, stream_infos
):
190 self
._status
= status
191 self
._stream
_infos
= stream_infos
198 def stream_infos(self
):
199 return self
._stream
_infos
202 class _LttngLiveViewerGetNextDataStreamIndexEntryCommand(_LttngLiveViewerCommand
):
203 def __init__(self
, version
, stream_id
):
204 super().__init
__(version
)
205 self
._stream
_id
= stream_id
209 return self
._stream
_id
212 class _LttngLiveViewerGetNextDataStreamIndexEntryReply
:
221 def __init__(self
, status
, index_entry
, has_new_metadata
, has_new_data_stream
):
222 self
._status
= status
223 self
._index
_entry
= index_entry
224 self
._has
_new
_metadata
= has_new_metadata
225 self
._has
_new
_data
_stream
= has_new_data_stream
232 def index_entry(self
):
233 return self
._index
_entry
236 def has_new_metadata(self
):
237 return self
._has
_new
_metadata
240 def has_new_data_stream(self
):
241 return self
._has
_new
_data
_stream
244 class _LttngLiveViewerGetDataStreamPacketDataCommand(_LttngLiveViewerCommand
):
245 def __init__(self
, version
, stream_id
, offset
, req_length
):
246 super().__init
__(version
)
247 self
._stream
_id
= stream_id
248 self
._offset
= offset
249 self
._req
_length
= req_length
253 return self
._stream
_id
260 def req_length(self
):
261 return self
._req
_length
264 class _LttngLiveViewerGetDataStreamPacketDataReply
:
271 def __init__(self
, status
, data
, has_new_metadata
, has_new_data_stream
):
272 self
._status
= status
274 self
._has
_new
_metadata
= has_new_metadata
275 self
._has
_new
_data
_stream
= has_new_data_stream
286 def has_new_metadata(self
):
287 return self
._has
_new
_metadata
290 def has_new_data_stream(self
):
291 return self
._has
_new
_data
_stream
294 class _LttngLiveViewerGetMetadataStreamDataCommand(_LttngLiveViewerCommand
):
295 def __init__(self
, version
, stream_id
):
296 super().__init
__(version
)
297 self
._stream
_id
= stream_id
301 return self
._stream
_id
304 class _LttngLiveViewerGetMetadataStreamDataContentReply
:
310 def __init__(self
, status
, data
):
311 self
._status
= status
323 class _LttngLiveViewerGetNewStreamInfosCommand(_LttngLiveViewerCommand
):
324 def __init__(self
, version
, tracing_session_id
):
325 super().__init
__(version
)
326 self
._tracing
_session
_id
= tracing_session_id
329 def tracing_session_id(self
):
330 return self
._tracing
_session
_id
333 class _LttngLiveViewerGetNewStreamInfosReply
:
340 def __init__(self
, status
, stream_infos
):
341 self
._status
= status
342 self
._stream
_infos
= stream_infos
349 def stream_infos(self
):
350 return self
._stream
_infos
353 class _LttngLiveViewerCreateViewerSessionCommand(_LttngLiveViewerCommand
):
357 class _LttngLiveViewerCreateViewerSessionReply
:
362 def __init__(self
, status
):
363 self
._status
= status
370 class _LttngLiveViewerDetachFromTracingSessionCommand(_LttngLiveViewerCommand
):
371 def __init__(self
, version
, tracing_session_id
):
372 super().__init
__(version
)
373 self
._tracing
_session
_id
= tracing_session_id
376 def tracing_session_id(self
):
377 return self
._tracing
_session
_id
380 class _LttngLiveViewerDetachFromTracingSessionReply
:
386 def __init__(self
, status
):
387 self
._status
= status
394 # An LTTng live protocol codec can convert bytes to command objects and
395 # reply objects to bytes.
396 class _LttngLiveViewerProtocolCodec
:
397 _COMMAND_HEADER_STRUCT_FMT
= 'QII'
398 _COMMAND_HEADER_SIZE_BYTES
= struct
.calcsize(_COMMAND_HEADER_STRUCT_FMT
)
403 def _unpack(self
, fmt
, data
, offset
=0):
405 return struct
.unpack_from(fmt
, data
, offset
)
407 def _unpack_payload(self
, fmt
, data
):
409 fmt
, data
, _LttngLiveViewerProtocolCodec
._COMMAND
_HEADER
_SIZE
_BYTES
412 def decode(self
, data
):
413 if len(data
) < self
._COMMAND
_HEADER
_SIZE
_BYTES
:
414 # Not enough data to read the command header
417 payload_size
, cmd_type
, version
= self
._unpack
(
418 self
._COMMAND
_HEADER
_STRUCT
_FMT
, data
421 'Decoded command header: payload-size={}, cmd-type={}, version={}'.format(
422 payload_size
, cmd_type
, version
426 if len(data
) < self
._COMMAND
_HEADER
_SIZE
_BYTES
+ payload_size
:
427 # Not enough data to read the whole command
431 viewer_session_id
, major
, minor
, conn_type
= self
._unpack
_payload
(
434 return _LttngLiveViewerConnectCommand(
435 version
, viewer_session_id
, major
, minor
438 return _LttngLiveViewerGetTracingSessionInfosCommand(version
)
440 tracing_session_id
, offset
, seek_type
= self
._unpack
_payload
('QQI', data
)
441 return _LttngLiveViewerAttachToTracingSessionCommand(
442 version
, tracing_session_id
, offset
, seek_type
445 (stream_id
,) = self
._unpack
_payload
('Q', data
)
446 return _LttngLiveViewerGetNextDataStreamIndexEntryCommand(
450 stream_id
, offset
, req_length
= self
._unpack
_payload
('QQI', data
)
451 return _LttngLiveViewerGetDataStreamPacketDataCommand(
452 version
, stream_id
, offset
, req_length
455 (stream_id
,) = self
._unpack
_payload
('Q', data
)
456 return _LttngLiveViewerGetMetadataStreamDataCommand(version
, stream_id
)
458 (tracing_session_id
,) = self
._unpack
_payload
('Q', data
)
459 return _LttngLiveViewerGetNewStreamInfosCommand(version
, tracing_session_id
)
461 return _LttngLiveViewerCreateViewerSessionCommand(version
)
463 (tracing_session_id
,) = self
._unpack
_payload
('Q', data
)
464 return _LttngLiveViewerDetachFromTracingSessionCommand(
465 version
, tracing_session_id
468 raise UnexpectedInput('Unknown command type {}'.format(cmd_type
))
470 def _pack(self
, fmt
, *args
):
471 # Force network byte order
472 return struct
.pack('!' + fmt
, *args
)
474 def _encode_zero_padded_str(self
, string
, length
):
475 data
= string
.encode()
476 return data
.ljust(length
, b
'\x00')
478 def _encode_stream_info(self
, info
):
479 data
= self
._pack
('QQI', info
.id, info
.trace_id
, int(info
.is_metadata
))
480 data
+= self
._encode
_zero
_padded
_str
(info
.path
, 4096)
481 data
+= self
._encode
_zero
_padded
_str
(info
.channel_name
, 255)
484 def _get_has_new_stuff_flags(self
, has_new_metadata
, has_new_data_streams
):
490 if has_new_data_streams
:
495 def encode(self
, reply
):
496 if type(reply
) is _LttngLiveViewerConnectReply
:
498 'QIII', reply
.viewer_session_id
, reply
.major
, reply
.minor
, 2
500 elif type(reply
) is _LttngLiveViewerGetTracingSessionInfosReply
:
501 data
= self
._pack
('I', len(reply
.tracing_session_infos
))
503 for info
in reply
.tracing_session_infos
:
506 info
.tracing_session_id
,
507 info
.live_timer_freq
,
511 data
+= self
._encode
_zero
_padded
_str
(info
.hostname
, 64)
512 data
+= self
._encode
_zero
_padded
_str
(info
.name
, 255)
513 elif type(reply
) is _LttngLiveViewerAttachToTracingSessionReply
:
514 data
= self
._pack
('II', reply
.status
, len(reply
.stream_infos
))
516 for info
in reply
.stream_infos
:
517 data
+= self
._encode
_stream
_info
(info
)
518 elif type(reply
) is _LttngLiveViewerGetNextDataStreamIndexEntryReply
:
519 index_format
= 'QQQQQQQII'
520 entry
= reply
.index_entry
521 flags
= self
._get
_has
_new
_stuff
_flags
(
522 reply
.has_new_metadata
, reply
.has_new_data_stream
525 if type(entry
) is _LttngDataStreamIndexEntry
:
529 entry
.total_size_bits
,
530 entry
.content_size_bits
,
531 entry
.timestamp_begin
,
533 entry
.events_discarded
,
534 entry
.stream_class_id
,
539 assert type(entry
) is _LttngDataStreamBeaconEntry
548 entry
.stream_class_id
,
552 elif type(reply
) is _LttngLiveViewerGetDataStreamPacketDataReply
:
553 flags
= self
._get
_has
_new
_stuff
_flags
(
554 reply
.has_new_metadata
, reply
.has_new_data_stream
556 data
= self
._pack
('III', reply
.status
, len(reply
.data
), flags
)
558 elif type(reply
) is _LttngLiveViewerGetMetadataStreamDataContentReply
:
559 data
= self
._pack
('QI', len(reply
.data
), reply
.status
)
561 elif type(reply
) is _LttngLiveViewerGetNewStreamInfosReply
:
562 data
= self
._pack
('II', reply
.status
, len(reply
.stream_infos
))
564 for info
in reply
.stream_infos
:
565 data
+= self
._encode
_stream
_info
(info
)
566 elif type(reply
) is _LttngLiveViewerCreateViewerSessionReply
:
567 data
= self
._pack
('I', reply
.status
)
568 elif type(reply
) is _LttngLiveViewerDetachFromTracingSessionReply
:
569 data
= self
._pack
('I', reply
.status
)
572 'Unknown reply object with class `{}`'.format(reply
.__class
__.__name
__)
578 # An entry within the index of an LTTng data stream.
579 class _LttngDataStreamIndexEntry
:
590 self
._offset
_bytes
= offset_bytes
591 self
._total
_size
_bits
= total_size_bits
592 self
._content
_size
_bits
= content_size_bits
593 self
._timestamp
_begin
= timestamp_begin
594 self
._timestamp
_end
= timestamp_end
595 self
._events
_discarded
= events_discarded
596 self
._stream
_class
_id
= stream_class_id
599 def offset_bytes(self
):
600 return self
._offset
_bytes
603 def total_size_bits(self
):
604 return self
._total
_size
_bits
607 def total_size_bytes(self
):
608 return self
._total
_size
_bits
// 8
611 def content_size_bits(self
):
612 return self
._content
_size
_bits
615 def content_size_bytes(self
):
616 return self
._content
_size
_bits
// 8
619 def timestamp_begin(self
):
620 return self
._timestamp
_begin
623 def timestamp_end(self
):
624 return self
._timestamp
_end
627 def events_discarded(self
):
628 return self
._events
_discarded
631 def stream_class_id(self
):
632 return self
._stream
_class
_id
635 # An entry within the index of an LTTng data stream. While a stream beacon entry
636 # is conceptually unrelated to an index, it is sent as a reply to a
637 # LttngLiveViewerGetNextDataStreamIndexEntryCommand
638 class _LttngDataStreamBeaconEntry
:
639 def __init__(self
, stream_class_id
, timestamp
):
640 self
._stream
_class
_id
= stream_class_id
641 self
._timestamp
= timestamp
645 return self
._timestamp
648 def stream_class_id(self
):
649 return self
._stream
_class
_id
652 def _get_entry_timestamp_begin(entry
):
653 if type(entry
) is _LttngDataStreamBeaconEntry
:
654 return entry
.timestamp
656 assert type(entry
) is _LttngDataStreamIndexEntry
657 return entry
.timestamp_begin
660 # The index of an LTTng data stream, a sequence of index entries.
661 class _LttngDataStreamIndex(collections
.abc
.Sequence
):
662 def __init__(self
, path
, beacons
):
667 stream_class_id
= self
._entries
[0].stream_class_id
669 _LttngDataStreamBeaconEntry(stream_class_id
, ts
) for ts
in beacons
671 self
._add
_beacons
(beacons
)
674 'Built data stream index entries: path="{}", count={}'.format(
675 path
, len(self
._entries
)
681 assert os
.path
.isfile(self
._path
)
683 with
open(self
._path
, 'rb') as f
:
686 size
= struct
.calcsize(fmt
)
688 assert len(data
) == size
689 magic
, index_major
, index_minor
, index_entry_length
= struct
.unpack(
692 assert magic
== 0xC1F1DCC1
696 size
= struct
.calcsize(fmt
)
700 'Decoding data stream index entry: path="{}", offset={}'.format(
710 assert len(data
) == size
719 ) = struct
.unpack(fmt
, data
)
721 self
._entries
.append(
722 _LttngDataStreamIndexEntry(
733 # Skip anything else before the next entry
734 f
.seek(index_entry_length
- size
, os
.SEEK_CUR
)
736 def _add_beacons(self
, beacons
):
737 # Assumes entries[n + 1].timestamp_end >= entries[n].timestamp_begin
739 if type(entry
) is _LttngDataStreamBeaconEntry
:
740 return entry
.timestamp
742 return entry
.timestamp_end
744 self
._entries
+= beacons
745 self
._entries
.sort(key
=sort_key
)
747 def __getitem__(self
, index
):
748 return self
._entries
[index
]
751 return len(self
._entries
)
758 # An LTTng data stream.
759 class _LttngDataStream
:
760 def __init__(self
, path
, beacons
):
762 filename
= os
.path
.basename(path
)
763 match
= re
.match(r
'(.*)_\d+', filename
)
764 self
._channel
_name
= match
.group(1)
765 trace_dir
= os
.path
.dirname(path
)
766 index_path
= os
.path
.join(trace_dir
, 'index', filename
+ '.idx')
767 self
._index
= _LttngDataStreamIndex(index_path
, beacons
)
768 assert os
.path
.isfile(path
)
769 self
._file
= open(path
, 'rb')
771 'Built data stream: path="{}", channel-name="{}"'.format(
772 path
, self
._channel
_name
781 def channel_name(self
):
782 return self
._channel
_name
788 def get_data(self
, offset_bytes
, len_bytes
):
789 self
._file
.seek(offset_bytes
)
790 return self
._file
.read(len_bytes
)
793 class _LttngMetadataStreamSection
:
794 def __init__(self
, timestamp
, data
):
795 self
._timestamp
= timestamp
801 'Built metadata stream section: ts={}, data-len={}'.format(
802 self
._timestamp
, len(self
._data
)
808 return self
._timestamp
815 # An LTTng metadata stream.
816 class _LttngMetadataStream
:
817 def __init__(self
, metadata_file_path
, config_sections
):
818 self
._path
= metadata_file_path
819 self
._sections
= config_sections
821 'Built metadata stream: path={}, section-len={}'.format(
822 self
._path
, len(self
._sections
)
832 return self
._sections
835 LttngMetadataConfigSection
= namedtuple(
836 'LttngMetadataConfigSection', ['line', 'timestamp', 'is_empty']
840 def _parse_metadata_sections_config(config_sections
):
841 assert config_sections
is not None
842 config_metadata_sections
= []
843 append_empty_section
= False
847 for config_section
in config_sections
:
848 if config_section
== 'empty':
849 # Found a empty section marker. Actually append the section at the
850 # timestamp of the next concrete section.
851 append_empty_section
= True
853 assert type(config_section
) is dict
854 line
= config_section
.get('line')
855 ts
= config_section
.get('timestamp')
857 # Sections' timestamps and lines must both be increasing.
858 assert ts
> last_timestamp
860 assert line
> last_line
863 if append_empty_section
:
864 config_metadata_sections
.append(
865 LttngMetadataConfigSection(line
, ts
, True)
867 append_empty_section
= False
869 config_metadata_sections
.append(LttngMetadataConfigSection(line
, ts
, False))
871 return config_metadata_sections
874 def _split_metadata_sections(metadata_file_path
, raw_config_sections
):
875 assert isinstance(raw_config_sections
, collections
.abc
.Sequence
)
877 parsed_sections
= _parse_metadata_sections_config(raw_config_sections
)
880 with
open(metadata_file_path
, 'r') as metadata_file
:
881 metadata_lines
= [line
for line
in metadata_file
]
883 config_metadata_sections_idx
= 0
884 curr_metadata_section
= bytearray()
886 for idx
, line_content
in enumerate(metadata_lines
):
887 # Add one to the index to convert from the zero-indexing of the
888 # enumerate() function to the one-indexing used by humans when
889 # viewing a text file.
890 curr_line_number
= idx
+ 1
892 # If there are no more sections, simply append the line.
893 if config_metadata_sections_idx
+ 1 >= len(parsed_sections
):
894 curr_metadata_section
+= bytearray(line_content
, 'utf8')
897 next_section_line_number
= parsed_sections
[
898 config_metadata_sections_idx
+ 1
901 # If the next section begins at the current line, create a
902 # section with the metadata we gathered so far.
903 if curr_line_number
>= next_section_line_number
:
905 # Flushing the metadata of the current section.
907 _LttngMetadataStreamSection(
908 parsed_sections
[config_metadata_sections_idx
].timestamp
,
909 bytes(curr_metadata_section
),
913 # Move to the next section.
914 config_metadata_sections_idx
+= 1
916 # Clear old content and append current line for the next section.
917 curr_metadata_section
.clear()
918 curr_metadata_section
+= bytearray(line_content
, 'utf8')
920 # Append any empty sections.
921 while parsed_sections
[config_metadata_sections_idx
].is_empty
:
923 _LttngMetadataStreamSection(
924 parsed_sections
[config_metadata_sections_idx
].timestamp
, None
927 config_metadata_sections_idx
+= 1
929 # Append line_content to the current metadata section.
930 curr_metadata_section
+= bytearray(line_content
, 'utf8')
932 # We iterated over all the lines of the metadata file. Close the current section.
934 _LttngMetadataStreamSection(
935 parsed_sections
[config_metadata_sections_idx
].timestamp
,
936 bytes(curr_metadata_section
),
943 # An LTTng trace, a sequence of LTTng data streams.
944 class LttngTrace(collections
.abc
.Sequence
):
945 def __init__(self
, trace_dir
, metadata_sections
, beacons
):
946 assert os
.path
.isdir(trace_dir
)
947 self
._path
= trace_dir
948 self
._create
_metadata
_stream
(trace_dir
, metadata_sections
)
949 self
._create
_data
_streams
(trace_dir
, beacons
)
950 logging
.info('Built trace: path="{}"'.format(trace_dir
))
952 def _create_data_streams(self
, trace_dir
, beacons
):
953 data_stream_paths
= []
955 for filename
in os
.listdir(trace_dir
):
956 path
= os
.path
.join(trace_dir
, filename
)
958 if not os
.path
.isfile(path
):
961 if filename
.startswith('.'):
964 if filename
== 'metadata':
967 data_stream_paths
.append(path
)
969 data_stream_paths
.sort()
970 self
._data
_streams
= []
972 for data_stream_path
in data_stream_paths
:
973 stream_name
= os
.path
.basename(data_stream_path
)
974 this_stream_beacons
= None
976 if beacons
is not None and stream_name
in beacons
:
977 this_stream_beacons
= beacons
[stream_name
]
979 self
._data
_streams
.append(
980 _LttngDataStream(data_stream_path
, this_stream_beacons
)
983 def _create_metadata_stream(self
, trace_dir
, config_metadata_sections
):
984 metadata_path
= os
.path
.join(trace_dir
, 'metadata')
985 metadata_sections
= []
987 if config_metadata_sections
is None:
988 with
open(metadata_path
, 'rb') as metadata_file
:
989 metadata_sections
.append(
990 _LttngMetadataStreamSection(0, metadata_file
.read())
993 metadata_sections
= _split_metadata_sections(
994 metadata_path
, config_metadata_sections
997 self
._metadata
_stream
= _LttngMetadataStream(metadata_path
, metadata_sections
)
1004 def metadata_stream(self
):
1005 return self
._metadata
_stream
1007 def __getitem__(self
, index
):
1008 return self
._data
_streams
[index
]
1011 return len(self
._data
_streams
)
1014 # The state of a single data stream.
1015 class _LttngLiveViewerSessionDataStreamState
:
1016 def __init__(self
, ts_state
, info
, data_stream
, metadata_stream_id
):
1017 self
._ts
_state
= ts_state
1019 self
._data
_stream
= data_stream
1020 self
._metadata
_stream
_id
= metadata_stream_id
1021 self
._cur
_index
_entry
_index
= 0
1022 fmt
= 'Built data stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
1026 ts_state
.tracing_session_descriptor
.info
.tracing_session_id
,
1027 ts_state
.tracing_session_descriptor
.info
.name
,
1033 def tracing_session_state(self
):
1034 return self
._ts
_state
1041 def data_stream(self
):
1042 return self
._data
_stream
1045 def cur_index_entry(self
):
1046 if self
._cur
_index
_entry
_index
== len(self
._data
_stream
.index
):
1049 return self
._data
_stream
.index
[self
._cur
_index
_entry
_index
]
1051 def goto_next_index_entry(self
):
1052 self
._cur
_index
_entry
_index
+= 1
1055 # The state of a single metadata stream.
1056 class _LttngLiveViewerSessionMetadataStreamState
:
1057 def __init__(self
, ts_state
, info
, metadata_stream
):
1058 self
._ts
_state
= ts_state
1060 self
._metadata
_stream
= metadata_stream
1061 self
._cur
_metadata
_stream
_section
_index
= 0
1062 if len(metadata_stream
.sections
) > 1:
1063 self
._next
_metadata
_stream
_section
_timestamp
= metadata_stream
.sections
[
1067 self
._next
_metadata
_stream
_section
_timestamp
= None
1069 self
._is
_sent
= False
1070 fmt
= 'Built metadata stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
1074 ts_state
.tracing_session_descriptor
.info
.tracing_session_id
,
1075 ts_state
.tracing_session_descriptor
.info
.name
,
1076 metadata_stream
.path
,
1081 def trace_session_state(self
):
1082 return self
._trace
_session
_state
1089 def metadata_stream(self
):
1090 return self
._metadata
_stream
1094 return self
._is
_sent
1097 def is_sent(self
, value
):
1098 self
._is
_sent
= value
1101 def cur_section(self
):
1102 fmt
= "Get current metadata section: section-idx={}"
1103 logging
.info(fmt
.format(self
._cur
_metadata
_stream
_section
_index
))
1104 if self
._cur
_metadata
_stream
_section
_index
== len(
1105 self
._metadata
_stream
.sections
1109 return self
._metadata
_stream
.sections
[self
._cur
_metadata
_stream
_section
_index
]
1111 def goto_next_section(self
):
1112 self
._cur
_metadata
_stream
_section
_index
+= 1
1113 if self
.cur_section
:
1114 self
._next
_metadata
_stream
_section
_timestamp
= self
.cur_section
.timestamp
1116 self
._next
_metadata
_stream
_section
_timestamp
= None
1119 def next_section_timestamp(self
):
1120 return self
._next
_metadata
_stream
_section
_timestamp
1123 # The state of a tracing session.
1124 class _LttngLiveViewerSessionTracingSessionState
:
1125 def __init__(self
, tc_descr
, base_stream_id
):
1126 self
._tc
_descr
= tc_descr
1127 self
._stream
_infos
= []
1128 self
._ds
_states
= {}
1129 self
._ms
_states
= {}
1130 stream_id
= base_stream_id
1132 for trace
in tc_descr
.traces
:
1133 trace_id
= stream_id
* 1000
1135 # Metadata stream -> stream info and metadata stream state
1136 info
= _LttngLiveViewerStreamInfo(
1137 stream_id
, trace_id
, True, trace
.metadata_stream
.path
, 'metadata'
1139 self
._stream
_infos
.append(info
)
1140 self
._ms
_states
[stream_id
] = _LttngLiveViewerSessionMetadataStreamState(
1141 self
, info
, trace
.metadata_stream
1143 metadata_stream_id
= stream_id
1146 # Data streams -> stream infos and data stream states
1147 for data_stream
in trace
:
1148 info
= _LttngLiveViewerStreamInfo(
1153 data_stream
.channel_name
,
1155 self
._stream
_infos
.append(info
)
1156 self
._ds
_states
[stream_id
] = _LttngLiveViewerSessionDataStreamState(
1157 self
, info
, data_stream
, metadata_stream_id
1161 self
._is
_attached
= False
1162 fmt
= 'Built tracing session state: id={}, name="{}"'
1163 logging
.info(fmt
.format(tc_descr
.info
.tracing_session_id
, tc_descr
.info
.name
))
1166 def tracing_session_descriptor(self
):
1167 return self
._tc
_descr
1170 def data_stream_states(self
):
1171 return self
._ds
_states
1174 def metadata_stream_states(self
):
1175 return self
._ms
_states
1178 def stream_infos(self
):
1179 return self
._stream
_infos
1182 def has_new_metadata(self
):
1183 return any([not ms
.is_sent
for ms
in self
._ms
_states
.values()])
1186 def is_attached(self
):
1187 return self
._is
_attached
1190 def is_attached(self
, value
):
1191 self
._is
_attached
= value
1194 def needs_new_metadata_section(metadata_stream_state
, latest_timestamp
):
1195 if metadata_stream_state
.next_section_timestamp
is None:
1198 if latest_timestamp
>= metadata_stream_state
.next_section_timestamp
:
1204 # An LTTng live viewer session manages a view on tracing sessions
1205 # and replies to commands accordingly.
1206 class _LttngLiveViewerSession
:
1210 tracing_session_descriptors
,
1211 max_query_data_response_size
,
1213 self
._viewer
_session
_id
= viewer_session_id
1214 self
._ts
_states
= {}
1215 self
._stream
_states
= {}
1216 self
._max
_query
_data
_response
_size
= max_query_data_response_size
1217 total_stream_infos
= 0
1219 for ts_descr
in tracing_session_descriptors
:
1220 ts_state
= _LttngLiveViewerSessionTracingSessionState(
1221 ts_descr
, total_stream_infos
1223 ts_id
= ts_state
.tracing_session_descriptor
.info
.tracing_session_id
1224 self
._ts
_states
[ts_id
] = ts_state
1225 total_stream_infos
+= len(ts_state
.stream_infos
)
1227 # Update session's stream states to have the new states
1228 self
._stream
_states
.update(ts_state
.data_stream_states
)
1229 self
._stream
_states
.update(ts_state
.metadata_stream_states
)
1231 self
._command
_handlers
= {
1232 _LttngLiveViewerAttachToTracingSessionCommand
: self
._handle
_attach
_to
_tracing
_session
_command
,
1233 _LttngLiveViewerCreateViewerSessionCommand
: self
._handle
_create
_viewer
_session
_command
,
1234 _LttngLiveViewerDetachFromTracingSessionCommand
: self
._handle
_detach
_from
_tracing
_session
_command
,
1235 _LttngLiveViewerGetDataStreamPacketDataCommand
: self
._handle
_get
_data
_stream
_packet
_data
_command
,
1236 _LttngLiveViewerGetMetadataStreamDataCommand
: self
._handle
_get
_metadata
_stream
_data
_command
,
1237 _LttngLiveViewerGetNewStreamInfosCommand
: self
._handle
_get
_new
_stream
_infos
_command
,
1238 _LttngLiveViewerGetNextDataStreamIndexEntryCommand
: self
._handle
_get
_next
_data
_stream
_index
_entry
_command
,
1239 _LttngLiveViewerGetTracingSessionInfosCommand
: self
._handle
_get
_tracing
_session
_infos
_command
,
1243 def viewer_session_id(self
):
1244 return self
._viewer
_session
_id
1246 def _get_tracing_session_state(self
, tracing_session_id
):
1247 if tracing_session_id
not in self
._ts
_states
:
1248 raise UnexpectedInput(
1249 'Unknown tracing session ID {}'.format(tracing_session_id
)
1252 return self
._ts
_states
[tracing_session_id
]
1254 def _get_stream_state(self
, stream_id
):
1255 if stream_id
not in self
._stream
_states
:
1256 UnexpectedInput('Unknown stream ID {}'.format(stream_id
))
1258 return self
._stream
_states
[stream_id
]
1260 def handle_command(self
, cmd
):
1262 'Handling command in viewer session: cmd-cls-name={}'.format(
1263 cmd
.__class
__.__name
__
1266 cmd_type
= type(cmd
)
1268 if cmd_type
not in self
._command
_handlers
:
1269 raise UnexpectedInput(
1270 'Unexpected command: cmd-cls-name={}'.format(cmd
.__class
__.__name
__)
1273 return self
._command
_handlers
[cmd_type
](cmd
)
1275 def _handle_attach_to_tracing_session_command(self
, cmd
):
1276 fmt
= 'Handling "attach to tracing session" command: ts-id={}, offset={}, seek-type={}'
1277 logging
.info(fmt
.format(cmd
.tracing_session_id
, cmd
.offset
, cmd
.seek_type
))
1278 ts_state
= self
._get
_tracing
_session
_state
(cmd
.tracing_session_id
)
1279 info
= ts_state
.tracing_session_descriptor
.info
1281 if ts_state
.is_attached
:
1282 raise UnexpectedInput(
1283 'Cannot attach to tracing session `{}`: viewer is already attached'.format(
1288 ts_state
.is_attached
= True
1289 status
= _LttngLiveViewerAttachToTracingSessionReply
.Status
.OK
1290 return _LttngLiveViewerAttachToTracingSessionReply(
1291 status
, ts_state
.stream_infos
1294 def _handle_detach_from_tracing_session_command(self
, cmd
):
1295 fmt
= 'Handling "detach from tracing session" command: ts-id={}'
1296 logging
.info(fmt
.format(cmd
.tracing_session_id
))
1297 ts_state
= self
._get
_tracing
_session
_state
(cmd
.tracing_session_id
)
1298 info
= ts_state
.tracing_session_descriptor
.info
1300 if not ts_state
.is_attached
:
1301 raise UnexpectedInput(
1302 'Cannot detach to tracing session `{}`: viewer is not attached'.format(
1307 ts_state
.is_attached
= False
1308 status
= _LttngLiveViewerDetachFromTracingSessionReply
.Status
.OK
1309 return _LttngLiveViewerDetachFromTracingSessionReply(status
)
1311 def _handle_get_next_data_stream_index_entry_command(self
, cmd
):
1312 fmt
= 'Handling "get next data stream index entry" command: stream-id={}'
1313 logging
.info(fmt
.format(cmd
.stream_id
))
1314 stream_state
= self
._get
_stream
_state
(cmd
.stream_id
)
1315 metadata_stream_state
= self
._get
_stream
_state
(stream_state
._metadata
_stream
_id
)
1317 if type(stream_state
) is not _LttngLiveViewerSessionDataStreamState
:
1318 raise UnexpectedInput(
1319 'Stream with ID {} is not a data stream'.format(cmd
.stream_id
)
1322 if stream_state
.cur_index_entry
is None:
1323 # The viewer is done reading this stream
1324 status
= _LttngLiveViewerGetNextDataStreamIndexEntryReply
.Status
.HUP
1326 # Dummy data stream index entry to use with the `HUP` status
1327 # (the reply needs one, but the viewer ignores it)
1328 index_entry
= _LttngDataStreamIndexEntry(0, 0, 0, 0, 0, 0, 0)
1330 return _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1331 status
, index_entry
, False, False
1334 timestamp_begin
= _get_entry_timestamp_begin(stream_state
.cur_index_entry
)
1336 if needs_new_metadata_section(metadata_stream_state
, timestamp_begin
):
1337 metadata_stream_state
.is_sent
= False
1338 metadata_stream_state
.goto_next_section()
1340 # The viewer only checks the `has_new_metadata` flag if the
1341 # reply's status is `OK`, so we need to provide an index here
1342 has_new_metadata
= stream_state
.tracing_session_state
.has_new_metadata
1343 if type(stream_state
.cur_index_entry
) is _LttngDataStreamIndexEntry
:
1344 status
= _LttngLiveViewerGetNextDataStreamIndexEntryReply
.Status
.OK
1346 assert type(stream_state
.cur_index_entry
) is _LttngDataStreamBeaconEntry
1347 status
= _LttngLiveViewerGetNextDataStreamIndexEntryReply
.Status
.INACTIVE
1349 reply
= _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1350 status
, stream_state
.cur_index_entry
, has_new_metadata
, False
1352 stream_state
.goto_next_index_entry()
1355 def _handle_get_data_stream_packet_data_command(self
, cmd
):
1356 fmt
= 'Handling "get data stream packet data" command: stream-id={}, offset={}, req-length={}'
1357 logging
.info(fmt
.format(cmd
.stream_id
, cmd
.offset
, cmd
.req_length
))
1358 stream_state
= self
._get
_stream
_state
(cmd
.stream_id
)
1359 data_response_length
= cmd
.req_length
1361 if type(stream_state
) is not _LttngLiveViewerSessionDataStreamState
:
1362 raise UnexpectedInput(
1363 'Stream with ID {} is not a data stream'.format(cmd
.stream_id
)
1366 if stream_state
.tracing_session_state
.has_new_metadata
:
1367 status
= _LttngLiveViewerGetDataStreamPacketDataReply
.Status
.ERROR
1368 return _LttngLiveViewerGetDataStreamPacketDataReply(
1369 status
, bytes(), True, False
1372 if self
._max
_query
_data
_response
_size
:
1373 # Enforce a server side limit on the query requested length.
1374 # To ensure that the transaction terminate take the minimum of both
1376 data_response_length
= min(
1377 cmd
.req_length
, self
._max
_query
_data
_response
_size
1379 fmt
= 'Limiting "get data stream packet data" command: req-length={} actual response size={}'
1380 logging
.info(fmt
.format(cmd
.req_length
, data_response_length
))
1382 data
= stream_state
.data_stream
.get_data(cmd
.offset
, data_response_length
)
1383 status
= _LttngLiveViewerGetDataStreamPacketDataReply
.Status
.OK
1384 return _LttngLiveViewerGetDataStreamPacketDataReply(status
, data
, False, False)
1386 def _handle_get_metadata_stream_data_command(self
, cmd
):
1387 fmt
= 'Handling "get metadata stream data" command: stream-id={}'
1388 logging
.info(fmt
.format(cmd
.stream_id
))
1389 metadata_stream_state
= self
._get
_stream
_state
(cmd
.stream_id
)
1392 type(metadata_stream_state
)
1393 is not _LttngLiveViewerSessionMetadataStreamState
1395 raise UnexpectedInput(
1396 'Stream with ID {} is not a metadata stream'.format(cmd
.stream_id
)
1399 if metadata_stream_state
.is_sent
:
1400 status
= _LttngLiveViewerGetMetadataStreamDataContentReply
.Status
.NO_NEW
1401 return _LttngLiveViewerGetMetadataStreamDataContentReply(status
, bytes())
1403 metadata_stream_state
.is_sent
= True
1404 status
= _LttngLiveViewerGetMetadataStreamDataContentReply
.Status
.OK
1405 metadata_section
= metadata_stream_state
.cur_section
1407 # If we are sending an empty section, ready the next one right away.
1408 if len(metadata_section
.data
) == 0:
1409 metadata_stream_state
.is_sent
= False
1410 metadata_stream_state
.goto_next_section()
1412 fmt
= 'Replying to "get metadata stream data" command: metadata-size={}'
1413 logging
.info(fmt
.format(len(metadata_section
.data
)))
1414 return _LttngLiveViewerGetMetadataStreamDataContentReply(
1415 status
, metadata_section
.data
1418 def _handle_get_new_stream_infos_command(self
, cmd
):
1419 fmt
= 'Handling "get new stream infos" command: ts-id={}'
1420 logging
.info(fmt
.format(cmd
.tracing_session_id
))
1422 # As of this version, all the tracing session's stream infos are
1423 # always given to the viewer when sending the "attach to tracing
1424 # session" reply, so there's nothing new here. Return the `HUP`
1425 # status as, if we're handling this command, the viewer consumed
1426 # all the existing data streams.
1427 status
= _LttngLiveViewerGetNewStreamInfosReply
.Status
.HUP
1428 return _LttngLiveViewerGetNewStreamInfosReply(status
, [])
1430 def _handle_get_tracing_session_infos_command(self
, cmd
):
1431 logging
.info('Handling "get tracing session infos" command.')
1433 tss
.tracing_session_descriptor
.info
for tss
in self
._ts
_states
.values()
1435 infos
.sort(key
=lambda info
: info
.name
)
1436 return _LttngLiveViewerGetTracingSessionInfosReply(infos
)
1438 def _handle_create_viewer_session_command(self
, cmd
):
1439 logging
.info('Handling "create viewer session" command.')
1440 status
= _LttngLiveViewerCreateViewerSessionReply
.Status
.OK
1442 # This does nothing here. In the LTTng relay daemon, it
1443 # allocates the viewer session's state.
1444 return _LttngLiveViewerCreateViewerSessionReply(status
)
1447 # An LTTng live TCP server.
1449 # On creation, it binds to `localhost` with an OS-assigned TCP port. It writes
1450 # the decimal TCP port number to a temporary port file. It renames the
1451 # temporary port file to `port_filename`.
1453 # `tracing_session_descriptors` is a list of tracing session descriptors
1454 # (`LttngTracingSessionDescriptor`) to serve.
1456 # This server accepts a single viewer (client).
1458 # When the viewer closes the connection, the server's constructor
1460 class LttngLiveServer
:
1462 self
, port_filename
, tracing_session_descriptors
, max_query_data_response_size
1464 logging
.info('Server configuration:')
1466 logging
.info(' Port file name: `{}`'.format(port_filename
))
1468 if max_query_data_response_size
is not None:
1470 ' Maximum response data query size: `{}`'.format(
1471 max_query_data_response_size
1475 for ts_descr
in tracing_session_descriptors
:
1476 info
= ts_descr
.info
1477 fmt
= ' TS descriptor: name="{}", id={}, hostname="{}", live-timer-freq={}, client-count={}, stream-count={}:'
1481 info
.tracing_session_id
,
1483 info
.live_timer_freq
,
1489 for trace
in ts_descr
.traces
:
1490 logging
.info(' Trace: path="{}"'.format(trace
.path
))
1492 self
._ts
_descriptors
= tracing_session_descriptors
1493 self
._max
_query
_data
_response
_size
= max_query_data_response_size
1494 self
._sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
1495 self
._codec
= _LttngLiveViewerProtocolCodec()
1497 # Port 0: OS assigns an unused port
1498 serv_addr
= ('localhost', 0)
1499 self
._sock
.bind(serv_addr
)
1500 self
._write
_port
_to
_file
(port_filename
)
1506 logging
.info('Closed connection and socket.')
1509 def _server_port(self
):
1510 return self
._sock
.getsockname()[1]
1512 def _recv_command(self
):
1516 logging
.info('Waiting for viewer command.')
1517 buf
= self
._conn
.recv(128)
1520 logging
.info('Client closed connection.')
1523 raise UnexpectedInput(
1524 'Client closed connection after having sent {} command bytes.'.format(
1531 logging
.info('Received data from viewer: length={}'.format(len(buf
)))
1536 cmd
= self
._codec
.decode(data
)
1537 except struct
.error
as exc
:
1538 raise UnexpectedInput('Malformed command: {}'.format(exc
)) from exc
1542 'Received command from viewer: cmd-cls-name={}'.format(
1543 cmd
.__class
__.__name
__
1548 def _send_reply(self
, reply
):
1549 data
= self
._codec
.encode(reply
)
1551 'Sending reply to viewer: reply-cls-name={}, length={}'.format(
1552 reply
.__class
__.__name
__, len(data
)
1555 self
._conn
.sendall(data
)
1557 def _handle_connection(self
):
1558 # First command must be "connect"
1559 cmd
= self
._recv
_command
()
1561 if type(cmd
) is not _LttngLiveViewerConnectCommand
:
1562 raise UnexpectedInput(
1563 'First command is not "connect": cmd-cls-name={}'.format(
1564 cmd
.__class
__.__name
__
1568 # Create viewer session (arbitrary ID 23)
1570 'LTTng live viewer connected: version={}.{}'.format(cmd
.major
, cmd
.minor
)
1572 viewer_session
= _LttngLiveViewerSession(
1573 23, self
._ts
_descriptors
, self
._max
_query
_data
_response
_size
1576 # Send "connect" reply
1578 _LttngLiveViewerConnectReply(viewer_session
.viewer_session_id
, 2, 10)
1581 # Make the viewer session handle the remaining commands
1583 cmd
= self
._recv
_command
()
1586 # Connection closed (at an expected location within the
1590 self
._send
_reply
(viewer_session
.handle_command(cmd
))
1593 logging
.info('Listening: port={}'.format(self
._server
_port
))
1594 # Backlog must be present for Python version < 3.5.
1595 # 128 is an arbitrary number since we expect only 1 connection anyway.
1596 self
._sock
.listen(128)
1597 self
._conn
, viewer_addr
= self
._sock
.accept()
1599 'Accepted viewer: addr={}:{}'.format(viewer_addr
[0], viewer_addr
[1])
1603 self
._handle
_connection
()
1607 def _write_port_to_file(self
, port_filename
):
1608 # Write the port number to a temporary file.
1609 with tempfile
.NamedTemporaryFile(mode
='w', delete
=False) as tmp_port_file
:
1610 print(self
._server
_port
, end
='', file=tmp_port_file
)
1612 # Rename temporary file to real file
1613 os
.replace(tmp_port_file
.name
, port_filename
)
1615 'Renamed port file: src-path="{}", dst-path="{}"'.format(
1616 tmp_port_file
.name
, port_filename
1621 # A tracing session descriptor.
1623 # In the constructor, `traces` is a list of LTTng traces (`LttngTrace`
1625 class LttngTracingSessionDescriptor
:
1627 self
, name
, tracing_session_id
, hostname
, live_timer_freq
, client_count
, traces
1629 for trace
in traces
:
1630 if name
not in trace
.path
:
1631 fmt
= 'Tracing session name must be part of every trace path (`{}` not found in `{}`)'
1632 raise ValueError(fmt
.format(name
, trace
.path
))
1634 self
._traces
= traces
1635 stream_count
= sum([len(t
) + 1 for t
in traces
])
1636 self
._info
= _LttngLiveViewerTracingSessionInfo(
1654 def _session_descriptors_from_path(sessions_filename
, trace_path_prefix
):
1659 # "name": "my-session",
1661 # "hostname": "myhost",
1662 # "live-timer-freq": 1000000,
1663 # "client-count": 23,
1669 # "path": "meow/mix",
1671 # "my_stream": [ 5235787, 728375283 ]
1673 # "metadata-sections": [
1683 with
open(sessions_filename
, 'r') as sessions_file
:
1684 params
= json
.load(sessions_file
)
1688 for session
in params
:
1689 name
= session
['name']
1690 tracing_session_id
= session
['id']
1691 hostname
= session
['hostname']
1692 live_timer_freq
= session
['live-timer-freq']
1693 client_count
= session
['client-count']
1696 for trace
in session
['traces']:
1697 metadata_sections
= trace
.get('metadata-sections')
1698 beacons
= trace
.get('beacons')
1699 path
= trace
['path']
1701 if not os
.path
.isabs(path
):
1702 path
= os
.path
.join(trace_path_prefix
, path
)
1704 traces
.append(LttngTrace(path
, metadata_sections
, beacons
))
1707 LttngTracingSessionDescriptor(
1720 def _loglevel_parser(string
):
1721 loglevels
= {'info': logging
.INFO
, 'warning': logging
.WARNING
}
1722 if string
not in loglevels
:
1723 msg
= "{} is not a valid loglevel".format(string
)
1724 raise argparse
.ArgumentTypeError(msg
)
1725 return loglevels
[string
]
1728 if __name__
== '__main__':
1729 logging
.basicConfig(format
='# %(asctime)-25s%(message)s')
1730 parser
= argparse
.ArgumentParser(
1731 description
='LTTng-live protocol mocker', add_help
=False
1733 parser
.add_argument(
1736 choices
=['info', 'warning'],
1737 help='The loglevel to be used.',
1740 loglevel_namespace
, remaining_args
= parser
.parse_known_args()
1741 logging
.getLogger().setLevel(_loglevel_parser(loglevel_namespace
.log_level
))
1743 parser
.add_argument(
1745 help='The final port file. This file is present when the server is ready to receive connection.',
1748 parser
.add_argument(
1749 '--max-query-data-response-size',
1751 help='The maximum size of control data response in bytes',
1753 parser
.add_argument(
1754 '--trace-path-prefix',
1756 help='Prefix to prepend to the trace paths of session configurations',
1758 parser
.add_argument(
1759 '--sessions-filename',
1761 help='Path to a session configuration file',
1763 parser
.add_argument(
1767 default
=argparse
.SUPPRESS
,
1768 help='Show this help message and exit.',
1771 args
= parser
.parse_args(args
=remaining_args
)
1773 sessions
= _session_descriptors_from_path(
1774 args
.sessions_filename
,
1775 args
.trace_path_prefix
,
1777 LttngLiveServer(args
.port_filename
, sessions
, args
.max_query_data_response_size
)
1778 except UnexpectedInput
as exc
:
1779 logging
.error(str(exc
))
1780 print(exc
, file=sys
.stderr
)