Fix: source.ctf.lttng-live: muxing failure on clear
[babeltrace.git] / tests / data / plugins / src.ctf.lttng-live / lttng_live_server.py
CommitLineData
0235b0db 1# SPDX-License-Identifier: MIT
584af91e 2#
0235b0db 3# Copyright (C) 2019 Philippe Proulx <pproulx@efficios.com>
584af91e 4#
584af91e
PP
5
6import argparse
7import collections.abc
8import logging
9import os
10import os.path
11import re
12import socket
13import struct
14import sys
15import tempfile
2b763e29 16import json
584af91e
PP
17
18
19class UnexpectedInput(RuntimeError):
20 pass
21
22
23class _LttngLiveViewerCommand:
24 def __init__(self, version):
25 self._version = version
26
27 @property
28 def version(self):
29 return self._version
30
31
32class _LttngLiveViewerConnectCommand(_LttngLiveViewerCommand):
33 def __init__(self, version, viewer_session_id, major, minor):
34 super().__init__(version)
35 self._viewer_session_id = viewer_session_id
36 self._major = major
37 self._minor = minor
38
39 @property
40 def viewer_session_id(self):
41 return self._viewer_session_id
42
43 @property
44 def major(self):
45 return self._major
46
47 @property
48 def minor(self):
49 return self._minor
50
51
52class _LttngLiveViewerConnectReply:
53 def __init__(self, viewer_session_id, major, minor):
54 self._viewer_session_id = viewer_session_id
55 self._major = major
56 self._minor = minor
57
58 @property
59 def viewer_session_id(self):
60 return self._viewer_session_id
61
62 @property
63 def major(self):
64 return self._major
65
66 @property
67 def minor(self):
68 return self._minor
69
70
71class _LttngLiveViewerGetTracingSessionInfosCommand(_LttngLiveViewerCommand):
72 pass
73
74
75class _LttngLiveViewerTracingSessionInfo:
76 def __init__(
77 self,
78 tracing_session_id,
79 live_timer_freq,
80 client_count,
81 stream_count,
82 hostname,
83 name,
84 ):
85 self._tracing_session_id = tracing_session_id
86 self._live_timer_freq = live_timer_freq
87 self._client_count = client_count
88 self._stream_count = stream_count
89 self._hostname = hostname
90 self._name = name
91
92 @property
93 def tracing_session_id(self):
94 return self._tracing_session_id
95
96 @property
97 def live_timer_freq(self):
98 return self._live_timer_freq
99
100 @property
101 def client_count(self):
102 return self._client_count
103
104 @property
105 def stream_count(self):
106 return self._stream_count
107
108 @property
109 def hostname(self):
110 return self._hostname
111
112 @property
113 def name(self):
114 return self._name
115
116
117class _LttngLiveViewerGetTracingSessionInfosReply:
118 def __init__(self, tracing_session_infos):
119 self._tracing_session_infos = tracing_session_infos
120
121 @property
122 def tracing_session_infos(self):
123 return self._tracing_session_infos
124
125
126class _LttngLiveViewerAttachToTracingSessionCommand(_LttngLiveViewerCommand):
127 class SeekType:
128 BEGINNING = 1
129 LAST = 2
130
131 def __init__(self, version, tracing_session_id, offset, seek_type):
132 super().__init__(version)
133 self._tracing_session_id = tracing_session_id
134 self._offset = offset
135 self._seek_type = seek_type
136
137 @property
138 def tracing_session_id(self):
139 return self._tracing_session_id
140
141 @property
142 def offset(self):
143 return self._offset
144
145 @property
146 def seek_type(self):
147 return self._seek_type
148
149
150class _LttngLiveViewerStreamInfo:
151 def __init__(self, id, trace_id, is_metadata, path, channel_name):
152 self._id = id
153 self._trace_id = trace_id
154 self._is_metadata = is_metadata
155 self._path = path
156 self._channel_name = channel_name
157
158 @property
159 def id(self):
160 return self._id
161
162 @property
163 def trace_id(self):
164 return self._trace_id
165
166 @property
167 def is_metadata(self):
168 return self._is_metadata
169
170 @property
171 def path(self):
172 return self._path
173
174 @property
175 def channel_name(self):
176 return self._channel_name
177
178
179class _LttngLiveViewerAttachToTracingSessionReply:
180 class Status:
181 OK = 1
182 ALREADY = 2
183 UNKNOWN = 3
184 NOT_LIVE = 4
185 SEEK_ERROR = 5
186 NO_SESSION = 6
187
188 def __init__(self, status, stream_infos):
189 self._status = status
190 self._stream_infos = stream_infos
191
192 @property
193 def status(self):
194 return self._status
195
196 @property
197 def stream_infos(self):
198 return self._stream_infos
199
200
201class _LttngLiveViewerGetNextDataStreamIndexEntryCommand(_LttngLiveViewerCommand):
202 def __init__(self, version, stream_id):
203 super().__init__(version)
204 self._stream_id = stream_id
205
206 @property
207 def stream_id(self):
208 return self._stream_id
209
210
211class _LttngLiveViewerGetNextDataStreamIndexEntryReply:
212 class Status:
213 OK = 1
214 RETRY = 2
215 HUP = 3
216 ERROR = 4
217 INACTIVE = 5
218 EOF = 6
219
220 def __init__(self, status, index_entry, has_new_metadata, has_new_data_stream):
221 self._status = status
222 self._index_entry = index_entry
223 self._has_new_metadata = has_new_metadata
224 self._has_new_data_stream = has_new_data_stream
225
226 @property
227 def status(self):
228 return self._status
229
230 @property
231 def index_entry(self):
232 return self._index_entry
233
234 @property
235 def has_new_metadata(self):
236 return self._has_new_metadata
237
238 @property
239 def has_new_data_stream(self):
240 return self._has_new_data_stream
241
242
243class _LttngLiveViewerGetDataStreamPacketDataCommand(_LttngLiveViewerCommand):
244 def __init__(self, version, stream_id, offset, req_length):
245 super().__init__(version)
246 self._stream_id = stream_id
247 self._offset = offset
248 self._req_length = req_length
249
250 @property
251 def stream_id(self):
252 return self._stream_id
253
254 @property
255 def offset(self):
256 return self._offset
257
258 @property
259 def req_length(self):
260 return self._req_length
261
262
263class _LttngLiveViewerGetDataStreamPacketDataReply:
264 class Status:
265 OK = 1
266 RETRY = 2
267 ERROR = 3
268 EOF = 4
269
270 def __init__(self, status, data, has_new_metadata, has_new_data_stream):
271 self._status = status
272 self._data = data
273 self._has_new_metadata = has_new_metadata
274 self._has_new_data_stream = has_new_data_stream
275
276 @property
277 def status(self):
278 return self._status
279
280 @property
281 def data(self):
282 return self._data
283
284 @property
285 def has_new_metadata(self):
286 return self._has_new_metadata
287
288 @property
289 def has_new_data_stream(self):
290 return self._has_new_data_stream
291
292
293class _LttngLiveViewerGetMetadataStreamDataCommand(_LttngLiveViewerCommand):
294 def __init__(self, version, stream_id):
295 super().__init__(version)
296 self._stream_id = stream_id
297
298 @property
299 def stream_id(self):
300 return self._stream_id
301
302
303class _LttngLiveViewerGetMetadataStreamDataContentReply:
304 class Status:
305 OK = 1
306 NO_NEW = 2
307 ERROR = 3
308
309 def __init__(self, status, data):
310 self._status = status
311 self._data = data
312
313 @property
314 def status(self):
315 return self._status
316
317 @property
318 def data(self):
319 return self._data
320
321
322class _LttngLiveViewerGetNewStreamInfosCommand(_LttngLiveViewerCommand):
323 def __init__(self, version, tracing_session_id):
324 super().__init__(version)
325 self._tracing_session_id = tracing_session_id
326
327 @property
328 def tracing_session_id(self):
329 return self._tracing_session_id
330
331
332class _LttngLiveViewerGetNewStreamInfosReply:
333 class Status:
334 OK = 1
335 NO_NEW = 2
336 ERROR = 3
337 HUP = 4
338
339 def __init__(self, status, stream_infos):
340 self._status = status
341 self._stream_infos = stream_infos
342
343 @property
344 def status(self):
345 return self._status
346
347 @property
348 def stream_infos(self):
349 return self._stream_infos
350
351
352class _LttngLiveViewerCreateViewerSessionCommand(_LttngLiveViewerCommand):
353 pass
354
355
356class _LttngLiveViewerCreateViewerSessionReply:
357 class Status:
358 OK = 1
359 ERROR = 2
360
361 def __init__(self, status):
362 self._status = status
363
364 @property
365 def status(self):
366 return self._status
367
368
369class _LttngLiveViewerDetachFromTracingSessionCommand(_LttngLiveViewerCommand):
370 def __init__(self, version, tracing_session_id):
371 super().__init__(version)
372 self._tracing_session_id = tracing_session_id
373
374 @property
375 def tracing_session_id(self):
376 return self._tracing_session_id
377
378
379class _LttngLiveViewerDetachFromTracingSessionReply:
380 class Status:
381 OK = 1
382 UNKNOWN = 2
383 ERROR = 3
384
385 def __init__(self, status):
386 self._status = status
387
388 @property
389 def status(self):
390 return self._status
391
392
393# An LTTng live protocol codec can convert bytes to command objects and
394# reply objects to bytes.
395class _LttngLiveViewerProtocolCodec:
396 _COMMAND_HEADER_STRUCT_FMT = 'QII'
397 _COMMAND_HEADER_SIZE_BYTES = struct.calcsize(_COMMAND_HEADER_STRUCT_FMT)
398
399 def __init__(self):
400 pass
401
402 def _unpack(self, fmt, data, offset=0):
403 fmt = '!' + fmt
404 return struct.unpack_from(fmt, data, offset)
405
406 def _unpack_payload(self, fmt, data):
407 return self._unpack(
408 fmt, data, _LttngLiveViewerProtocolCodec._COMMAND_HEADER_SIZE_BYTES
409 )
410
411 def decode(self, data):
412 if len(data) < self._COMMAND_HEADER_SIZE_BYTES:
413 # Not enough data to read the command header
414 return
415
416 payload_size, cmd_type, version = self._unpack(
417 self._COMMAND_HEADER_STRUCT_FMT, data
418 )
419 logging.info(
420 'Decoded command header: payload-size={}, cmd-type={}, version={}'.format(
421 payload_size, cmd_type, version
422 )
423 )
424
425 if len(data) < self._COMMAND_HEADER_SIZE_BYTES + payload_size:
426 # Not enough data to read the whole command
427 return
428
429 if cmd_type == 1:
430 viewer_session_id, major, minor, conn_type = self._unpack_payload(
431 'QIII', data
432 )
433 return _LttngLiveViewerConnectCommand(
434 version, viewer_session_id, major, minor
435 )
436 elif cmd_type == 2:
437 return _LttngLiveViewerGetTracingSessionInfosCommand(version)
438 elif cmd_type == 3:
439 tracing_session_id, offset, seek_type = self._unpack_payload('QQI', data)
440 return _LttngLiveViewerAttachToTracingSessionCommand(
441 version, tracing_session_id, offset, seek_type
442 )
443 elif cmd_type == 4:
75882e97 444 (stream_id,) = self._unpack_payload('Q', data)
584af91e
PP
445 return _LttngLiveViewerGetNextDataStreamIndexEntryCommand(
446 version, stream_id
447 )
448 elif cmd_type == 5:
449 stream_id, offset, req_length = self._unpack_payload('QQI', data)
450 return _LttngLiveViewerGetDataStreamPacketDataCommand(
451 version, stream_id, offset, req_length
452 )
453 elif cmd_type == 6:
75882e97 454 (stream_id,) = self._unpack_payload('Q', data)
584af91e
PP
455 return _LttngLiveViewerGetMetadataStreamDataCommand(version, stream_id)
456 elif cmd_type == 7:
75882e97 457 (tracing_session_id,) = self._unpack_payload('Q', data)
584af91e
PP
458 return _LttngLiveViewerGetNewStreamInfosCommand(version, tracing_session_id)
459 elif cmd_type == 8:
460 return _LttngLiveViewerCreateViewerSessionCommand(version)
461 elif cmd_type == 9:
75882e97 462 (tracing_session_id,) = self._unpack_payload('Q', data)
584af91e
PP
463 return _LttngLiveViewerDetachFromTracingSessionCommand(
464 version, tracing_session_id
465 )
466 else:
467 raise UnexpectedInput('Unknown command type {}'.format(cmd_type))
468
469 def _pack(self, fmt, *args):
470 # Force network byte order
471 return struct.pack('!' + fmt, *args)
472
473 def _encode_zero_padded_str(self, string, length):
474 data = string.encode()
475 return data.ljust(length, b'\x00')
476
477 def _encode_stream_info(self, info):
478 data = self._pack('QQI', info.id, info.trace_id, int(info.is_metadata))
479 data += self._encode_zero_padded_str(info.path, 4096)
480 data += self._encode_zero_padded_str(info.channel_name, 255)
481 return data
482
483 def _get_has_new_stuff_flags(self, has_new_metadata, has_new_data_streams):
484 flags = 0
485
486 if has_new_metadata:
487 flags |= 1
488
489 if has_new_data_streams:
490 flags |= 2
491
492 return flags
493
494 def encode(self, reply):
495 if type(reply) is _LttngLiveViewerConnectReply:
496 data = self._pack(
497 'QIII', reply.viewer_session_id, reply.major, reply.minor, 2
498 )
499 elif type(reply) is _LttngLiveViewerGetTracingSessionInfosReply:
500 data = self._pack('I', len(reply.tracing_session_infos))
501
502 for info in reply.tracing_session_infos:
503 data += self._pack(
504 'QIII',
505 info.tracing_session_id,
506 info.live_timer_freq,
507 info.client_count,
508 info.stream_count,
509 )
510 data += self._encode_zero_padded_str(info.hostname, 64)
511 data += self._encode_zero_padded_str(info.name, 255)
512 elif type(reply) is _LttngLiveViewerAttachToTracingSessionReply:
513 data = self._pack('II', reply.status, len(reply.stream_infos))
514
515 for info in reply.stream_infos:
516 data += self._encode_stream_info(info)
517 elif type(reply) is _LttngLiveViewerGetNextDataStreamIndexEntryReply:
518 entry = reply.index_entry
519 flags = self._get_has_new_stuff_flags(
520 reply.has_new_metadata, reply.has_new_data_stream
521 )
522
523 data = self._pack(
524 'QQQQQQQII',
525 entry.offset_bytes,
526 entry.total_size_bits,
527 entry.content_size_bits,
528 entry.timestamp_begin,
529 entry.timestamp_end,
530 entry.events_discarded,
531 entry.stream_class_id,
532 reply.status,
533 flags,
534 )
535 elif type(reply) is _LttngLiveViewerGetDataStreamPacketDataReply:
536 flags = self._get_has_new_stuff_flags(
537 reply.has_new_metadata, reply.has_new_data_stream
538 )
539 data = self._pack('III', reply.status, len(reply.data), flags)
540 data += reply.data
541 elif type(reply) is _LttngLiveViewerGetMetadataStreamDataContentReply:
542 data = self._pack('QI', len(reply.data), reply.status)
543 data += reply.data
544 elif type(reply) is _LttngLiveViewerGetNewStreamInfosReply:
545 data = self._pack('II', reply.status, len(reply.stream_infos))
546
547 for info in reply.stream_infos:
548 data += self._encode_stream_info(info)
549 elif type(reply) is _LttngLiveViewerCreateViewerSessionReply:
550 data = self._pack('I', reply.status)
551 elif type(reply) is _LttngLiveViewerDetachFromTracingSessionReply:
552 data = self._pack('I', reply.status)
553 else:
554 raise ValueError(
555 'Unknown reply object with class `{}`'.format(reply.__class__.__name__)
556 )
557
558 return data
559
560
561# An entry within the index of an LTTng data stream.
562class _LttngDataStreamIndexEntry:
563 def __init__(
564 self,
565 offset_bytes,
566 total_size_bits,
567 content_size_bits,
568 timestamp_begin,
569 timestamp_end,
570 events_discarded,
571 stream_class_id,
572 ):
573 self._offset_bytes = offset_bytes
574 self._total_size_bits = total_size_bits
575 self._content_size_bits = content_size_bits
576 self._timestamp_begin = timestamp_begin
577 self._timestamp_end = timestamp_end
578 self._events_discarded = events_discarded
579 self._stream_class_id = stream_class_id
580
581 @property
582 def offset_bytes(self):
583 return self._offset_bytes
584
585 @property
586 def total_size_bits(self):
587 return self._total_size_bits
588
589 @property
590 def total_size_bytes(self):
591 return self._total_size_bits // 8
592
593 @property
594 def content_size_bits(self):
595 return self._content_size_bits
596
597 @property
598 def content_size_bytes(self):
599 return self._content_size_bits // 8
600
601 @property
602 def timestamp_begin(self):
603 return self._timestamp_begin
604
605 @property
606 def timestamp_end(self):
607 return self._timestamp_end
608
609 @property
610 def events_discarded(self):
611 return self._events_discarded
612
613 @property
614 def stream_class_id(self):
615 return self._stream_class_id
616
617
618# The index of an LTTng data stream, a sequence of index entries.
619class _LttngDataStreamIndex(collections.abc.Sequence):
620 def __init__(self, path):
621 self._path = path
622 self._build()
623 logging.info(
624 'Built data stream index entries: path="{}", count={}'.format(
625 path, len(self._entries)
626 )
627 )
628
629 def _build(self):
630 self._entries = []
631 assert os.path.isfile(self._path)
632
633 with open(self._path, 'rb') as f:
634 # Read header first
635 fmt = '>IIII'
636 size = struct.calcsize(fmt)
637 data = f.read(size)
638 assert len(data) == size
639 magic, index_major, index_minor, index_entry_length = struct.unpack(
640 fmt, data
641 )
642 assert magic == 0xC1F1DCC1
643
644 # Read index entries
645 fmt = '>QQQQQQQ'
646 size = struct.calcsize(fmt)
647
648 while True:
649 logging.debug(
650 'Decoding data stream index entry: path="{}", offset={}'.format(
651 self._path, f.tell()
652 )
653 )
654 data = f.read(size)
655
656 if not data:
657 # Done
658 break
659
660 assert len(data) == size
75882e97
FD
661 (
662 offset_bytes,
663 total_size_bits,
664 content_size_bits,
665 timestamp_begin,
666 timestamp_end,
667 events_discarded,
668 stream_class_id,
669 ) = struct.unpack(fmt, data)
584af91e
PP
670
671 self._entries.append(
672 _LttngDataStreamIndexEntry(
673 offset_bytes,
674 total_size_bits,
675 content_size_bits,
676 timestamp_begin,
677 timestamp_end,
678 events_discarded,
679 stream_class_id,
680 )
681 )
682
683 # Skip anything else before the next entry
684 f.seek(index_entry_length - size, os.SEEK_CUR)
685
686 def __getitem__(self, index):
687 return self._entries[index]
688
689 def __len__(self):
690 return len(self._entries)
691
692 @property
693 def path(self):
694 return self._path
695
696
697# An LTTng data stream.
698class _LttngDataStream:
699 def __init__(self, path):
700 self._path = path
701 filename = os.path.basename(path)
702 match = re.match(r'(.*)_\d+', filename)
703 self._channel_name = match.group(1)
704 trace_dir = os.path.dirname(path)
705 index_path = os.path.join(trace_dir, 'index', filename + '.idx')
706 self._index = _LttngDataStreamIndex(index_path)
707 assert os.path.isfile(path)
708 self._file = open(path, 'rb')
709 logging.info(
710 'Built data stream: path="{}", channel-name="{}"'.format(
711 path, self._channel_name
712 )
713 )
714
715 @property
716 def path(self):
717 return self._path
718
719 @property
720 def channel_name(self):
721 return self._channel_name
722
723 @property
724 def index(self):
725 return self._index
726
727 def get_data(self, offset_bytes, len_bytes):
728 self._file.seek(offset_bytes)
729 return self._file.read(len_bytes)
730
731
732# An LTTng metadata stream.
733class _LttngMetadataStream:
734 def __init__(self, path):
735 self._path = path
736 logging.info('Built metadata stream: path="{}"'.format(path))
737
738 @property
739 def path(self):
740 return self._path
741
742 @property
743 def data(self):
744 assert os.path.isfile(self._path)
745
746 with open(self._path, 'rb') as f:
747 return f.read()
748
749
750# An LTTng trace, a sequence of LTTng data streams.
751class LttngTrace(collections.abc.Sequence):
752 def __init__(self, trace_dir):
753 assert os.path.isdir(trace_dir)
754 self._path = trace_dir
755 self._metadata_stream = _LttngMetadataStream(
756 os.path.join(trace_dir, 'metadata')
757 )
758 self._create_data_streams(trace_dir)
759 logging.info('Built trace: path="{}"'.format(trace_dir))
760
761 def _create_data_streams(self, trace_dir):
762 data_stream_paths = []
763
764 for filename in os.listdir(trace_dir):
765 path = os.path.join(trace_dir, filename)
766
767 if not os.path.isfile(path):
768 continue
769
770 if filename.startswith('.'):
771 continue
772
773 if filename == 'metadata':
774 continue
775
776 data_stream_paths.append(path)
777
778 data_stream_paths.sort()
779 self._data_streams = []
780
781 for data_stream_path in data_stream_paths:
782 self._data_streams.append(_LttngDataStream(data_stream_path))
783
784 @property
785 def path(self):
786 return self._path
787
788 @property
789 def metadata_stream(self):
790 return self._metadata_stream
791
792 def __getitem__(self, index):
793 return self._data_streams[index]
794
795 def __len__(self):
796 return len(self._data_streams)
797
798
799# The state of a single data stream.
800class _LttngLiveViewerSessionDataStreamState:
801 def __init__(self, ts_state, info, data_stream):
802 self._ts_state = ts_state
803 self._info = info
804 self._data_stream = data_stream
805 self._cur_index_entry_index = 0
806 fmt = 'Built data stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
807 logging.info(
808 fmt.format(
809 info.id,
810 ts_state.tracing_session_descriptor.info.tracing_session_id,
811 ts_state.tracing_session_descriptor.info.name,
812 data_stream.path,
813 )
814 )
815
816 @property
817 def tracing_session_state(self):
818 return self._ts_state
819
820 @property
821 def info(self):
822 return self._info
823
824 @property
825 def data_stream(self):
826 return self._data_stream
827
828 @property
829 def cur_index_entry(self):
830 if self._cur_index_entry_index == len(self._data_stream.index):
831 return
832
833 return self._data_stream.index[self._cur_index_entry_index]
834
835 def goto_next_index_entry(self):
836 self._cur_index_entry_index += 1
837
838
839# The state of a single metadata stream.
840class _LttngLiveViewerSessionMetadataStreamState:
841 def __init__(self, ts_state, info, metadata_stream):
842 self._ts_state = ts_state
843 self._info = info
844 self._metadata_stream = metadata_stream
845 self._is_sent = False
846 fmt = 'Built metadata stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
847 logging.info(
848 fmt.format(
849 info.id,
850 ts_state.tracing_session_descriptor.info.tracing_session_id,
851 ts_state.tracing_session_descriptor.info.name,
852 metadata_stream.path,
853 )
854 )
855
856 @property
857 def trace_session_state(self):
858 return self._trace_session_state
859
860 @property
861 def info(self):
862 return self._info
863
864 @property
865 def metadata_stream(self):
866 return self._metadata_stream
867
868 @property
869 def is_sent(self):
870 return self._is_sent
871
872 @is_sent.setter
873 def is_sent(self, value):
874 self._is_sent = value
875
876
877# The state of a tracing session.
878class _LttngLiveViewerSessionTracingSessionState:
879 def __init__(self, tc_descr, base_stream_id):
880 self._tc_descr = tc_descr
881 self._stream_infos = []
882 self._ds_states = {}
883 self._ms_states = {}
884 stream_id = base_stream_id
885
886 for trace in tc_descr.traces:
887 trace_id = stream_id * 1000
888
889 # Data streams -> stream infos and data stream states
890 for data_stream in trace:
891 info = _LttngLiveViewerStreamInfo(
892 stream_id,
893 trace_id,
894 False,
895 data_stream.path,
896 data_stream.channel_name,
897 )
898 self._stream_infos.append(info)
899 self._ds_states[stream_id] = _LttngLiveViewerSessionDataStreamState(
900 self, info, data_stream
901 )
902 stream_id += 1
903
904 # Metadata stream -> stream info and metadata stream state
905 info = _LttngLiveViewerStreamInfo(
906 stream_id, trace_id, True, trace.metadata_stream.path, 'metadata'
907 )
908 self._stream_infos.append(info)
909 self._ms_states[stream_id] = _LttngLiveViewerSessionMetadataStreamState(
910 self, info, trace.metadata_stream
911 )
912 stream_id += 1
913
914 self._is_attached = False
915 fmt = 'Built tracing session state: id={}, name="{}"'
916 logging.info(fmt.format(tc_descr.info.tracing_session_id, tc_descr.info.name))
917
918 @property
919 def tracing_session_descriptor(self):
920 return self._tc_descr
921
922 @property
923 def data_stream_states(self):
924 return self._ds_states
925
926 @property
927 def metadata_stream_states(self):
928 return self._ms_states
929
930 @property
931 def stream_infos(self):
932 return self._stream_infos
933
934 @property
935 def has_new_metadata(self):
936 return any([not ms.is_sent for ms in self._ms_states.values()])
937
938 @property
939 def is_attached(self):
940 return self._is_attached
941
942 @is_attached.setter
943 def is_attached(self, value):
944 self._is_attached = value
945
946
947# An LTTng live viewer session manages a view on tracing sessions
948# and replies to commands accordingly.
949class _LttngLiveViewerSession:
950 def __init__(
951 self,
952 viewer_session_id,
953 tracing_session_descriptors,
954 max_query_data_response_size,
955 ):
956 self._viewer_session_id = viewer_session_id
957 self._ts_states = {}
958 self._stream_states = {}
959 self._max_query_data_response_size = max_query_data_response_size
960 total_stream_infos = 0
961
962 for ts_descr in tracing_session_descriptors:
963 ts_state = _LttngLiveViewerSessionTracingSessionState(
964 ts_descr, total_stream_infos
965 )
966 ts_id = ts_state.tracing_session_descriptor.info.tracing_session_id
967 self._ts_states[ts_id] = ts_state
968 total_stream_infos += len(ts_state.stream_infos)
969
970 # Update session's stream states to have the new states
971 self._stream_states.update(ts_state.data_stream_states)
972 self._stream_states.update(ts_state.metadata_stream_states)
973
974 self._command_handlers = {
975 _LttngLiveViewerAttachToTracingSessionCommand: self._handle_attach_to_tracing_session_command,
976 _LttngLiveViewerCreateViewerSessionCommand: self._handle_create_viewer_session_command,
977 _LttngLiveViewerDetachFromTracingSessionCommand: self._handle_detach_from_tracing_session_command,
978 _LttngLiveViewerGetDataStreamPacketDataCommand: self._handle_get_data_stream_packet_data_command,
979 _LttngLiveViewerGetMetadataStreamDataCommand: self._handle_get_metadata_stream_data_command,
980 _LttngLiveViewerGetNewStreamInfosCommand: self._handle_get_new_stream_infos_command,
981 _LttngLiveViewerGetNextDataStreamIndexEntryCommand: self._handle_get_next_data_stream_index_entry_command,
982 _LttngLiveViewerGetTracingSessionInfosCommand: self._handle_get_tracing_session_infos_command,
983 }
984
985 @property
986 def viewer_session_id(self):
987 return self._viewer_session_id
988
989 def _get_tracing_session_state(self, tracing_session_id):
990 if tracing_session_id not in self._ts_states:
991 raise UnexpectedInput(
992 'Unknown tracing session ID {}'.format(tracing_session_id)
993 )
994
995 return self._ts_states[tracing_session_id]
996
997 def _get_stream_state(self, stream_id):
998 if stream_id not in self._stream_states:
999 UnexpectedInput('Unknown stream ID {}'.format(stream_id))
1000
1001 return self._stream_states[stream_id]
1002
1003 def handle_command(self, cmd):
1004 logging.info(
1005 'Handling command in viewer session: cmd-cls-name={}'.format(
1006 cmd.__class__.__name__
1007 )
1008 )
1009 cmd_type = type(cmd)
1010
1011 if cmd_type not in self._command_handlers:
1012 raise UnexpectedInput(
1013 'Unexpected command: cmd-cls-name={}'.format(cmd.__class__.__name__)
1014 )
1015
1016 return self._command_handlers[cmd_type](cmd)
1017
1018 def _handle_attach_to_tracing_session_command(self, cmd):
1019 fmt = 'Handling "attach to tracing session" command: ts-id={}, offset={}, seek-type={}'
1020 logging.info(fmt.format(cmd.tracing_session_id, cmd.offset, cmd.seek_type))
1021 ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
1022 info = ts_state.tracing_session_descriptor.info
1023
1024 if ts_state.is_attached:
1025 raise UnexpectedInput(
1026 'Cannot attach to tracing session `{}`: viewer is already attached'.format(
1027 info.name
1028 )
1029 )
1030
1031 ts_state.is_attached = True
1032 status = _LttngLiveViewerAttachToTracingSessionReply.Status.OK
1033 return _LttngLiveViewerAttachToTracingSessionReply(
1034 status, ts_state.stream_infos
1035 )
1036
1037 def _handle_detach_from_tracing_session_command(self, cmd):
1038 fmt = 'Handling "detach from tracing session" command: ts-id={}'
1039 logging.info(fmt.format(cmd.tracing_session_id))
1040 ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
1041 info = ts_state.tracing_session_descriptor.info
1042
1043 if not ts_state.is_attached:
1044 raise UnexpectedInput(
1045 'Cannot detach to tracing session `{}`: viewer is not attached'.format(
1046 info.name
1047 )
1048 )
1049
1050 ts_state.is_attached = False
1051 status = _LttngLiveViewerDetachFromTracingSessionReply.Status.OK
1052 return _LttngLiveViewerDetachFromTracingSessionReply(status)
1053
1054 def _handle_get_next_data_stream_index_entry_command(self, cmd):
1055 fmt = 'Handling "get next data stream index entry" command: stream-id={}'
1056 logging.info(fmt.format(cmd.stream_id))
1057 stream_state = self._get_stream_state(cmd.stream_id)
1058
1059 if type(stream_state) is not _LttngLiveViewerSessionDataStreamState:
1060 raise UnexpectedInput(
1061 'Stream with ID {} is not a data stream'.format(cmd.stream_id)
1062 )
1063
1064 if stream_state.cur_index_entry is None:
1065 # The viewer is done reading this stream
1066 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.HUP
1067
1068 # Dummy data stream index entry to use with the `HUP` status
1069 # (the reply needs one, but the viewer ignores it)
1070 index_entry = _LttngDataStreamIndexEntry(0, 0, 0, 0, 0, 0, 0)
1071
1072 return _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1073 status, index_entry, False, False
1074 )
1075
1076 # The viewer only checks the `has_new_metadata` flag if the
1077 # reply's status is `OK`, so we need to provide an index here
1078 has_new_metadata = stream_state.tracing_session_state.has_new_metadata
1079 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.OK
1080 reply = _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1081 status, stream_state.cur_index_entry, has_new_metadata, False
1082 )
1083 stream_state.goto_next_index_entry()
1084 return reply
1085
1086 def _handle_get_data_stream_packet_data_command(self, cmd):
1087 fmt = 'Handling "get data stream packet data" command: stream-id={}, offset={}, req-length={}'
1088 logging.info(fmt.format(cmd.stream_id, cmd.offset, cmd.req_length))
1089 stream_state = self._get_stream_state(cmd.stream_id)
1090 data_response_length = cmd.req_length
1091
1092 if type(stream_state) is not _LttngLiveViewerSessionDataStreamState:
1093 raise UnexpectedInput(
1094 'Stream with ID {} is not a data stream'.format(cmd.stream_id)
1095 )
1096
1097 if stream_state.tracing_session_state.has_new_metadata:
1098 status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.ERROR
1099 return _LttngLiveViewerGetDataStreamPacketDataReply(
1100 status, bytes(), True, False
1101 )
1102
1103 if self._max_query_data_response_size:
1104 # Enforce a server side limit on the query requested length.
1105 # To ensure that the transaction terminate take the minimum of both
1106 # value.
1107 data_response_length = min(
1108 cmd.req_length, self._max_query_data_response_size
1109 )
1110 fmt = 'Limiting "get data stream packet data" command: req-length={} actual response size={}'
1111 logging.info(fmt.format(cmd.req_length, data_response_length))
1112
1113 data = stream_state.data_stream.get_data(cmd.offset, data_response_length)
1114 status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.OK
1115 return _LttngLiveViewerGetDataStreamPacketDataReply(status, data, False, False)
1116
1117 def _handle_get_metadata_stream_data_command(self, cmd):
1118 fmt = 'Handling "get metadata stream data" command: stream-id={}'
1119 logging.info(fmt.format(cmd.stream_id))
1120 stream_state = self._get_stream_state(cmd.stream_id)
1121
1122 if type(stream_state) is not _LttngLiveViewerSessionMetadataStreamState:
1123 raise UnexpectedInput(
1124 'Stream with ID {} is not a metadata stream'.format(cmd.stream_id)
1125 )
1126
1127 if stream_state.is_sent:
1128 status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.NO_NEW
1129 return _LttngLiveViewerGetMetadataStreamDataContentReply(status, bytes())
1130
1131 stream_state.is_sent = True
1132 status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.OK
1133 return _LttngLiveViewerGetMetadataStreamDataContentReply(
1134 status, stream_state.metadata_stream.data
1135 )
1136
1137 def _handle_get_new_stream_infos_command(self, cmd):
1138 fmt = 'Handling "get new stream infos" command: ts-id={}'
1139 logging.info(fmt.format(cmd.tracing_session_id))
1140
1141 # As of this version, all the tracing session's stream infos are
1142 # always given to the viewer when sending the "attach to tracing
1143 # session" reply, so there's nothing new here. Return the `HUP`
1144 # status as, if we're handling this command, the viewer consumed
1145 # all the existing data streams.
1146 status = _LttngLiveViewerGetNewStreamInfosReply.Status.HUP
1147 return _LttngLiveViewerGetNewStreamInfosReply(status, [])
1148
1149 def _handle_get_tracing_session_infos_command(self, cmd):
1150 logging.info('Handling "get tracing session infos" command.')
1151 infos = [
1152 tss.tracing_session_descriptor.info for tss in self._ts_states.values()
1153 ]
1154 infos.sort(key=lambda info: info.name)
1155 return _LttngLiveViewerGetTracingSessionInfosReply(infos)
1156
1157 def _handle_create_viewer_session_command(self, cmd):
1158 logging.info('Handling "create viewer session" command.')
1159 status = _LttngLiveViewerCreateViewerSessionReply.Status.OK
1160
1161 # This does nothing here. In the LTTng relay daemon, it
1162 # allocates the viewer session's state.
1163 return _LttngLiveViewerCreateViewerSessionReply(status)
1164
1165
1166# An LTTng live TCP server.
1167#
1168# On creation, it binds to `localhost` with an OS-assigned TCP port. It writes
1169# the decimal TCP port number to a temporary port file. It renames the
1170# temporary port file to `port_filename`.
1171#
1172# `tracing_session_descriptors` is a list of tracing session descriptors
1173# (`LttngTracingSessionDescriptor`) to serve.
1174#
1175# This server accepts a single viewer (client).
1176#
1177# When the viewer closes the connection, the server's constructor
1178# returns.
1179class LttngLiveServer:
1180 def __init__(
1181 self, port_filename, tracing_session_descriptors, max_query_data_response_size
1182 ):
1183 logging.info('Server configuration:')
1184
1185 logging.info(' Port file name: `{}`'.format(port_filename))
1186
1187 if max_query_data_response_size is not None:
1188 logging.info(
1189 ' Maximum response data query size: `{}`'.format(
1190 max_query_data_response_size
1191 )
1192 )
1193
1194 for ts_descr in tracing_session_descriptors:
1195 info = ts_descr.info
1196 fmt = ' TS descriptor: name="{}", id={}, hostname="{}", live-timer-freq={}, client-count={}, stream-count={}:'
1197 logging.info(
1198 fmt.format(
1199 info.name,
1200 info.tracing_session_id,
1201 info.hostname,
1202 info.live_timer_freq,
1203 info.client_count,
1204 info.stream_count,
1205 )
1206 )
1207
1208 for trace in ts_descr.traces:
1209 logging.info(' Trace: path="{}"'.format(trace.path))
1210
1211 self._ts_descriptors = tracing_session_descriptors
1212 self._max_query_data_response_size = max_query_data_response_size
1213 self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1214 self._codec = _LttngLiveViewerProtocolCodec()
1215
1216 # Port 0: OS assigns an unused port
1217 serv_addr = ('localhost', 0)
1218 self._sock.bind(serv_addr)
1219 self._write_port_to_file(port_filename)
1220
1221 try:
1222 self._listen()
1223 finally:
1224 self._sock.close()
1225 logging.info('Closed connection and socket.')
1226
1227 @property
1228 def _server_port(self):
1229 return self._sock.getsockname()[1]
1230
1231 def _recv_command(self):
1232 data = bytes()
1233
1234 while True:
1235 logging.info('Waiting for viewer command.')
1236 buf = self._conn.recv(128)
1237
1238 if not buf:
1239 logging.info('Client closed connection.')
1240
1241 if data:
1242 raise UnexpectedInput(
1243 'Client closed connection after having sent {} command bytes.'.format(
1244 len(data)
1245 )
1246 )
1247
1248 return
1249
1250 logging.info('Received data from viewer: length={}'.format(len(buf)))
1251
1252 data += buf
1253
1254 try:
1255 cmd = self._codec.decode(data)
1256 except struct.error as exc:
1257 raise UnexpectedInput('Malformed command: {}'.format(exc)) from exc
1258
1259 if cmd is not None:
1260 logging.info(
1261 'Received command from viewer: cmd-cls-name={}'.format(
1262 cmd.__class__.__name__
1263 )
1264 )
1265 return cmd
1266
1267 def _send_reply(self, reply):
1268 data = self._codec.encode(reply)
1269 logging.info(
1270 'Sending reply to viewer: reply-cls-name={}, length={}'.format(
1271 reply.__class__.__name__, len(data)
1272 )
1273 )
1274 self._conn.sendall(data)
1275
1276 def _handle_connection(self):
1277 # First command must be "connect"
1278 cmd = self._recv_command()
1279
1280 if type(cmd) is not _LttngLiveViewerConnectCommand:
1281 raise UnexpectedInput(
1282 'First command is not "connect": cmd-cls-name={}'.format(
1283 cmd.__class__.__name__
1284 )
1285 )
1286
1287 # Create viewer session (arbitrary ID 23)
1288 logging.info(
1289 'LTTng live viewer connected: version={}.{}'.format(cmd.major, cmd.minor)
1290 )
1291 viewer_session = _LttngLiveViewerSession(
1292 23, self._ts_descriptors, self._max_query_data_response_size
1293 )
1294
1295 # Send "connect" reply
1296 self._send_reply(
1297 _LttngLiveViewerConnectReply(viewer_session.viewer_session_id, 2, 10)
1298 )
1299
1300 # Make the viewer session handle the remaining commands
1301 while True:
1302 cmd = self._recv_command()
1303
1304 if cmd is None:
1305 # Connection closed (at an expected location within the
1306 # conversation)
1307 return
1308
1309 self._send_reply(viewer_session.handle_command(cmd))
1310
1311 def _listen(self):
1312 logging.info('Listening: port={}'.format(self._server_port))
1726ac08
JR
1313 # Backlog must be present for Python version < 3.5.
1314 # 128 is an arbitrary number since we expect only 1 connection anyway.
1315 self._sock.listen(128)
584af91e
PP
1316 self._conn, viewer_addr = self._sock.accept()
1317 logging.info(
1318 'Accepted viewer: addr={}:{}'.format(viewer_addr[0], viewer_addr[1])
1319 )
1320
1321 try:
1322 self._handle_connection()
1323 finally:
1324 self._conn.close()
1325
1326 def _write_port_to_file(self, port_filename):
1327 # Write the port number to a temporary file.
1328 with tempfile.NamedTemporaryFile(mode='w', delete=False) as tmp_port_file:
1329 print(self._server_port, end='', file=tmp_port_file)
1330
1331 # Rename temporary file to real file
9c878ece 1332 os.replace(tmp_port_file.name, port_filename)
584af91e
PP
1333 logging.info(
1334 'Renamed port file: src-path="{}", dst-path="{}"'.format(
1335 tmp_port_file.name, port_filename
1336 )
1337 )
1338
1339
1340# A tracing session descriptor.
1341#
1342# In the constructor, `traces` is a list of LTTng traces (`LttngTrace`
1343# objects).
1344class LttngTracingSessionDescriptor:
1345 def __init__(
1346 self, name, tracing_session_id, hostname, live_timer_freq, client_count, traces
1347 ):
1348 for trace in traces:
1349 if name not in trace.path:
1350 fmt = 'Tracing session name must be part of every trace path (`{}` not found in `{}`)'
1351 raise ValueError(fmt.format(name, trace.path))
1352
1353 self._traces = traces
1354 stream_count = sum([len(t) + 1 for t in traces])
1355 self._info = _LttngLiveViewerTracingSessionInfo(
1356 tracing_session_id,
1357 live_timer_freq,
1358 client_count,
1359 stream_count,
1360 hostname,
1361 name,
1362 )
1363
1364 @property
1365 def traces(self):
1366 return self._traces
1367
1368 @property
1369 def info(self):
1370 return self._info
1371
1372
2b763e29
JG
1373def _session_descriptors_from_path(sessions_filename, trace_path_prefix):
1374 # File format is:
1375 #
1376 # [
1377 # {
1378 # "name": "my-session",
1379 # "id": 17,
1380 # "hostname": "myhost",
1381 # "live-timer-freq": 1000000,
1382 # "client-count": 23,
1383 # "traces": [
1384 # {
1385 # "path": "lol"
1386 # },
1387 # {
1388 # "path": "meow/mix"
1389 # }
1390 # ]
1391 # }
1392 # ]
1393 with open(sessions_filename, 'r') as sessions_file:
1394 params = json.load(sessions_file)
1395
1396 sessions = []
1397
1398 for session in params:
1399 name = session['name']
1400 tracing_session_id = session['id']
1401 hostname = session['hostname']
1402 live_timer_freq = session['live-timer-freq']
1403 client_count = session['client-count']
1404 traces = []
1405
1406 for trace in session['traces']:
1407 path = trace['path']
1408
1409 if not os.path.isabs(path):
1410 path = os.path.join(trace_path_prefix, path)
1411
1412 traces.append(LttngTrace(path))
1413
1414 sessions.append(
1415 LttngTracingSessionDescriptor(
1416 name,
1417 tracing_session_id,
1418 hostname,
1419 live_timer_freq,
1420 client_count,
1421 traces,
1422 )
1423 )
1424
1425 return sessions
584af91e
PP
1426
1427
1428def _loglevel_parser(string):
1429 loglevels = {'info': logging.INFO, 'warning': logging.WARNING}
1430 if string not in loglevels:
1431 msg = "{} is not a valid loglevel".format(string)
1432 raise argparse.ArgumentTypeError(msg)
1433 return loglevels[string]
1434
1435
1436if __name__ == '__main__':
1437 logging.basicConfig(format='# %(asctime)-25s%(message)s')
1438 parser = argparse.ArgumentParser(
1439 description='LTTng-live protocol mocker', add_help=False
1440 )
1441 parser.add_argument(
1442 '--log-level',
1443 default='warning',
1444 choices=['info', 'warning'],
1445 help='The loglevel to be used.',
1446 )
1447
1448 loglevel_namespace, remaining_args = parser.parse_known_args()
1449 logging.getLogger().setLevel(_loglevel_parser(loglevel_namespace.log_level))
1450
1451 parser.add_argument(
1452 '--port-filename',
1453 help='The final port file. This file is present when the server is ready to receive connection.',
1454 required=True,
1455 )
1456 parser.add_argument(
1457 '--max-query-data-response-size',
1458 type=int,
1459 help='The maximum size of control data response in bytes',
1460 )
1461 parser.add_argument(
2b763e29
JG
1462 '--trace-path-prefix',
1463 type=str,
1464 help='Prefix to prepend to the trace paths of session configurations',
1465 )
1466 parser.add_argument(
1467 '--sessions-filename', type=str, help='Path to a session configuration file',
584af91e
PP
1468 )
1469 parser.add_argument(
1470 '-h',
1471 '--help',
1472 action='help',
1473 default=argparse.SUPPRESS,
1474 help='Show this help message and exit.',
1475 )
1476
1477 args = parser.parse_args(args=remaining_args)
1478 try:
2b763e29
JG
1479 sessions = _session_descriptors_from_path(
1480 args.sessions_filename, args.trace_path_prefix
584af91e 1481 )
2b763e29 1482 LttngLiveServer(args.port_filename, sessions, args.max_query_data_response_size)
584af91e
PP
1483 except UnexpectedInput as exc:
1484 logging.error(str(exc))
1485 print(exc, file=sys.stderr)
1486 sys.exit(1)
This page took 0.091911 seconds and 4 git commands to generate.