1 # SPDX-License-Identifier: MIT
3 # Copyright (C) 2019 Philippe Proulx <pproulx@efficios.com>
19 class UnexpectedInput(RuntimeError):
23 class _LttngLiveViewerCommand
:
24 def __init__(self
, version
):
25 self
._version
= version
32 class _LttngLiveViewerConnectCommand(_LttngLiveViewerCommand
):
33 def __init__(self
, version
, viewer_session_id
, major
, minor
):
34 super().__init
__(version
)
35 self
._viewer
_session
_id
= viewer_session_id
40 def viewer_session_id(self
):
41 return self
._viewer
_session
_id
52 class _LttngLiveViewerConnectReply
:
53 def __init__(self
, viewer_session_id
, major
, minor
):
54 self
._viewer
_session
_id
= viewer_session_id
59 def viewer_session_id(self
):
60 return self
._viewer
_session
_id
71 class _LttngLiveViewerGetTracingSessionInfosCommand(_LttngLiveViewerCommand
):
75 class _LttngLiveViewerTracingSessionInfo
:
85 self
._tracing
_session
_id
= tracing_session_id
86 self
._live
_timer
_freq
= live_timer_freq
87 self
._client
_count
= client_count
88 self
._stream
_count
= stream_count
89 self
._hostname
= hostname
93 def tracing_session_id(self
):
94 return self
._tracing
_session
_id
97 def live_timer_freq(self
):
98 return self
._live
_timer
_freq
101 def client_count(self
):
102 return self
._client
_count
105 def stream_count(self
):
106 return self
._stream
_count
110 return self
._hostname
117 class _LttngLiveViewerGetTracingSessionInfosReply
:
118 def __init__(self
, tracing_session_infos
):
119 self
._tracing
_session
_infos
= tracing_session_infos
122 def tracing_session_infos(self
):
123 return self
._tracing
_session
_infos
126 class _LttngLiveViewerAttachToTracingSessionCommand(_LttngLiveViewerCommand
):
131 def __init__(self
, version
, tracing_session_id
, offset
, seek_type
):
132 super().__init
__(version
)
133 self
._tracing
_session
_id
= tracing_session_id
134 self
._offset
= offset
135 self
._seek
_type
= seek_type
138 def tracing_session_id(self
):
139 return self
._tracing
_session
_id
147 return self
._seek
_type
150 class _LttngLiveViewerStreamInfo
:
151 def __init__(self
, id, trace_id
, is_metadata
, path
, channel_name
):
153 self
._trace
_id
= trace_id
154 self
._is
_metadata
= is_metadata
156 self
._channel
_name
= channel_name
164 return self
._trace
_id
167 def is_metadata(self
):
168 return self
._is
_metadata
175 def channel_name(self
):
176 return self
._channel
_name
179 class _LttngLiveViewerAttachToTracingSessionReply
:
188 def __init__(self
, status
, stream_infos
):
189 self
._status
= status
190 self
._stream
_infos
= stream_infos
197 def stream_infos(self
):
198 return self
._stream
_infos
201 class _LttngLiveViewerGetNextDataStreamIndexEntryCommand(_LttngLiveViewerCommand
):
202 def __init__(self
, version
, stream_id
):
203 super().__init
__(version
)
204 self
._stream
_id
= stream_id
208 return self
._stream
_id
211 class _LttngLiveViewerGetNextDataStreamIndexEntryReply
:
220 def __init__(self
, status
, index_entry
, has_new_metadata
, has_new_data_stream
):
221 self
._status
= status
222 self
._index
_entry
= index_entry
223 self
._has
_new
_metadata
= has_new_metadata
224 self
._has
_new
_data
_stream
= has_new_data_stream
231 def index_entry(self
):
232 return self
._index
_entry
235 def has_new_metadata(self
):
236 return self
._has
_new
_metadata
239 def has_new_data_stream(self
):
240 return self
._has
_new
_data
_stream
243 class _LttngLiveViewerGetDataStreamPacketDataCommand(_LttngLiveViewerCommand
):
244 def __init__(self
, version
, stream_id
, offset
, req_length
):
245 super().__init
__(version
)
246 self
._stream
_id
= stream_id
247 self
._offset
= offset
248 self
._req
_length
= req_length
252 return self
._stream
_id
259 def req_length(self
):
260 return self
._req
_length
263 class _LttngLiveViewerGetDataStreamPacketDataReply
:
270 def __init__(self
, status
, data
, has_new_metadata
, has_new_data_stream
):
271 self
._status
= status
273 self
._has
_new
_metadata
= has_new_metadata
274 self
._has
_new
_data
_stream
= has_new_data_stream
285 def has_new_metadata(self
):
286 return self
._has
_new
_metadata
289 def has_new_data_stream(self
):
290 return self
._has
_new
_data
_stream
293 class _LttngLiveViewerGetMetadataStreamDataCommand(_LttngLiveViewerCommand
):
294 def __init__(self
, version
, stream_id
):
295 super().__init
__(version
)
296 self
._stream
_id
= stream_id
300 return self
._stream
_id
303 class _LttngLiveViewerGetMetadataStreamDataContentReply
:
309 def __init__(self
, status
, data
):
310 self
._status
= status
322 class _LttngLiveViewerGetNewStreamInfosCommand(_LttngLiveViewerCommand
):
323 def __init__(self
, version
, tracing_session_id
):
324 super().__init
__(version
)
325 self
._tracing
_session
_id
= tracing_session_id
328 def tracing_session_id(self
):
329 return self
._tracing
_session
_id
332 class _LttngLiveViewerGetNewStreamInfosReply
:
339 def __init__(self
, status
, stream_infos
):
340 self
._status
= status
341 self
._stream
_infos
= stream_infos
348 def stream_infos(self
):
349 return self
._stream
_infos
352 class _LttngLiveViewerCreateViewerSessionCommand(_LttngLiveViewerCommand
):
356 class _LttngLiveViewerCreateViewerSessionReply
:
361 def __init__(self
, status
):
362 self
._status
= status
369 class _LttngLiveViewerDetachFromTracingSessionCommand(_LttngLiveViewerCommand
):
370 def __init__(self
, version
, tracing_session_id
):
371 super().__init
__(version
)
372 self
._tracing
_session
_id
= tracing_session_id
375 def tracing_session_id(self
):
376 return self
._tracing
_session
_id
379 class _LttngLiveViewerDetachFromTracingSessionReply
:
385 def __init__(self
, status
):
386 self
._status
= status
393 # An LTTng live protocol codec can convert bytes to command objects and
394 # reply objects to bytes.
395 class _LttngLiveViewerProtocolCodec
:
396 _COMMAND_HEADER_STRUCT_FMT
= 'QII'
397 _COMMAND_HEADER_SIZE_BYTES
= struct
.calcsize(_COMMAND_HEADER_STRUCT_FMT
)
402 def _unpack(self
, fmt
, data
, offset
=0):
404 return struct
.unpack_from(fmt
, data
, offset
)
406 def _unpack_payload(self
, fmt
, data
):
408 fmt
, data
, _LttngLiveViewerProtocolCodec
._COMMAND
_HEADER
_SIZE
_BYTES
411 def decode(self
, data
):
412 if len(data
) < self
._COMMAND
_HEADER
_SIZE
_BYTES
:
413 # Not enough data to read the command header
416 payload_size
, cmd_type
, version
= self
._unpack
(
417 self
._COMMAND
_HEADER
_STRUCT
_FMT
, data
420 'Decoded command header: payload-size={}, cmd-type={}, version={}'.format(
421 payload_size
, cmd_type
, version
425 if len(data
) < self
._COMMAND
_HEADER
_SIZE
_BYTES
+ payload_size
:
426 # Not enough data to read the whole command
430 viewer_session_id
, major
, minor
, conn_type
= self
._unpack
_payload
(
433 return _LttngLiveViewerConnectCommand(
434 version
, viewer_session_id
, major
, minor
437 return _LttngLiveViewerGetTracingSessionInfosCommand(version
)
439 tracing_session_id
, offset
, seek_type
= self
._unpack
_payload
('QQI', data
)
440 return _LttngLiveViewerAttachToTracingSessionCommand(
441 version
, tracing_session_id
, offset
, seek_type
444 (stream_id
,) = self
._unpack
_payload
('Q', data
)
445 return _LttngLiveViewerGetNextDataStreamIndexEntryCommand(
449 stream_id
, offset
, req_length
= self
._unpack
_payload
('QQI', data
)
450 return _LttngLiveViewerGetDataStreamPacketDataCommand(
451 version
, stream_id
, offset
, req_length
454 (stream_id
,) = self
._unpack
_payload
('Q', data
)
455 return _LttngLiveViewerGetMetadataStreamDataCommand(version
, stream_id
)
457 (tracing_session_id
,) = self
._unpack
_payload
('Q', data
)
458 return _LttngLiveViewerGetNewStreamInfosCommand(version
, tracing_session_id
)
460 return _LttngLiveViewerCreateViewerSessionCommand(version
)
462 (tracing_session_id
,) = self
._unpack
_payload
('Q', data
)
463 return _LttngLiveViewerDetachFromTracingSessionCommand(
464 version
, tracing_session_id
467 raise UnexpectedInput('Unknown command type {}'.format(cmd_type
))
469 def _pack(self
, fmt
, *args
):
470 # Force network byte order
471 return struct
.pack('!' + fmt
, *args
)
473 def _encode_zero_padded_str(self
, string
, length
):
474 data
= string
.encode()
475 return data
.ljust(length
, b
'\x00')
477 def _encode_stream_info(self
, info
):
478 data
= self
._pack
('QQI', info
.id, info
.trace_id
, int(info
.is_metadata
))
479 data
+= self
._encode
_zero
_padded
_str
(info
.path
, 4096)
480 data
+= self
._encode
_zero
_padded
_str
(info
.channel_name
, 255)
483 def _get_has_new_stuff_flags(self
, has_new_metadata
, has_new_data_streams
):
489 if has_new_data_streams
:
494 def encode(self
, reply
):
495 if type(reply
) is _LttngLiveViewerConnectReply
:
497 'QIII', reply
.viewer_session_id
, reply
.major
, reply
.minor
, 2
499 elif type(reply
) is _LttngLiveViewerGetTracingSessionInfosReply
:
500 data
= self
._pack
('I', len(reply
.tracing_session_infos
))
502 for info
in reply
.tracing_session_infos
:
505 info
.tracing_session_id
,
506 info
.live_timer_freq
,
510 data
+= self
._encode
_zero
_padded
_str
(info
.hostname
, 64)
511 data
+= self
._encode
_zero
_padded
_str
(info
.name
, 255)
512 elif type(reply
) is _LttngLiveViewerAttachToTracingSessionReply
:
513 data
= self
._pack
('II', reply
.status
, len(reply
.stream_infos
))
515 for info
in reply
.stream_infos
:
516 data
+= self
._encode
_stream
_info
(info
)
517 elif type(reply
) is _LttngLiveViewerGetNextDataStreamIndexEntryReply
:
518 index_format
= 'QQQQQQQII'
519 entry
= reply
.index_entry
520 flags
= self
._get
_has
_new
_stuff
_flags
(
521 reply
.has_new_metadata
, reply
.has_new_data_stream
524 if type(entry
) is _LttngDataStreamIndexEntry
:
528 entry
.total_size_bits
,
529 entry
.content_size_bits
,
530 entry
.timestamp_begin
,
532 entry
.events_discarded
,
533 entry
.stream_class_id
,
538 assert type(entry
) is _LttngDataStreamBeaconEntry
547 entry
.stream_class_id
,
551 elif type(reply
) is _LttngLiveViewerGetDataStreamPacketDataReply
:
552 flags
= self
._get
_has
_new
_stuff
_flags
(
553 reply
.has_new_metadata
, reply
.has_new_data_stream
555 data
= self
._pack
('III', reply
.status
, len(reply
.data
), flags
)
557 elif type(reply
) is _LttngLiveViewerGetMetadataStreamDataContentReply
:
558 data
= self
._pack
('QI', len(reply
.data
), reply
.status
)
560 elif type(reply
) is _LttngLiveViewerGetNewStreamInfosReply
:
561 data
= self
._pack
('II', reply
.status
, len(reply
.stream_infos
))
563 for info
in reply
.stream_infos
:
564 data
+= self
._encode
_stream
_info
(info
)
565 elif type(reply
) is _LttngLiveViewerCreateViewerSessionReply
:
566 data
= self
._pack
('I', reply
.status
)
567 elif type(reply
) is _LttngLiveViewerDetachFromTracingSessionReply
:
568 data
= self
._pack
('I', reply
.status
)
571 'Unknown reply object with class `{}`'.format(reply
.__class
__.__name
__)
577 # An entry within the index of an LTTng data stream.
578 class _LttngDataStreamIndexEntry
:
589 self
._offset
_bytes
= offset_bytes
590 self
._total
_size
_bits
= total_size_bits
591 self
._content
_size
_bits
= content_size_bits
592 self
._timestamp
_begin
= timestamp_begin
593 self
._timestamp
_end
= timestamp_end
594 self
._events
_discarded
= events_discarded
595 self
._stream
_class
_id
= stream_class_id
598 def offset_bytes(self
):
599 return self
._offset
_bytes
602 def total_size_bits(self
):
603 return self
._total
_size
_bits
606 def total_size_bytes(self
):
607 return self
._total
_size
_bits
// 8
610 def content_size_bits(self
):
611 return self
._content
_size
_bits
614 def content_size_bytes(self
):
615 return self
._content
_size
_bits
// 8
618 def timestamp_begin(self
):
619 return self
._timestamp
_begin
622 def timestamp_end(self
):
623 return self
._timestamp
_end
626 def events_discarded(self
):
627 return self
._events
_discarded
630 def stream_class_id(self
):
631 return self
._stream
_class
_id
634 # An entry within the index of an LTTng data stream. While a stream beacon entry
635 # is conceptually unrelated to an index, it is sent as a reply to a
636 # LttngLiveViewerGetNextDataStreamIndexEntryCommand
637 class _LttngDataStreamBeaconEntry
:
638 def __init__(self
, stream_class_id
, timestamp
):
639 self
._stream
_class
_id
= stream_class_id
640 self
._timestamp
= timestamp
644 return self
._timestamp
647 def stream_class_id(self
):
648 return self
._stream
_class
_id
651 # The index of an LTTng data stream, a sequence of index entries.
652 class _LttngDataStreamIndex(collections
.abc
.Sequence
):
653 def __init__(self
, path
, beacons
):
658 stream_class_id
= self
._entries
[0].stream_class_id
660 _LttngDataStreamBeaconEntry(stream_class_id
, ts
) for ts
in beacons
662 self
._add
_beacons
(beacons
)
665 'Built data stream index entries: path="{}", count={}'.format(
666 path
, len(self
._entries
)
672 assert os
.path
.isfile(self
._path
)
674 with
open(self
._path
, 'rb') as f
:
677 size
= struct
.calcsize(fmt
)
679 assert len(data
) == size
680 magic
, index_major
, index_minor
, index_entry_length
= struct
.unpack(
683 assert magic
== 0xC1F1DCC1
687 size
= struct
.calcsize(fmt
)
691 'Decoding data stream index entry: path="{}", offset={}'.format(
701 assert len(data
) == size
710 ) = struct
.unpack(fmt
, data
)
712 self
._entries
.append(
713 _LttngDataStreamIndexEntry(
724 # Skip anything else before the next entry
725 f
.seek(index_entry_length
- size
, os
.SEEK_CUR
)
727 def _add_beacons(self
, beacons
):
728 # Assumes entries[n + 1].timestamp_end >= entries[n].timestamp_begin
730 if type(entry
) is _LttngDataStreamBeaconEntry
:
731 return entry
.timestamp
733 return entry
.timestamp_end
735 self
._entries
+= beacons
736 self
._entries
.sort(key
=sort_key
)
738 def __getitem__(self
, index
):
739 return self
._entries
[index
]
742 return len(self
._entries
)
749 # An LTTng data stream.
750 class _LttngDataStream
:
751 def __init__(self
, path
, beacons
):
753 filename
= os
.path
.basename(path
)
754 match
= re
.match(r
'(.*)_\d+', filename
)
755 self
._channel
_name
= match
.group(1)
756 trace_dir
= os
.path
.dirname(path
)
757 index_path
= os
.path
.join(trace_dir
, 'index', filename
+ '.idx')
758 self
._index
= _LttngDataStreamIndex(index_path
, beacons
)
759 assert os
.path
.isfile(path
)
760 self
._file
= open(path
, 'rb')
762 'Built data stream: path="{}", channel-name="{}"'.format(
763 path
, self
._channel
_name
772 def channel_name(self
):
773 return self
._channel
_name
779 def get_data(self
, offset_bytes
, len_bytes
):
780 self
._file
.seek(offset_bytes
)
781 return self
._file
.read(len_bytes
)
784 # An LTTng metadata stream.
785 class _LttngMetadataStream
:
786 def __init__(self
, path
):
788 logging
.info('Built metadata stream: path="{}"'.format(path
))
796 assert os
.path
.isfile(self
._path
)
798 with
open(self
._path
, 'rb') as f
:
802 # An LTTng trace, a sequence of LTTng data streams.
803 class LttngTrace(collections
.abc
.Sequence
):
804 def __init__(self
, trace_dir
, beacons
):
805 assert os
.path
.isdir(trace_dir
)
806 self
._path
= trace_dir
807 self
._metadata
_stream
= _LttngMetadataStream(
808 os
.path
.join(trace_dir
, 'metadata')
810 self
._create
_data
_streams
(trace_dir
, beacons
)
811 logging
.info('Built trace: path="{}"'.format(trace_dir
))
813 def _create_data_streams(self
, trace_dir
, beacons
):
814 data_stream_paths
= []
816 for filename
in os
.listdir(trace_dir
):
817 path
= os
.path
.join(trace_dir
, filename
)
819 if not os
.path
.isfile(path
):
822 if filename
.startswith('.'):
825 if filename
== 'metadata':
828 data_stream_paths
.append(path
)
830 data_stream_paths
.sort()
831 self
._data
_streams
= []
833 for data_stream_path
in data_stream_paths
:
834 stream_name
= os
.path
.basename(data_stream_path
)
835 this_stream_beacons
= None
837 if beacons
is not None and stream_name
in beacons
:
838 this_stream_beacons
= beacons
[stream_name
]
840 self
._data
_streams
.append(
841 _LttngDataStream(data_stream_path
, this_stream_beacons
)
849 def metadata_stream(self
):
850 return self
._metadata
_stream
852 def __getitem__(self
, index
):
853 return self
._data
_streams
[index
]
856 return len(self
._data
_streams
)
859 # The state of a single data stream.
860 class _LttngLiveViewerSessionDataStreamState
:
861 def __init__(self
, ts_state
, info
, data_stream
):
862 self
._ts
_state
= ts_state
864 self
._data
_stream
= data_stream
865 self
._cur
_index
_entry
_index
= 0
866 fmt
= 'Built data stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
870 ts_state
.tracing_session_descriptor
.info
.tracing_session_id
,
871 ts_state
.tracing_session_descriptor
.info
.name
,
877 def tracing_session_state(self
):
878 return self
._ts
_state
885 def data_stream(self
):
886 return self
._data
_stream
889 def cur_index_entry(self
):
890 if self
._cur
_index
_entry
_index
== len(self
._data
_stream
.index
):
893 return self
._data
_stream
.index
[self
._cur
_index
_entry
_index
]
895 def goto_next_index_entry(self
):
896 self
._cur
_index
_entry
_index
+= 1
899 # The state of a single metadata stream.
900 class _LttngLiveViewerSessionMetadataStreamState
:
901 def __init__(self
, ts_state
, info
, metadata_stream
):
902 self
._ts
_state
= ts_state
904 self
._metadata
_stream
= metadata_stream
905 self
._is
_sent
= False
906 fmt
= 'Built metadata stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
910 ts_state
.tracing_session_descriptor
.info
.tracing_session_id
,
911 ts_state
.tracing_session_descriptor
.info
.name
,
912 metadata_stream
.path
,
917 def trace_session_state(self
):
918 return self
._trace
_session
_state
925 def metadata_stream(self
):
926 return self
._metadata
_stream
933 def is_sent(self
, value
):
934 self
._is
_sent
= value
937 # The state of a tracing session.
938 class _LttngLiveViewerSessionTracingSessionState
:
939 def __init__(self
, tc_descr
, base_stream_id
):
940 self
._tc
_descr
= tc_descr
941 self
._stream
_infos
= []
944 stream_id
= base_stream_id
946 for trace
in tc_descr
.traces
:
947 trace_id
= stream_id
* 1000
949 # Data streams -> stream infos and data stream states
950 for data_stream
in trace
:
951 info
= _LttngLiveViewerStreamInfo(
956 data_stream
.channel_name
,
958 self
._stream
_infos
.append(info
)
959 self
._ds
_states
[stream_id
] = _LttngLiveViewerSessionDataStreamState(
960 self
, info
, data_stream
964 # Metadata stream -> stream info and metadata stream state
965 info
= _LttngLiveViewerStreamInfo(
966 stream_id
, trace_id
, True, trace
.metadata_stream
.path
, 'metadata'
968 self
._stream
_infos
.append(info
)
969 self
._ms
_states
[stream_id
] = _LttngLiveViewerSessionMetadataStreamState(
970 self
, info
, trace
.metadata_stream
974 self
._is
_attached
= False
975 fmt
= 'Built tracing session state: id={}, name="{}"'
976 logging
.info(fmt
.format(tc_descr
.info
.tracing_session_id
, tc_descr
.info
.name
))
979 def tracing_session_descriptor(self
):
980 return self
._tc
_descr
983 def data_stream_states(self
):
984 return self
._ds
_states
987 def metadata_stream_states(self
):
988 return self
._ms
_states
991 def stream_infos(self
):
992 return self
._stream
_infos
995 def has_new_metadata(self
):
996 return any([not ms
.is_sent
for ms
in self
._ms
_states
.values()])
999 def is_attached(self
):
1000 return self
._is
_attached
1003 def is_attached(self
, value
):
1004 self
._is
_attached
= value
1007 # An LTTng live viewer session manages a view on tracing sessions
1008 # and replies to commands accordingly.
1009 class _LttngLiveViewerSession
:
1013 tracing_session_descriptors
,
1014 max_query_data_response_size
,
1016 self
._viewer
_session
_id
= viewer_session_id
1017 self
._ts
_states
= {}
1018 self
._stream
_states
= {}
1019 self
._max
_query
_data
_response
_size
= max_query_data_response_size
1020 total_stream_infos
= 0
1022 for ts_descr
in tracing_session_descriptors
:
1023 ts_state
= _LttngLiveViewerSessionTracingSessionState(
1024 ts_descr
, total_stream_infos
1026 ts_id
= ts_state
.tracing_session_descriptor
.info
.tracing_session_id
1027 self
._ts
_states
[ts_id
] = ts_state
1028 total_stream_infos
+= len(ts_state
.stream_infos
)
1030 # Update session's stream states to have the new states
1031 self
._stream
_states
.update(ts_state
.data_stream_states
)
1032 self
._stream
_states
.update(ts_state
.metadata_stream_states
)
1034 self
._command
_handlers
= {
1035 _LttngLiveViewerAttachToTracingSessionCommand
: self
._handle
_attach
_to
_tracing
_session
_command
,
1036 _LttngLiveViewerCreateViewerSessionCommand
: self
._handle
_create
_viewer
_session
_command
,
1037 _LttngLiveViewerDetachFromTracingSessionCommand
: self
._handle
_detach
_from
_tracing
_session
_command
,
1038 _LttngLiveViewerGetDataStreamPacketDataCommand
: self
._handle
_get
_data
_stream
_packet
_data
_command
,
1039 _LttngLiveViewerGetMetadataStreamDataCommand
: self
._handle
_get
_metadata
_stream
_data
_command
,
1040 _LttngLiveViewerGetNewStreamInfosCommand
: self
._handle
_get
_new
_stream
_infos
_command
,
1041 _LttngLiveViewerGetNextDataStreamIndexEntryCommand
: self
._handle
_get
_next
_data
_stream
_index
_entry
_command
,
1042 _LttngLiveViewerGetTracingSessionInfosCommand
: self
._handle
_get
_tracing
_session
_infos
_command
,
1046 def viewer_session_id(self
):
1047 return self
._viewer
_session
_id
1049 def _get_tracing_session_state(self
, tracing_session_id
):
1050 if tracing_session_id
not in self
._ts
_states
:
1051 raise UnexpectedInput(
1052 'Unknown tracing session ID {}'.format(tracing_session_id
)
1055 return self
._ts
_states
[tracing_session_id
]
1057 def _get_stream_state(self
, stream_id
):
1058 if stream_id
not in self
._stream
_states
:
1059 UnexpectedInput('Unknown stream ID {}'.format(stream_id
))
1061 return self
._stream
_states
[stream_id
]
1063 def handle_command(self
, cmd
):
1065 'Handling command in viewer session: cmd-cls-name={}'.format(
1066 cmd
.__class
__.__name
__
1069 cmd_type
= type(cmd
)
1071 if cmd_type
not in self
._command
_handlers
:
1072 raise UnexpectedInput(
1073 'Unexpected command: cmd-cls-name={}'.format(cmd
.__class
__.__name
__)
1076 return self
._command
_handlers
[cmd_type
](cmd
)
1078 def _handle_attach_to_tracing_session_command(self
, cmd
):
1079 fmt
= 'Handling "attach to tracing session" command: ts-id={}, offset={}, seek-type={}'
1080 logging
.info(fmt
.format(cmd
.tracing_session_id
, cmd
.offset
, cmd
.seek_type
))
1081 ts_state
= self
._get
_tracing
_session
_state
(cmd
.tracing_session_id
)
1082 info
= ts_state
.tracing_session_descriptor
.info
1084 if ts_state
.is_attached
:
1085 raise UnexpectedInput(
1086 'Cannot attach to tracing session `{}`: viewer is already attached'.format(
1091 ts_state
.is_attached
= True
1092 status
= _LttngLiveViewerAttachToTracingSessionReply
.Status
.OK
1093 return _LttngLiveViewerAttachToTracingSessionReply(
1094 status
, ts_state
.stream_infos
1097 def _handle_detach_from_tracing_session_command(self
, cmd
):
1098 fmt
= 'Handling "detach from tracing session" command: ts-id={}'
1099 logging
.info(fmt
.format(cmd
.tracing_session_id
))
1100 ts_state
= self
._get
_tracing
_session
_state
(cmd
.tracing_session_id
)
1101 info
= ts_state
.tracing_session_descriptor
.info
1103 if not ts_state
.is_attached
:
1104 raise UnexpectedInput(
1105 'Cannot detach to tracing session `{}`: viewer is not attached'.format(
1110 ts_state
.is_attached
= False
1111 status
= _LttngLiveViewerDetachFromTracingSessionReply
.Status
.OK
1112 return _LttngLiveViewerDetachFromTracingSessionReply(status
)
1114 def _handle_get_next_data_stream_index_entry_command(self
, cmd
):
1115 fmt
= 'Handling "get next data stream index entry" command: stream-id={}'
1116 logging
.info(fmt
.format(cmd
.stream_id
))
1117 stream_state
= self
._get
_stream
_state
(cmd
.stream_id
)
1119 if type(stream_state
) is not _LttngLiveViewerSessionDataStreamState
:
1120 raise UnexpectedInput(
1121 'Stream with ID {} is not a data stream'.format(cmd
.stream_id
)
1124 if stream_state
.cur_index_entry
is None:
1125 # The viewer is done reading this stream
1126 status
= _LttngLiveViewerGetNextDataStreamIndexEntryReply
.Status
.HUP
1128 # Dummy data stream index entry to use with the `HUP` status
1129 # (the reply needs one, but the viewer ignores it)
1130 index_entry
= _LttngDataStreamIndexEntry(0, 0, 0, 0, 0, 0, 0)
1132 return _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1133 status
, index_entry
, False, False
1136 # The viewer only checks the `has_new_metadata` flag if the
1137 # reply's status is `OK`, so we need to provide an index here
1138 has_new_metadata
= stream_state
.tracing_session_state
.has_new_metadata
1139 if type(stream_state
.cur_index_entry
) is _LttngDataStreamIndexEntry
:
1140 status
= _LttngLiveViewerGetNextDataStreamIndexEntryReply
.Status
.OK
1142 assert type(stream_state
.cur_index_entry
) is _LttngDataStreamBeaconEntry
1143 status
= _LttngLiveViewerGetNextDataStreamIndexEntryReply
.Status
.INACTIVE
1145 reply
= _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1146 status
, stream_state
.cur_index_entry
, has_new_metadata
, False
1148 stream_state
.goto_next_index_entry()
1151 def _handle_get_data_stream_packet_data_command(self
, cmd
):
1152 fmt
= 'Handling "get data stream packet data" command: stream-id={}, offset={}, req-length={}'
1153 logging
.info(fmt
.format(cmd
.stream_id
, cmd
.offset
, cmd
.req_length
))
1154 stream_state
= self
._get
_stream
_state
(cmd
.stream_id
)
1155 data_response_length
= cmd
.req_length
1157 if type(stream_state
) is not _LttngLiveViewerSessionDataStreamState
:
1158 raise UnexpectedInput(
1159 'Stream with ID {} is not a data stream'.format(cmd
.stream_id
)
1162 if stream_state
.tracing_session_state
.has_new_metadata
:
1163 status
= _LttngLiveViewerGetDataStreamPacketDataReply
.Status
.ERROR
1164 return _LttngLiveViewerGetDataStreamPacketDataReply(
1165 status
, bytes(), True, False
1168 if self
._max
_query
_data
_response
_size
:
1169 # Enforce a server side limit on the query requested length.
1170 # To ensure that the transaction terminate take the minimum of both
1172 data_response_length
= min(
1173 cmd
.req_length
, self
._max
_query
_data
_response
_size
1175 fmt
= 'Limiting "get data stream packet data" command: req-length={} actual response size={}'
1176 logging
.info(fmt
.format(cmd
.req_length
, data_response_length
))
1178 data
= stream_state
.data_stream
.get_data(cmd
.offset
, data_response_length
)
1179 status
= _LttngLiveViewerGetDataStreamPacketDataReply
.Status
.OK
1180 return _LttngLiveViewerGetDataStreamPacketDataReply(status
, data
, False, False)
1182 def _handle_get_metadata_stream_data_command(self
, cmd
):
1183 fmt
= 'Handling "get metadata stream data" command: stream-id={}'
1184 logging
.info(fmt
.format(cmd
.stream_id
))
1185 stream_state
= self
._get
_stream
_state
(cmd
.stream_id
)
1187 if type(stream_state
) is not _LttngLiveViewerSessionMetadataStreamState
:
1188 raise UnexpectedInput(
1189 'Stream with ID {} is not a metadata stream'.format(cmd
.stream_id
)
1192 if stream_state
.is_sent
:
1193 status
= _LttngLiveViewerGetMetadataStreamDataContentReply
.Status
.NO_NEW
1194 return _LttngLiveViewerGetMetadataStreamDataContentReply(status
, bytes())
1196 stream_state
.is_sent
= True
1197 status
= _LttngLiveViewerGetMetadataStreamDataContentReply
.Status
.OK
1198 return _LttngLiveViewerGetMetadataStreamDataContentReply(
1199 status
, stream_state
.metadata_stream
.data
1202 def _handle_get_new_stream_infos_command(self
, cmd
):
1203 fmt
= 'Handling "get new stream infos" command: ts-id={}'
1204 logging
.info(fmt
.format(cmd
.tracing_session_id
))
1206 # As of this version, all the tracing session's stream infos are
1207 # always given to the viewer when sending the "attach to tracing
1208 # session" reply, so there's nothing new here. Return the `HUP`
1209 # status as, if we're handling this command, the viewer consumed
1210 # all the existing data streams.
1211 status
= _LttngLiveViewerGetNewStreamInfosReply
.Status
.HUP
1212 return _LttngLiveViewerGetNewStreamInfosReply(status
, [])
1214 def _handle_get_tracing_session_infos_command(self
, cmd
):
1215 logging
.info('Handling "get tracing session infos" command.')
1217 tss
.tracing_session_descriptor
.info
for tss
in self
._ts
_states
.values()
1219 infos
.sort(key
=lambda info
: info
.name
)
1220 return _LttngLiveViewerGetTracingSessionInfosReply(infos
)
1222 def _handle_create_viewer_session_command(self
, cmd
):
1223 logging
.info('Handling "create viewer session" command.')
1224 status
= _LttngLiveViewerCreateViewerSessionReply
.Status
.OK
1226 # This does nothing here. In the LTTng relay daemon, it
1227 # allocates the viewer session's state.
1228 return _LttngLiveViewerCreateViewerSessionReply(status
)
1231 # An LTTng live TCP server.
1233 # On creation, it binds to `localhost` with an OS-assigned TCP port. It writes
1234 # the decimal TCP port number to a temporary port file. It renames the
1235 # temporary port file to `port_filename`.
1237 # `tracing_session_descriptors` is a list of tracing session descriptors
1238 # (`LttngTracingSessionDescriptor`) to serve.
1240 # This server accepts a single viewer (client).
1242 # When the viewer closes the connection, the server's constructor
1244 class LttngLiveServer
:
1246 self
, port_filename
, tracing_session_descriptors
, max_query_data_response_size
1248 logging
.info('Server configuration:')
1250 logging
.info(' Port file name: `{}`'.format(port_filename
))
1252 if max_query_data_response_size
is not None:
1254 ' Maximum response data query size: `{}`'.format(
1255 max_query_data_response_size
1259 for ts_descr
in tracing_session_descriptors
:
1260 info
= ts_descr
.info
1261 fmt
= ' TS descriptor: name="{}", id={}, hostname="{}", live-timer-freq={}, client-count={}, stream-count={}:'
1265 info
.tracing_session_id
,
1267 info
.live_timer_freq
,
1273 for trace
in ts_descr
.traces
:
1274 logging
.info(' Trace: path="{}"'.format(trace
.path
))
1276 self
._ts
_descriptors
= tracing_session_descriptors
1277 self
._max
_query
_data
_response
_size
= max_query_data_response_size
1278 self
._sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
1279 self
._codec
= _LttngLiveViewerProtocolCodec()
1281 # Port 0: OS assigns an unused port
1282 serv_addr
= ('localhost', 0)
1283 self
._sock
.bind(serv_addr
)
1284 self
._write
_port
_to
_file
(port_filename
)
1290 logging
.info('Closed connection and socket.')
1293 def _server_port(self
):
1294 return self
._sock
.getsockname()[1]
1296 def _recv_command(self
):
1300 logging
.info('Waiting for viewer command.')
1301 buf
= self
._conn
.recv(128)
1304 logging
.info('Client closed connection.')
1307 raise UnexpectedInput(
1308 'Client closed connection after having sent {} command bytes.'.format(
1315 logging
.info('Received data from viewer: length={}'.format(len(buf
)))
1320 cmd
= self
._codec
.decode(data
)
1321 except struct
.error
as exc
:
1322 raise UnexpectedInput('Malformed command: {}'.format(exc
)) from exc
1326 'Received command from viewer: cmd-cls-name={}'.format(
1327 cmd
.__class
__.__name
__
1332 def _send_reply(self
, reply
):
1333 data
= self
._codec
.encode(reply
)
1335 'Sending reply to viewer: reply-cls-name={}, length={}'.format(
1336 reply
.__class
__.__name
__, len(data
)
1339 self
._conn
.sendall(data
)
1341 def _handle_connection(self
):
1342 # First command must be "connect"
1343 cmd
= self
._recv
_command
()
1345 if type(cmd
) is not _LttngLiveViewerConnectCommand
:
1346 raise UnexpectedInput(
1347 'First command is not "connect": cmd-cls-name={}'.format(
1348 cmd
.__class
__.__name
__
1352 # Create viewer session (arbitrary ID 23)
1354 'LTTng live viewer connected: version={}.{}'.format(cmd
.major
, cmd
.minor
)
1356 viewer_session
= _LttngLiveViewerSession(
1357 23, self
._ts
_descriptors
, self
._max
_query
_data
_response
_size
1360 # Send "connect" reply
1362 _LttngLiveViewerConnectReply(viewer_session
.viewer_session_id
, 2, 10)
1365 # Make the viewer session handle the remaining commands
1367 cmd
= self
._recv
_command
()
1370 # Connection closed (at an expected location within the
1374 self
._send
_reply
(viewer_session
.handle_command(cmd
))
1377 logging
.info('Listening: port={}'.format(self
._server
_port
))
1378 # Backlog must be present for Python version < 3.5.
1379 # 128 is an arbitrary number since we expect only 1 connection anyway.
1380 self
._sock
.listen(128)
1381 self
._conn
, viewer_addr
= self
._sock
.accept()
1383 'Accepted viewer: addr={}:{}'.format(viewer_addr
[0], viewer_addr
[1])
1387 self
._handle
_connection
()
1391 def _write_port_to_file(self
, port_filename
):
1392 # Write the port number to a temporary file.
1393 with tempfile
.NamedTemporaryFile(mode
='w', delete
=False) as tmp_port_file
:
1394 print(self
._server
_port
, end
='', file=tmp_port_file
)
1396 # Rename temporary file to real file
1397 os
.replace(tmp_port_file
.name
, port_filename
)
1399 'Renamed port file: src-path="{}", dst-path="{}"'.format(
1400 tmp_port_file
.name
, port_filename
1405 # A tracing session descriptor.
1407 # In the constructor, `traces` is a list of LTTng traces (`LttngTrace`
1409 class LttngTracingSessionDescriptor
:
1411 self
, name
, tracing_session_id
, hostname
, live_timer_freq
, client_count
, traces
1413 for trace
in traces
:
1414 if name
not in trace
.path
:
1415 fmt
= 'Tracing session name must be part of every trace path (`{}` not found in `{}`)'
1416 raise ValueError(fmt
.format(name
, trace
.path
))
1418 self
._traces
= traces
1419 stream_count
= sum([len(t
) + 1 for t
in traces
])
1420 self
._info
= _LttngLiveViewerTracingSessionInfo(
1438 def _session_descriptors_from_path(sessions_filename
, trace_path_prefix
):
1443 # "name": "my-session",
1445 # "hostname": "myhost",
1446 # "live-timer-freq": 1000000,
1447 # "client-count": 23,
1453 # "path": "meow/mix",
1455 # "my_stream": [ 5235787, 728375283 ]
1461 with
open(sessions_filename
, 'r') as sessions_file
:
1462 params
= json
.load(sessions_file
)
1466 for session
in params
:
1467 name
= session
['name']
1468 tracing_session_id
= session
['id']
1469 hostname
= session
['hostname']
1470 live_timer_freq
= session
['live-timer-freq']
1471 client_count
= session
['client-count']
1474 for trace
in session
['traces']:
1475 beacons
= trace
.get('beacons')
1476 path
= trace
['path']
1478 if not os
.path
.isabs(path
):
1479 path
= os
.path
.join(trace_path_prefix
, path
)
1481 traces
.append(LttngTrace(path
, beacons
))
1484 LttngTracingSessionDescriptor(
1497 def _loglevel_parser(string
):
1498 loglevels
= {'info': logging
.INFO
, 'warning': logging
.WARNING
}
1499 if string
not in loglevels
:
1500 msg
= "{} is not a valid loglevel".format(string
)
1501 raise argparse
.ArgumentTypeError(msg
)
1502 return loglevels
[string
]
1505 if __name__
== '__main__':
1506 logging
.basicConfig(format
='# %(asctime)-25s%(message)s')
1507 parser
= argparse
.ArgumentParser(
1508 description
='LTTng-live protocol mocker', add_help
=False
1510 parser
.add_argument(
1513 choices
=['info', 'warning'],
1514 help='The loglevel to be used.',
1517 loglevel_namespace
, remaining_args
= parser
.parse_known_args()
1518 logging
.getLogger().setLevel(_loglevel_parser(loglevel_namespace
.log_level
))
1520 parser
.add_argument(
1522 help='The final port file. This file is present when the server is ready to receive connection.',
1525 parser
.add_argument(
1526 '--max-query-data-response-size',
1528 help='The maximum size of control data response in bytes',
1530 parser
.add_argument(
1531 '--trace-path-prefix',
1533 help='Prefix to prepend to the trace paths of session configurations',
1535 parser
.add_argument(
1536 '--sessions-filename',
1538 help='Path to a session configuration file',
1540 parser
.add_argument(
1544 default
=argparse
.SUPPRESS
,
1545 help='Show this help message and exit.',
1548 args
= parser
.parse_args(args
=remaining_args
)
1550 sessions
= _session_descriptors_from_path(
1551 args
.sessions_filename
, args
.trace_path_prefix
1553 LttngLiveServer(args
.port_filename
, sessions
, args
.max_query_data_response_size
)
1554 except UnexpectedInput
as exc
:
1555 logging
.error(str(exc
))
1556 print(exc
, file=sys
.stderr
)