1 # SPDX-License-Identifier: MIT
3 # Copyright (C) 2019 Philippe Proulx <pproulx@efficios.com>
18 class UnexpectedInput(RuntimeError):
22 class _LttngLiveViewerCommand
:
23 def __init__(self
, version
):
24 self
._version
= version
31 class _LttngLiveViewerConnectCommand(_LttngLiveViewerCommand
):
32 def __init__(self
, version
, viewer_session_id
, major
, minor
):
33 super().__init
__(version
)
34 self
._viewer
_session
_id
= viewer_session_id
39 def viewer_session_id(self
):
40 return self
._viewer
_session
_id
51 class _LttngLiveViewerConnectReply
:
52 def __init__(self
, viewer_session_id
, major
, minor
):
53 self
._viewer
_session
_id
= viewer_session_id
58 def viewer_session_id(self
):
59 return self
._viewer
_session
_id
70 class _LttngLiveViewerGetTracingSessionInfosCommand(_LttngLiveViewerCommand
):
74 class _LttngLiveViewerTracingSessionInfo
:
84 self
._tracing
_session
_id
= tracing_session_id
85 self
._live
_timer
_freq
= live_timer_freq
86 self
._client
_count
= client_count
87 self
._stream
_count
= stream_count
88 self
._hostname
= hostname
92 def tracing_session_id(self
):
93 return self
._tracing
_session
_id
96 def live_timer_freq(self
):
97 return self
._live
_timer
_freq
100 def client_count(self
):
101 return self
._client
_count
104 def stream_count(self
):
105 return self
._stream
_count
109 return self
._hostname
116 class _LttngLiveViewerGetTracingSessionInfosReply
:
117 def __init__(self
, tracing_session_infos
):
118 self
._tracing
_session
_infos
= tracing_session_infos
121 def tracing_session_infos(self
):
122 return self
._tracing
_session
_infos
125 class _LttngLiveViewerAttachToTracingSessionCommand(_LttngLiveViewerCommand
):
130 def __init__(self
, version
, tracing_session_id
, offset
, seek_type
):
131 super().__init
__(version
)
132 self
._tracing
_session
_id
= tracing_session_id
133 self
._offset
= offset
134 self
._seek
_type
= seek_type
137 def tracing_session_id(self
):
138 return self
._tracing
_session
_id
146 return self
._seek
_type
149 class _LttngLiveViewerStreamInfo
:
150 def __init__(self
, id, trace_id
, is_metadata
, path
, channel_name
):
152 self
._trace
_id
= trace_id
153 self
._is
_metadata
= is_metadata
155 self
._channel
_name
= channel_name
163 return self
._trace
_id
166 def is_metadata(self
):
167 return self
._is
_metadata
174 def channel_name(self
):
175 return self
._channel
_name
178 class _LttngLiveViewerAttachToTracingSessionReply
:
187 def __init__(self
, status
, stream_infos
):
188 self
._status
= status
189 self
._stream
_infos
= stream_infos
196 def stream_infos(self
):
197 return self
._stream
_infos
200 class _LttngLiveViewerGetNextDataStreamIndexEntryCommand(_LttngLiveViewerCommand
):
201 def __init__(self
, version
, stream_id
):
202 super().__init
__(version
)
203 self
._stream
_id
= stream_id
207 return self
._stream
_id
210 class _LttngLiveViewerGetNextDataStreamIndexEntryReply
:
219 def __init__(self
, status
, index_entry
, has_new_metadata
, has_new_data_stream
):
220 self
._status
= status
221 self
._index
_entry
= index_entry
222 self
._has
_new
_metadata
= has_new_metadata
223 self
._has
_new
_data
_stream
= has_new_data_stream
230 def index_entry(self
):
231 return self
._index
_entry
234 def has_new_metadata(self
):
235 return self
._has
_new
_metadata
238 def has_new_data_stream(self
):
239 return self
._has
_new
_data
_stream
242 class _LttngLiveViewerGetDataStreamPacketDataCommand(_LttngLiveViewerCommand
):
243 def __init__(self
, version
, stream_id
, offset
, req_length
):
244 super().__init
__(version
)
245 self
._stream
_id
= stream_id
246 self
._offset
= offset
247 self
._req
_length
= req_length
251 return self
._stream
_id
258 def req_length(self
):
259 return self
._req
_length
262 class _LttngLiveViewerGetDataStreamPacketDataReply
:
269 def __init__(self
, status
, data
, has_new_metadata
, has_new_data_stream
):
270 self
._status
= status
272 self
._has
_new
_metadata
= has_new_metadata
273 self
._has
_new
_data
_stream
= has_new_data_stream
284 def has_new_metadata(self
):
285 return self
._has
_new
_metadata
288 def has_new_data_stream(self
):
289 return self
._has
_new
_data
_stream
292 class _LttngLiveViewerGetMetadataStreamDataCommand(_LttngLiveViewerCommand
):
293 def __init__(self
, version
, stream_id
):
294 super().__init
__(version
)
295 self
._stream
_id
= stream_id
299 return self
._stream
_id
302 class _LttngLiveViewerGetMetadataStreamDataContentReply
:
308 def __init__(self
, status
, data
):
309 self
._status
= status
321 class _LttngLiveViewerGetNewStreamInfosCommand(_LttngLiveViewerCommand
):
322 def __init__(self
, version
, tracing_session_id
):
323 super().__init
__(version
)
324 self
._tracing
_session
_id
= tracing_session_id
327 def tracing_session_id(self
):
328 return self
._tracing
_session
_id
331 class _LttngLiveViewerGetNewStreamInfosReply
:
338 def __init__(self
, status
, stream_infos
):
339 self
._status
= status
340 self
._stream
_infos
= stream_infos
347 def stream_infos(self
):
348 return self
._stream
_infos
351 class _LttngLiveViewerCreateViewerSessionCommand(_LttngLiveViewerCommand
):
355 class _LttngLiveViewerCreateViewerSessionReply
:
360 def __init__(self
, status
):
361 self
._status
= status
368 class _LttngLiveViewerDetachFromTracingSessionCommand(_LttngLiveViewerCommand
):
369 def __init__(self
, version
, tracing_session_id
):
370 super().__init
__(version
)
371 self
._tracing
_session
_id
= tracing_session_id
374 def tracing_session_id(self
):
375 return self
._tracing
_session
_id
378 class _LttngLiveViewerDetachFromTracingSessionReply
:
384 def __init__(self
, status
):
385 self
._status
= status
392 # An LTTng live protocol codec can convert bytes to command objects and
393 # reply objects to bytes.
394 class _LttngLiveViewerProtocolCodec
:
395 _COMMAND_HEADER_STRUCT_FMT
= 'QII'
396 _COMMAND_HEADER_SIZE_BYTES
= struct
.calcsize(_COMMAND_HEADER_STRUCT_FMT
)
401 def _unpack(self
, fmt
, data
, offset
=0):
403 return struct
.unpack_from(fmt
, data
, offset
)
405 def _unpack_payload(self
, fmt
, data
):
407 fmt
, data
, _LttngLiveViewerProtocolCodec
._COMMAND
_HEADER
_SIZE
_BYTES
410 def decode(self
, data
):
411 if len(data
) < self
._COMMAND
_HEADER
_SIZE
_BYTES
:
412 # Not enough data to read the command header
415 payload_size
, cmd_type
, version
= self
._unpack
(
416 self
._COMMAND
_HEADER
_STRUCT
_FMT
, data
419 'Decoded command header: payload-size={}, cmd-type={}, version={}'.format(
420 payload_size
, cmd_type
, version
424 if len(data
) < self
._COMMAND
_HEADER
_SIZE
_BYTES
+ payload_size
:
425 # Not enough data to read the whole command
429 viewer_session_id
, major
, minor
, conn_type
= self
._unpack
_payload
(
432 return _LttngLiveViewerConnectCommand(
433 version
, viewer_session_id
, major
, minor
436 return _LttngLiveViewerGetTracingSessionInfosCommand(version
)
438 tracing_session_id
, offset
, seek_type
= self
._unpack
_payload
('QQI', data
)
439 return _LttngLiveViewerAttachToTracingSessionCommand(
440 version
, tracing_session_id
, offset
, seek_type
443 (stream_id
,) = self
._unpack
_payload
('Q', data
)
444 return _LttngLiveViewerGetNextDataStreamIndexEntryCommand(
448 stream_id
, offset
, req_length
= self
._unpack
_payload
('QQI', data
)
449 return _LttngLiveViewerGetDataStreamPacketDataCommand(
450 version
, stream_id
, offset
, req_length
453 (stream_id
,) = self
._unpack
_payload
('Q', data
)
454 return _LttngLiveViewerGetMetadataStreamDataCommand(version
, stream_id
)
456 (tracing_session_id
,) = self
._unpack
_payload
('Q', data
)
457 return _LttngLiveViewerGetNewStreamInfosCommand(version
, tracing_session_id
)
459 return _LttngLiveViewerCreateViewerSessionCommand(version
)
461 (tracing_session_id
,) = self
._unpack
_payload
('Q', data
)
462 return _LttngLiveViewerDetachFromTracingSessionCommand(
463 version
, tracing_session_id
466 raise UnexpectedInput('Unknown command type {}'.format(cmd_type
))
468 def _pack(self
, fmt
, *args
):
469 # Force network byte order
470 return struct
.pack('!' + fmt
, *args
)
472 def _encode_zero_padded_str(self
, string
, length
):
473 data
= string
.encode()
474 return data
.ljust(length
, b
'\x00')
476 def _encode_stream_info(self
, info
):
477 data
= self
._pack
('QQI', info
.id, info
.trace_id
, int(info
.is_metadata
))
478 data
+= self
._encode
_zero
_padded
_str
(info
.path
, 4096)
479 data
+= self
._encode
_zero
_padded
_str
(info
.channel_name
, 255)
482 def _get_has_new_stuff_flags(self
, has_new_metadata
, has_new_data_streams
):
488 if has_new_data_streams
:
493 def encode(self
, reply
):
494 if type(reply
) is _LttngLiveViewerConnectReply
:
496 'QIII', reply
.viewer_session_id
, reply
.major
, reply
.minor
, 2
498 elif type(reply
) is _LttngLiveViewerGetTracingSessionInfosReply
:
499 data
= self
._pack
('I', len(reply
.tracing_session_infos
))
501 for info
in reply
.tracing_session_infos
:
504 info
.tracing_session_id
,
505 info
.live_timer_freq
,
509 data
+= self
._encode
_zero
_padded
_str
(info
.hostname
, 64)
510 data
+= self
._encode
_zero
_padded
_str
(info
.name
, 255)
511 elif type(reply
) is _LttngLiveViewerAttachToTracingSessionReply
:
512 data
= self
._pack
('II', reply
.status
, len(reply
.stream_infos
))
514 for info
in reply
.stream_infos
:
515 data
+= self
._encode
_stream
_info
(info
)
516 elif type(reply
) is _LttngLiveViewerGetNextDataStreamIndexEntryReply
:
517 entry
= reply
.index_entry
518 flags
= self
._get
_has
_new
_stuff
_flags
(
519 reply
.has_new_metadata
, reply
.has_new_data_stream
525 entry
.total_size_bits
,
526 entry
.content_size_bits
,
527 entry
.timestamp_begin
,
529 entry
.events_discarded
,
530 entry
.stream_class_id
,
534 elif type(reply
) is _LttngLiveViewerGetDataStreamPacketDataReply
:
535 flags
= self
._get
_has
_new
_stuff
_flags
(
536 reply
.has_new_metadata
, reply
.has_new_data_stream
538 data
= self
._pack
('III', reply
.status
, len(reply
.data
), flags
)
540 elif type(reply
) is _LttngLiveViewerGetMetadataStreamDataContentReply
:
541 data
= self
._pack
('QI', len(reply
.data
), reply
.status
)
543 elif type(reply
) is _LttngLiveViewerGetNewStreamInfosReply
:
544 data
= self
._pack
('II', reply
.status
, len(reply
.stream_infos
))
546 for info
in reply
.stream_infos
:
547 data
+= self
._encode
_stream
_info
(info
)
548 elif type(reply
) is _LttngLiveViewerCreateViewerSessionReply
:
549 data
= self
._pack
('I', reply
.status
)
550 elif type(reply
) is _LttngLiveViewerDetachFromTracingSessionReply
:
551 data
= self
._pack
('I', reply
.status
)
554 'Unknown reply object with class `{}`'.format(reply
.__class
__.__name
__)
560 # An entry within the index of an LTTng data stream.
561 class _LttngDataStreamIndexEntry
:
572 self
._offset
_bytes
= offset_bytes
573 self
._total
_size
_bits
= total_size_bits
574 self
._content
_size
_bits
= content_size_bits
575 self
._timestamp
_begin
= timestamp_begin
576 self
._timestamp
_end
= timestamp_end
577 self
._events
_discarded
= events_discarded
578 self
._stream
_class
_id
= stream_class_id
581 def offset_bytes(self
):
582 return self
._offset
_bytes
585 def total_size_bits(self
):
586 return self
._total
_size
_bits
589 def total_size_bytes(self
):
590 return self
._total
_size
_bits
// 8
593 def content_size_bits(self
):
594 return self
._content
_size
_bits
597 def content_size_bytes(self
):
598 return self
._content
_size
_bits
// 8
601 def timestamp_begin(self
):
602 return self
._timestamp
_begin
605 def timestamp_end(self
):
606 return self
._timestamp
_end
609 def events_discarded(self
):
610 return self
._events
_discarded
613 def stream_class_id(self
):
614 return self
._stream
_class
_id
617 # The index of an LTTng data stream, a sequence of index entries.
618 class _LttngDataStreamIndex(collections
.abc
.Sequence
):
619 def __init__(self
, path
):
623 'Built data stream index entries: path="{}", count={}'.format(
624 path
, len(self
._entries
)
630 assert os
.path
.isfile(self
._path
)
632 with
open(self
._path
, 'rb') as f
:
635 size
= struct
.calcsize(fmt
)
637 assert len(data
) == size
638 magic
, index_major
, index_minor
, index_entry_length
= struct
.unpack(
641 assert magic
== 0xC1F1DCC1
645 size
= struct
.calcsize(fmt
)
649 'Decoding data stream index entry: path="{}", offset={}'.format(
659 assert len(data
) == size
668 ) = struct
.unpack(fmt
, data
)
670 self
._entries
.append(
671 _LttngDataStreamIndexEntry(
682 # Skip anything else before the next entry
683 f
.seek(index_entry_length
- size
, os
.SEEK_CUR
)
685 def __getitem__(self
, index
):
686 return self
._entries
[index
]
689 return len(self
._entries
)
696 # An LTTng data stream.
697 class _LttngDataStream
:
698 def __init__(self
, path
):
700 filename
= os
.path
.basename(path
)
701 match
= re
.match(r
'(.*)_\d+', filename
)
702 self
._channel
_name
= match
.group(1)
703 trace_dir
= os
.path
.dirname(path
)
704 index_path
= os
.path
.join(trace_dir
, 'index', filename
+ '.idx')
705 self
._index
= _LttngDataStreamIndex(index_path
)
706 assert os
.path
.isfile(path
)
707 self
._file
= open(path
, 'rb')
709 'Built data stream: path="{}", channel-name="{}"'.format(
710 path
, self
._channel
_name
719 def channel_name(self
):
720 return self
._channel
_name
726 def get_data(self
, offset_bytes
, len_bytes
):
727 self
._file
.seek(offset_bytes
)
728 return self
._file
.read(len_bytes
)
731 # An LTTng metadata stream.
732 class _LttngMetadataStream
:
733 def __init__(self
, path
):
735 logging
.info('Built metadata stream: path="{}"'.format(path
))
743 assert os
.path
.isfile(self
._path
)
745 with
open(self
._path
, 'rb') as f
:
749 # An LTTng trace, a sequence of LTTng data streams.
750 class LttngTrace(collections
.abc
.Sequence
):
751 def __init__(self
, trace_dir
):
752 assert os
.path
.isdir(trace_dir
)
753 self
._path
= trace_dir
754 self
._metadata
_stream
= _LttngMetadataStream(
755 os
.path
.join(trace_dir
, 'metadata')
757 self
._create
_data
_streams
(trace_dir
)
758 logging
.info('Built trace: path="{}"'.format(trace_dir
))
760 def _create_data_streams(self
, trace_dir
):
761 data_stream_paths
= []
763 for filename
in os
.listdir(trace_dir
):
764 path
= os
.path
.join(trace_dir
, filename
)
766 if not os
.path
.isfile(path
):
769 if filename
.startswith('.'):
772 if filename
== 'metadata':
775 data_stream_paths
.append(path
)
777 data_stream_paths
.sort()
778 self
._data
_streams
= []
780 for data_stream_path
in data_stream_paths
:
781 self
._data
_streams
.append(_LttngDataStream(data_stream_path
))
788 def metadata_stream(self
):
789 return self
._metadata
_stream
791 def __getitem__(self
, index
):
792 return self
._data
_streams
[index
]
795 return len(self
._data
_streams
)
798 # The state of a single data stream.
799 class _LttngLiveViewerSessionDataStreamState
:
800 def __init__(self
, ts_state
, info
, data_stream
):
801 self
._ts
_state
= ts_state
803 self
._data
_stream
= data_stream
804 self
._cur
_index
_entry
_index
= 0
805 fmt
= 'Built data stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
809 ts_state
.tracing_session_descriptor
.info
.tracing_session_id
,
810 ts_state
.tracing_session_descriptor
.info
.name
,
816 def tracing_session_state(self
):
817 return self
._ts
_state
824 def data_stream(self
):
825 return self
._data
_stream
828 def cur_index_entry(self
):
829 if self
._cur
_index
_entry
_index
== len(self
._data
_stream
.index
):
832 return self
._data
_stream
.index
[self
._cur
_index
_entry
_index
]
834 def goto_next_index_entry(self
):
835 self
._cur
_index
_entry
_index
+= 1
838 # The state of a single metadata stream.
839 class _LttngLiveViewerSessionMetadataStreamState
:
840 def __init__(self
, ts_state
, info
, metadata_stream
):
841 self
._ts
_state
= ts_state
843 self
._metadata
_stream
= metadata_stream
844 self
._is
_sent
= False
845 fmt
= 'Built metadata stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
849 ts_state
.tracing_session_descriptor
.info
.tracing_session_id
,
850 ts_state
.tracing_session_descriptor
.info
.name
,
851 metadata_stream
.path
,
856 def trace_session_state(self
):
857 return self
._trace
_session
_state
864 def metadata_stream(self
):
865 return self
._metadata
_stream
872 def is_sent(self
, value
):
873 self
._is
_sent
= value
876 # The state of a tracing session.
877 class _LttngLiveViewerSessionTracingSessionState
:
878 def __init__(self
, tc_descr
, base_stream_id
):
879 self
._tc
_descr
= tc_descr
880 self
._stream
_infos
= []
883 stream_id
= base_stream_id
885 for trace
in tc_descr
.traces
:
886 trace_id
= stream_id
* 1000
888 # Data streams -> stream infos and data stream states
889 for data_stream
in trace
:
890 info
= _LttngLiveViewerStreamInfo(
895 data_stream
.channel_name
,
897 self
._stream
_infos
.append(info
)
898 self
._ds
_states
[stream_id
] = _LttngLiveViewerSessionDataStreamState(
899 self
, info
, data_stream
903 # Metadata stream -> stream info and metadata stream state
904 info
= _LttngLiveViewerStreamInfo(
905 stream_id
, trace_id
, True, trace
.metadata_stream
.path
, 'metadata'
907 self
._stream
_infos
.append(info
)
908 self
._ms
_states
[stream_id
] = _LttngLiveViewerSessionMetadataStreamState(
909 self
, info
, trace
.metadata_stream
913 self
._is
_attached
= False
914 fmt
= 'Built tracing session state: id={}, name="{}"'
915 logging
.info(fmt
.format(tc_descr
.info
.tracing_session_id
, tc_descr
.info
.name
))
918 def tracing_session_descriptor(self
):
919 return self
._tc
_descr
922 def data_stream_states(self
):
923 return self
._ds
_states
926 def metadata_stream_states(self
):
927 return self
._ms
_states
930 def stream_infos(self
):
931 return self
._stream
_infos
934 def has_new_metadata(self
):
935 return any([not ms
.is_sent
for ms
in self
._ms
_states
.values()])
938 def is_attached(self
):
939 return self
._is
_attached
942 def is_attached(self
, value
):
943 self
._is
_attached
= value
946 # An LTTng live viewer session manages a view on tracing sessions
947 # and replies to commands accordingly.
948 class _LttngLiveViewerSession
:
952 tracing_session_descriptors
,
953 max_query_data_response_size
,
955 self
._viewer
_session
_id
= viewer_session_id
957 self
._stream
_states
= {}
958 self
._max
_query
_data
_response
_size
= max_query_data_response_size
959 total_stream_infos
= 0
961 for ts_descr
in tracing_session_descriptors
:
962 ts_state
= _LttngLiveViewerSessionTracingSessionState(
963 ts_descr
, total_stream_infos
965 ts_id
= ts_state
.tracing_session_descriptor
.info
.tracing_session_id
966 self
._ts
_states
[ts_id
] = ts_state
967 total_stream_infos
+= len(ts_state
.stream_infos
)
969 # Update session's stream states to have the new states
970 self
._stream
_states
.update(ts_state
.data_stream_states
)
971 self
._stream
_states
.update(ts_state
.metadata_stream_states
)
973 self
._command
_handlers
= {
974 _LttngLiveViewerAttachToTracingSessionCommand
: self
._handle
_attach
_to
_tracing
_session
_command
,
975 _LttngLiveViewerCreateViewerSessionCommand
: self
._handle
_create
_viewer
_session
_command
,
976 _LttngLiveViewerDetachFromTracingSessionCommand
: self
._handle
_detach
_from
_tracing
_session
_command
,
977 _LttngLiveViewerGetDataStreamPacketDataCommand
: self
._handle
_get
_data
_stream
_packet
_data
_command
,
978 _LttngLiveViewerGetMetadataStreamDataCommand
: self
._handle
_get
_metadata
_stream
_data
_command
,
979 _LttngLiveViewerGetNewStreamInfosCommand
: self
._handle
_get
_new
_stream
_infos
_command
,
980 _LttngLiveViewerGetNextDataStreamIndexEntryCommand
: self
._handle
_get
_next
_data
_stream
_index
_entry
_command
,
981 _LttngLiveViewerGetTracingSessionInfosCommand
: self
._handle
_get
_tracing
_session
_infos
_command
,
985 def viewer_session_id(self
):
986 return self
._viewer
_session
_id
988 def _get_tracing_session_state(self
, tracing_session_id
):
989 if tracing_session_id
not in self
._ts
_states
:
990 raise UnexpectedInput(
991 'Unknown tracing session ID {}'.format(tracing_session_id
)
994 return self
._ts
_states
[tracing_session_id
]
996 def _get_stream_state(self
, stream_id
):
997 if stream_id
not in self
._stream
_states
:
998 UnexpectedInput('Unknown stream ID {}'.format(stream_id
))
1000 return self
._stream
_states
[stream_id
]
1002 def handle_command(self
, cmd
):
1004 'Handling command in viewer session: cmd-cls-name={}'.format(
1005 cmd
.__class
__.__name
__
1008 cmd_type
= type(cmd
)
1010 if cmd_type
not in self
._command
_handlers
:
1011 raise UnexpectedInput(
1012 'Unexpected command: cmd-cls-name={}'.format(cmd
.__class
__.__name
__)
1015 return self
._command
_handlers
[cmd_type
](cmd
)
1017 def _handle_attach_to_tracing_session_command(self
, cmd
):
1018 fmt
= 'Handling "attach to tracing session" command: ts-id={}, offset={}, seek-type={}'
1019 logging
.info(fmt
.format(cmd
.tracing_session_id
, cmd
.offset
, cmd
.seek_type
))
1020 ts_state
= self
._get
_tracing
_session
_state
(cmd
.tracing_session_id
)
1021 info
= ts_state
.tracing_session_descriptor
.info
1023 if ts_state
.is_attached
:
1024 raise UnexpectedInput(
1025 'Cannot attach to tracing session `{}`: viewer is already attached'.format(
1030 ts_state
.is_attached
= True
1031 status
= _LttngLiveViewerAttachToTracingSessionReply
.Status
.OK
1032 return _LttngLiveViewerAttachToTracingSessionReply(
1033 status
, ts_state
.stream_infos
1036 def _handle_detach_from_tracing_session_command(self
, cmd
):
1037 fmt
= 'Handling "detach from tracing session" command: ts-id={}'
1038 logging
.info(fmt
.format(cmd
.tracing_session_id
))
1039 ts_state
= self
._get
_tracing
_session
_state
(cmd
.tracing_session_id
)
1040 info
= ts_state
.tracing_session_descriptor
.info
1042 if not ts_state
.is_attached
:
1043 raise UnexpectedInput(
1044 'Cannot detach to tracing session `{}`: viewer is not attached'.format(
1049 ts_state
.is_attached
= False
1050 status
= _LttngLiveViewerDetachFromTracingSessionReply
.Status
.OK
1051 return _LttngLiveViewerDetachFromTracingSessionReply(status
)
1053 def _handle_get_next_data_stream_index_entry_command(self
, cmd
):
1054 fmt
= 'Handling "get next data stream index entry" command: stream-id={}'
1055 logging
.info(fmt
.format(cmd
.stream_id
))
1056 stream_state
= self
._get
_stream
_state
(cmd
.stream_id
)
1058 if type(stream_state
) is not _LttngLiveViewerSessionDataStreamState
:
1059 raise UnexpectedInput(
1060 'Stream with ID {} is not a data stream'.format(cmd
.stream_id
)
1063 if stream_state
.cur_index_entry
is None:
1064 # The viewer is done reading this stream
1065 status
= _LttngLiveViewerGetNextDataStreamIndexEntryReply
.Status
.HUP
1067 # Dummy data stream index entry to use with the `HUP` status
1068 # (the reply needs one, but the viewer ignores it)
1069 index_entry
= _LttngDataStreamIndexEntry(0, 0, 0, 0, 0, 0, 0)
1071 return _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1072 status
, index_entry
, False, False
1075 # The viewer only checks the `has_new_metadata` flag if the
1076 # reply's status is `OK`, so we need to provide an index here
1077 has_new_metadata
= stream_state
.tracing_session_state
.has_new_metadata
1078 status
= _LttngLiveViewerGetNextDataStreamIndexEntryReply
.Status
.OK
1079 reply
= _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1080 status
, stream_state
.cur_index_entry
, has_new_metadata
, False
1082 stream_state
.goto_next_index_entry()
1085 def _handle_get_data_stream_packet_data_command(self
, cmd
):
1086 fmt
= 'Handling "get data stream packet data" command: stream-id={}, offset={}, req-length={}'
1087 logging
.info(fmt
.format(cmd
.stream_id
, cmd
.offset
, cmd
.req_length
))
1088 stream_state
= self
._get
_stream
_state
(cmd
.stream_id
)
1089 data_response_length
= cmd
.req_length
1091 if type(stream_state
) is not _LttngLiveViewerSessionDataStreamState
:
1092 raise UnexpectedInput(
1093 'Stream with ID {} is not a data stream'.format(cmd
.stream_id
)
1096 if stream_state
.tracing_session_state
.has_new_metadata
:
1097 status
= _LttngLiveViewerGetDataStreamPacketDataReply
.Status
.ERROR
1098 return _LttngLiveViewerGetDataStreamPacketDataReply(
1099 status
, bytes(), True, False
1102 if self
._max
_query
_data
_response
_size
:
1103 # Enforce a server side limit on the query requested length.
1104 # To ensure that the transaction terminate take the minimum of both
1106 data_response_length
= min(
1107 cmd
.req_length
, self
._max
_query
_data
_response
_size
1109 fmt
= 'Limiting "get data stream packet data" command: req-length={} actual response size={}'
1110 logging
.info(fmt
.format(cmd
.req_length
, data_response_length
))
1112 data
= stream_state
.data_stream
.get_data(cmd
.offset
, data_response_length
)
1113 status
= _LttngLiveViewerGetDataStreamPacketDataReply
.Status
.OK
1114 return _LttngLiveViewerGetDataStreamPacketDataReply(status
, data
, False, False)
1116 def _handle_get_metadata_stream_data_command(self
, cmd
):
1117 fmt
= 'Handling "get metadata stream data" command: stream-id={}'
1118 logging
.info(fmt
.format(cmd
.stream_id
))
1119 stream_state
= self
._get
_stream
_state
(cmd
.stream_id
)
1121 if type(stream_state
) is not _LttngLiveViewerSessionMetadataStreamState
:
1122 raise UnexpectedInput(
1123 'Stream with ID {} is not a metadata stream'.format(cmd
.stream_id
)
1126 if stream_state
.is_sent
:
1127 status
= _LttngLiveViewerGetMetadataStreamDataContentReply
.Status
.NO_NEW
1128 return _LttngLiveViewerGetMetadataStreamDataContentReply(status
, bytes())
1130 stream_state
.is_sent
= True
1131 status
= _LttngLiveViewerGetMetadataStreamDataContentReply
.Status
.OK
1132 return _LttngLiveViewerGetMetadataStreamDataContentReply(
1133 status
, stream_state
.metadata_stream
.data
1136 def _handle_get_new_stream_infos_command(self
, cmd
):
1137 fmt
= 'Handling "get new stream infos" command: ts-id={}'
1138 logging
.info(fmt
.format(cmd
.tracing_session_id
))
1140 # As of this version, all the tracing session's stream infos are
1141 # always given to the viewer when sending the "attach to tracing
1142 # session" reply, so there's nothing new here. Return the `HUP`
1143 # status as, if we're handling this command, the viewer consumed
1144 # all the existing data streams.
1145 status
= _LttngLiveViewerGetNewStreamInfosReply
.Status
.HUP
1146 return _LttngLiveViewerGetNewStreamInfosReply(status
, [])
1148 def _handle_get_tracing_session_infos_command(self
, cmd
):
1149 logging
.info('Handling "get tracing session infos" command.')
1151 tss
.tracing_session_descriptor
.info
for tss
in self
._ts
_states
.values()
1153 infos
.sort(key
=lambda info
: info
.name
)
1154 return _LttngLiveViewerGetTracingSessionInfosReply(infos
)
1156 def _handle_create_viewer_session_command(self
, cmd
):
1157 logging
.info('Handling "create viewer session" command.')
1158 status
= _LttngLiveViewerCreateViewerSessionReply
.Status
.OK
1160 # This does nothing here. In the LTTng relay daemon, it
1161 # allocates the viewer session's state.
1162 return _LttngLiveViewerCreateViewerSessionReply(status
)
1165 # An LTTng live TCP server.
1167 # On creation, it binds to `localhost` with an OS-assigned TCP port. It writes
1168 # the decimal TCP port number to a temporary port file. It renames the
1169 # temporary port file to `port_filename`.
1171 # `tracing_session_descriptors` is a list of tracing session descriptors
1172 # (`LttngTracingSessionDescriptor`) to serve.
1174 # This server accepts a single viewer (client).
1176 # When the viewer closes the connection, the server's constructor
1178 class LttngLiveServer
:
1180 self
, port_filename
, tracing_session_descriptors
, max_query_data_response_size
1182 logging
.info('Server configuration:')
1184 logging
.info(' Port file name: `{}`'.format(port_filename
))
1186 if max_query_data_response_size
is not None:
1188 ' Maximum response data query size: `{}`'.format(
1189 max_query_data_response_size
1193 for ts_descr
in tracing_session_descriptors
:
1194 info
= ts_descr
.info
1195 fmt
= ' TS descriptor: name="{}", id={}, hostname="{}", live-timer-freq={}, client-count={}, stream-count={}:'
1199 info
.tracing_session_id
,
1201 info
.live_timer_freq
,
1207 for trace
in ts_descr
.traces
:
1208 logging
.info(' Trace: path="{}"'.format(trace
.path
))
1210 self
._ts
_descriptors
= tracing_session_descriptors
1211 self
._max
_query
_data
_response
_size
= max_query_data_response_size
1212 self
._sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
1213 self
._codec
= _LttngLiveViewerProtocolCodec()
1215 # Port 0: OS assigns an unused port
1216 serv_addr
= ('localhost', 0)
1217 self
._sock
.bind(serv_addr
)
1218 self
._write
_port
_to
_file
(port_filename
)
1224 logging
.info('Closed connection and socket.')
1227 def _server_port(self
):
1228 return self
._sock
.getsockname()[1]
1230 def _recv_command(self
):
1234 logging
.info('Waiting for viewer command.')
1235 buf
= self
._conn
.recv(128)
1238 logging
.info('Client closed connection.')
1241 raise UnexpectedInput(
1242 'Client closed connection after having sent {} command bytes.'.format(
1249 logging
.info('Received data from viewer: length={}'.format(len(buf
)))
1254 cmd
= self
._codec
.decode(data
)
1255 except struct
.error
as exc
:
1256 raise UnexpectedInput('Malformed command: {}'.format(exc
)) from exc
1260 'Received command from viewer: cmd-cls-name={}'.format(
1261 cmd
.__class
__.__name
__
1266 def _send_reply(self
, reply
):
1267 data
= self
._codec
.encode(reply
)
1269 'Sending reply to viewer: reply-cls-name={}, length={}'.format(
1270 reply
.__class
__.__name
__, len(data
)
1273 self
._conn
.sendall(data
)
1275 def _handle_connection(self
):
1276 # First command must be "connect"
1277 cmd
= self
._recv
_command
()
1279 if type(cmd
) is not _LttngLiveViewerConnectCommand
:
1280 raise UnexpectedInput(
1281 'First command is not "connect": cmd-cls-name={}'.format(
1282 cmd
.__class
__.__name
__
1286 # Create viewer session (arbitrary ID 23)
1288 'LTTng live viewer connected: version={}.{}'.format(cmd
.major
, cmd
.minor
)
1290 viewer_session
= _LttngLiveViewerSession(
1291 23, self
._ts
_descriptors
, self
._max
_query
_data
_response
_size
1294 # Send "connect" reply
1296 _LttngLiveViewerConnectReply(viewer_session
.viewer_session_id
, 2, 10)
1299 # Make the viewer session handle the remaining commands
1301 cmd
= self
._recv
_command
()
1304 # Connection closed (at an expected location within the
1308 self
._send
_reply
(viewer_session
.handle_command(cmd
))
1311 logging
.info('Listening: port={}'.format(self
._server
_port
))
1312 # Backlog must be present for Python version < 3.5.
1313 # 128 is an arbitrary number since we expect only 1 connection anyway.
1314 self
._sock
.listen(128)
1315 self
._conn
, viewer_addr
= self
._sock
.accept()
1317 'Accepted viewer: addr={}:{}'.format(viewer_addr
[0], viewer_addr
[1])
1321 self
._handle
_connection
()
1325 def _write_port_to_file(self
, port_filename
):
1326 # Write the port number to a temporary file.
1327 with tempfile
.NamedTemporaryFile(mode
='w', delete
=False) as tmp_port_file
:
1328 print(self
._server
_port
, end
='', file=tmp_port_file
)
1330 # Rename temporary file to real file
1331 os
.replace(tmp_port_file
.name
, port_filename
)
1333 'Renamed port file: src-path="{}", dst-path="{}"'.format(
1334 tmp_port_file
.name
, port_filename
1339 # A tracing session descriptor.
1341 # In the constructor, `traces` is a list of LTTng traces (`LttngTrace`
1343 class LttngTracingSessionDescriptor
:
1345 self
, name
, tracing_session_id
, hostname
, live_timer_freq
, client_count
, traces
1347 for trace
in traces
:
1348 if name
not in trace
.path
:
1349 fmt
= 'Tracing session name must be part of every trace path (`{}` not found in `{}`)'
1350 raise ValueError(fmt
.format(name
, trace
.path
))
1352 self
._traces
= traces
1353 stream_count
= sum([len(t
) + 1 for t
in traces
])
1354 self
._info
= _LttngLiveViewerTracingSessionInfo(
1372 def _tracing_session_descriptors_from_arg(string
):
1374 # NAME,ID,HOSTNAME,FREQ,CLIENTS,TRACEPATH[,TRACEPATH]...
1375 parts
= string
.split(',')
1377 tracing_session_id
= int(parts
[1])
1379 live_timer_freq
= int(parts
[3])
1380 client_count
= int(parts
[4])
1381 traces
= [LttngTrace(path
) for path
in parts
[5:]]
1382 return LttngTracingSessionDescriptor(
1383 name
, tracing_session_id
, hostname
, live_timer_freq
, client_count
, traces
1387 def _loglevel_parser(string
):
1388 loglevels
= {'info': logging
.INFO
, 'warning': logging
.WARNING
}
1389 if string
not in loglevels
:
1390 msg
= "{} is not a valid loglevel".format(string
)
1391 raise argparse
.ArgumentTypeError(msg
)
1392 return loglevels
[string
]
1395 if __name__
== '__main__':
1396 logging
.basicConfig(format
='# %(asctime)-25s%(message)s')
1397 parser
= argparse
.ArgumentParser(
1398 description
='LTTng-live protocol mocker', add_help
=False
1400 parser
.add_argument(
1403 choices
=['info', 'warning'],
1404 help='The loglevel to be used.',
1407 loglevel_namespace
, remaining_args
= parser
.parse_known_args()
1408 logging
.getLogger().setLevel(_loglevel_parser(loglevel_namespace
.log_level
))
1410 parser
.add_argument(
1412 help='The final port file. This file is present when the server is ready to receive connection.',
1415 parser
.add_argument(
1416 '--max-query-data-response-size',
1418 help='The maximum size of control data response in bytes',
1420 parser
.add_argument(
1424 type=_tracing_session_descriptors_from_arg
,
1425 help='A session configuration. There is no space after comma. Format is: NAME,ID,HOSTNAME,FREQ,CLIENTS,TRACEPATH[,TRACEPATH]....',
1427 parser
.add_argument(
1431 default
=argparse
.SUPPRESS
,
1432 help='Show this help message and exit.',
1435 args
= parser
.parse_args(args
=remaining_args
)
1438 args
.port_filename
, args
.sessions
, args
.max_query_data_response_size
1440 except UnexpectedInput
as exc
:
1441 logging
.error(str(exc
))
1442 print(exc
, file=sys
.stderr
)