1 # The MIT License (MIT)
3 # Copyright (c) 2019 Philippe Proulx <pproulx@efficios.com>
5 # Permission is hereby granted, free of charge, to any person obtaining a copy
6 # of this software and associated documentation files (the "Software"), to deal
7 # in the Software without restriction, including without limitation the rights
8 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 # copies of the Software, and to permit persons to whom the Software is
10 # furnished to do so, subject to the following conditions:
12 # The above copyright notice and this permission notice shall be included in
13 # all copies or substantial portions of the Software.
15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
24 import collections
.abc
35 class UnexpectedInput(RuntimeError):
39 class _LttngLiveViewerCommand
:
40 def __init__(self
, version
):
41 self
._version
= version
48 class _LttngLiveViewerConnectCommand(_LttngLiveViewerCommand
):
49 def __init__(self
, version
, viewer_session_id
, major
, minor
):
50 super().__init
__(version
)
51 self
._viewer
_session
_id
= viewer_session_id
56 def viewer_session_id(self
):
57 return self
._viewer
_session
_id
68 class _LttngLiveViewerConnectReply
:
69 def __init__(self
, viewer_session_id
, major
, minor
):
70 self
._viewer
_session
_id
= viewer_session_id
75 def viewer_session_id(self
):
76 return self
._viewer
_session
_id
87 class _LttngLiveViewerGetTracingSessionInfosCommand(_LttngLiveViewerCommand
):
91 class _LttngLiveViewerTracingSessionInfo
:
101 self
._tracing
_session
_id
= tracing_session_id
102 self
._live
_timer
_freq
= live_timer_freq
103 self
._client
_count
= client_count
104 self
._stream
_count
= stream_count
105 self
._hostname
= hostname
109 def tracing_session_id(self
):
110 return self
._tracing
_session
_id
113 def live_timer_freq(self
):
114 return self
._live
_timer
_freq
117 def client_count(self
):
118 return self
._client
_count
121 def stream_count(self
):
122 return self
._stream
_count
126 return self
._hostname
133 class _LttngLiveViewerGetTracingSessionInfosReply
:
134 def __init__(self
, tracing_session_infos
):
135 self
._tracing
_session
_infos
= tracing_session_infos
138 def tracing_session_infos(self
):
139 return self
._tracing
_session
_infos
142 class _LttngLiveViewerAttachToTracingSessionCommand(_LttngLiveViewerCommand
):
147 def __init__(self
, version
, tracing_session_id
, offset
, seek_type
):
148 super().__init
__(version
)
149 self
._tracing
_session
_id
= tracing_session_id
150 self
._offset
= offset
151 self
._seek
_type
= seek_type
154 def tracing_session_id(self
):
155 return self
._tracing
_session
_id
163 return self
._seek
_type
166 class _LttngLiveViewerStreamInfo
:
167 def __init__(self
, id, trace_id
, is_metadata
, path
, channel_name
):
169 self
._trace
_id
= trace_id
170 self
._is
_metadata
= is_metadata
172 self
._channel
_name
= channel_name
180 return self
._trace
_id
183 def is_metadata(self
):
184 return self
._is
_metadata
191 def channel_name(self
):
192 return self
._channel
_name
195 class _LttngLiveViewerAttachToTracingSessionReply
:
204 def __init__(self
, status
, stream_infos
):
205 self
._status
= status
206 self
._stream
_infos
= stream_infos
213 def stream_infos(self
):
214 return self
._stream
_infos
217 class _LttngLiveViewerGetNextDataStreamIndexEntryCommand(_LttngLiveViewerCommand
):
218 def __init__(self
, version
, stream_id
):
219 super().__init
__(version
)
220 self
._stream
_id
= stream_id
224 return self
._stream
_id
227 class _LttngLiveViewerGetNextDataStreamIndexEntryReply
:
236 def __init__(self
, status
, index_entry
, has_new_metadata
, has_new_data_stream
):
237 self
._status
= status
238 self
._index
_entry
= index_entry
239 self
._has
_new
_metadata
= has_new_metadata
240 self
._has
_new
_data
_stream
= has_new_data_stream
247 def index_entry(self
):
248 return self
._index
_entry
251 def has_new_metadata(self
):
252 return self
._has
_new
_metadata
255 def has_new_data_stream(self
):
256 return self
._has
_new
_data
_stream
259 class _LttngLiveViewerGetDataStreamPacketDataCommand(_LttngLiveViewerCommand
):
260 def __init__(self
, version
, stream_id
, offset
, req_length
):
261 super().__init
__(version
)
262 self
._stream
_id
= stream_id
263 self
._offset
= offset
264 self
._req
_length
= req_length
268 return self
._stream
_id
275 def req_length(self
):
276 return self
._req
_length
279 class _LttngLiveViewerGetDataStreamPacketDataReply
:
286 def __init__(self
, status
, data
, has_new_metadata
, has_new_data_stream
):
287 self
._status
= status
289 self
._has
_new
_metadata
= has_new_metadata
290 self
._has
_new
_data
_stream
= has_new_data_stream
301 def has_new_metadata(self
):
302 return self
._has
_new
_metadata
305 def has_new_data_stream(self
):
306 return self
._has
_new
_data
_stream
309 class _LttngLiveViewerGetMetadataStreamDataCommand(_LttngLiveViewerCommand
):
310 def __init__(self
, version
, stream_id
):
311 super().__init
__(version
)
312 self
._stream
_id
= stream_id
316 return self
._stream
_id
319 class _LttngLiveViewerGetMetadataStreamDataContentReply
:
325 def __init__(self
, status
, data
):
326 self
._status
= status
338 class _LttngLiveViewerGetNewStreamInfosCommand(_LttngLiveViewerCommand
):
339 def __init__(self
, version
, tracing_session_id
):
340 super().__init
__(version
)
341 self
._tracing
_session
_id
= tracing_session_id
344 def tracing_session_id(self
):
345 return self
._tracing
_session
_id
348 class _LttngLiveViewerGetNewStreamInfosReply
:
355 def __init__(self
, status
, stream_infos
):
356 self
._status
= status
357 self
._stream
_infos
= stream_infos
364 def stream_infos(self
):
365 return self
._stream
_infos
368 class _LttngLiveViewerCreateViewerSessionCommand(_LttngLiveViewerCommand
):
372 class _LttngLiveViewerCreateViewerSessionReply
:
377 def __init__(self
, status
):
378 self
._status
= status
385 class _LttngLiveViewerDetachFromTracingSessionCommand(_LttngLiveViewerCommand
):
386 def __init__(self
, version
, tracing_session_id
):
387 super().__init
__(version
)
388 self
._tracing
_session
_id
= tracing_session_id
391 def tracing_session_id(self
):
392 return self
._tracing
_session
_id
395 class _LttngLiveViewerDetachFromTracingSessionReply
:
401 def __init__(self
, status
):
402 self
._status
= status
409 # An LTTng live protocol codec can convert bytes to command objects and
410 # reply objects to bytes.
411 class _LttngLiveViewerProtocolCodec
:
412 _COMMAND_HEADER_STRUCT_FMT
= 'QII'
413 _COMMAND_HEADER_SIZE_BYTES
= struct
.calcsize(_COMMAND_HEADER_STRUCT_FMT
)
418 def _unpack(self
, fmt
, data
, offset
=0):
420 return struct
.unpack_from(fmt
, data
, offset
)
422 def _unpack_payload(self
, fmt
, data
):
424 fmt
, data
, _LttngLiveViewerProtocolCodec
._COMMAND
_HEADER
_SIZE
_BYTES
427 def decode(self
, data
):
428 if len(data
) < self
._COMMAND
_HEADER
_SIZE
_BYTES
:
429 # Not enough data to read the command header
432 payload_size
, cmd_type
, version
= self
._unpack
(
433 self
._COMMAND
_HEADER
_STRUCT
_FMT
, data
436 'Decoded command header: payload-size={}, cmd-type={}, version={}'.format(
437 payload_size
, cmd_type
, version
441 if len(data
) < self
._COMMAND
_HEADER
_SIZE
_BYTES
+ payload_size
:
442 # Not enough data to read the whole command
446 viewer_session_id
, major
, minor
, conn_type
= self
._unpack
_payload
(
449 return _LttngLiveViewerConnectCommand(
450 version
, viewer_session_id
, major
, minor
453 return _LttngLiveViewerGetTracingSessionInfosCommand(version
)
455 tracing_session_id
, offset
, seek_type
= self
._unpack
_payload
('QQI', data
)
456 return _LttngLiveViewerAttachToTracingSessionCommand(
457 version
, tracing_session_id
, offset
, seek_type
460 (stream_id
,) = self
._unpack
_payload
('Q', data
)
461 return _LttngLiveViewerGetNextDataStreamIndexEntryCommand(
465 stream_id
, offset
, req_length
= self
._unpack
_payload
('QQI', data
)
466 return _LttngLiveViewerGetDataStreamPacketDataCommand(
467 version
, stream_id
, offset
, req_length
470 (stream_id
,) = self
._unpack
_payload
('Q', data
)
471 return _LttngLiveViewerGetMetadataStreamDataCommand(version
, stream_id
)
473 (tracing_session_id
,) = self
._unpack
_payload
('Q', data
)
474 return _LttngLiveViewerGetNewStreamInfosCommand(version
, tracing_session_id
)
476 return _LttngLiveViewerCreateViewerSessionCommand(version
)
478 (tracing_session_id
,) = self
._unpack
_payload
('Q', data
)
479 return _LttngLiveViewerDetachFromTracingSessionCommand(
480 version
, tracing_session_id
483 raise UnexpectedInput('Unknown command type {}'.format(cmd_type
))
485 def _pack(self
, fmt
, *args
):
486 # Force network byte order
487 return struct
.pack('!' + fmt
, *args
)
489 def _encode_zero_padded_str(self
, string
, length
):
490 data
= string
.encode()
491 return data
.ljust(length
, b
'\x00')
493 def _encode_stream_info(self
, info
):
494 data
= self
._pack
('QQI', info
.id, info
.trace_id
, int(info
.is_metadata
))
495 data
+= self
._encode
_zero
_padded
_str
(info
.path
, 4096)
496 data
+= self
._encode
_zero
_padded
_str
(info
.channel_name
, 255)
499 def _get_has_new_stuff_flags(self
, has_new_metadata
, has_new_data_streams
):
505 if has_new_data_streams
:
510 def encode(self
, reply
):
511 if type(reply
) is _LttngLiveViewerConnectReply
:
513 'QIII', reply
.viewer_session_id
, reply
.major
, reply
.minor
, 2
515 elif type(reply
) is _LttngLiveViewerGetTracingSessionInfosReply
:
516 data
= self
._pack
('I', len(reply
.tracing_session_infos
))
518 for info
in reply
.tracing_session_infos
:
521 info
.tracing_session_id
,
522 info
.live_timer_freq
,
526 data
+= self
._encode
_zero
_padded
_str
(info
.hostname
, 64)
527 data
+= self
._encode
_zero
_padded
_str
(info
.name
, 255)
528 elif type(reply
) is _LttngLiveViewerAttachToTracingSessionReply
:
529 data
= self
._pack
('II', reply
.status
, len(reply
.stream_infos
))
531 for info
in reply
.stream_infos
:
532 data
+= self
._encode
_stream
_info
(info
)
533 elif type(reply
) is _LttngLiveViewerGetNextDataStreamIndexEntryReply
:
534 entry
= reply
.index_entry
535 flags
= self
._get
_has
_new
_stuff
_flags
(
536 reply
.has_new_metadata
, reply
.has_new_data_stream
542 entry
.total_size_bits
,
543 entry
.content_size_bits
,
544 entry
.timestamp_begin
,
546 entry
.events_discarded
,
547 entry
.stream_class_id
,
551 elif type(reply
) is _LttngLiveViewerGetDataStreamPacketDataReply
:
552 flags
= self
._get
_has
_new
_stuff
_flags
(
553 reply
.has_new_metadata
, reply
.has_new_data_stream
555 data
= self
._pack
('III', reply
.status
, len(reply
.data
), flags
)
557 elif type(reply
) is _LttngLiveViewerGetMetadataStreamDataContentReply
:
558 data
= self
._pack
('QI', len(reply
.data
), reply
.status
)
560 elif type(reply
) is _LttngLiveViewerGetNewStreamInfosReply
:
561 data
= self
._pack
('II', reply
.status
, len(reply
.stream_infos
))
563 for info
in reply
.stream_infos
:
564 data
+= self
._encode
_stream
_info
(info
)
565 elif type(reply
) is _LttngLiveViewerCreateViewerSessionReply
:
566 data
= self
._pack
('I', reply
.status
)
567 elif type(reply
) is _LttngLiveViewerDetachFromTracingSessionReply
:
568 data
= self
._pack
('I', reply
.status
)
571 'Unknown reply object with class `{}`'.format(reply
.__class
__.__name
__)
577 # An entry within the index of an LTTng data stream.
578 class _LttngDataStreamIndexEntry
:
589 self
._offset
_bytes
= offset_bytes
590 self
._total
_size
_bits
= total_size_bits
591 self
._content
_size
_bits
= content_size_bits
592 self
._timestamp
_begin
= timestamp_begin
593 self
._timestamp
_end
= timestamp_end
594 self
._events
_discarded
= events_discarded
595 self
._stream
_class
_id
= stream_class_id
598 def offset_bytes(self
):
599 return self
._offset
_bytes
602 def total_size_bits(self
):
603 return self
._total
_size
_bits
606 def total_size_bytes(self
):
607 return self
._total
_size
_bits
// 8
610 def content_size_bits(self
):
611 return self
._content
_size
_bits
614 def content_size_bytes(self
):
615 return self
._content
_size
_bits
// 8
618 def timestamp_begin(self
):
619 return self
._timestamp
_begin
622 def timestamp_end(self
):
623 return self
._timestamp
_end
626 def events_discarded(self
):
627 return self
._events
_discarded
630 def stream_class_id(self
):
631 return self
._stream
_class
_id
634 # The index of an LTTng data stream, a sequence of index entries.
635 class _LttngDataStreamIndex(collections
.abc
.Sequence
):
636 def __init__(self
, path
):
640 'Built data stream index entries: path="{}", count={}'.format(
641 path
, len(self
._entries
)
647 assert os
.path
.isfile(self
._path
)
649 with
open(self
._path
, 'rb') as f
:
652 size
= struct
.calcsize(fmt
)
654 assert len(data
) == size
655 magic
, index_major
, index_minor
, index_entry_length
= struct
.unpack(
658 assert magic
== 0xC1F1DCC1
662 size
= struct
.calcsize(fmt
)
666 'Decoding data stream index entry: path="{}", offset={}'.format(
676 assert len(data
) == size
685 ) = struct
.unpack(fmt
, data
)
687 self
._entries
.append(
688 _LttngDataStreamIndexEntry(
699 # Skip anything else before the next entry
700 f
.seek(index_entry_length
- size
, os
.SEEK_CUR
)
702 def __getitem__(self
, index
):
703 return self
._entries
[index
]
706 return len(self
._entries
)
713 # An LTTng data stream.
714 class _LttngDataStream
:
715 def __init__(self
, path
):
717 filename
= os
.path
.basename(path
)
718 match
= re
.match(r
'(.*)_\d+', filename
)
719 self
._channel
_name
= match
.group(1)
720 trace_dir
= os
.path
.dirname(path
)
721 index_path
= os
.path
.join(trace_dir
, 'index', filename
+ '.idx')
722 self
._index
= _LttngDataStreamIndex(index_path
)
723 assert os
.path
.isfile(path
)
724 self
._file
= open(path
, 'rb')
726 'Built data stream: path="{}", channel-name="{}"'.format(
727 path
, self
._channel
_name
736 def channel_name(self
):
737 return self
._channel
_name
743 def get_data(self
, offset_bytes
, len_bytes
):
744 self
._file
.seek(offset_bytes
)
745 return self
._file
.read(len_bytes
)
748 # An LTTng metadata stream.
749 class _LttngMetadataStream
:
750 def __init__(self
, path
):
752 logging
.info('Built metadata stream: path="{}"'.format(path
))
760 assert os
.path
.isfile(self
._path
)
762 with
open(self
._path
, 'rb') as f
:
766 # An LTTng trace, a sequence of LTTng data streams.
767 class LttngTrace(collections
.abc
.Sequence
):
768 def __init__(self
, trace_dir
):
769 assert os
.path
.isdir(trace_dir
)
770 self
._path
= trace_dir
771 self
._metadata
_stream
= _LttngMetadataStream(
772 os
.path
.join(trace_dir
, 'metadata')
774 self
._create
_data
_streams
(trace_dir
)
775 logging
.info('Built trace: path="{}"'.format(trace_dir
))
777 def _create_data_streams(self
, trace_dir
):
778 data_stream_paths
= []
780 for filename
in os
.listdir(trace_dir
):
781 path
= os
.path
.join(trace_dir
, filename
)
783 if not os
.path
.isfile(path
):
786 if filename
.startswith('.'):
789 if filename
== 'metadata':
792 data_stream_paths
.append(path
)
794 data_stream_paths
.sort()
795 self
._data
_streams
= []
797 for data_stream_path
in data_stream_paths
:
798 self
._data
_streams
.append(_LttngDataStream(data_stream_path
))
805 def metadata_stream(self
):
806 return self
._metadata
_stream
808 def __getitem__(self
, index
):
809 return self
._data
_streams
[index
]
812 return len(self
._data
_streams
)
815 # The state of a single data stream.
816 class _LttngLiveViewerSessionDataStreamState
:
817 def __init__(self
, ts_state
, info
, data_stream
):
818 self
._ts
_state
= ts_state
820 self
._data
_stream
= data_stream
821 self
._cur
_index
_entry
_index
= 0
822 fmt
= 'Built data stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
826 ts_state
.tracing_session_descriptor
.info
.tracing_session_id
,
827 ts_state
.tracing_session_descriptor
.info
.name
,
833 def tracing_session_state(self
):
834 return self
._ts
_state
841 def data_stream(self
):
842 return self
._data
_stream
845 def cur_index_entry(self
):
846 if self
._cur
_index
_entry
_index
== len(self
._data
_stream
.index
):
849 return self
._data
_stream
.index
[self
._cur
_index
_entry
_index
]
851 def goto_next_index_entry(self
):
852 self
._cur
_index
_entry
_index
+= 1
855 # The state of a single metadata stream.
856 class _LttngLiveViewerSessionMetadataStreamState
:
857 def __init__(self
, ts_state
, info
, metadata_stream
):
858 self
._ts
_state
= ts_state
860 self
._metadata
_stream
= metadata_stream
861 self
._is
_sent
= False
862 fmt
= 'Built metadata stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
866 ts_state
.tracing_session_descriptor
.info
.tracing_session_id
,
867 ts_state
.tracing_session_descriptor
.info
.name
,
868 metadata_stream
.path
,
873 def trace_session_state(self
):
874 return self
._trace
_session
_state
881 def metadata_stream(self
):
882 return self
._metadata
_stream
889 def is_sent(self
, value
):
890 self
._is
_sent
= value
893 # The state of a tracing session.
894 class _LttngLiveViewerSessionTracingSessionState
:
895 def __init__(self
, tc_descr
, base_stream_id
):
896 self
._tc
_descr
= tc_descr
897 self
._stream
_infos
= []
900 stream_id
= base_stream_id
902 for trace
in tc_descr
.traces
:
903 trace_id
= stream_id
* 1000
905 # Data streams -> stream infos and data stream states
906 for data_stream
in trace
:
907 info
= _LttngLiveViewerStreamInfo(
912 data_stream
.channel_name
,
914 self
._stream
_infos
.append(info
)
915 self
._ds
_states
[stream_id
] = _LttngLiveViewerSessionDataStreamState(
916 self
, info
, data_stream
920 # Metadata stream -> stream info and metadata stream state
921 info
= _LttngLiveViewerStreamInfo(
922 stream_id
, trace_id
, True, trace
.metadata_stream
.path
, 'metadata'
924 self
._stream
_infos
.append(info
)
925 self
._ms
_states
[stream_id
] = _LttngLiveViewerSessionMetadataStreamState(
926 self
, info
, trace
.metadata_stream
930 self
._is
_attached
= False
931 fmt
= 'Built tracing session state: id={}, name="{}"'
932 logging
.info(fmt
.format(tc_descr
.info
.tracing_session_id
, tc_descr
.info
.name
))
935 def tracing_session_descriptor(self
):
936 return self
._tc
_descr
939 def data_stream_states(self
):
940 return self
._ds
_states
943 def metadata_stream_states(self
):
944 return self
._ms
_states
947 def stream_infos(self
):
948 return self
._stream
_infos
951 def has_new_metadata(self
):
952 return any([not ms
.is_sent
for ms
in self
._ms
_states
.values()])
955 def is_attached(self
):
956 return self
._is
_attached
959 def is_attached(self
, value
):
960 self
._is
_attached
= value
963 # An LTTng live viewer session manages a view on tracing sessions
964 # and replies to commands accordingly.
965 class _LttngLiveViewerSession
:
969 tracing_session_descriptors
,
970 max_query_data_response_size
,
972 self
._viewer
_session
_id
= viewer_session_id
974 self
._stream
_states
= {}
975 self
._max
_query
_data
_response
_size
= max_query_data_response_size
976 total_stream_infos
= 0
978 for ts_descr
in tracing_session_descriptors
:
979 ts_state
= _LttngLiveViewerSessionTracingSessionState(
980 ts_descr
, total_stream_infos
982 ts_id
= ts_state
.tracing_session_descriptor
.info
.tracing_session_id
983 self
._ts
_states
[ts_id
] = ts_state
984 total_stream_infos
+= len(ts_state
.stream_infos
)
986 # Update session's stream states to have the new states
987 self
._stream
_states
.update(ts_state
.data_stream_states
)
988 self
._stream
_states
.update(ts_state
.metadata_stream_states
)
990 self
._command
_handlers
= {
991 _LttngLiveViewerAttachToTracingSessionCommand
: self
._handle
_attach
_to
_tracing
_session
_command
,
992 _LttngLiveViewerCreateViewerSessionCommand
: self
._handle
_create
_viewer
_session
_command
,
993 _LttngLiveViewerDetachFromTracingSessionCommand
: self
._handle
_detach
_from
_tracing
_session
_command
,
994 _LttngLiveViewerGetDataStreamPacketDataCommand
: self
._handle
_get
_data
_stream
_packet
_data
_command
,
995 _LttngLiveViewerGetMetadataStreamDataCommand
: self
._handle
_get
_metadata
_stream
_data
_command
,
996 _LttngLiveViewerGetNewStreamInfosCommand
: self
._handle
_get
_new
_stream
_infos
_command
,
997 _LttngLiveViewerGetNextDataStreamIndexEntryCommand
: self
._handle
_get
_next
_data
_stream
_index
_entry
_command
,
998 _LttngLiveViewerGetTracingSessionInfosCommand
: self
._handle
_get
_tracing
_session
_infos
_command
,
1002 def viewer_session_id(self
):
1003 return self
._viewer
_session
_id
1005 def _get_tracing_session_state(self
, tracing_session_id
):
1006 if tracing_session_id
not in self
._ts
_states
:
1007 raise UnexpectedInput(
1008 'Unknown tracing session ID {}'.format(tracing_session_id
)
1011 return self
._ts
_states
[tracing_session_id
]
1013 def _get_stream_state(self
, stream_id
):
1014 if stream_id
not in self
._stream
_states
:
1015 UnexpectedInput('Unknown stream ID {}'.format(stream_id
))
1017 return self
._stream
_states
[stream_id
]
1019 def handle_command(self
, cmd
):
1021 'Handling command in viewer session: cmd-cls-name={}'.format(
1022 cmd
.__class
__.__name
__
1025 cmd_type
= type(cmd
)
1027 if cmd_type
not in self
._command
_handlers
:
1028 raise UnexpectedInput(
1029 'Unexpected command: cmd-cls-name={}'.format(cmd
.__class
__.__name
__)
1032 return self
._command
_handlers
[cmd_type
](cmd
)
1034 def _handle_attach_to_tracing_session_command(self
, cmd
):
1035 fmt
= 'Handling "attach to tracing session" command: ts-id={}, offset={}, seek-type={}'
1036 logging
.info(fmt
.format(cmd
.tracing_session_id
, cmd
.offset
, cmd
.seek_type
))
1037 ts_state
= self
._get
_tracing
_session
_state
(cmd
.tracing_session_id
)
1038 info
= ts_state
.tracing_session_descriptor
.info
1040 if ts_state
.is_attached
:
1041 raise UnexpectedInput(
1042 'Cannot attach to tracing session `{}`: viewer is already attached'.format(
1047 ts_state
.is_attached
= True
1048 status
= _LttngLiveViewerAttachToTracingSessionReply
.Status
.OK
1049 return _LttngLiveViewerAttachToTracingSessionReply(
1050 status
, ts_state
.stream_infos
1053 def _handle_detach_from_tracing_session_command(self
, cmd
):
1054 fmt
= 'Handling "detach from tracing session" command: ts-id={}'
1055 logging
.info(fmt
.format(cmd
.tracing_session_id
))
1056 ts_state
= self
._get
_tracing
_session
_state
(cmd
.tracing_session_id
)
1057 info
= ts_state
.tracing_session_descriptor
.info
1059 if not ts_state
.is_attached
:
1060 raise UnexpectedInput(
1061 'Cannot detach to tracing session `{}`: viewer is not attached'.format(
1066 ts_state
.is_attached
= False
1067 status
= _LttngLiveViewerDetachFromTracingSessionReply
.Status
.OK
1068 return _LttngLiveViewerDetachFromTracingSessionReply(status
)
1070 def _handle_get_next_data_stream_index_entry_command(self
, cmd
):
1071 fmt
= 'Handling "get next data stream index entry" command: stream-id={}'
1072 logging
.info(fmt
.format(cmd
.stream_id
))
1073 stream_state
= self
._get
_stream
_state
(cmd
.stream_id
)
1075 if type(stream_state
) is not _LttngLiveViewerSessionDataStreamState
:
1076 raise UnexpectedInput(
1077 'Stream with ID {} is not a data stream'.format(cmd
.stream_id
)
1080 if stream_state
.cur_index_entry
is None:
1081 # The viewer is done reading this stream
1082 status
= _LttngLiveViewerGetNextDataStreamIndexEntryReply
.Status
.HUP
1084 # Dummy data stream index entry to use with the `HUP` status
1085 # (the reply needs one, but the viewer ignores it)
1086 index_entry
= _LttngDataStreamIndexEntry(0, 0, 0, 0, 0, 0, 0)
1088 return _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1089 status
, index_entry
, False, False
1092 # The viewer only checks the `has_new_metadata` flag if the
1093 # reply's status is `OK`, so we need to provide an index here
1094 has_new_metadata
= stream_state
.tracing_session_state
.has_new_metadata
1095 status
= _LttngLiveViewerGetNextDataStreamIndexEntryReply
.Status
.OK
1096 reply
= _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1097 status
, stream_state
.cur_index_entry
, has_new_metadata
, False
1099 stream_state
.goto_next_index_entry()
1102 def _handle_get_data_stream_packet_data_command(self
, cmd
):
1103 fmt
= 'Handling "get data stream packet data" command: stream-id={}, offset={}, req-length={}'
1104 logging
.info(fmt
.format(cmd
.stream_id
, cmd
.offset
, cmd
.req_length
))
1105 stream_state
= self
._get
_stream
_state
(cmd
.stream_id
)
1106 data_response_length
= cmd
.req_length
1108 if type(stream_state
) is not _LttngLiveViewerSessionDataStreamState
:
1109 raise UnexpectedInput(
1110 'Stream with ID {} is not a data stream'.format(cmd
.stream_id
)
1113 if stream_state
.tracing_session_state
.has_new_metadata
:
1114 status
= _LttngLiveViewerGetDataStreamPacketDataReply
.Status
.ERROR
1115 return _LttngLiveViewerGetDataStreamPacketDataReply(
1116 status
, bytes(), True, False
1119 if self
._max
_query
_data
_response
_size
:
1120 # Enforce a server side limit on the query requested length.
1121 # To ensure that the transaction terminate take the minimum of both
1123 data_response_length
= min(
1124 cmd
.req_length
, self
._max
_query
_data
_response
_size
1126 fmt
= 'Limiting "get data stream packet data" command: req-length={} actual response size={}'
1127 logging
.info(fmt
.format(cmd
.req_length
, data_response_length
))
1129 data
= stream_state
.data_stream
.get_data(cmd
.offset
, data_response_length
)
1130 status
= _LttngLiveViewerGetDataStreamPacketDataReply
.Status
.OK
1131 return _LttngLiveViewerGetDataStreamPacketDataReply(status
, data
, False, False)
1133 def _handle_get_metadata_stream_data_command(self
, cmd
):
1134 fmt
= 'Handling "get metadata stream data" command: stream-id={}'
1135 logging
.info(fmt
.format(cmd
.stream_id
))
1136 stream_state
= self
._get
_stream
_state
(cmd
.stream_id
)
1138 if type(stream_state
) is not _LttngLiveViewerSessionMetadataStreamState
:
1139 raise UnexpectedInput(
1140 'Stream with ID {} is not a metadata stream'.format(cmd
.stream_id
)
1143 if stream_state
.is_sent
:
1144 status
= _LttngLiveViewerGetMetadataStreamDataContentReply
.Status
.NO_NEW
1145 return _LttngLiveViewerGetMetadataStreamDataContentReply(status
, bytes())
1147 stream_state
.is_sent
= True
1148 status
= _LttngLiveViewerGetMetadataStreamDataContentReply
.Status
.OK
1149 return _LttngLiveViewerGetMetadataStreamDataContentReply(
1150 status
, stream_state
.metadata_stream
.data
1153 def _handle_get_new_stream_infos_command(self
, cmd
):
1154 fmt
= 'Handling "get new stream infos" command: ts-id={}'
1155 logging
.info(fmt
.format(cmd
.tracing_session_id
))
1157 # As of this version, all the tracing session's stream infos are
1158 # always given to the viewer when sending the "attach to tracing
1159 # session" reply, so there's nothing new here. Return the `HUP`
1160 # status as, if we're handling this command, the viewer consumed
1161 # all the existing data streams.
1162 status
= _LttngLiveViewerGetNewStreamInfosReply
.Status
.HUP
1163 return _LttngLiveViewerGetNewStreamInfosReply(status
, [])
1165 def _handle_get_tracing_session_infos_command(self
, cmd
):
1166 logging
.info('Handling "get tracing session infos" command.')
1168 tss
.tracing_session_descriptor
.info
for tss
in self
._ts
_states
.values()
1170 infos
.sort(key
=lambda info
: info
.name
)
1171 return _LttngLiveViewerGetTracingSessionInfosReply(infos
)
1173 def _handle_create_viewer_session_command(self
, cmd
):
1174 logging
.info('Handling "create viewer session" command.')
1175 status
= _LttngLiveViewerCreateViewerSessionReply
.Status
.OK
1177 # This does nothing here. In the LTTng relay daemon, it
1178 # allocates the viewer session's state.
1179 return _LttngLiveViewerCreateViewerSessionReply(status
)
1182 # An LTTng live TCP server.
1184 # On creation, it binds to `localhost` with an OS-assigned TCP port. It writes
1185 # the decimal TCP port number to a temporary port file. It renames the
1186 # temporary port file to `port_filename`.
1188 # `tracing_session_descriptors` is a list of tracing session descriptors
1189 # (`LttngTracingSessionDescriptor`) to serve.
1191 # This server accepts a single viewer (client).
1193 # When the viewer closes the connection, the server's constructor
1195 class LttngLiveServer
:
1197 self
, port_filename
, tracing_session_descriptors
, max_query_data_response_size
1199 logging
.info('Server configuration:')
1201 logging
.info(' Port file name: `{}`'.format(port_filename
))
1203 if max_query_data_response_size
is not None:
1205 ' Maximum response data query size: `{}`'.format(
1206 max_query_data_response_size
1210 for ts_descr
in tracing_session_descriptors
:
1211 info
= ts_descr
.info
1212 fmt
= ' TS descriptor: name="{}", id={}, hostname="{}", live-timer-freq={}, client-count={}, stream-count={}:'
1216 info
.tracing_session_id
,
1218 info
.live_timer_freq
,
1224 for trace
in ts_descr
.traces
:
1225 logging
.info(' Trace: path="{}"'.format(trace
.path
))
1227 self
._ts
_descriptors
= tracing_session_descriptors
1228 self
._max
_query
_data
_response
_size
= max_query_data_response_size
1229 self
._sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
1230 self
._codec
= _LttngLiveViewerProtocolCodec()
1232 # Port 0: OS assigns an unused port
1233 serv_addr
= ('localhost', 0)
1234 self
._sock
.bind(serv_addr
)
1235 self
._write
_port
_to
_file
(port_filename
)
1241 logging
.info('Closed connection and socket.')
1244 def _server_port(self
):
1245 return self
._sock
.getsockname()[1]
1247 def _recv_command(self
):
1251 logging
.info('Waiting for viewer command.')
1252 buf
= self
._conn
.recv(128)
1255 logging
.info('Client closed connection.')
1258 raise UnexpectedInput(
1259 'Client closed connection after having sent {} command bytes.'.format(
1266 logging
.info('Received data from viewer: length={}'.format(len(buf
)))
1271 cmd
= self
._codec
.decode(data
)
1272 except struct
.error
as exc
:
1273 raise UnexpectedInput('Malformed command: {}'.format(exc
)) from exc
1277 'Received command from viewer: cmd-cls-name={}'.format(
1278 cmd
.__class
__.__name
__
1283 def _send_reply(self
, reply
):
1284 data
= self
._codec
.encode(reply
)
1286 'Sending reply to viewer: reply-cls-name={}, length={}'.format(
1287 reply
.__class
__.__name
__, len(data
)
1290 self
._conn
.sendall(data
)
1292 def _handle_connection(self
):
1293 # First command must be "connect"
1294 cmd
= self
._recv
_command
()
1296 if type(cmd
) is not _LttngLiveViewerConnectCommand
:
1297 raise UnexpectedInput(
1298 'First command is not "connect": cmd-cls-name={}'.format(
1299 cmd
.__class
__.__name
__
1303 # Create viewer session (arbitrary ID 23)
1305 'LTTng live viewer connected: version={}.{}'.format(cmd
.major
, cmd
.minor
)
1307 viewer_session
= _LttngLiveViewerSession(
1308 23, self
._ts
_descriptors
, self
._max
_query
_data
_response
_size
1311 # Send "connect" reply
1313 _LttngLiveViewerConnectReply(viewer_session
.viewer_session_id
, 2, 10)
1316 # Make the viewer session handle the remaining commands
1318 cmd
= self
._recv
_command
()
1321 # Connection closed (at an expected location within the
1325 self
._send
_reply
(viewer_session
.handle_command(cmd
))
1328 logging
.info('Listening: port={}'.format(self
._server
_port
))
1329 # Backlog must be present for Python version < 3.5.
1330 # 128 is an arbitrary number since we expect only 1 connection anyway.
1331 self
._sock
.listen(128)
1332 self
._conn
, viewer_addr
= self
._sock
.accept()
1334 'Accepted viewer: addr={}:{}'.format(viewer_addr
[0], viewer_addr
[1])
1338 self
._handle
_connection
()
1342 def _write_port_to_file(self
, port_filename
):
1343 # Write the port number to a temporary file.
1344 with tempfile
.NamedTemporaryFile(mode
='w', delete
=False) as tmp_port_file
:
1345 print(self
._server
_port
, end
='', file=tmp_port_file
)
1347 # Rename temporary file to real file
1348 os
.replace(tmp_port_file
.name
, port_filename
)
1350 'Renamed port file: src-path="{}", dst-path="{}"'.format(
1351 tmp_port_file
.name
, port_filename
1356 # A tracing session descriptor.
1358 # In the constructor, `traces` is a list of LTTng traces (`LttngTrace`
1360 class LttngTracingSessionDescriptor
:
1362 self
, name
, tracing_session_id
, hostname
, live_timer_freq
, client_count
, traces
1364 for trace
in traces
:
1365 if name
not in trace
.path
:
1366 fmt
= 'Tracing session name must be part of every trace path (`{}` not found in `{}`)'
1367 raise ValueError(fmt
.format(name
, trace
.path
))
1369 self
._traces
= traces
1370 stream_count
= sum([len(t
) + 1 for t
in traces
])
1371 self
._info
= _LttngLiveViewerTracingSessionInfo(
1389 def _tracing_session_descriptors_from_arg(string
):
1391 # NAME,ID,HOSTNAME,FREQ,CLIENTS,TRACEPATH[,TRACEPATH]...
1392 parts
= string
.split(',')
1394 tracing_session_id
= int(parts
[1])
1396 live_timer_freq
= int(parts
[3])
1397 client_count
= int(parts
[4])
1398 traces
= [LttngTrace(path
) for path
in parts
[5:]]
1399 return LttngTracingSessionDescriptor(
1400 name
, tracing_session_id
, hostname
, live_timer_freq
, client_count
, traces
1404 def _loglevel_parser(string
):
1405 loglevels
= {'info': logging
.INFO
, 'warning': logging
.WARNING
}
1406 if string
not in loglevels
:
1407 msg
= "{} is not a valid loglevel".format(string
)
1408 raise argparse
.ArgumentTypeError(msg
)
1409 return loglevels
[string
]
1412 if __name__
== '__main__':
1413 logging
.basicConfig(format
='# %(asctime)-25s%(message)s')
1414 parser
= argparse
.ArgumentParser(
1415 description
='LTTng-live protocol mocker', add_help
=False
1417 parser
.add_argument(
1420 choices
=['info', 'warning'],
1421 help='The loglevel to be used.',
1424 loglevel_namespace
, remaining_args
= parser
.parse_known_args()
1425 logging
.getLogger().setLevel(_loglevel_parser(loglevel_namespace
.log_level
))
1427 parser
.add_argument(
1429 help='The final port file. This file is present when the server is ready to receive connection.',
1432 parser
.add_argument(
1433 '--max-query-data-response-size',
1435 help='The maximum size of control data response in bytes',
1437 parser
.add_argument(
1441 type=_tracing_session_descriptors_from_arg
,
1442 help='A session configuration. There is no space after comma. Format is: NAME,ID,HOSTNAME,FREQ,CLIENTS,TRACEPATH[,TRACEPATH]....',
1444 parser
.add_argument(
1448 default
=argparse
.SUPPRESS
,
1449 help='Show this help message and exit.',
1452 args
= parser
.parse_args(args
=remaining_args
)
1455 args
.port_filename
, args
.sessions
, args
.max_query_data_response_size
1457 except UnexpectedInput
as exc
:
1458 logging
.error(str(exc
))
1459 print(exc
, file=sys
.stderr
)