1 # The MIT License (MIT)
3 # Copyright (c) 2019 Philippe Proulx <pproulx@efficios.com>
5 # Permission is hereby granted, free of charge, to any person obtaining a copy
6 # of this software and associated documentation files (the "Software"), to deal
7 # in the Software without restriction, including without limitation the rights
8 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 # copies of the Software, and to permit persons to whom the Software is
10 # furnished to do so, subject to the following conditions:
12 # The above copyright notice and this permission notice shall be included in
13 # all copies or substantial portions of the Software.
15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
24 import collections
.abc
36 class UnexpectedInput(RuntimeError):
40 class _LttngLiveViewerCommand
:
41 def __init__(self
, version
):
42 self
._version
= version
49 class _LttngLiveViewerConnectCommand(_LttngLiveViewerCommand
):
50 def __init__(self
, version
, viewer_session_id
, major
, minor
):
51 super().__init
__(version
)
52 self
._viewer
_session
_id
= viewer_session_id
57 def viewer_session_id(self
):
58 return self
._viewer
_session
_id
69 class _LttngLiveViewerConnectReply
:
70 def __init__(self
, viewer_session_id
, major
, minor
):
71 self
._viewer
_session
_id
= viewer_session_id
76 def viewer_session_id(self
):
77 return self
._viewer
_session
_id
88 class _LttngLiveViewerGetTracingSessionInfosCommand(_LttngLiveViewerCommand
):
92 class _LttngLiveViewerTracingSessionInfo
:
102 self
._tracing
_session
_id
= tracing_session_id
103 self
._live
_timer
_freq
= live_timer_freq
104 self
._client
_count
= client_count
105 self
._stream
_count
= stream_count
106 self
._hostname
= hostname
110 def tracing_session_id(self
):
111 return self
._tracing
_session
_id
114 def live_timer_freq(self
):
115 return self
._live
_timer
_freq
118 def client_count(self
):
119 return self
._client
_count
122 def stream_count(self
):
123 return self
._stream
_count
127 return self
._hostname
134 class _LttngLiveViewerGetTracingSessionInfosReply
:
135 def __init__(self
, tracing_session_infos
):
136 self
._tracing
_session
_infos
= tracing_session_infos
139 def tracing_session_infos(self
):
140 return self
._tracing
_session
_infos
143 class _LttngLiveViewerAttachToTracingSessionCommand(_LttngLiveViewerCommand
):
148 def __init__(self
, version
, tracing_session_id
, offset
, seek_type
):
149 super().__init
__(version
)
150 self
._tracing
_session
_id
= tracing_session_id
151 self
._offset
= offset
152 self
._seek
_type
= seek_type
155 def tracing_session_id(self
):
156 return self
._tracing
_session
_id
164 return self
._seek
_type
167 class _LttngLiveViewerStreamInfo
:
168 def __init__(self
, id, trace_id
, is_metadata
, path
, channel_name
):
170 self
._trace
_id
= trace_id
171 self
._is
_metadata
= is_metadata
173 self
._channel
_name
= channel_name
181 return self
._trace
_id
184 def is_metadata(self
):
185 return self
._is
_metadata
192 def channel_name(self
):
193 return self
._channel
_name
196 class _LttngLiveViewerAttachToTracingSessionReply
:
205 def __init__(self
, status
, stream_infos
):
206 self
._status
= status
207 self
._stream
_infos
= stream_infos
214 def stream_infos(self
):
215 return self
._stream
_infos
218 class _LttngLiveViewerGetNextDataStreamIndexEntryCommand(_LttngLiveViewerCommand
):
219 def __init__(self
, version
, stream_id
):
220 super().__init
__(version
)
221 self
._stream
_id
= stream_id
225 return self
._stream
_id
228 class _LttngLiveViewerGetNextDataStreamIndexEntryReply
:
237 def __init__(self
, status
, index_entry
, has_new_metadata
, has_new_data_stream
):
238 self
._status
= status
239 self
._index
_entry
= index_entry
240 self
._has
_new
_metadata
= has_new_metadata
241 self
._has
_new
_data
_stream
= has_new_data_stream
248 def index_entry(self
):
249 return self
._index
_entry
252 def has_new_metadata(self
):
253 return self
._has
_new
_metadata
256 def has_new_data_stream(self
):
257 return self
._has
_new
_data
_stream
260 class _LttngLiveViewerGetDataStreamPacketDataCommand(_LttngLiveViewerCommand
):
261 def __init__(self
, version
, stream_id
, offset
, req_length
):
262 super().__init
__(version
)
263 self
._stream
_id
= stream_id
264 self
._offset
= offset
265 self
._req
_length
= req_length
269 return self
._stream
_id
276 def req_length(self
):
277 return self
._req
_length
280 class _LttngLiveViewerGetDataStreamPacketDataReply
:
287 def __init__(self
, status
, data
, has_new_metadata
, has_new_data_stream
):
288 self
._status
= status
290 self
._has
_new
_metadata
= has_new_metadata
291 self
._has
_new
_data
_stream
= has_new_data_stream
302 def has_new_metadata(self
):
303 return self
._has
_new
_metadata
306 def has_new_data_stream(self
):
307 return self
._has
_new
_data
_stream
310 class _LttngLiveViewerGetMetadataStreamDataCommand(_LttngLiveViewerCommand
):
311 def __init__(self
, version
, stream_id
):
312 super().__init
__(version
)
313 self
._stream
_id
= stream_id
317 return self
._stream
_id
320 class _LttngLiveViewerGetMetadataStreamDataContentReply
:
326 def __init__(self
, status
, data
):
327 self
._status
= status
339 class _LttngLiveViewerGetNewStreamInfosCommand(_LttngLiveViewerCommand
):
340 def __init__(self
, version
, tracing_session_id
):
341 super().__init
__(version
)
342 self
._tracing
_session
_id
= tracing_session_id
345 def tracing_session_id(self
):
346 return self
._tracing
_session
_id
349 class _LttngLiveViewerGetNewStreamInfosReply
:
356 def __init__(self
, status
, stream_infos
):
357 self
._status
= status
358 self
._stream
_infos
= stream_infos
365 def stream_infos(self
):
366 return self
._stream
_infos
369 class _LttngLiveViewerCreateViewerSessionCommand(_LttngLiveViewerCommand
):
373 class _LttngLiveViewerCreateViewerSessionReply
:
378 def __init__(self
, status
):
379 self
._status
= status
386 class _LttngLiveViewerDetachFromTracingSessionCommand(_LttngLiveViewerCommand
):
387 def __init__(self
, version
, tracing_session_id
):
388 super().__init
__(version
)
389 self
._tracing
_session
_id
= tracing_session_id
392 def tracing_session_id(self
):
393 return self
._tracing
_session
_id
396 class _LttngLiveViewerDetachFromTracingSessionReply
:
402 def __init__(self
, status
):
403 self
._status
= status
410 # An LTTng live protocol codec can convert bytes to command objects and
411 # reply objects to bytes.
412 class _LttngLiveViewerProtocolCodec
:
413 _COMMAND_HEADER_STRUCT_FMT
= 'QII'
414 _COMMAND_HEADER_SIZE_BYTES
= struct
.calcsize(_COMMAND_HEADER_STRUCT_FMT
)
419 def _unpack(self
, fmt
, data
, offset
=0):
421 return struct
.unpack_from(fmt
, data
, offset
)
423 def _unpack_payload(self
, fmt
, data
):
425 fmt
, data
, _LttngLiveViewerProtocolCodec
._COMMAND
_HEADER
_SIZE
_BYTES
428 def decode(self
, data
):
429 if len(data
) < self
._COMMAND
_HEADER
_SIZE
_BYTES
:
430 # Not enough data to read the command header
433 payload_size
, cmd_type
, version
= self
._unpack
(
434 self
._COMMAND
_HEADER
_STRUCT
_FMT
, data
437 'Decoded command header: payload-size={}, cmd-type={}, version={}'.format(
438 payload_size
, cmd_type
, version
442 if len(data
) < self
._COMMAND
_HEADER
_SIZE
_BYTES
+ payload_size
:
443 # Not enough data to read the whole command
447 viewer_session_id
, major
, minor
, conn_type
= self
._unpack
_payload
(
450 return _LttngLiveViewerConnectCommand(
451 version
, viewer_session_id
, major
, minor
454 return _LttngLiveViewerGetTracingSessionInfosCommand(version
)
456 tracing_session_id
, offset
, seek_type
= self
._unpack
_payload
('QQI', data
)
457 return _LttngLiveViewerAttachToTracingSessionCommand(
458 version
, tracing_session_id
, offset
, seek_type
461 (stream_id
,) = self
._unpack
_payload
('Q', data
)
462 return _LttngLiveViewerGetNextDataStreamIndexEntryCommand(
466 stream_id
, offset
, req_length
= self
._unpack
_payload
('QQI', data
)
467 return _LttngLiveViewerGetDataStreamPacketDataCommand(
468 version
, stream_id
, offset
, req_length
471 (stream_id
,) = self
._unpack
_payload
('Q', data
)
472 return _LttngLiveViewerGetMetadataStreamDataCommand(version
, stream_id
)
474 (tracing_session_id
,) = self
._unpack
_payload
('Q', data
)
475 return _LttngLiveViewerGetNewStreamInfosCommand(version
, tracing_session_id
)
477 return _LttngLiveViewerCreateViewerSessionCommand(version
)
479 (tracing_session_id
,) = self
._unpack
_payload
('Q', data
)
480 return _LttngLiveViewerDetachFromTracingSessionCommand(
481 version
, tracing_session_id
484 raise UnexpectedInput('Unknown command type {}'.format(cmd_type
))
486 def _pack(self
, fmt
, *args
):
487 # Force network byte order
488 return struct
.pack('!' + fmt
, *args
)
490 def _encode_zero_padded_str(self
, string
, length
):
491 data
= string
.encode()
492 return data
.ljust(length
, b
'\x00')
494 def _encode_stream_info(self
, info
):
495 data
= self
._pack
('QQI', info
.id, info
.trace_id
, int(info
.is_metadata
))
496 data
+= self
._encode
_zero
_padded
_str
(info
.path
, 4096)
497 data
+= self
._encode
_zero
_padded
_str
(info
.channel_name
, 255)
500 def _get_has_new_stuff_flags(self
, has_new_metadata
, has_new_data_streams
):
506 if has_new_data_streams
:
511 def encode(self
, reply
):
512 if type(reply
) is _LttngLiveViewerConnectReply
:
514 'QIII', reply
.viewer_session_id
, reply
.major
, reply
.minor
, 2
516 elif type(reply
) is _LttngLiveViewerGetTracingSessionInfosReply
:
517 data
= self
._pack
('I', len(reply
.tracing_session_infos
))
519 for info
in reply
.tracing_session_infos
:
522 info
.tracing_session_id
,
523 info
.live_timer_freq
,
527 data
+= self
._encode
_zero
_padded
_str
(info
.hostname
, 64)
528 data
+= self
._encode
_zero
_padded
_str
(info
.name
, 255)
529 elif type(reply
) is _LttngLiveViewerAttachToTracingSessionReply
:
530 data
= self
._pack
('II', reply
.status
, len(reply
.stream_infos
))
532 for info
in reply
.stream_infos
:
533 data
+= self
._encode
_stream
_info
(info
)
534 elif type(reply
) is _LttngLiveViewerGetNextDataStreamIndexEntryReply
:
535 entry
= reply
.index_entry
536 flags
= self
._get
_has
_new
_stuff
_flags
(
537 reply
.has_new_metadata
, reply
.has_new_data_stream
543 entry
.total_size_bits
,
544 entry
.content_size_bits
,
545 entry
.timestamp_begin
,
547 entry
.events_discarded
,
548 entry
.stream_class_id
,
552 elif type(reply
) is _LttngLiveViewerGetDataStreamPacketDataReply
:
553 flags
= self
._get
_has
_new
_stuff
_flags
(
554 reply
.has_new_metadata
, reply
.has_new_data_stream
556 data
= self
._pack
('III', reply
.status
, len(reply
.data
), flags
)
558 elif type(reply
) is _LttngLiveViewerGetMetadataStreamDataContentReply
:
559 data
= self
._pack
('QI', len(reply
.data
), reply
.status
)
561 elif type(reply
) is _LttngLiveViewerGetNewStreamInfosReply
:
562 data
= self
._pack
('II', reply
.status
, len(reply
.stream_infos
))
564 for info
in reply
.stream_infos
:
565 data
+= self
._encode
_stream
_info
(info
)
566 elif type(reply
) is _LttngLiveViewerCreateViewerSessionReply
:
567 data
= self
._pack
('I', reply
.status
)
568 elif type(reply
) is _LttngLiveViewerDetachFromTracingSessionReply
:
569 data
= self
._pack
('I', reply
.status
)
572 'Unknown reply object with class `{}`'.format(reply
.__class
__.__name
__)
578 # An entry within the index of an LTTng data stream.
579 class _LttngDataStreamIndexEntry
:
590 self
._offset
_bytes
= offset_bytes
591 self
._total
_size
_bits
= total_size_bits
592 self
._content
_size
_bits
= content_size_bits
593 self
._timestamp
_begin
= timestamp_begin
594 self
._timestamp
_end
= timestamp_end
595 self
._events
_discarded
= events_discarded
596 self
._stream
_class
_id
= stream_class_id
599 def offset_bytes(self
):
600 return self
._offset
_bytes
603 def total_size_bits(self
):
604 return self
._total
_size
_bits
607 def total_size_bytes(self
):
608 return self
._total
_size
_bits
// 8
611 def content_size_bits(self
):
612 return self
._content
_size
_bits
615 def content_size_bytes(self
):
616 return self
._content
_size
_bits
// 8
619 def timestamp_begin(self
):
620 return self
._timestamp
_begin
623 def timestamp_end(self
):
624 return self
._timestamp
_end
627 def events_discarded(self
):
628 return self
._events
_discarded
631 def stream_class_id(self
):
632 return self
._stream
_class
_id
635 # The index of an LTTng data stream, a sequence of index entries.
636 class _LttngDataStreamIndex(collections
.abc
.Sequence
):
637 def __init__(self
, path
):
641 'Built data stream index entries: path="{}", count={}'.format(
642 path
, len(self
._entries
)
648 assert os
.path
.isfile(self
._path
)
650 with
open(self
._path
, 'rb') as f
:
653 size
= struct
.calcsize(fmt
)
655 assert len(data
) == size
656 magic
, index_major
, index_minor
, index_entry_length
= struct
.unpack(
659 assert magic
== 0xC1F1DCC1
663 size
= struct
.calcsize(fmt
)
667 'Decoding data stream index entry: path="{}", offset={}'.format(
677 assert len(data
) == size
686 ) = struct
.unpack(fmt
, data
)
688 self
._entries
.append(
689 _LttngDataStreamIndexEntry(
700 # Skip anything else before the next entry
701 f
.seek(index_entry_length
- size
, os
.SEEK_CUR
)
703 def __getitem__(self
, index
):
704 return self
._entries
[index
]
707 return len(self
._entries
)
714 # An LTTng data stream.
715 class _LttngDataStream
:
716 def __init__(self
, path
):
718 filename
= os
.path
.basename(path
)
719 match
= re
.match(r
'(.*)_\d+', filename
)
720 self
._channel
_name
= match
.group(1)
721 trace_dir
= os
.path
.dirname(path
)
722 index_path
= os
.path
.join(trace_dir
, 'index', filename
+ '.idx')
723 self
._index
= _LttngDataStreamIndex(index_path
)
724 assert os
.path
.isfile(path
)
725 self
._file
= open(path
, 'rb')
727 'Built data stream: path="{}", channel-name="{}"'.format(
728 path
, self
._channel
_name
737 def channel_name(self
):
738 return self
._channel
_name
744 def get_data(self
, offset_bytes
, len_bytes
):
745 self
._file
.seek(offset_bytes
)
746 return self
._file
.read(len_bytes
)
749 # An LTTng metadata stream.
750 class _LttngMetadataStream
:
751 def __init__(self
, path
):
753 logging
.info('Built metadata stream: path="{}"'.format(path
))
761 assert os
.path
.isfile(self
._path
)
763 with
open(self
._path
, 'rb') as f
:
767 # An LTTng trace, a sequence of LTTng data streams.
768 class LttngTrace(collections
.abc
.Sequence
):
769 def __init__(self
, trace_dir
):
770 assert os
.path
.isdir(trace_dir
)
771 self
._path
= trace_dir
772 self
._metadata
_stream
= _LttngMetadataStream(
773 os
.path
.join(trace_dir
, 'metadata')
775 self
._create
_data
_streams
(trace_dir
)
776 logging
.info('Built trace: path="{}"'.format(trace_dir
))
778 def _create_data_streams(self
, trace_dir
):
779 data_stream_paths
= []
781 for filename
in os
.listdir(trace_dir
):
782 path
= os
.path
.join(trace_dir
, filename
)
784 if not os
.path
.isfile(path
):
787 if filename
.startswith('.'):
790 if filename
== 'metadata':
793 data_stream_paths
.append(path
)
795 data_stream_paths
.sort()
796 self
._data
_streams
= []
798 for data_stream_path
in data_stream_paths
:
799 self
._data
_streams
.append(_LttngDataStream(data_stream_path
))
806 def metadata_stream(self
):
807 return self
._metadata
_stream
809 def __getitem__(self
, index
):
810 return self
._data
_streams
[index
]
813 return len(self
._data
_streams
)
816 # The state of a single data stream.
817 class _LttngLiveViewerSessionDataStreamState
:
818 def __init__(self
, ts_state
, info
, data_stream
):
819 self
._ts
_state
= ts_state
821 self
._data
_stream
= data_stream
822 self
._cur
_index
_entry
_index
= 0
823 fmt
= 'Built data stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
827 ts_state
.tracing_session_descriptor
.info
.tracing_session_id
,
828 ts_state
.tracing_session_descriptor
.info
.name
,
834 def tracing_session_state(self
):
835 return self
._ts
_state
842 def data_stream(self
):
843 return self
._data
_stream
846 def cur_index_entry(self
):
847 if self
._cur
_index
_entry
_index
== len(self
._data
_stream
.index
):
850 return self
._data
_stream
.index
[self
._cur
_index
_entry
_index
]
852 def goto_next_index_entry(self
):
853 self
._cur
_index
_entry
_index
+= 1
856 # The state of a single metadata stream.
857 class _LttngLiveViewerSessionMetadataStreamState
:
858 def __init__(self
, ts_state
, info
, metadata_stream
):
859 self
._ts
_state
= ts_state
861 self
._metadata
_stream
= metadata_stream
862 self
._is
_sent
= False
863 fmt
= 'Built metadata stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
867 ts_state
.tracing_session_descriptor
.info
.tracing_session_id
,
868 ts_state
.tracing_session_descriptor
.info
.name
,
869 metadata_stream
.path
,
874 def trace_session_state(self
):
875 return self
._trace
_session
_state
882 def metadata_stream(self
):
883 return self
._metadata
_stream
890 def is_sent(self
, value
):
891 self
._is
_sent
= value
894 # The state of a tracing session.
895 class _LttngLiveViewerSessionTracingSessionState
:
896 def __init__(self
, tc_descr
, base_stream_id
):
897 self
._tc
_descr
= tc_descr
898 self
._stream
_infos
= []
901 stream_id
= base_stream_id
903 for trace
in tc_descr
.traces
:
904 trace_id
= stream_id
* 1000
906 # Data streams -> stream infos and data stream states
907 for data_stream
in trace
:
908 info
= _LttngLiveViewerStreamInfo(
913 data_stream
.channel_name
,
915 self
._stream
_infos
.append(info
)
916 self
._ds
_states
[stream_id
] = _LttngLiveViewerSessionDataStreamState(
917 self
, info
, data_stream
921 # Metadata stream -> stream info and metadata stream state
922 info
= _LttngLiveViewerStreamInfo(
923 stream_id
, trace_id
, True, trace
.metadata_stream
.path
, 'metadata'
925 self
._stream
_infos
.append(info
)
926 self
._ms
_states
[stream_id
] = _LttngLiveViewerSessionMetadataStreamState(
927 self
, info
, trace
.metadata_stream
931 self
._is
_attached
= False
932 fmt
= 'Built tracing session state: id={}, name="{}"'
933 logging
.info(fmt
.format(tc_descr
.info
.tracing_session_id
, tc_descr
.info
.name
))
936 def tracing_session_descriptor(self
):
937 return self
._tc
_descr
940 def data_stream_states(self
):
941 return self
._ds
_states
944 def metadata_stream_states(self
):
945 return self
._ms
_states
948 def stream_infos(self
):
949 return self
._stream
_infos
952 def has_new_metadata(self
):
953 return any([not ms
.is_sent
for ms
in self
._ms
_states
.values()])
956 def is_attached(self
):
957 return self
._is
_attached
960 def is_attached(self
, value
):
961 self
._is
_attached
= value
964 # An LTTng live viewer session manages a view on tracing sessions
965 # and replies to commands accordingly.
966 class _LttngLiveViewerSession
:
970 tracing_session_descriptors
,
971 max_query_data_response_size
,
973 self
._viewer
_session
_id
= viewer_session_id
975 self
._stream
_states
= {}
976 self
._max
_query
_data
_response
_size
= max_query_data_response_size
977 total_stream_infos
= 0
979 for ts_descr
in tracing_session_descriptors
:
980 ts_state
= _LttngLiveViewerSessionTracingSessionState(
981 ts_descr
, total_stream_infos
983 ts_id
= ts_state
.tracing_session_descriptor
.info
.tracing_session_id
984 self
._ts
_states
[ts_id
] = ts_state
985 total_stream_infos
+= len(ts_state
.stream_infos
)
987 # Update session's stream states to have the new states
988 self
._stream
_states
.update(ts_state
.data_stream_states
)
989 self
._stream
_states
.update(ts_state
.metadata_stream_states
)
991 self
._command
_handlers
= {
992 _LttngLiveViewerAttachToTracingSessionCommand
: self
._handle
_attach
_to
_tracing
_session
_command
,
993 _LttngLiveViewerCreateViewerSessionCommand
: self
._handle
_create
_viewer
_session
_command
,
994 _LttngLiveViewerDetachFromTracingSessionCommand
: self
._handle
_detach
_from
_tracing
_session
_command
,
995 _LttngLiveViewerGetDataStreamPacketDataCommand
: self
._handle
_get
_data
_stream
_packet
_data
_command
,
996 _LttngLiveViewerGetMetadataStreamDataCommand
: self
._handle
_get
_metadata
_stream
_data
_command
,
997 _LttngLiveViewerGetNewStreamInfosCommand
: self
._handle
_get
_new
_stream
_infos
_command
,
998 _LttngLiveViewerGetNextDataStreamIndexEntryCommand
: self
._handle
_get
_next
_data
_stream
_index
_entry
_command
,
999 _LttngLiveViewerGetTracingSessionInfosCommand
: self
._handle
_get
_tracing
_session
_infos
_command
,
1003 def viewer_session_id(self
):
1004 return self
._viewer
_session
_id
1006 def _get_tracing_session_state(self
, tracing_session_id
):
1007 if tracing_session_id
not in self
._ts
_states
:
1008 raise UnexpectedInput(
1009 'Unknown tracing session ID {}'.format(tracing_session_id
)
1012 return self
._ts
_states
[tracing_session_id
]
1014 def _get_stream_state(self
, stream_id
):
1015 if stream_id
not in self
._stream
_states
:
1016 UnexpectedInput('Unknown stream ID {}'.format(stream_id
))
1018 return self
._stream
_states
[stream_id
]
1020 def handle_command(self
, cmd
):
1022 'Handling command in viewer session: cmd-cls-name={}'.format(
1023 cmd
.__class
__.__name
__
1026 cmd_type
= type(cmd
)
1028 if cmd_type
not in self
._command
_handlers
:
1029 raise UnexpectedInput(
1030 'Unexpected command: cmd-cls-name={}'.format(cmd
.__class
__.__name
__)
1033 return self
._command
_handlers
[cmd_type
](cmd
)
1035 def _handle_attach_to_tracing_session_command(self
, cmd
):
1036 fmt
= 'Handling "attach to tracing session" command: ts-id={}, offset={}, seek-type={}'
1037 logging
.info(fmt
.format(cmd
.tracing_session_id
, cmd
.offset
, cmd
.seek_type
))
1038 ts_state
= self
._get
_tracing
_session
_state
(cmd
.tracing_session_id
)
1039 info
= ts_state
.tracing_session_descriptor
.info
1041 if ts_state
.is_attached
:
1042 raise UnexpectedInput(
1043 'Cannot attach to tracing session `{}`: viewer is already attached'.format(
1048 ts_state
.is_attached
= True
1049 status
= _LttngLiveViewerAttachToTracingSessionReply
.Status
.OK
1050 return _LttngLiveViewerAttachToTracingSessionReply(
1051 status
, ts_state
.stream_infos
1054 def _handle_detach_from_tracing_session_command(self
, cmd
):
1055 fmt
= 'Handling "detach from tracing session" command: ts-id={}'
1056 logging
.info(fmt
.format(cmd
.tracing_session_id
))
1057 ts_state
= self
._get
_tracing
_session
_state
(cmd
.tracing_session_id
)
1058 info
= ts_state
.tracing_session_descriptor
.info
1060 if not ts_state
.is_attached
:
1061 raise UnexpectedInput(
1062 'Cannot detach to tracing session `{}`: viewer is not attached'.format(
1067 ts_state
.is_attached
= False
1068 status
= _LttngLiveViewerDetachFromTracingSessionReply
.Status
.OK
1069 return _LttngLiveViewerDetachFromTracingSessionReply(status
)
1071 def _handle_get_next_data_stream_index_entry_command(self
, cmd
):
1072 fmt
= 'Handling "get next data stream index entry" command: stream-id={}'
1073 logging
.info(fmt
.format(cmd
.stream_id
))
1074 stream_state
= self
._get
_stream
_state
(cmd
.stream_id
)
1076 if type(stream_state
) is not _LttngLiveViewerSessionDataStreamState
:
1077 raise UnexpectedInput(
1078 'Stream with ID {} is not a data stream'.format(cmd
.stream_id
)
1081 if stream_state
.cur_index_entry
is None:
1082 # The viewer is done reading this stream
1083 status
= _LttngLiveViewerGetNextDataStreamIndexEntryReply
.Status
.HUP
1085 # Dummy data stream index entry to use with the `HUP` status
1086 # (the reply needs one, but the viewer ignores it)
1087 index_entry
= _LttngDataStreamIndexEntry(0, 0, 0, 0, 0, 0, 0)
1089 return _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1090 status
, index_entry
, False, False
1093 # The viewer only checks the `has_new_metadata` flag if the
1094 # reply's status is `OK`, so we need to provide an index here
1095 has_new_metadata
= stream_state
.tracing_session_state
.has_new_metadata
1096 status
= _LttngLiveViewerGetNextDataStreamIndexEntryReply
.Status
.OK
1097 reply
= _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1098 status
, stream_state
.cur_index_entry
, has_new_metadata
, False
1100 stream_state
.goto_next_index_entry()
1103 def _handle_get_data_stream_packet_data_command(self
, cmd
):
1104 fmt
= 'Handling "get data stream packet data" command: stream-id={}, offset={}, req-length={}'
1105 logging
.info(fmt
.format(cmd
.stream_id
, cmd
.offset
, cmd
.req_length
))
1106 stream_state
= self
._get
_stream
_state
(cmd
.stream_id
)
1107 data_response_length
= cmd
.req_length
1109 if type(stream_state
) is not _LttngLiveViewerSessionDataStreamState
:
1110 raise UnexpectedInput(
1111 'Stream with ID {} is not a data stream'.format(cmd
.stream_id
)
1114 if stream_state
.tracing_session_state
.has_new_metadata
:
1115 status
= _LttngLiveViewerGetDataStreamPacketDataReply
.Status
.ERROR
1116 return _LttngLiveViewerGetDataStreamPacketDataReply(
1117 status
, bytes(), True, False
1120 if self
._max
_query
_data
_response
_size
:
1121 # Enforce a server side limit on the query requested length.
1122 # To ensure that the transaction terminate take the minimum of both
1124 data_response_length
= min(
1125 cmd
.req_length
, self
._max
_query
_data
_response
_size
1127 fmt
= 'Limiting "get data stream packet data" command: req-length={} actual response size={}'
1128 logging
.info(fmt
.format(cmd
.req_length
, data_response_length
))
1130 data
= stream_state
.data_stream
.get_data(cmd
.offset
, data_response_length
)
1131 status
= _LttngLiveViewerGetDataStreamPacketDataReply
.Status
.OK
1132 return _LttngLiveViewerGetDataStreamPacketDataReply(status
, data
, False, False)
1134 def _handle_get_metadata_stream_data_command(self
, cmd
):
1135 fmt
= 'Handling "get metadata stream data" command: stream-id={}'
1136 logging
.info(fmt
.format(cmd
.stream_id
))
1137 stream_state
= self
._get
_stream
_state
(cmd
.stream_id
)
1139 if type(stream_state
) is not _LttngLiveViewerSessionMetadataStreamState
:
1140 raise UnexpectedInput(
1141 'Stream with ID {} is not a metadata stream'.format(cmd
.stream_id
)
1144 if stream_state
.is_sent
:
1145 status
= _LttngLiveViewerGetMetadataStreamDataContentReply
.Status
.NO_NEW
1146 return _LttngLiveViewerGetMetadataStreamDataContentReply(status
, bytes())
1148 stream_state
.is_sent
= True
1149 status
= _LttngLiveViewerGetMetadataStreamDataContentReply
.Status
.OK
1150 return _LttngLiveViewerGetMetadataStreamDataContentReply(
1151 status
, stream_state
.metadata_stream
.data
1154 def _handle_get_new_stream_infos_command(self
, cmd
):
1155 fmt
= 'Handling "get new stream infos" command: ts-id={}'
1156 logging
.info(fmt
.format(cmd
.tracing_session_id
))
1158 # As of this version, all the tracing session's stream infos are
1159 # always given to the viewer when sending the "attach to tracing
1160 # session" reply, so there's nothing new here. Return the `HUP`
1161 # status as, if we're handling this command, the viewer consumed
1162 # all the existing data streams.
1163 status
= _LttngLiveViewerGetNewStreamInfosReply
.Status
.HUP
1164 return _LttngLiveViewerGetNewStreamInfosReply(status
, [])
1166 def _handle_get_tracing_session_infos_command(self
, cmd
):
1167 logging
.info('Handling "get tracing session infos" command.')
1169 tss
.tracing_session_descriptor
.info
for tss
in self
._ts
_states
.values()
1171 infos
.sort(key
=lambda info
: info
.name
)
1172 return _LttngLiveViewerGetTracingSessionInfosReply(infos
)
1174 def _handle_create_viewer_session_command(self
, cmd
):
1175 logging
.info('Handling "create viewer session" command.')
1176 status
= _LttngLiveViewerCreateViewerSessionReply
.Status
.OK
1178 # This does nothing here. In the LTTng relay daemon, it
1179 # allocates the viewer session's state.
1180 return _LttngLiveViewerCreateViewerSessionReply(status
)
1183 # An LTTng live TCP server.
1185 # On creation, it binds to `localhost` with an OS-assigned TCP port. It writes
1186 # the decimal TCP port number to a temporary port file. It renames the
1187 # temporary port file to `port_filename`.
1189 # `tracing_session_descriptors` is a list of tracing session descriptors
1190 # (`LttngTracingSessionDescriptor`) to serve.
1192 # This server accepts a single viewer (client).
1194 # When the viewer closes the connection, the server's constructor
1196 class LttngLiveServer
:
1198 self
, port_filename
, tracing_session_descriptors
, max_query_data_response_size
1200 logging
.info('Server configuration:')
1202 logging
.info(' Port file name: `{}`'.format(port_filename
))
1204 if max_query_data_response_size
is not None:
1206 ' Maximum response data query size: `{}`'.format(
1207 max_query_data_response_size
1211 for ts_descr
in tracing_session_descriptors
:
1212 info
= ts_descr
.info
1213 fmt
= ' TS descriptor: name="{}", id={}, hostname="{}", live-timer-freq={}, client-count={}, stream-count={}:'
1217 info
.tracing_session_id
,
1219 info
.live_timer_freq
,
1225 for trace
in ts_descr
.traces
:
1226 logging
.info(' Trace: path="{}"'.format(trace
.path
))
1228 self
._ts
_descriptors
= tracing_session_descriptors
1229 self
._max
_query
_data
_response
_size
= max_query_data_response_size
1230 self
._sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
1231 self
._codec
= _LttngLiveViewerProtocolCodec()
1233 # Port 0: OS assigns an unused port
1234 serv_addr
= ('localhost', 0)
1235 self
._sock
.bind(serv_addr
)
1236 self
._write
_port
_to
_file
(port_filename
)
1242 logging
.info('Closed connection and socket.')
1245 def _server_port(self
):
1246 return self
._sock
.getsockname()[1]
1248 def _recv_command(self
):
1252 logging
.info('Waiting for viewer command.')
1253 buf
= self
._conn
.recv(128)
1256 logging
.info('Client closed connection.')
1259 raise UnexpectedInput(
1260 'Client closed connection after having sent {} command bytes.'.format(
1267 logging
.info('Received data from viewer: length={}'.format(len(buf
)))
1272 cmd
= self
._codec
.decode(data
)
1273 except struct
.error
as exc
:
1274 raise UnexpectedInput('Malformed command: {}'.format(exc
)) from exc
1278 'Received command from viewer: cmd-cls-name={}'.format(
1279 cmd
.__class
__.__name
__
1284 def _send_reply(self
, reply
):
1285 data
= self
._codec
.encode(reply
)
1287 'Sending reply to viewer: reply-cls-name={}, length={}'.format(
1288 reply
.__class
__.__name
__, len(data
)
1291 self
._conn
.sendall(data
)
1293 def _handle_connection(self
):
1294 # First command must be "connect"
1295 cmd
= self
._recv
_command
()
1297 if type(cmd
) is not _LttngLiveViewerConnectCommand
:
1298 raise UnexpectedInput(
1299 'First command is not "connect": cmd-cls-name={}'.format(
1300 cmd
.__class
__.__name
__
1304 # Create viewer session (arbitrary ID 23)
1306 'LTTng live viewer connected: version={}.{}'.format(cmd
.major
, cmd
.minor
)
1308 viewer_session
= _LttngLiveViewerSession(
1309 23, self
._ts
_descriptors
, self
._max
_query
_data
_response
_size
1312 # Send "connect" reply
1314 _LttngLiveViewerConnectReply(viewer_session
.viewer_session_id
, 2, 10)
1317 # Make the viewer session handle the remaining commands
1319 cmd
= self
._recv
_command
()
1322 # Connection closed (at an expected location within the
1326 self
._send
_reply
(viewer_session
.handle_command(cmd
))
1329 logging
.info('Listening: port={}'.format(self
._server
_port
))
1330 # Backlog must be present for Python version < 3.5.
1331 # 128 is an arbitrary number since we expect only 1 connection anyway.
1332 self
._sock
.listen(128)
1333 self
._conn
, viewer_addr
= self
._sock
.accept()
1335 'Accepted viewer: addr={}:{}'.format(viewer_addr
[0], viewer_addr
[1])
1339 self
._handle
_connection
()
1343 def _write_port_to_file(self
, port_filename
):
1344 # Write the port number to a temporary file.
1345 with tempfile
.NamedTemporaryFile(mode
='w', delete
=False) as tmp_port_file
:
1346 print(self
._server
_port
, end
='', file=tmp_port_file
)
1348 # Rename temporary file to real file.
1350 # For unknown reasons, on Windows, moving the port file from its
1351 # temporary location to its final location (where the user of
1352 # the server expects it to appear) may raise a `PermissionError`
1355 # We suppose it's possible that something in the Windows kernel
1356 # hasn't completely finished using the file when we try to move
1359 # Use a wait-and-retry scheme as a (bad) workaround.
1363 for attempt
in reversed(range(num_attempts
)):
1365 os
.replace(tmp_port_file
.name
, port_filename
)
1367 'Renamed port file: src-path="{}", dst-path="{}"'.format(
1368 tmp_port_file
.name
, port_filename
1372 except PermissionError
:
1374 'Permission error while attempting to rename port file; retrying in {} second: src-path="{}", dst-path="{}"'.format(
1375 retry_delay_s
, tmp_port_file
.name
, port_filename
1382 time
.sleep(retry_delay_s
)
1385 # A tracing session descriptor.
1387 # In the constructor, `traces` is a list of LTTng traces (`LttngTrace`
1389 class LttngTracingSessionDescriptor
:
1391 self
, name
, tracing_session_id
, hostname
, live_timer_freq
, client_count
, traces
1393 for trace
in traces
:
1394 if name
not in trace
.path
:
1395 fmt
= 'Tracing session name must be part of every trace path (`{}` not found in `{}`)'
1396 raise ValueError(fmt
.format(name
, trace
.path
))
1398 self
._traces
= traces
1399 stream_count
= sum([len(t
) + 1 for t
in traces
])
1400 self
._info
= _LttngLiveViewerTracingSessionInfo(
1418 def _tracing_session_descriptors_from_arg(string
):
1420 # NAME,ID,HOSTNAME,FREQ,CLIENTS,TRACEPATH[,TRACEPATH]...
1421 parts
= string
.split(',')
1423 tracing_session_id
= int(parts
[1])
1425 live_timer_freq
= int(parts
[3])
1426 client_count
= int(parts
[4])
1427 traces
= [LttngTrace(path
) for path
in parts
[5:]]
1428 return LttngTracingSessionDescriptor(
1429 name
, tracing_session_id
, hostname
, live_timer_freq
, client_count
, traces
1433 def _loglevel_parser(string
):
1434 loglevels
= {'info': logging
.INFO
, 'warning': logging
.WARNING
}
1435 if string
not in loglevels
:
1436 msg
= "{} is not a valid loglevel".format(string
)
1437 raise argparse
.ArgumentTypeError(msg
)
1438 return loglevels
[string
]
1441 if __name__
== '__main__':
1442 logging
.basicConfig(format
='# %(asctime)-25s%(message)s')
1443 parser
= argparse
.ArgumentParser(
1444 description
='LTTng-live protocol mocker', add_help
=False
1446 parser
.add_argument(
1449 choices
=['info', 'warning'],
1450 help='The loglevel to be used.',
1453 loglevel_namespace
, remaining_args
= parser
.parse_known_args()
1454 logging
.getLogger().setLevel(_loglevel_parser(loglevel_namespace
.log_level
))
1456 parser
.add_argument(
1458 help='The final port file. This file is present when the server is ready to receive connection.',
1461 parser
.add_argument(
1462 '--max-query-data-response-size',
1464 help='The maximum size of control data response in bytes',
1466 parser
.add_argument(
1470 type=_tracing_session_descriptors_from_arg
,
1471 help='A session configuration. There is no space after comma. Format is: NAME,ID,HOSTNAME,FREQ,CLIENTS,TRACEPATH[,TRACEPATH]....',
1473 parser
.add_argument(
1477 default
=argparse
.SUPPRESS
,
1478 help='Show this help message and exit.',
1481 args
= parser
.parse_args(args
=remaining_args
)
1484 args
.port_filename
, args
.sessions
, args
.max_query_data_response_size
1486 except UnexpectedInput
as exc
:
1487 logging
.error(str(exc
))
1488 print(exc
, file=sys
.stderr
)