1 # SPDX-License-Identifier: MIT
3 # Copyright (C) 2019 Philippe Proulx <pproulx@efficios.com>
19 class UnexpectedInput(RuntimeError):
23 class _LttngLiveViewerCommand
:
24 def __init__(self
, version
):
25 self
._version
= version
32 class _LttngLiveViewerConnectCommand(_LttngLiveViewerCommand
):
33 def __init__(self
, version
, viewer_session_id
, major
, minor
):
34 super().__init
__(version
)
35 self
._viewer
_session
_id
= viewer_session_id
40 def viewer_session_id(self
):
41 return self
._viewer
_session
_id
52 class _LttngLiveViewerConnectReply
:
53 def __init__(self
, viewer_session_id
, major
, minor
):
54 self
._viewer
_session
_id
= viewer_session_id
59 def viewer_session_id(self
):
60 return self
._viewer
_session
_id
71 class _LttngLiveViewerGetTracingSessionInfosCommand(_LttngLiveViewerCommand
):
75 class _LttngLiveViewerTracingSessionInfo
:
85 self
._tracing
_session
_id
= tracing_session_id
86 self
._live
_timer
_freq
= live_timer_freq
87 self
._client
_count
= client_count
88 self
._stream
_count
= stream_count
89 self
._hostname
= hostname
93 def tracing_session_id(self
):
94 return self
._tracing
_session
_id
97 def live_timer_freq(self
):
98 return self
._live
_timer
_freq
101 def client_count(self
):
102 return self
._client
_count
105 def stream_count(self
):
106 return self
._stream
_count
110 return self
._hostname
117 class _LttngLiveViewerGetTracingSessionInfosReply
:
118 def __init__(self
, tracing_session_infos
):
119 self
._tracing
_session
_infos
= tracing_session_infos
122 def tracing_session_infos(self
):
123 return self
._tracing
_session
_infos
126 class _LttngLiveViewerAttachToTracingSessionCommand(_LttngLiveViewerCommand
):
131 def __init__(self
, version
, tracing_session_id
, offset
, seek_type
):
132 super().__init
__(version
)
133 self
._tracing
_session
_id
= tracing_session_id
134 self
._offset
= offset
135 self
._seek
_type
= seek_type
138 def tracing_session_id(self
):
139 return self
._tracing
_session
_id
147 return self
._seek
_type
150 class _LttngLiveViewerStreamInfo
:
151 def __init__(self
, id, trace_id
, is_metadata
, path
, channel_name
):
153 self
._trace
_id
= trace_id
154 self
._is
_metadata
= is_metadata
156 self
._channel
_name
= channel_name
164 return self
._trace
_id
167 def is_metadata(self
):
168 return self
._is
_metadata
175 def channel_name(self
):
176 return self
._channel
_name
179 class _LttngLiveViewerAttachToTracingSessionReply
:
188 def __init__(self
, status
, stream_infos
):
189 self
._status
= status
190 self
._stream
_infos
= stream_infos
197 def stream_infos(self
):
198 return self
._stream
_infos
201 class _LttngLiveViewerGetNextDataStreamIndexEntryCommand(_LttngLiveViewerCommand
):
202 def __init__(self
, version
, stream_id
):
203 super().__init
__(version
)
204 self
._stream
_id
= stream_id
208 return self
._stream
_id
211 class _LttngLiveViewerGetNextDataStreamIndexEntryReply
:
220 def __init__(self
, status
, index_entry
, has_new_metadata
, has_new_data_stream
):
221 self
._status
= status
222 self
._index
_entry
= index_entry
223 self
._has
_new
_metadata
= has_new_metadata
224 self
._has
_new
_data
_stream
= has_new_data_stream
231 def index_entry(self
):
232 return self
._index
_entry
235 def has_new_metadata(self
):
236 return self
._has
_new
_metadata
239 def has_new_data_stream(self
):
240 return self
._has
_new
_data
_stream
243 class _LttngLiveViewerGetDataStreamPacketDataCommand(_LttngLiveViewerCommand
):
244 def __init__(self
, version
, stream_id
, offset
, req_length
):
245 super().__init
__(version
)
246 self
._stream
_id
= stream_id
247 self
._offset
= offset
248 self
._req
_length
= req_length
252 return self
._stream
_id
259 def req_length(self
):
260 return self
._req
_length
263 class _LttngLiveViewerGetDataStreamPacketDataReply
:
270 def __init__(self
, status
, data
, has_new_metadata
, has_new_data_stream
):
271 self
._status
= status
273 self
._has
_new
_metadata
= has_new_metadata
274 self
._has
_new
_data
_stream
= has_new_data_stream
285 def has_new_metadata(self
):
286 return self
._has
_new
_metadata
289 def has_new_data_stream(self
):
290 return self
._has
_new
_data
_stream
293 class _LttngLiveViewerGetMetadataStreamDataCommand(_LttngLiveViewerCommand
):
294 def __init__(self
, version
, stream_id
):
295 super().__init
__(version
)
296 self
._stream
_id
= stream_id
300 return self
._stream
_id
303 class _LttngLiveViewerGetMetadataStreamDataContentReply
:
309 def __init__(self
, status
, data
):
310 self
._status
= status
322 class _LttngLiveViewerGetNewStreamInfosCommand(_LttngLiveViewerCommand
):
323 def __init__(self
, version
, tracing_session_id
):
324 super().__init
__(version
)
325 self
._tracing
_session
_id
= tracing_session_id
328 def tracing_session_id(self
):
329 return self
._tracing
_session
_id
332 class _LttngLiveViewerGetNewStreamInfosReply
:
339 def __init__(self
, status
, stream_infos
):
340 self
._status
= status
341 self
._stream
_infos
= stream_infos
348 def stream_infos(self
):
349 return self
._stream
_infos
352 class _LttngLiveViewerCreateViewerSessionCommand(_LttngLiveViewerCommand
):
356 class _LttngLiveViewerCreateViewerSessionReply
:
361 def __init__(self
, status
):
362 self
._status
= status
369 class _LttngLiveViewerDetachFromTracingSessionCommand(_LttngLiveViewerCommand
):
370 def __init__(self
, version
, tracing_session_id
):
371 super().__init
__(version
)
372 self
._tracing
_session
_id
= tracing_session_id
375 def tracing_session_id(self
):
376 return self
._tracing
_session
_id
379 class _LttngLiveViewerDetachFromTracingSessionReply
:
385 def __init__(self
, status
):
386 self
._status
= status
393 # An LTTng live protocol codec can convert bytes to command objects and
394 # reply objects to bytes.
395 class _LttngLiveViewerProtocolCodec
:
396 _COMMAND_HEADER_STRUCT_FMT
= 'QII'
397 _COMMAND_HEADER_SIZE_BYTES
= struct
.calcsize(_COMMAND_HEADER_STRUCT_FMT
)
402 def _unpack(self
, fmt
, data
, offset
=0):
404 return struct
.unpack_from(fmt
, data
, offset
)
406 def _unpack_payload(self
, fmt
, data
):
408 fmt
, data
, _LttngLiveViewerProtocolCodec
._COMMAND
_HEADER
_SIZE
_BYTES
411 def decode(self
, data
):
412 if len(data
) < self
._COMMAND
_HEADER
_SIZE
_BYTES
:
413 # Not enough data to read the command header
416 payload_size
, cmd_type
, version
= self
._unpack
(
417 self
._COMMAND
_HEADER
_STRUCT
_FMT
, data
420 'Decoded command header: payload-size={}, cmd-type={}, version={}'.format(
421 payload_size
, cmd_type
, version
425 if len(data
) < self
._COMMAND
_HEADER
_SIZE
_BYTES
+ payload_size
:
426 # Not enough data to read the whole command
430 viewer_session_id
, major
, minor
, conn_type
= self
._unpack
_payload
(
433 return _LttngLiveViewerConnectCommand(
434 version
, viewer_session_id
, major
, minor
437 return _LttngLiveViewerGetTracingSessionInfosCommand(version
)
439 tracing_session_id
, offset
, seek_type
= self
._unpack
_payload
('QQI', data
)
440 return _LttngLiveViewerAttachToTracingSessionCommand(
441 version
, tracing_session_id
, offset
, seek_type
444 (stream_id
,) = self
._unpack
_payload
('Q', data
)
445 return _LttngLiveViewerGetNextDataStreamIndexEntryCommand(
449 stream_id
, offset
, req_length
= self
._unpack
_payload
('QQI', data
)
450 return _LttngLiveViewerGetDataStreamPacketDataCommand(
451 version
, stream_id
, offset
, req_length
454 (stream_id
,) = self
._unpack
_payload
('Q', data
)
455 return _LttngLiveViewerGetMetadataStreamDataCommand(version
, stream_id
)
457 (tracing_session_id
,) = self
._unpack
_payload
('Q', data
)
458 return _LttngLiveViewerGetNewStreamInfosCommand(version
, tracing_session_id
)
460 return _LttngLiveViewerCreateViewerSessionCommand(version
)
462 (tracing_session_id
,) = self
._unpack
_payload
('Q', data
)
463 return _LttngLiveViewerDetachFromTracingSessionCommand(
464 version
, tracing_session_id
467 raise UnexpectedInput('Unknown command type {}'.format(cmd_type
))
469 def _pack(self
, fmt
, *args
):
470 # Force network byte order
471 return struct
.pack('!' + fmt
, *args
)
473 def _encode_zero_padded_str(self
, string
, length
):
474 data
= string
.encode()
475 return data
.ljust(length
, b
'\x00')
477 def _encode_stream_info(self
, info
):
478 data
= self
._pack
('QQI', info
.id, info
.trace_id
, int(info
.is_metadata
))
479 data
+= self
._encode
_zero
_padded
_str
(info
.path
, 4096)
480 data
+= self
._encode
_zero
_padded
_str
(info
.channel_name
, 255)
483 def _get_has_new_stuff_flags(self
, has_new_metadata
, has_new_data_streams
):
489 if has_new_data_streams
:
494 def encode(self
, reply
):
495 if type(reply
) is _LttngLiveViewerConnectReply
:
497 'QIII', reply
.viewer_session_id
, reply
.major
, reply
.minor
, 2
499 elif type(reply
) is _LttngLiveViewerGetTracingSessionInfosReply
:
500 data
= self
._pack
('I', len(reply
.tracing_session_infos
))
502 for info
in reply
.tracing_session_infos
:
505 info
.tracing_session_id
,
506 info
.live_timer_freq
,
510 data
+= self
._encode
_zero
_padded
_str
(info
.hostname
, 64)
511 data
+= self
._encode
_zero
_padded
_str
(info
.name
, 255)
512 elif type(reply
) is _LttngLiveViewerAttachToTracingSessionReply
:
513 data
= self
._pack
('II', reply
.status
, len(reply
.stream_infos
))
515 for info
in reply
.stream_infos
:
516 data
+= self
._encode
_stream
_info
(info
)
517 elif type(reply
) is _LttngLiveViewerGetNextDataStreamIndexEntryReply
:
518 entry
= reply
.index_entry
519 flags
= self
._get
_has
_new
_stuff
_flags
(
520 reply
.has_new_metadata
, reply
.has_new_data_stream
526 entry
.total_size_bits
,
527 entry
.content_size_bits
,
528 entry
.timestamp_begin
,
530 entry
.events_discarded
,
531 entry
.stream_class_id
,
535 elif type(reply
) is _LttngLiveViewerGetDataStreamPacketDataReply
:
536 flags
= self
._get
_has
_new
_stuff
_flags
(
537 reply
.has_new_metadata
, reply
.has_new_data_stream
539 data
= self
._pack
('III', reply
.status
, len(reply
.data
), flags
)
541 elif type(reply
) is _LttngLiveViewerGetMetadataStreamDataContentReply
:
542 data
= self
._pack
('QI', len(reply
.data
), reply
.status
)
544 elif type(reply
) is _LttngLiveViewerGetNewStreamInfosReply
:
545 data
= self
._pack
('II', reply
.status
, len(reply
.stream_infos
))
547 for info
in reply
.stream_infos
:
548 data
+= self
._encode
_stream
_info
(info
)
549 elif type(reply
) is _LttngLiveViewerCreateViewerSessionReply
:
550 data
= self
._pack
('I', reply
.status
)
551 elif type(reply
) is _LttngLiveViewerDetachFromTracingSessionReply
:
552 data
= self
._pack
('I', reply
.status
)
555 'Unknown reply object with class `{}`'.format(reply
.__class
__.__name
__)
561 # An entry within the index of an LTTng data stream.
562 class _LttngDataStreamIndexEntry
:
573 self
._offset
_bytes
= offset_bytes
574 self
._total
_size
_bits
= total_size_bits
575 self
._content
_size
_bits
= content_size_bits
576 self
._timestamp
_begin
= timestamp_begin
577 self
._timestamp
_end
= timestamp_end
578 self
._events
_discarded
= events_discarded
579 self
._stream
_class
_id
= stream_class_id
582 def offset_bytes(self
):
583 return self
._offset
_bytes
586 def total_size_bits(self
):
587 return self
._total
_size
_bits
590 def total_size_bytes(self
):
591 return self
._total
_size
_bits
// 8
594 def content_size_bits(self
):
595 return self
._content
_size
_bits
598 def content_size_bytes(self
):
599 return self
._content
_size
_bits
// 8
602 def timestamp_begin(self
):
603 return self
._timestamp
_begin
606 def timestamp_end(self
):
607 return self
._timestamp
_end
610 def events_discarded(self
):
611 return self
._events
_discarded
614 def stream_class_id(self
):
615 return self
._stream
_class
_id
618 # The index of an LTTng data stream, a sequence of index entries.
619 class _LttngDataStreamIndex(collections
.abc
.Sequence
):
620 def __init__(self
, path
):
624 'Built data stream index entries: path="{}", count={}'.format(
625 path
, len(self
._entries
)
631 assert os
.path
.isfile(self
._path
)
633 with
open(self
._path
, 'rb') as f
:
636 size
= struct
.calcsize(fmt
)
638 assert len(data
) == size
639 magic
, index_major
, index_minor
, index_entry_length
= struct
.unpack(
642 assert magic
== 0xC1F1DCC1
646 size
= struct
.calcsize(fmt
)
650 'Decoding data stream index entry: path="{}", offset={}'.format(
660 assert len(data
) == size
669 ) = struct
.unpack(fmt
, data
)
671 self
._entries
.append(
672 _LttngDataStreamIndexEntry(
683 # Skip anything else before the next entry
684 f
.seek(index_entry_length
- size
, os
.SEEK_CUR
)
686 def __getitem__(self
, index
):
687 return self
._entries
[index
]
690 return len(self
._entries
)
697 # An LTTng data stream.
698 class _LttngDataStream
:
699 def __init__(self
, path
):
701 filename
= os
.path
.basename(path
)
702 match
= re
.match(r
'(.*)_\d+', filename
)
703 self
._channel
_name
= match
.group(1)
704 trace_dir
= os
.path
.dirname(path
)
705 index_path
= os
.path
.join(trace_dir
, 'index', filename
+ '.idx')
706 self
._index
= _LttngDataStreamIndex(index_path
)
707 assert os
.path
.isfile(path
)
708 self
._file
= open(path
, 'rb')
710 'Built data stream: path="{}", channel-name="{}"'.format(
711 path
, self
._channel
_name
720 def channel_name(self
):
721 return self
._channel
_name
727 def get_data(self
, offset_bytes
, len_bytes
):
728 self
._file
.seek(offset_bytes
)
729 return self
._file
.read(len_bytes
)
732 # An LTTng metadata stream.
733 class _LttngMetadataStream
:
734 def __init__(self
, path
):
736 logging
.info('Built metadata stream: path="{}"'.format(path
))
744 assert os
.path
.isfile(self
._path
)
746 with
open(self
._path
, 'rb') as f
:
750 # An LTTng trace, a sequence of LTTng data streams.
751 class LttngTrace(collections
.abc
.Sequence
):
752 def __init__(self
, trace_dir
):
753 assert os
.path
.isdir(trace_dir
)
754 self
._path
= trace_dir
755 self
._metadata
_stream
= _LttngMetadataStream(
756 os
.path
.join(trace_dir
, 'metadata')
758 self
._create
_data
_streams
(trace_dir
)
759 logging
.info('Built trace: path="{}"'.format(trace_dir
))
761 def _create_data_streams(self
, trace_dir
):
762 data_stream_paths
= []
764 for filename
in os
.listdir(trace_dir
):
765 path
= os
.path
.join(trace_dir
, filename
)
767 if not os
.path
.isfile(path
):
770 if filename
.startswith('.'):
773 if filename
== 'metadata':
776 data_stream_paths
.append(path
)
778 data_stream_paths
.sort()
779 self
._data
_streams
= []
781 for data_stream_path
in data_stream_paths
:
782 self
._data
_streams
.append(_LttngDataStream(data_stream_path
))
789 def metadata_stream(self
):
790 return self
._metadata
_stream
792 def __getitem__(self
, index
):
793 return self
._data
_streams
[index
]
796 return len(self
._data
_streams
)
799 # The state of a single data stream.
800 class _LttngLiveViewerSessionDataStreamState
:
801 def __init__(self
, ts_state
, info
, data_stream
):
802 self
._ts
_state
= ts_state
804 self
._data
_stream
= data_stream
805 self
._cur
_index
_entry
_index
= 0
806 fmt
= 'Built data stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
810 ts_state
.tracing_session_descriptor
.info
.tracing_session_id
,
811 ts_state
.tracing_session_descriptor
.info
.name
,
817 def tracing_session_state(self
):
818 return self
._ts
_state
825 def data_stream(self
):
826 return self
._data
_stream
829 def cur_index_entry(self
):
830 if self
._cur
_index
_entry
_index
== len(self
._data
_stream
.index
):
833 return self
._data
_stream
.index
[self
._cur
_index
_entry
_index
]
835 def goto_next_index_entry(self
):
836 self
._cur
_index
_entry
_index
+= 1
839 # The state of a single metadata stream.
840 class _LttngLiveViewerSessionMetadataStreamState
:
841 def __init__(self
, ts_state
, info
, metadata_stream
):
842 self
._ts
_state
= ts_state
844 self
._metadata
_stream
= metadata_stream
845 self
._is
_sent
= False
846 fmt
= 'Built metadata stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
850 ts_state
.tracing_session_descriptor
.info
.tracing_session_id
,
851 ts_state
.tracing_session_descriptor
.info
.name
,
852 metadata_stream
.path
,
857 def trace_session_state(self
):
858 return self
._trace
_session
_state
865 def metadata_stream(self
):
866 return self
._metadata
_stream
873 def is_sent(self
, value
):
874 self
._is
_sent
= value
877 # The state of a tracing session.
878 class _LttngLiveViewerSessionTracingSessionState
:
879 def __init__(self
, tc_descr
, base_stream_id
):
880 self
._tc
_descr
= tc_descr
881 self
._stream
_infos
= []
884 stream_id
= base_stream_id
886 for trace
in tc_descr
.traces
:
887 trace_id
= stream_id
* 1000
889 # Data streams -> stream infos and data stream states
890 for data_stream
in trace
:
891 info
= _LttngLiveViewerStreamInfo(
896 data_stream
.channel_name
,
898 self
._stream
_infos
.append(info
)
899 self
._ds
_states
[stream_id
] = _LttngLiveViewerSessionDataStreamState(
900 self
, info
, data_stream
904 # Metadata stream -> stream info and metadata stream state
905 info
= _LttngLiveViewerStreamInfo(
906 stream_id
, trace_id
, True, trace
.metadata_stream
.path
, 'metadata'
908 self
._stream
_infos
.append(info
)
909 self
._ms
_states
[stream_id
] = _LttngLiveViewerSessionMetadataStreamState(
910 self
, info
, trace
.metadata_stream
914 self
._is
_attached
= False
915 fmt
= 'Built tracing session state: id={}, name="{}"'
916 logging
.info(fmt
.format(tc_descr
.info
.tracing_session_id
, tc_descr
.info
.name
))
919 def tracing_session_descriptor(self
):
920 return self
._tc
_descr
923 def data_stream_states(self
):
924 return self
._ds
_states
927 def metadata_stream_states(self
):
928 return self
._ms
_states
931 def stream_infos(self
):
932 return self
._stream
_infos
935 def has_new_metadata(self
):
936 return any([not ms
.is_sent
for ms
in self
._ms
_states
.values()])
939 def is_attached(self
):
940 return self
._is
_attached
943 def is_attached(self
, value
):
944 self
._is
_attached
= value
947 # An LTTng live viewer session manages a view on tracing sessions
948 # and replies to commands accordingly.
949 class _LttngLiveViewerSession
:
953 tracing_session_descriptors
,
954 max_query_data_response_size
,
956 self
._viewer
_session
_id
= viewer_session_id
958 self
._stream
_states
= {}
959 self
._max
_query
_data
_response
_size
= max_query_data_response_size
960 total_stream_infos
= 0
962 for ts_descr
in tracing_session_descriptors
:
963 ts_state
= _LttngLiveViewerSessionTracingSessionState(
964 ts_descr
, total_stream_infos
966 ts_id
= ts_state
.tracing_session_descriptor
.info
.tracing_session_id
967 self
._ts
_states
[ts_id
] = ts_state
968 total_stream_infos
+= len(ts_state
.stream_infos
)
970 # Update session's stream states to have the new states
971 self
._stream
_states
.update(ts_state
.data_stream_states
)
972 self
._stream
_states
.update(ts_state
.metadata_stream_states
)
974 self
._command
_handlers
= {
975 _LttngLiveViewerAttachToTracingSessionCommand
: self
._handle
_attach
_to
_tracing
_session
_command
,
976 _LttngLiveViewerCreateViewerSessionCommand
: self
._handle
_create
_viewer
_session
_command
,
977 _LttngLiveViewerDetachFromTracingSessionCommand
: self
._handle
_detach
_from
_tracing
_session
_command
,
978 _LttngLiveViewerGetDataStreamPacketDataCommand
: self
._handle
_get
_data
_stream
_packet
_data
_command
,
979 _LttngLiveViewerGetMetadataStreamDataCommand
: self
._handle
_get
_metadata
_stream
_data
_command
,
980 _LttngLiveViewerGetNewStreamInfosCommand
: self
._handle
_get
_new
_stream
_infos
_command
,
981 _LttngLiveViewerGetNextDataStreamIndexEntryCommand
: self
._handle
_get
_next
_data
_stream
_index
_entry
_command
,
982 _LttngLiveViewerGetTracingSessionInfosCommand
: self
._handle
_get
_tracing
_session
_infos
_command
,
986 def viewer_session_id(self
):
987 return self
._viewer
_session
_id
989 def _get_tracing_session_state(self
, tracing_session_id
):
990 if tracing_session_id
not in self
._ts
_states
:
991 raise UnexpectedInput(
992 'Unknown tracing session ID {}'.format(tracing_session_id
)
995 return self
._ts
_states
[tracing_session_id
]
997 def _get_stream_state(self
, stream_id
):
998 if stream_id
not in self
._stream
_states
:
999 UnexpectedInput('Unknown stream ID {}'.format(stream_id
))
1001 return self
._stream
_states
[stream_id
]
1003 def handle_command(self
, cmd
):
1005 'Handling command in viewer session: cmd-cls-name={}'.format(
1006 cmd
.__class
__.__name
__
1009 cmd_type
= type(cmd
)
1011 if cmd_type
not in self
._command
_handlers
:
1012 raise UnexpectedInput(
1013 'Unexpected command: cmd-cls-name={}'.format(cmd
.__class
__.__name
__)
1016 return self
._command
_handlers
[cmd_type
](cmd
)
1018 def _handle_attach_to_tracing_session_command(self
, cmd
):
1019 fmt
= 'Handling "attach to tracing session" command: ts-id={}, offset={}, seek-type={}'
1020 logging
.info(fmt
.format(cmd
.tracing_session_id
, cmd
.offset
, cmd
.seek_type
))
1021 ts_state
= self
._get
_tracing
_session
_state
(cmd
.tracing_session_id
)
1022 info
= ts_state
.tracing_session_descriptor
.info
1024 if ts_state
.is_attached
:
1025 raise UnexpectedInput(
1026 'Cannot attach to tracing session `{}`: viewer is already attached'.format(
1031 ts_state
.is_attached
= True
1032 status
= _LttngLiveViewerAttachToTracingSessionReply
.Status
.OK
1033 return _LttngLiveViewerAttachToTracingSessionReply(
1034 status
, ts_state
.stream_infos
1037 def _handle_detach_from_tracing_session_command(self
, cmd
):
1038 fmt
= 'Handling "detach from tracing session" command: ts-id={}'
1039 logging
.info(fmt
.format(cmd
.tracing_session_id
))
1040 ts_state
= self
._get
_tracing
_session
_state
(cmd
.tracing_session_id
)
1041 info
= ts_state
.tracing_session_descriptor
.info
1043 if not ts_state
.is_attached
:
1044 raise UnexpectedInput(
1045 'Cannot detach to tracing session `{}`: viewer is not attached'.format(
1050 ts_state
.is_attached
= False
1051 status
= _LttngLiveViewerDetachFromTracingSessionReply
.Status
.OK
1052 return _LttngLiveViewerDetachFromTracingSessionReply(status
)
1054 def _handle_get_next_data_stream_index_entry_command(self
, cmd
):
1055 fmt
= 'Handling "get next data stream index entry" command: stream-id={}'
1056 logging
.info(fmt
.format(cmd
.stream_id
))
1057 stream_state
= self
._get
_stream
_state
(cmd
.stream_id
)
1059 if type(stream_state
) is not _LttngLiveViewerSessionDataStreamState
:
1060 raise UnexpectedInput(
1061 'Stream with ID {} is not a data stream'.format(cmd
.stream_id
)
1064 if stream_state
.cur_index_entry
is None:
1065 # The viewer is done reading this stream
1066 status
= _LttngLiveViewerGetNextDataStreamIndexEntryReply
.Status
.HUP
1068 # Dummy data stream index entry to use with the `HUP` status
1069 # (the reply needs one, but the viewer ignores it)
1070 index_entry
= _LttngDataStreamIndexEntry(0, 0, 0, 0, 0, 0, 0)
1072 return _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1073 status
, index_entry
, False, False
1076 # The viewer only checks the `has_new_metadata` flag if the
1077 # reply's status is `OK`, so we need to provide an index here
1078 has_new_metadata
= stream_state
.tracing_session_state
.has_new_metadata
1079 status
= _LttngLiveViewerGetNextDataStreamIndexEntryReply
.Status
.OK
1080 reply
= _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1081 status
, stream_state
.cur_index_entry
, has_new_metadata
, False
1083 stream_state
.goto_next_index_entry()
1086 def _handle_get_data_stream_packet_data_command(self
, cmd
):
1087 fmt
= 'Handling "get data stream packet data" command: stream-id={}, offset={}, req-length={}'
1088 logging
.info(fmt
.format(cmd
.stream_id
, cmd
.offset
, cmd
.req_length
))
1089 stream_state
= self
._get
_stream
_state
(cmd
.stream_id
)
1090 data_response_length
= cmd
.req_length
1092 if type(stream_state
) is not _LttngLiveViewerSessionDataStreamState
:
1093 raise UnexpectedInput(
1094 'Stream with ID {} is not a data stream'.format(cmd
.stream_id
)
1097 if stream_state
.tracing_session_state
.has_new_metadata
:
1098 status
= _LttngLiveViewerGetDataStreamPacketDataReply
.Status
.ERROR
1099 return _LttngLiveViewerGetDataStreamPacketDataReply(
1100 status
, bytes(), True, False
1103 if self
._max
_query
_data
_response
_size
:
1104 # Enforce a server side limit on the query requested length.
1105 # To ensure that the transaction terminate take the minimum of both
1107 data_response_length
= min(
1108 cmd
.req_length
, self
._max
_query
_data
_response
_size
1110 fmt
= 'Limiting "get data stream packet data" command: req-length={} actual response size={}'
1111 logging
.info(fmt
.format(cmd
.req_length
, data_response_length
))
1113 data
= stream_state
.data_stream
.get_data(cmd
.offset
, data_response_length
)
1114 status
= _LttngLiveViewerGetDataStreamPacketDataReply
.Status
.OK
1115 return _LttngLiveViewerGetDataStreamPacketDataReply(status
, data
, False, False)
1117 def _handle_get_metadata_stream_data_command(self
, cmd
):
1118 fmt
= 'Handling "get metadata stream data" command: stream-id={}'
1119 logging
.info(fmt
.format(cmd
.stream_id
))
1120 stream_state
= self
._get
_stream
_state
(cmd
.stream_id
)
1122 if type(stream_state
) is not _LttngLiveViewerSessionMetadataStreamState
:
1123 raise UnexpectedInput(
1124 'Stream with ID {} is not a metadata stream'.format(cmd
.stream_id
)
1127 if stream_state
.is_sent
:
1128 status
= _LttngLiveViewerGetMetadataStreamDataContentReply
.Status
.NO_NEW
1129 return _LttngLiveViewerGetMetadataStreamDataContentReply(status
, bytes())
1131 stream_state
.is_sent
= True
1132 status
= _LttngLiveViewerGetMetadataStreamDataContentReply
.Status
.OK
1133 return _LttngLiveViewerGetMetadataStreamDataContentReply(
1134 status
, stream_state
.metadata_stream
.data
1137 def _handle_get_new_stream_infos_command(self
, cmd
):
1138 fmt
= 'Handling "get new stream infos" command: ts-id={}'
1139 logging
.info(fmt
.format(cmd
.tracing_session_id
))
1141 # As of this version, all the tracing session's stream infos are
1142 # always given to the viewer when sending the "attach to tracing
1143 # session" reply, so there's nothing new here. Return the `HUP`
1144 # status as, if we're handling this command, the viewer consumed
1145 # all the existing data streams.
1146 status
= _LttngLiveViewerGetNewStreamInfosReply
.Status
.HUP
1147 return _LttngLiveViewerGetNewStreamInfosReply(status
, [])
1149 def _handle_get_tracing_session_infos_command(self
, cmd
):
1150 logging
.info('Handling "get tracing session infos" command.')
1152 tss
.tracing_session_descriptor
.info
for tss
in self
._ts
_states
.values()
1154 infos
.sort(key
=lambda info
: info
.name
)
1155 return _LttngLiveViewerGetTracingSessionInfosReply(infos
)
1157 def _handle_create_viewer_session_command(self
, cmd
):
1158 logging
.info('Handling "create viewer session" command.')
1159 status
= _LttngLiveViewerCreateViewerSessionReply
.Status
.OK
1161 # This does nothing here. In the LTTng relay daemon, it
1162 # allocates the viewer session's state.
1163 return _LttngLiveViewerCreateViewerSessionReply(status
)
1166 # An LTTng live TCP server.
1168 # On creation, it binds to `localhost` with an OS-assigned TCP port. It writes
1169 # the decimal TCP port number to a temporary port file. It renames the
1170 # temporary port file to `port_filename`.
1172 # `tracing_session_descriptors` is a list of tracing session descriptors
1173 # (`LttngTracingSessionDescriptor`) to serve.
1175 # This server accepts a single viewer (client).
1177 # When the viewer closes the connection, the server's constructor
1179 class LttngLiveServer
:
1181 self
, port_filename
, tracing_session_descriptors
, max_query_data_response_size
1183 logging
.info('Server configuration:')
1185 logging
.info(' Port file name: `{}`'.format(port_filename
))
1187 if max_query_data_response_size
is not None:
1189 ' Maximum response data query size: `{}`'.format(
1190 max_query_data_response_size
1194 for ts_descr
in tracing_session_descriptors
:
1195 info
= ts_descr
.info
1196 fmt
= ' TS descriptor: name="{}", id={}, hostname="{}", live-timer-freq={}, client-count={}, stream-count={}:'
1200 info
.tracing_session_id
,
1202 info
.live_timer_freq
,
1208 for trace
in ts_descr
.traces
:
1209 logging
.info(' Trace: path="{}"'.format(trace
.path
))
1211 self
._ts
_descriptors
= tracing_session_descriptors
1212 self
._max
_query
_data
_response
_size
= max_query_data_response_size
1213 self
._sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
1214 self
._codec
= _LttngLiveViewerProtocolCodec()
1216 # Port 0: OS assigns an unused port
1217 serv_addr
= ('localhost', 0)
1218 self
._sock
.bind(serv_addr
)
1219 self
._write
_port
_to
_file
(port_filename
)
1225 logging
.info('Closed connection and socket.')
1228 def _server_port(self
):
1229 return self
._sock
.getsockname()[1]
1231 def _recv_command(self
):
1235 logging
.info('Waiting for viewer command.')
1236 buf
= self
._conn
.recv(128)
1239 logging
.info('Client closed connection.')
1242 raise UnexpectedInput(
1243 'Client closed connection after having sent {} command bytes.'.format(
1250 logging
.info('Received data from viewer: length={}'.format(len(buf
)))
1255 cmd
= self
._codec
.decode(data
)
1256 except struct
.error
as exc
:
1257 raise UnexpectedInput('Malformed command: {}'.format(exc
)) from exc
1261 'Received command from viewer: cmd-cls-name={}'.format(
1262 cmd
.__class
__.__name
__
1267 def _send_reply(self
, reply
):
1268 data
= self
._codec
.encode(reply
)
1270 'Sending reply to viewer: reply-cls-name={}, length={}'.format(
1271 reply
.__class
__.__name
__, len(data
)
1274 self
._conn
.sendall(data
)
1276 def _handle_connection(self
):
1277 # First command must be "connect"
1278 cmd
= self
._recv
_command
()
1280 if type(cmd
) is not _LttngLiveViewerConnectCommand
:
1281 raise UnexpectedInput(
1282 'First command is not "connect": cmd-cls-name={}'.format(
1283 cmd
.__class
__.__name
__
1287 # Create viewer session (arbitrary ID 23)
1289 'LTTng live viewer connected: version={}.{}'.format(cmd
.major
, cmd
.minor
)
1291 viewer_session
= _LttngLiveViewerSession(
1292 23, self
._ts
_descriptors
, self
._max
_query
_data
_response
_size
1295 # Send "connect" reply
1297 _LttngLiveViewerConnectReply(viewer_session
.viewer_session_id
, 2, 10)
1300 # Make the viewer session handle the remaining commands
1302 cmd
= self
._recv
_command
()
1305 # Connection closed (at an expected location within the
1309 self
._send
_reply
(viewer_session
.handle_command(cmd
))
1312 logging
.info('Listening: port={}'.format(self
._server
_port
))
1313 # Backlog must be present for Python version < 3.5.
1314 # 128 is an arbitrary number since we expect only 1 connection anyway.
1315 self
._sock
.listen(128)
1316 self
._conn
, viewer_addr
= self
._sock
.accept()
1318 'Accepted viewer: addr={}:{}'.format(viewer_addr
[0], viewer_addr
[1])
1322 self
._handle
_connection
()
1326 def _write_port_to_file(self
, port_filename
):
1327 # Write the port number to a temporary file.
1328 with tempfile
.NamedTemporaryFile(mode
='w', delete
=False) as tmp_port_file
:
1329 print(self
._server
_port
, end
='', file=tmp_port_file
)
1331 # Rename temporary file to real file
1332 os
.replace(tmp_port_file
.name
, port_filename
)
1334 'Renamed port file: src-path="{}", dst-path="{}"'.format(
1335 tmp_port_file
.name
, port_filename
1340 # A tracing session descriptor.
1342 # In the constructor, `traces` is a list of LTTng traces (`LttngTrace`
1344 class LttngTracingSessionDescriptor
:
1346 self
, name
, tracing_session_id
, hostname
, live_timer_freq
, client_count
, traces
1348 for trace
in traces
:
1349 if name
not in trace
.path
:
1350 fmt
= 'Tracing session name must be part of every trace path (`{}` not found in `{}`)'
1351 raise ValueError(fmt
.format(name
, trace
.path
))
1353 self
._traces
= traces
1354 stream_count
= sum([len(t
) + 1 for t
in traces
])
1355 self
._info
= _LttngLiveViewerTracingSessionInfo(
1373 def _session_descriptors_from_path(sessions_filename
, trace_path_prefix
):
1378 # "name": "my-session",
1380 # "hostname": "myhost",
1381 # "live-timer-freq": 1000000,
1382 # "client-count": 23,
1388 # "path": "meow/mix"
1393 with
open(sessions_filename
, 'r') as sessions_file
:
1394 params
= json
.load(sessions_file
)
1398 for session
in params
:
1399 name
= session
['name']
1400 tracing_session_id
= session
['id']
1401 hostname
= session
['hostname']
1402 live_timer_freq
= session
['live-timer-freq']
1403 client_count
= session
['client-count']
1406 for trace
in session
['traces']:
1407 path
= trace
['path']
1409 if not os
.path
.isabs(path
):
1410 path
= os
.path
.join(trace_path_prefix
, path
)
1412 traces
.append(LttngTrace(path
))
1415 LttngTracingSessionDescriptor(
1428 def _loglevel_parser(string
):
1429 loglevels
= {'info': logging
.INFO
, 'warning': logging
.WARNING
}
1430 if string
not in loglevels
:
1431 msg
= "{} is not a valid loglevel".format(string
)
1432 raise argparse
.ArgumentTypeError(msg
)
1433 return loglevels
[string
]
1436 if __name__
== '__main__':
1437 logging
.basicConfig(format
='# %(asctime)-25s%(message)s')
1438 parser
= argparse
.ArgumentParser(
1439 description
='LTTng-live protocol mocker', add_help
=False
1441 parser
.add_argument(
1444 choices
=['info', 'warning'],
1445 help='The loglevel to be used.',
1448 loglevel_namespace
, remaining_args
= parser
.parse_known_args()
1449 logging
.getLogger().setLevel(_loglevel_parser(loglevel_namespace
.log_level
))
1451 parser
.add_argument(
1453 help='The final port file. This file is present when the server is ready to receive connection.',
1456 parser
.add_argument(
1457 '--max-query-data-response-size',
1459 help='The maximum size of control data response in bytes',
1461 parser
.add_argument(
1462 '--trace-path-prefix',
1464 help='Prefix to prepend to the trace paths of session configurations',
1466 parser
.add_argument(
1467 '--sessions-filename', type=str, help='Path to a session configuration file',
1469 parser
.add_argument(
1473 default
=argparse
.SUPPRESS
,
1474 help='Show this help message and exit.',
1477 args
= parser
.parse_args(args
=remaining_args
)
1479 sessions
= _session_descriptors_from_path(
1480 args
.sessions_filename
, args
.trace_path_prefix
1482 LttngLiveServer(args
.port_filename
, sessions
, args
.max_query_data_response_size
)
1483 except UnexpectedInput
as exc
:
1484 logging
.error(str(exc
))
1485 print(exc
, file=sys
.stderr
)