1 # The MIT License (MIT)
3 # Copyright (c) 2019 Philippe Proulx <pproulx@efficios.com>
5 # Permission is hereby granted, free of charge, to any person obtaining a copy
6 # of this software and associated documentation files (the "Software"), to deal
7 # in the Software without restriction, including without limitation the rights
8 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 # copies of the Software, and to permit persons to whom the Software is
10 # furnished to do so, subject to the following conditions:
12 # The above copyright notice and this permission notice shall be included in
13 # all copies or substantial portions of the Software.
15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
24 import collections
.abc
35 class UnexpectedInput(RuntimeError):
39 class _LttngLiveViewerCommand
:
40 def __init__(self
, version
):
41 self
._version
= version
48 class _LttngLiveViewerConnectCommand(_LttngLiveViewerCommand
):
49 def __init__(self
, version
, viewer_session_id
, major
, minor
):
50 super().__init
__(version
)
51 self
._viewer
_session
_id
= viewer_session_id
56 def viewer_session_id(self
):
57 return self
._viewer
_session
_id
68 class _LttngLiveViewerConnectReply
:
69 def __init__(self
, viewer_session_id
, major
, minor
):
70 self
._viewer
_session
_id
= viewer_session_id
75 def viewer_session_id(self
):
76 return self
._viewer
_session
_id
87 class _LttngLiveViewerGetTracingSessionInfosCommand(_LttngLiveViewerCommand
):
91 class _LttngLiveViewerTracingSessionInfo
:
101 self
._tracing
_session
_id
= tracing_session_id
102 self
._live
_timer
_freq
= live_timer_freq
103 self
._client
_count
= client_count
104 self
._stream
_count
= stream_count
105 self
._hostname
= hostname
109 def tracing_session_id(self
):
110 return self
._tracing
_session
_id
113 def live_timer_freq(self
):
114 return self
._live
_timer
_freq
117 def client_count(self
):
118 return self
._client
_count
121 def stream_count(self
):
122 return self
._stream
_count
126 return self
._hostname
133 class _LttngLiveViewerGetTracingSessionInfosReply
:
134 def __init__(self
, tracing_session_infos
):
135 self
._tracing
_session
_infos
= tracing_session_infos
138 def tracing_session_infos(self
):
139 return self
._tracing
_session
_infos
142 class _LttngLiveViewerAttachToTracingSessionCommand(_LttngLiveViewerCommand
):
147 def __init__(self
, version
, tracing_session_id
, offset
, seek_type
):
148 super().__init
__(version
)
149 self
._tracing
_session
_id
= tracing_session_id
150 self
._offset
= offset
151 self
._seek
_type
= seek_type
154 def tracing_session_id(self
):
155 return self
._tracing
_session
_id
163 return self
._seek
_type
166 class _LttngLiveViewerStreamInfo
:
167 def __init__(self
, id, trace_id
, is_metadata
, path
, channel_name
):
169 self
._trace
_id
= trace_id
170 self
._is
_metadata
= is_metadata
172 self
._channel
_name
= channel_name
180 return self
._trace
_id
183 def is_metadata(self
):
184 return self
._is
_metadata
191 def channel_name(self
):
192 return self
._channel
_name
195 class _LttngLiveViewerAttachToTracingSessionReply
:
204 def __init__(self
, status
, stream_infos
):
205 self
._status
= status
206 self
._stream
_infos
= stream_infos
213 def stream_infos(self
):
214 return self
._stream
_infos
217 class _LttngLiveViewerGetNextDataStreamIndexEntryCommand(_LttngLiveViewerCommand
):
218 def __init__(self
, version
, stream_id
):
219 super().__init
__(version
)
220 self
._stream
_id
= stream_id
224 return self
._stream
_id
227 class _LttngLiveViewerGetNextDataStreamIndexEntryReply
:
236 def __init__(self
, status
, index_entry
, has_new_metadata
, has_new_data_stream
):
237 self
._status
= status
238 self
._index
_entry
= index_entry
239 self
._has
_new
_metadata
= has_new_metadata
240 self
._has
_new
_data
_stream
= has_new_data_stream
247 def index_entry(self
):
248 return self
._index
_entry
251 def has_new_metadata(self
):
252 return self
._has
_new
_metadata
255 def has_new_data_stream(self
):
256 return self
._has
_new
_data
_stream
259 class _LttngLiveViewerGetDataStreamPacketDataCommand(_LttngLiveViewerCommand
):
260 def __init__(self
, version
, stream_id
, offset
, req_length
):
261 super().__init
__(version
)
262 self
._stream
_id
= stream_id
263 self
._offset
= offset
264 self
._req
_length
= req_length
268 return self
._stream
_id
275 def req_length(self
):
276 return self
._req
_length
279 class _LttngLiveViewerGetDataStreamPacketDataReply
:
286 def __init__(self
, status
, data
, has_new_metadata
, has_new_data_stream
):
287 self
._status
= status
289 self
._has
_new
_metadata
= has_new_metadata
290 self
._has
_new
_data
_stream
= has_new_data_stream
301 def has_new_metadata(self
):
302 return self
._has
_new
_metadata
305 def has_new_data_stream(self
):
306 return self
._has
_new
_data
_stream
309 class _LttngLiveViewerGetMetadataStreamDataCommand(_LttngLiveViewerCommand
):
310 def __init__(self
, version
, stream_id
):
311 super().__init
__(version
)
312 self
._stream
_id
= stream_id
316 return self
._stream
_id
319 class _LttngLiveViewerGetMetadataStreamDataContentReply
:
325 def __init__(self
, status
, data
):
326 self
._status
= status
338 class _LttngLiveViewerGetNewStreamInfosCommand(_LttngLiveViewerCommand
):
339 def __init__(self
, version
, tracing_session_id
):
340 super().__init
__(version
)
341 self
._tracing
_session
_id
= tracing_session_id
344 def tracing_session_id(self
):
345 return self
._tracing
_session
_id
348 class _LttngLiveViewerGetNewStreamInfosReply
:
355 def __init__(self
, status
, stream_infos
):
356 self
._status
= status
357 self
._stream
_infos
= stream_infos
364 def stream_infos(self
):
365 return self
._stream
_infos
368 class _LttngLiveViewerCreateViewerSessionCommand(_LttngLiveViewerCommand
):
372 class _LttngLiveViewerCreateViewerSessionReply
:
377 def __init__(self
, status
):
378 self
._status
= status
385 class _LttngLiveViewerDetachFromTracingSessionCommand(_LttngLiveViewerCommand
):
386 def __init__(self
, version
, tracing_session_id
):
387 super().__init
__(version
)
388 self
._tracing
_session
_id
= tracing_session_id
391 def tracing_session_id(self
):
392 return self
._tracing
_session
_id
395 class _LttngLiveViewerDetachFromTracingSessionReply
:
401 def __init__(self
, status
):
402 self
._status
= status
409 # An LTTng live protocol codec can convert bytes to command objects and
410 # reply objects to bytes.
411 class _LttngLiveViewerProtocolCodec
:
412 _COMMAND_HEADER_STRUCT_FMT
= 'QII'
413 _COMMAND_HEADER_SIZE_BYTES
= struct
.calcsize(_COMMAND_HEADER_STRUCT_FMT
)
418 def _unpack(self
, fmt
, data
, offset
=0):
420 return struct
.unpack_from(fmt
, data
, offset
)
422 def _unpack_payload(self
, fmt
, data
):
424 fmt
, data
, _LttngLiveViewerProtocolCodec
._COMMAND
_HEADER
_SIZE
_BYTES
427 def decode(self
, data
):
428 if len(data
) < self
._COMMAND
_HEADER
_SIZE
_BYTES
:
429 # Not enough data to read the command header
432 payload_size
, cmd_type
, version
= self
._unpack
(
433 self
._COMMAND
_HEADER
_STRUCT
_FMT
, data
436 'Decoded command header: payload-size={}, cmd-type={}, version={}'.format(
437 payload_size
, cmd_type
, version
441 if len(data
) < self
._COMMAND
_HEADER
_SIZE
_BYTES
+ payload_size
:
442 # Not enough data to read the whole command
446 viewer_session_id
, major
, minor
, conn_type
= self
._unpack
_payload
(
449 return _LttngLiveViewerConnectCommand(
450 version
, viewer_session_id
, major
, minor
453 return _LttngLiveViewerGetTracingSessionInfosCommand(version
)
455 tracing_session_id
, offset
, seek_type
= self
._unpack
_payload
('QQI', data
)
456 return _LttngLiveViewerAttachToTracingSessionCommand(
457 version
, tracing_session_id
, offset
, seek_type
460 stream_id
, = self
._unpack
_payload
('Q', data
)
461 return _LttngLiveViewerGetNextDataStreamIndexEntryCommand(
465 stream_id
, offset
, req_length
= self
._unpack
_payload
('QQI', data
)
466 return _LttngLiveViewerGetDataStreamPacketDataCommand(
467 version
, stream_id
, offset
, req_length
470 stream_id
, = self
._unpack
_payload
('Q', data
)
471 return _LttngLiveViewerGetMetadataStreamDataCommand(version
, stream_id
)
473 tracing_session_id
, = self
._unpack
_payload
('Q', data
)
474 return _LttngLiveViewerGetNewStreamInfosCommand(version
, tracing_session_id
)
476 return _LttngLiveViewerCreateViewerSessionCommand(version
)
478 tracing_session_id
, = self
._unpack
_payload
('Q', data
)
479 return _LttngLiveViewerDetachFromTracingSessionCommand(
480 version
, tracing_session_id
483 raise UnexpectedInput('Unknown command type {}'.format(cmd_type
))
485 def _pack(self
, fmt
, *args
):
486 # Force network byte order
487 return struct
.pack('!' + fmt
, *args
)
489 def _encode_zero_padded_str(self
, string
, length
):
490 data
= string
.encode()
491 return data
.ljust(length
, b
'\x00')
493 def _encode_stream_info(self
, info
):
494 data
= self
._pack
('QQI', info
.id, info
.trace_id
, int(info
.is_metadata
))
495 data
+= self
._encode
_zero
_padded
_str
(info
.path
, 4096)
496 data
+= self
._encode
_zero
_padded
_str
(info
.channel_name
, 255)
499 def _get_has_new_stuff_flags(self
, has_new_metadata
, has_new_data_streams
):
505 if has_new_data_streams
:
510 def encode(self
, reply
):
511 if type(reply
) is _LttngLiveViewerConnectReply
:
513 'QIII', reply
.viewer_session_id
, reply
.major
, reply
.minor
, 2
515 elif type(reply
) is _LttngLiveViewerGetTracingSessionInfosReply
:
516 data
= self
._pack
('I', len(reply
.tracing_session_infos
))
518 for info
in reply
.tracing_session_infos
:
521 info
.tracing_session_id
,
522 info
.live_timer_freq
,
526 data
+= self
._encode
_zero
_padded
_str
(info
.hostname
, 64)
527 data
+= self
._encode
_zero
_padded
_str
(info
.name
, 255)
528 elif type(reply
) is _LttngLiveViewerAttachToTracingSessionReply
:
529 data
= self
._pack
('II', reply
.status
, len(reply
.stream_infos
))
531 for info
in reply
.stream_infos
:
532 data
+= self
._encode
_stream
_info
(info
)
533 elif type(reply
) is _LttngLiveViewerGetNextDataStreamIndexEntryReply
:
534 entry
= reply
.index_entry
535 flags
= self
._get
_has
_new
_stuff
_flags
(
536 reply
.has_new_metadata
, reply
.has_new_data_stream
542 entry
.total_size_bits
,
543 entry
.content_size_bits
,
544 entry
.timestamp_begin
,
546 entry
.events_discarded
,
547 entry
.stream_class_id
,
551 elif type(reply
) is _LttngLiveViewerGetDataStreamPacketDataReply
:
552 flags
= self
._get
_has
_new
_stuff
_flags
(
553 reply
.has_new_metadata
, reply
.has_new_data_stream
555 data
= self
._pack
('III', reply
.status
, len(reply
.data
), flags
)
557 elif type(reply
) is _LttngLiveViewerGetMetadataStreamDataContentReply
:
558 data
= self
._pack
('QI', len(reply
.data
), reply
.status
)
560 elif type(reply
) is _LttngLiveViewerGetNewStreamInfosReply
:
561 data
= self
._pack
('II', reply
.status
, len(reply
.stream_infos
))
563 for info
in reply
.stream_infos
:
564 data
+= self
._encode
_stream
_info
(info
)
565 elif type(reply
) is _LttngLiveViewerCreateViewerSessionReply
:
566 data
= self
._pack
('I', reply
.status
)
567 elif type(reply
) is _LttngLiveViewerDetachFromTracingSessionReply
:
568 data
= self
._pack
('I', reply
.status
)
571 'Unknown reply object with class `{}`'.format(reply
.__class
__.__name
__)
577 # An entry within the index of an LTTng data stream.
578 class _LttngDataStreamIndexEntry
:
589 self
._offset
_bytes
= offset_bytes
590 self
._total
_size
_bits
= total_size_bits
591 self
._content
_size
_bits
= content_size_bits
592 self
._timestamp
_begin
= timestamp_begin
593 self
._timestamp
_end
= timestamp_end
594 self
._events
_discarded
= events_discarded
595 self
._stream
_class
_id
= stream_class_id
598 def offset_bytes(self
):
599 return self
._offset
_bytes
602 def total_size_bits(self
):
603 return self
._total
_size
_bits
606 def total_size_bytes(self
):
607 return self
._total
_size
_bits
// 8
610 def content_size_bits(self
):
611 return self
._content
_size
_bits
614 def content_size_bytes(self
):
615 return self
._content
_size
_bits
// 8
618 def timestamp_begin(self
):
619 return self
._timestamp
_begin
622 def timestamp_end(self
):
623 return self
._timestamp
_end
626 def events_discarded(self
):
627 return self
._events
_discarded
630 def stream_class_id(self
):
631 return self
._stream
_class
_id
634 # The index of an LTTng data stream, a sequence of index entries.
635 class _LttngDataStreamIndex(collections
.abc
.Sequence
):
636 def __init__(self
, path
):
640 'Built data stream index entries: path="{}", count={}'.format(
641 path
, len(self
._entries
)
647 assert os
.path
.isfile(self
._path
)
649 with
open(self
._path
, 'rb') as f
:
652 size
= struct
.calcsize(fmt
)
654 assert len(data
) == size
655 magic
, index_major
, index_minor
, index_entry_length
= struct
.unpack(
658 assert magic
== 0xC1F1DCC1
662 size
= struct
.calcsize(fmt
)
666 'Decoding data stream index entry: path="{}", offset={}'.format(
676 assert len(data
) == size
677 offset_bytes
, total_size_bits
, content_size_bits
, timestamp_begin
, timestamp_end
, events_discarded
, stream_class_id
= struct
.unpack(
681 self
._entries
.append(
682 _LttngDataStreamIndexEntry(
693 # Skip anything else before the next entry
694 f
.seek(index_entry_length
- size
, os
.SEEK_CUR
)
696 def __getitem__(self
, index
):
697 return self
._entries
[index
]
700 return len(self
._entries
)
707 # An LTTng data stream.
708 class _LttngDataStream
:
709 def __init__(self
, path
):
711 filename
= os
.path
.basename(path
)
712 match
= re
.match(r
'(.*)_\d+', filename
)
713 self
._channel
_name
= match
.group(1)
714 trace_dir
= os
.path
.dirname(path
)
715 index_path
= os
.path
.join(trace_dir
, 'index', filename
+ '.idx')
716 self
._index
= _LttngDataStreamIndex(index_path
)
717 assert os
.path
.isfile(path
)
718 self
._file
= open(path
, 'rb')
720 'Built data stream: path="{}", channel-name="{}"'.format(
721 path
, self
._channel
_name
730 def channel_name(self
):
731 return self
._channel
_name
737 def get_data(self
, offset_bytes
, len_bytes
):
738 self
._file
.seek(offset_bytes
)
739 return self
._file
.read(len_bytes
)
742 # An LTTng metadata stream.
743 class _LttngMetadataStream
:
744 def __init__(self
, path
):
746 logging
.info('Built metadata stream: path="{}"'.format(path
))
754 assert os
.path
.isfile(self
._path
)
756 with
open(self
._path
, 'rb') as f
:
760 # An LTTng trace, a sequence of LTTng data streams.
761 class LttngTrace(collections
.abc
.Sequence
):
762 def __init__(self
, trace_dir
):
763 assert os
.path
.isdir(trace_dir
)
764 self
._path
= trace_dir
765 self
._metadata
_stream
= _LttngMetadataStream(
766 os
.path
.join(trace_dir
, 'metadata')
768 self
._create
_data
_streams
(trace_dir
)
769 logging
.info('Built trace: path="{}"'.format(trace_dir
))
771 def _create_data_streams(self
, trace_dir
):
772 data_stream_paths
= []
774 for filename
in os
.listdir(trace_dir
):
775 path
= os
.path
.join(trace_dir
, filename
)
777 if not os
.path
.isfile(path
):
780 if filename
.startswith('.'):
783 if filename
== 'metadata':
786 data_stream_paths
.append(path
)
788 data_stream_paths
.sort()
789 self
._data
_streams
= []
791 for data_stream_path
in data_stream_paths
:
792 self
._data
_streams
.append(_LttngDataStream(data_stream_path
))
799 def metadata_stream(self
):
800 return self
._metadata
_stream
802 def __getitem__(self
, index
):
803 return self
._data
_streams
[index
]
806 return len(self
._data
_streams
)
809 # The state of a single data stream.
810 class _LttngLiveViewerSessionDataStreamState
:
811 def __init__(self
, ts_state
, info
, data_stream
):
812 self
._ts
_state
= ts_state
814 self
._data
_stream
= data_stream
815 self
._cur
_index
_entry
_index
= 0
816 fmt
= 'Built data stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
820 ts_state
.tracing_session_descriptor
.info
.tracing_session_id
,
821 ts_state
.tracing_session_descriptor
.info
.name
,
827 def tracing_session_state(self
):
828 return self
._ts
_state
835 def data_stream(self
):
836 return self
._data
_stream
839 def cur_index_entry(self
):
840 if self
._cur
_index
_entry
_index
== len(self
._data
_stream
.index
):
843 return self
._data
_stream
.index
[self
._cur
_index
_entry
_index
]
845 def goto_next_index_entry(self
):
846 self
._cur
_index
_entry
_index
+= 1
849 # The state of a single metadata stream.
850 class _LttngLiveViewerSessionMetadataStreamState
:
851 def __init__(self
, ts_state
, info
, metadata_stream
):
852 self
._ts
_state
= ts_state
854 self
._metadata
_stream
= metadata_stream
855 self
._is
_sent
= False
856 fmt
= 'Built metadata stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
860 ts_state
.tracing_session_descriptor
.info
.tracing_session_id
,
861 ts_state
.tracing_session_descriptor
.info
.name
,
862 metadata_stream
.path
,
867 def trace_session_state(self
):
868 return self
._trace
_session
_state
875 def metadata_stream(self
):
876 return self
._metadata
_stream
883 def is_sent(self
, value
):
884 self
._is
_sent
= value
887 # The state of a tracing session.
888 class _LttngLiveViewerSessionTracingSessionState
:
889 def __init__(self
, tc_descr
, base_stream_id
):
890 self
._tc
_descr
= tc_descr
891 self
._stream
_infos
= []
894 stream_id
= base_stream_id
896 for trace
in tc_descr
.traces
:
897 trace_id
= stream_id
* 1000
899 # Data streams -> stream infos and data stream states
900 for data_stream
in trace
:
901 info
= _LttngLiveViewerStreamInfo(
906 data_stream
.channel_name
,
908 self
._stream
_infos
.append(info
)
909 self
._ds
_states
[stream_id
] = _LttngLiveViewerSessionDataStreamState(
910 self
, info
, data_stream
914 # Metadata stream -> stream info and metadata stream state
915 info
= _LttngLiveViewerStreamInfo(
916 stream_id
, trace_id
, True, trace
.metadata_stream
.path
, 'metadata'
918 self
._stream
_infos
.append(info
)
919 self
._ms
_states
[stream_id
] = _LttngLiveViewerSessionMetadataStreamState(
920 self
, info
, trace
.metadata_stream
924 self
._is
_attached
= False
925 fmt
= 'Built tracing session state: id={}, name="{}"'
926 logging
.info(fmt
.format(tc_descr
.info
.tracing_session_id
, tc_descr
.info
.name
))
929 def tracing_session_descriptor(self
):
930 return self
._tc
_descr
933 def data_stream_states(self
):
934 return self
._ds
_states
937 def metadata_stream_states(self
):
938 return self
._ms
_states
941 def stream_infos(self
):
942 return self
._stream
_infos
945 def has_new_metadata(self
):
946 return any([not ms
.is_sent
for ms
in self
._ms
_states
.values()])
949 def is_attached(self
):
950 return self
._is
_attached
953 def is_attached(self
, value
):
954 self
._is
_attached
= value
957 # An LTTng live viewer session manages a view on tracing sessions
958 # and replies to commands accordingly.
959 class _LttngLiveViewerSession
:
963 tracing_session_descriptors
,
964 max_query_data_response_size
,
966 self
._viewer
_session
_id
= viewer_session_id
968 self
._stream
_states
= {}
969 self
._max
_query
_data
_response
_size
= max_query_data_response_size
970 total_stream_infos
= 0
972 for ts_descr
in tracing_session_descriptors
:
973 ts_state
= _LttngLiveViewerSessionTracingSessionState(
974 ts_descr
, total_stream_infos
976 ts_id
= ts_state
.tracing_session_descriptor
.info
.tracing_session_id
977 self
._ts
_states
[ts_id
] = ts_state
978 total_stream_infos
+= len(ts_state
.stream_infos
)
980 # Update session's stream states to have the new states
981 self
._stream
_states
.update(ts_state
.data_stream_states
)
982 self
._stream
_states
.update(ts_state
.metadata_stream_states
)
984 self
._command
_handlers
= {
985 _LttngLiveViewerAttachToTracingSessionCommand
: self
._handle
_attach
_to
_tracing
_session
_command
,
986 _LttngLiveViewerCreateViewerSessionCommand
: self
._handle
_create
_viewer
_session
_command
,
987 _LttngLiveViewerDetachFromTracingSessionCommand
: self
._handle
_detach
_from
_tracing
_session
_command
,
988 _LttngLiveViewerGetDataStreamPacketDataCommand
: self
._handle
_get
_data
_stream
_packet
_data
_command
,
989 _LttngLiveViewerGetMetadataStreamDataCommand
: self
._handle
_get
_metadata
_stream
_data
_command
,
990 _LttngLiveViewerGetNewStreamInfosCommand
: self
._handle
_get
_new
_stream
_infos
_command
,
991 _LttngLiveViewerGetNextDataStreamIndexEntryCommand
: self
._handle
_get
_next
_data
_stream
_index
_entry
_command
,
992 _LttngLiveViewerGetTracingSessionInfosCommand
: self
._handle
_get
_tracing
_session
_infos
_command
,
996 def viewer_session_id(self
):
997 return self
._viewer
_session
_id
999 def _get_tracing_session_state(self
, tracing_session_id
):
1000 if tracing_session_id
not in self
._ts
_states
:
1001 raise UnexpectedInput(
1002 'Unknown tracing session ID {}'.format(tracing_session_id
)
1005 return self
._ts
_states
[tracing_session_id
]
1007 def _get_stream_state(self
, stream_id
):
1008 if stream_id
not in self
._stream
_states
:
1009 UnexpectedInput('Unknown stream ID {}'.format(stream_id
))
1011 return self
._stream
_states
[stream_id
]
1013 def handle_command(self
, cmd
):
1015 'Handling command in viewer session: cmd-cls-name={}'.format(
1016 cmd
.__class
__.__name
__
1019 cmd_type
= type(cmd
)
1021 if cmd_type
not in self
._command
_handlers
:
1022 raise UnexpectedInput(
1023 'Unexpected command: cmd-cls-name={}'.format(cmd
.__class
__.__name
__)
1026 return self
._command
_handlers
[cmd_type
](cmd
)
1028 def _handle_attach_to_tracing_session_command(self
, cmd
):
1029 fmt
= 'Handling "attach to tracing session" command: ts-id={}, offset={}, seek-type={}'
1030 logging
.info(fmt
.format(cmd
.tracing_session_id
, cmd
.offset
, cmd
.seek_type
))
1031 ts_state
= self
._get
_tracing
_session
_state
(cmd
.tracing_session_id
)
1032 info
= ts_state
.tracing_session_descriptor
.info
1034 if ts_state
.is_attached
:
1035 raise UnexpectedInput(
1036 'Cannot attach to tracing session `{}`: viewer is already attached'.format(
1041 ts_state
.is_attached
= True
1042 status
= _LttngLiveViewerAttachToTracingSessionReply
.Status
.OK
1043 return _LttngLiveViewerAttachToTracingSessionReply(
1044 status
, ts_state
.stream_infos
1047 def _handle_detach_from_tracing_session_command(self
, cmd
):
1048 fmt
= 'Handling "detach from tracing session" command: ts-id={}'
1049 logging
.info(fmt
.format(cmd
.tracing_session_id
))
1050 ts_state
= self
._get
_tracing
_session
_state
(cmd
.tracing_session_id
)
1051 info
= ts_state
.tracing_session_descriptor
.info
1053 if not ts_state
.is_attached
:
1054 raise UnexpectedInput(
1055 'Cannot detach to tracing session `{}`: viewer is not attached'.format(
1060 ts_state
.is_attached
= False
1061 status
= _LttngLiveViewerDetachFromTracingSessionReply
.Status
.OK
1062 return _LttngLiveViewerDetachFromTracingSessionReply(status
)
1064 def _handle_get_next_data_stream_index_entry_command(self
, cmd
):
1065 fmt
= 'Handling "get next data stream index entry" command: stream-id={}'
1066 logging
.info(fmt
.format(cmd
.stream_id
))
1067 stream_state
= self
._get
_stream
_state
(cmd
.stream_id
)
1069 if type(stream_state
) is not _LttngLiveViewerSessionDataStreamState
:
1070 raise UnexpectedInput(
1071 'Stream with ID {} is not a data stream'.format(cmd
.stream_id
)
1074 if stream_state
.cur_index_entry
is None:
1075 # The viewer is done reading this stream
1076 status
= _LttngLiveViewerGetNextDataStreamIndexEntryReply
.Status
.HUP
1078 # Dummy data stream index entry to use with the `HUP` status
1079 # (the reply needs one, but the viewer ignores it)
1080 index_entry
= _LttngDataStreamIndexEntry(0, 0, 0, 0, 0, 0, 0)
1082 return _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1083 status
, index_entry
, False, False
1086 # The viewer only checks the `has_new_metadata` flag if the
1087 # reply's status is `OK`, so we need to provide an index here
1088 has_new_metadata
= stream_state
.tracing_session_state
.has_new_metadata
1089 status
= _LttngLiveViewerGetNextDataStreamIndexEntryReply
.Status
.OK
1090 reply
= _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1091 status
, stream_state
.cur_index_entry
, has_new_metadata
, False
1093 stream_state
.goto_next_index_entry()
1096 def _handle_get_data_stream_packet_data_command(self
, cmd
):
1097 fmt
= 'Handling "get data stream packet data" command: stream-id={}, offset={}, req-length={}'
1098 logging
.info(fmt
.format(cmd
.stream_id
, cmd
.offset
, cmd
.req_length
))
1099 stream_state
= self
._get
_stream
_state
(cmd
.stream_id
)
1100 data_response_length
= cmd
.req_length
1102 if type(stream_state
) is not _LttngLiveViewerSessionDataStreamState
:
1103 raise UnexpectedInput(
1104 'Stream with ID {} is not a data stream'.format(cmd
.stream_id
)
1107 if stream_state
.tracing_session_state
.has_new_metadata
:
1108 status
= _LttngLiveViewerGetDataStreamPacketDataReply
.Status
.ERROR
1109 return _LttngLiveViewerGetDataStreamPacketDataReply(
1110 status
, bytes(), True, False
1113 if self
._max
_query
_data
_response
_size
:
1114 # Enforce a server side limit on the query requested length.
1115 # To ensure that the transaction terminate take the minimum of both
1117 data_response_length
= min(
1118 cmd
.req_length
, self
._max
_query
_data
_response
_size
1120 fmt
= 'Limiting "get data stream packet data" command: req-length={} actual response size={}'
1121 logging
.info(fmt
.format(cmd
.req_length
, data_response_length
))
1123 data
= stream_state
.data_stream
.get_data(cmd
.offset
, data_response_length
)
1124 status
= _LttngLiveViewerGetDataStreamPacketDataReply
.Status
.OK
1125 return _LttngLiveViewerGetDataStreamPacketDataReply(status
, data
, False, False)
1127 def _handle_get_metadata_stream_data_command(self
, cmd
):
1128 fmt
= 'Handling "get metadata stream data" command: stream-id={}'
1129 logging
.info(fmt
.format(cmd
.stream_id
))
1130 stream_state
= self
._get
_stream
_state
(cmd
.stream_id
)
1132 if type(stream_state
) is not _LttngLiveViewerSessionMetadataStreamState
:
1133 raise UnexpectedInput(
1134 'Stream with ID {} is not a metadata stream'.format(cmd
.stream_id
)
1137 if stream_state
.is_sent
:
1138 status
= _LttngLiveViewerGetMetadataStreamDataContentReply
.Status
.NO_NEW
1139 return _LttngLiveViewerGetMetadataStreamDataContentReply(status
, bytes())
1141 stream_state
.is_sent
= True
1142 status
= _LttngLiveViewerGetMetadataStreamDataContentReply
.Status
.OK
1143 return _LttngLiveViewerGetMetadataStreamDataContentReply(
1144 status
, stream_state
.metadata_stream
.data
1147 def _handle_get_new_stream_infos_command(self
, cmd
):
1148 fmt
= 'Handling "get new stream infos" command: ts-id={}'
1149 logging
.info(fmt
.format(cmd
.tracing_session_id
))
1151 # As of this version, all the tracing session's stream infos are
1152 # always given to the viewer when sending the "attach to tracing
1153 # session" reply, so there's nothing new here. Return the `HUP`
1154 # status as, if we're handling this command, the viewer consumed
1155 # all the existing data streams.
1156 status
= _LttngLiveViewerGetNewStreamInfosReply
.Status
.HUP
1157 return _LttngLiveViewerGetNewStreamInfosReply(status
, [])
1159 def _handle_get_tracing_session_infos_command(self
, cmd
):
1160 logging
.info('Handling "get tracing session infos" command.')
1162 tss
.tracing_session_descriptor
.info
for tss
in self
._ts
_states
.values()
1164 infos
.sort(key
=lambda info
: info
.name
)
1165 return _LttngLiveViewerGetTracingSessionInfosReply(infos
)
1167 def _handle_create_viewer_session_command(self
, cmd
):
1168 logging
.info('Handling "create viewer session" command.')
1169 status
= _LttngLiveViewerCreateViewerSessionReply
.Status
.OK
1171 # This does nothing here. In the LTTng relay daemon, it
1172 # allocates the viewer session's state.
1173 return _LttngLiveViewerCreateViewerSessionReply(status
)
1176 # An LTTng live TCP server.
1178 # On creation, it binds to `localhost` with an OS-assigned TCP port. It writes
1179 # the decimal TCP port number to a temporary port file. It renames the
1180 # temporary port file to `port_filename`.
1182 # `tracing_session_descriptors` is a list of tracing session descriptors
1183 # (`LttngTracingSessionDescriptor`) to serve.
1185 # This server accepts a single viewer (client).
1187 # When the viewer closes the connection, the server's constructor
1189 class LttngLiveServer
:
1191 self
, port_filename
, tracing_session_descriptors
, max_query_data_response_size
1193 logging
.info('Server configuration:')
1195 logging
.info(' Port file name: `{}`'.format(port_filename
))
1197 if max_query_data_response_size
is not None:
1199 ' Maximum response data query size: `{}`'.format(
1200 max_query_data_response_size
1204 for ts_descr
in tracing_session_descriptors
:
1205 info
= ts_descr
.info
1206 fmt
= ' TS descriptor: name="{}", id={}, hostname="{}", live-timer-freq={}, client-count={}, stream-count={}:'
1210 info
.tracing_session_id
,
1212 info
.live_timer_freq
,
1218 for trace
in ts_descr
.traces
:
1219 logging
.info(' Trace: path="{}"'.format(trace
.path
))
1221 self
._ts
_descriptors
= tracing_session_descriptors
1222 self
._max
_query
_data
_response
_size
= max_query_data_response_size
1223 self
._sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
1224 self
._codec
= _LttngLiveViewerProtocolCodec()
1226 # Port 0: OS assigns an unused port
1227 serv_addr
= ('localhost', 0)
1228 self
._sock
.bind(serv_addr
)
1229 self
._write
_port
_to
_file
(port_filename
)
1235 logging
.info('Closed connection and socket.')
1238 def _server_port(self
):
1239 return self
._sock
.getsockname()[1]
1241 def _recv_command(self
):
1245 logging
.info('Waiting for viewer command.')
1246 buf
= self
._conn
.recv(128)
1249 logging
.info('Client closed connection.')
1252 raise UnexpectedInput(
1253 'Client closed connection after having sent {} command bytes.'.format(
1260 logging
.info('Received data from viewer: length={}'.format(len(buf
)))
1265 cmd
= self
._codec
.decode(data
)
1266 except struct
.error
as exc
:
1267 raise UnexpectedInput('Malformed command: {}'.format(exc
)) from exc
1271 'Received command from viewer: cmd-cls-name={}'.format(
1272 cmd
.__class
__.__name
__
1277 def _send_reply(self
, reply
):
1278 data
= self
._codec
.encode(reply
)
1280 'Sending reply to viewer: reply-cls-name={}, length={}'.format(
1281 reply
.__class
__.__name
__, len(data
)
1284 self
._conn
.sendall(data
)
1286 def _handle_connection(self
):
1287 # First command must be "connect"
1288 cmd
= self
._recv
_command
()
1290 if type(cmd
) is not _LttngLiveViewerConnectCommand
:
1291 raise UnexpectedInput(
1292 'First command is not "connect": cmd-cls-name={}'.format(
1293 cmd
.__class
__.__name
__
1297 # Create viewer session (arbitrary ID 23)
1299 'LTTng live viewer connected: version={}.{}'.format(cmd
.major
, cmd
.minor
)
1301 viewer_session
= _LttngLiveViewerSession(
1302 23, self
._ts
_descriptors
, self
._max
_query
_data
_response
_size
1305 # Send "connect" reply
1307 _LttngLiveViewerConnectReply(viewer_session
.viewer_session_id
, 2, 10)
1310 # Make the viewer session handle the remaining commands
1312 cmd
= self
._recv
_command
()
1315 # Connection closed (at an expected location within the
1319 self
._send
_reply
(viewer_session
.handle_command(cmd
))
1322 logging
.info('Listening: port={}'.format(self
._server
_port
))
1323 # Backlog must be present for Python version < 3.5.
1324 # 128 is an arbitrary number since we expect only 1 connection anyway.
1325 self
._sock
.listen(128)
1326 self
._conn
, viewer_addr
= self
._sock
.accept()
1328 'Accepted viewer: addr={}:{}'.format(viewer_addr
[0], viewer_addr
[1])
1332 self
._handle
_connection
()
1336 def _write_port_to_file(self
, port_filename
):
1337 # Write the port number to a temporary file.
1338 with tempfile
.NamedTemporaryFile(mode
='w', delete
=False) as tmp_port_file
:
1339 print(self
._server
_port
, end
='', file=tmp_port_file
)
1341 # Rename temporary file to real file
1342 os
.rename(tmp_port_file
.name
, port_filename
)
1344 'Renamed port file: src-path="{}", dst-path="{}"'.format(
1345 tmp_port_file
.name
, port_filename
1350 # A tracing session descriptor.
1352 # In the constructor, `traces` is a list of LTTng traces (`LttngTrace`
1354 class LttngTracingSessionDescriptor
:
1356 self
, name
, tracing_session_id
, hostname
, live_timer_freq
, client_count
, traces
1358 for trace
in traces
:
1359 if name
not in trace
.path
:
1360 fmt
= 'Tracing session name must be part of every trace path (`{}` not found in `{}`)'
1361 raise ValueError(fmt
.format(name
, trace
.path
))
1363 self
._traces
= traces
1364 stream_count
= sum([len(t
) + 1 for t
in traces
])
1365 self
._info
= _LttngLiveViewerTracingSessionInfo(
1383 def _tracing_session_descriptors_from_arg(string
):
1385 # NAME,ID,HOSTNAME,FREQ,CLIENTS,TRACEPATH[,TRACEPATH]...
1386 parts
= string
.split(',')
1388 tracing_session_id
= int(parts
[1])
1390 live_timer_freq
= int(parts
[3])
1391 client_count
= int(parts
[4])
1392 traces
= [LttngTrace(path
) for path
in parts
[5:]]
1393 return LttngTracingSessionDescriptor(
1394 name
, tracing_session_id
, hostname
, live_timer_freq
, client_count
, traces
1398 def _loglevel_parser(string
):
1399 loglevels
= {'info': logging
.INFO
, 'warning': logging
.WARNING
}
1400 if string
not in loglevels
:
1401 msg
= "{} is not a valid loglevel".format(string
)
1402 raise argparse
.ArgumentTypeError(msg
)
1403 return loglevels
[string
]
1406 if __name__
== '__main__':
1407 logging
.basicConfig(format
='# %(asctime)-25s%(message)s')
1408 parser
= argparse
.ArgumentParser(
1409 description
='LTTng-live protocol mocker', add_help
=False
1411 parser
.add_argument(
1414 choices
=['info', 'warning'],
1415 help='The loglevel to be used.',
1418 loglevel_namespace
, remaining_args
= parser
.parse_known_args()
1419 logging
.getLogger().setLevel(_loglevel_parser(loglevel_namespace
.log_level
))
1421 parser
.add_argument(
1423 help='The final port file. This file is present when the server is ready to receive connection.',
1426 parser
.add_argument(
1427 '--max-query-data-response-size',
1429 help='The maximum size of control data response in bytes',
1431 parser
.add_argument(
1435 type=_tracing_session_descriptors_from_arg
,
1436 help='A session configuration. There is no space after comma. Format is: NAME,ID,HOSTNAME,FREQ,CLIENTS,TRACEPATH[,TRACEPATH]....',
1438 parser
.add_argument(
1442 default
=argparse
.SUPPRESS
,
1443 help='Show this help message and exit.',
1446 args
= parser
.parse_args(args
=remaining_args
)
1449 args
.port_filename
, args
.sessions
, args
.max_query_data_response_size
1451 except UnexpectedInput
as exc
:
1452 logging
.error(str(exc
))
1453 print(exc
, file=sys
.stderr
)