Fix: tests: live: listen on python < 3.5 needs backlog parameter
[babeltrace.git] / tests / data / plugins / src.ctf.lttng-live / lttng_live_server.py
1 # The MIT License (MIT)
2 #
3 # Copyright (c) 2019 Philippe Proulx <pproulx@efficios.com>
4 #
5 # Permission is hereby granted, free of charge, to any person obtaining a copy
6 # of this software and associated documentation files (the "Software"), to deal
7 # in the Software without restriction, including without limitation the rights
8 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 # copies of the Software, and to permit persons to whom the Software is
10 # furnished to do so, subject to the following conditions:
11 #
12 # The above copyright notice and this permission notice shall be included in
13 # all copies or substantial portions of the Software.
14 #
15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21 # THE SOFTWARE.
22
23 import argparse
24 import collections.abc
25 import logging
26 import os
27 import os.path
28 import re
29 import socket
30 import struct
31 import sys
32 import tempfile
33
34
35 class UnexpectedInput(RuntimeError):
36 pass
37
38
39 class _LttngLiveViewerCommand:
40 def __init__(self, version):
41 self._version = version
42
43 @property
44 def version(self):
45 return self._version
46
47
48 class _LttngLiveViewerConnectCommand(_LttngLiveViewerCommand):
49 def __init__(self, version, viewer_session_id, major, minor):
50 super().__init__(version)
51 self._viewer_session_id = viewer_session_id
52 self._major = major
53 self._minor = minor
54
55 @property
56 def viewer_session_id(self):
57 return self._viewer_session_id
58
59 @property
60 def major(self):
61 return self._major
62
63 @property
64 def minor(self):
65 return self._minor
66
67
68 class _LttngLiveViewerConnectReply:
69 def __init__(self, viewer_session_id, major, minor):
70 self._viewer_session_id = viewer_session_id
71 self._major = major
72 self._minor = minor
73
74 @property
75 def viewer_session_id(self):
76 return self._viewer_session_id
77
78 @property
79 def major(self):
80 return self._major
81
82 @property
83 def minor(self):
84 return self._minor
85
86
87 class _LttngLiveViewerGetTracingSessionInfosCommand(_LttngLiveViewerCommand):
88 pass
89
90
91 class _LttngLiveViewerTracingSessionInfo:
92 def __init__(
93 self,
94 tracing_session_id,
95 live_timer_freq,
96 client_count,
97 stream_count,
98 hostname,
99 name,
100 ):
101 self._tracing_session_id = tracing_session_id
102 self._live_timer_freq = live_timer_freq
103 self._client_count = client_count
104 self._stream_count = stream_count
105 self._hostname = hostname
106 self._name = name
107
108 @property
109 def tracing_session_id(self):
110 return self._tracing_session_id
111
112 @property
113 def live_timer_freq(self):
114 return self._live_timer_freq
115
116 @property
117 def client_count(self):
118 return self._client_count
119
120 @property
121 def stream_count(self):
122 return self._stream_count
123
124 @property
125 def hostname(self):
126 return self._hostname
127
128 @property
129 def name(self):
130 return self._name
131
132
133 class _LttngLiveViewerGetTracingSessionInfosReply:
134 def __init__(self, tracing_session_infos):
135 self._tracing_session_infos = tracing_session_infos
136
137 @property
138 def tracing_session_infos(self):
139 return self._tracing_session_infos
140
141
142 class _LttngLiveViewerAttachToTracingSessionCommand(_LttngLiveViewerCommand):
143 class SeekType:
144 BEGINNING = 1
145 LAST = 2
146
147 def __init__(self, version, tracing_session_id, offset, seek_type):
148 super().__init__(version)
149 self._tracing_session_id = tracing_session_id
150 self._offset = offset
151 self._seek_type = seek_type
152
153 @property
154 def tracing_session_id(self):
155 return self._tracing_session_id
156
157 @property
158 def offset(self):
159 return self._offset
160
161 @property
162 def seek_type(self):
163 return self._seek_type
164
165
166 class _LttngLiveViewerStreamInfo:
167 def __init__(self, id, trace_id, is_metadata, path, channel_name):
168 self._id = id
169 self._trace_id = trace_id
170 self._is_metadata = is_metadata
171 self._path = path
172 self._channel_name = channel_name
173
174 @property
175 def id(self):
176 return self._id
177
178 @property
179 def trace_id(self):
180 return self._trace_id
181
182 @property
183 def is_metadata(self):
184 return self._is_metadata
185
186 @property
187 def path(self):
188 return self._path
189
190 @property
191 def channel_name(self):
192 return self._channel_name
193
194
195 class _LttngLiveViewerAttachToTracingSessionReply:
196 class Status:
197 OK = 1
198 ALREADY = 2
199 UNKNOWN = 3
200 NOT_LIVE = 4
201 SEEK_ERROR = 5
202 NO_SESSION = 6
203
204 def __init__(self, status, stream_infos):
205 self._status = status
206 self._stream_infos = stream_infos
207
208 @property
209 def status(self):
210 return self._status
211
212 @property
213 def stream_infos(self):
214 return self._stream_infos
215
216
217 class _LttngLiveViewerGetNextDataStreamIndexEntryCommand(_LttngLiveViewerCommand):
218 def __init__(self, version, stream_id):
219 super().__init__(version)
220 self._stream_id = stream_id
221
222 @property
223 def stream_id(self):
224 return self._stream_id
225
226
227 class _LttngLiveViewerGetNextDataStreamIndexEntryReply:
228 class Status:
229 OK = 1
230 RETRY = 2
231 HUP = 3
232 ERROR = 4
233 INACTIVE = 5
234 EOF = 6
235
236 def __init__(self, status, index_entry, has_new_metadata, has_new_data_stream):
237 self._status = status
238 self._index_entry = index_entry
239 self._has_new_metadata = has_new_metadata
240 self._has_new_data_stream = has_new_data_stream
241
242 @property
243 def status(self):
244 return self._status
245
246 @property
247 def index_entry(self):
248 return self._index_entry
249
250 @property
251 def has_new_metadata(self):
252 return self._has_new_metadata
253
254 @property
255 def has_new_data_stream(self):
256 return self._has_new_data_stream
257
258
259 class _LttngLiveViewerGetDataStreamPacketDataCommand(_LttngLiveViewerCommand):
260 def __init__(self, version, stream_id, offset, req_length):
261 super().__init__(version)
262 self._stream_id = stream_id
263 self._offset = offset
264 self._req_length = req_length
265
266 @property
267 def stream_id(self):
268 return self._stream_id
269
270 @property
271 def offset(self):
272 return self._offset
273
274 @property
275 def req_length(self):
276 return self._req_length
277
278
279 class _LttngLiveViewerGetDataStreamPacketDataReply:
280 class Status:
281 OK = 1
282 RETRY = 2
283 ERROR = 3
284 EOF = 4
285
286 def __init__(self, status, data, has_new_metadata, has_new_data_stream):
287 self._status = status
288 self._data = data
289 self._has_new_metadata = has_new_metadata
290 self._has_new_data_stream = has_new_data_stream
291
292 @property
293 def status(self):
294 return self._status
295
296 @property
297 def data(self):
298 return self._data
299
300 @property
301 def has_new_metadata(self):
302 return self._has_new_metadata
303
304 @property
305 def has_new_data_stream(self):
306 return self._has_new_data_stream
307
308
309 class _LttngLiveViewerGetMetadataStreamDataCommand(_LttngLiveViewerCommand):
310 def __init__(self, version, stream_id):
311 super().__init__(version)
312 self._stream_id = stream_id
313
314 @property
315 def stream_id(self):
316 return self._stream_id
317
318
319 class _LttngLiveViewerGetMetadataStreamDataContentReply:
320 class Status:
321 OK = 1
322 NO_NEW = 2
323 ERROR = 3
324
325 def __init__(self, status, data):
326 self._status = status
327 self._data = data
328
329 @property
330 def status(self):
331 return self._status
332
333 @property
334 def data(self):
335 return self._data
336
337
338 class _LttngLiveViewerGetNewStreamInfosCommand(_LttngLiveViewerCommand):
339 def __init__(self, version, tracing_session_id):
340 super().__init__(version)
341 self._tracing_session_id = tracing_session_id
342
343 @property
344 def tracing_session_id(self):
345 return self._tracing_session_id
346
347
348 class _LttngLiveViewerGetNewStreamInfosReply:
349 class Status:
350 OK = 1
351 NO_NEW = 2
352 ERROR = 3
353 HUP = 4
354
355 def __init__(self, status, stream_infos):
356 self._status = status
357 self._stream_infos = stream_infos
358
359 @property
360 def status(self):
361 return self._status
362
363 @property
364 def stream_infos(self):
365 return self._stream_infos
366
367
368 class _LttngLiveViewerCreateViewerSessionCommand(_LttngLiveViewerCommand):
369 pass
370
371
372 class _LttngLiveViewerCreateViewerSessionReply:
373 class Status:
374 OK = 1
375 ERROR = 2
376
377 def __init__(self, status):
378 self._status = status
379
380 @property
381 def status(self):
382 return self._status
383
384
385 class _LttngLiveViewerDetachFromTracingSessionCommand(_LttngLiveViewerCommand):
386 def __init__(self, version, tracing_session_id):
387 super().__init__(version)
388 self._tracing_session_id = tracing_session_id
389
390 @property
391 def tracing_session_id(self):
392 return self._tracing_session_id
393
394
395 class _LttngLiveViewerDetachFromTracingSessionReply:
396 class Status:
397 OK = 1
398 UNKNOWN = 2
399 ERROR = 3
400
401 def __init__(self, status):
402 self._status = status
403
404 @property
405 def status(self):
406 return self._status
407
408
409 # An LTTng live protocol codec can convert bytes to command objects and
410 # reply objects to bytes.
411 class _LttngLiveViewerProtocolCodec:
412 _COMMAND_HEADER_STRUCT_FMT = 'QII'
413 _COMMAND_HEADER_SIZE_BYTES = struct.calcsize(_COMMAND_HEADER_STRUCT_FMT)
414
415 def __init__(self):
416 pass
417
418 def _unpack(self, fmt, data, offset=0):
419 fmt = '!' + fmt
420 return struct.unpack_from(fmt, data, offset)
421
422 def _unpack_payload(self, fmt, data):
423 return self._unpack(
424 fmt, data, _LttngLiveViewerProtocolCodec._COMMAND_HEADER_SIZE_BYTES
425 )
426
427 def decode(self, data):
428 if len(data) < self._COMMAND_HEADER_SIZE_BYTES:
429 # Not enough data to read the command header
430 return
431
432 payload_size, cmd_type, version = self._unpack(
433 self._COMMAND_HEADER_STRUCT_FMT, data
434 )
435 logging.info(
436 'Decoded command header: payload-size={}, cmd-type={}, version={}'.format(
437 payload_size, cmd_type, version
438 )
439 )
440
441 if len(data) < self._COMMAND_HEADER_SIZE_BYTES + payload_size:
442 # Not enough data to read the whole command
443 return
444
445 if cmd_type == 1:
446 viewer_session_id, major, minor, conn_type = self._unpack_payload(
447 'QIII', data
448 )
449 return _LttngLiveViewerConnectCommand(
450 version, viewer_session_id, major, minor
451 )
452 elif cmd_type == 2:
453 return _LttngLiveViewerGetTracingSessionInfosCommand(version)
454 elif cmd_type == 3:
455 tracing_session_id, offset, seek_type = self._unpack_payload('QQI', data)
456 return _LttngLiveViewerAttachToTracingSessionCommand(
457 version, tracing_session_id, offset, seek_type
458 )
459 elif cmd_type == 4:
460 stream_id, = self._unpack_payload('Q', data)
461 return _LttngLiveViewerGetNextDataStreamIndexEntryCommand(
462 version, stream_id
463 )
464 elif cmd_type == 5:
465 stream_id, offset, req_length = self._unpack_payload('QQI', data)
466 return _LttngLiveViewerGetDataStreamPacketDataCommand(
467 version, stream_id, offset, req_length
468 )
469 elif cmd_type == 6:
470 stream_id, = self._unpack_payload('Q', data)
471 return _LttngLiveViewerGetMetadataStreamDataCommand(version, stream_id)
472 elif cmd_type == 7:
473 tracing_session_id, = self._unpack_payload('Q', data)
474 return _LttngLiveViewerGetNewStreamInfosCommand(version, tracing_session_id)
475 elif cmd_type == 8:
476 return _LttngLiveViewerCreateViewerSessionCommand(version)
477 elif cmd_type == 9:
478 tracing_session_id, = self._unpack_payload('Q', data)
479 return _LttngLiveViewerDetachFromTracingSessionCommand(
480 version, tracing_session_id
481 )
482 else:
483 raise UnexpectedInput('Unknown command type {}'.format(cmd_type))
484
485 def _pack(self, fmt, *args):
486 # Force network byte order
487 return struct.pack('!' + fmt, *args)
488
489 def _encode_zero_padded_str(self, string, length):
490 data = string.encode()
491 return data.ljust(length, b'\x00')
492
493 def _encode_stream_info(self, info):
494 data = self._pack('QQI', info.id, info.trace_id, int(info.is_metadata))
495 data += self._encode_zero_padded_str(info.path, 4096)
496 data += self._encode_zero_padded_str(info.channel_name, 255)
497 return data
498
499 def _get_has_new_stuff_flags(self, has_new_metadata, has_new_data_streams):
500 flags = 0
501
502 if has_new_metadata:
503 flags |= 1
504
505 if has_new_data_streams:
506 flags |= 2
507
508 return flags
509
510 def encode(self, reply):
511 if type(reply) is _LttngLiveViewerConnectReply:
512 data = self._pack(
513 'QIII', reply.viewer_session_id, reply.major, reply.minor, 2
514 )
515 elif type(reply) is _LttngLiveViewerGetTracingSessionInfosReply:
516 data = self._pack('I', len(reply.tracing_session_infos))
517
518 for info in reply.tracing_session_infos:
519 data += self._pack(
520 'QIII',
521 info.tracing_session_id,
522 info.live_timer_freq,
523 info.client_count,
524 info.stream_count,
525 )
526 data += self._encode_zero_padded_str(info.hostname, 64)
527 data += self._encode_zero_padded_str(info.name, 255)
528 elif type(reply) is _LttngLiveViewerAttachToTracingSessionReply:
529 data = self._pack('II', reply.status, len(reply.stream_infos))
530
531 for info in reply.stream_infos:
532 data += self._encode_stream_info(info)
533 elif type(reply) is _LttngLiveViewerGetNextDataStreamIndexEntryReply:
534 entry = reply.index_entry
535 flags = self._get_has_new_stuff_flags(
536 reply.has_new_metadata, reply.has_new_data_stream
537 )
538
539 data = self._pack(
540 'QQQQQQQII',
541 entry.offset_bytes,
542 entry.total_size_bits,
543 entry.content_size_bits,
544 entry.timestamp_begin,
545 entry.timestamp_end,
546 entry.events_discarded,
547 entry.stream_class_id,
548 reply.status,
549 flags,
550 )
551 elif type(reply) is _LttngLiveViewerGetDataStreamPacketDataReply:
552 flags = self._get_has_new_stuff_flags(
553 reply.has_new_metadata, reply.has_new_data_stream
554 )
555 data = self._pack('III', reply.status, len(reply.data), flags)
556 data += reply.data
557 elif type(reply) is _LttngLiveViewerGetMetadataStreamDataContentReply:
558 data = self._pack('QI', len(reply.data), reply.status)
559 data += reply.data
560 elif type(reply) is _LttngLiveViewerGetNewStreamInfosReply:
561 data = self._pack('II', reply.status, len(reply.stream_infos))
562
563 for info in reply.stream_infos:
564 data += self._encode_stream_info(info)
565 elif type(reply) is _LttngLiveViewerCreateViewerSessionReply:
566 data = self._pack('I', reply.status)
567 elif type(reply) is _LttngLiveViewerDetachFromTracingSessionReply:
568 data = self._pack('I', reply.status)
569 else:
570 raise ValueError(
571 'Unknown reply object with class `{}`'.format(reply.__class__.__name__)
572 )
573
574 return data
575
576
577 # An entry within the index of an LTTng data stream.
578 class _LttngDataStreamIndexEntry:
579 def __init__(
580 self,
581 offset_bytes,
582 total_size_bits,
583 content_size_bits,
584 timestamp_begin,
585 timestamp_end,
586 events_discarded,
587 stream_class_id,
588 ):
589 self._offset_bytes = offset_bytes
590 self._total_size_bits = total_size_bits
591 self._content_size_bits = content_size_bits
592 self._timestamp_begin = timestamp_begin
593 self._timestamp_end = timestamp_end
594 self._events_discarded = events_discarded
595 self._stream_class_id = stream_class_id
596
597 @property
598 def offset_bytes(self):
599 return self._offset_bytes
600
601 @property
602 def total_size_bits(self):
603 return self._total_size_bits
604
605 @property
606 def total_size_bytes(self):
607 return self._total_size_bits // 8
608
609 @property
610 def content_size_bits(self):
611 return self._content_size_bits
612
613 @property
614 def content_size_bytes(self):
615 return self._content_size_bits // 8
616
617 @property
618 def timestamp_begin(self):
619 return self._timestamp_begin
620
621 @property
622 def timestamp_end(self):
623 return self._timestamp_end
624
625 @property
626 def events_discarded(self):
627 return self._events_discarded
628
629 @property
630 def stream_class_id(self):
631 return self._stream_class_id
632
633
634 # The index of an LTTng data stream, a sequence of index entries.
635 class _LttngDataStreamIndex(collections.abc.Sequence):
636 def __init__(self, path):
637 self._path = path
638 self._build()
639 logging.info(
640 'Built data stream index entries: path="{}", count={}'.format(
641 path, len(self._entries)
642 )
643 )
644
645 def _build(self):
646 self._entries = []
647 assert os.path.isfile(self._path)
648
649 with open(self._path, 'rb') as f:
650 # Read header first
651 fmt = '>IIII'
652 size = struct.calcsize(fmt)
653 data = f.read(size)
654 assert len(data) == size
655 magic, index_major, index_minor, index_entry_length = struct.unpack(
656 fmt, data
657 )
658 assert magic == 0xC1F1DCC1
659
660 # Read index entries
661 fmt = '>QQQQQQQ'
662 size = struct.calcsize(fmt)
663
664 while True:
665 logging.debug(
666 'Decoding data stream index entry: path="{}", offset={}'.format(
667 self._path, f.tell()
668 )
669 )
670 data = f.read(size)
671
672 if not data:
673 # Done
674 break
675
676 assert len(data) == size
677 offset_bytes, total_size_bits, content_size_bits, timestamp_begin, timestamp_end, events_discarded, stream_class_id = struct.unpack(
678 fmt, data
679 )
680
681 self._entries.append(
682 _LttngDataStreamIndexEntry(
683 offset_bytes,
684 total_size_bits,
685 content_size_bits,
686 timestamp_begin,
687 timestamp_end,
688 events_discarded,
689 stream_class_id,
690 )
691 )
692
693 # Skip anything else before the next entry
694 f.seek(index_entry_length - size, os.SEEK_CUR)
695
696 def __getitem__(self, index):
697 return self._entries[index]
698
699 def __len__(self):
700 return len(self._entries)
701
702 @property
703 def path(self):
704 return self._path
705
706
707 # An LTTng data stream.
708 class _LttngDataStream:
709 def __init__(self, path):
710 self._path = path
711 filename = os.path.basename(path)
712 match = re.match(r'(.*)_\d+', filename)
713 self._channel_name = match.group(1)
714 trace_dir = os.path.dirname(path)
715 index_path = os.path.join(trace_dir, 'index', filename + '.idx')
716 self._index = _LttngDataStreamIndex(index_path)
717 assert os.path.isfile(path)
718 self._file = open(path, 'rb')
719 logging.info(
720 'Built data stream: path="{}", channel-name="{}"'.format(
721 path, self._channel_name
722 )
723 )
724
725 @property
726 def path(self):
727 return self._path
728
729 @property
730 def channel_name(self):
731 return self._channel_name
732
733 @property
734 def index(self):
735 return self._index
736
737 def get_data(self, offset_bytes, len_bytes):
738 self._file.seek(offset_bytes)
739 return self._file.read(len_bytes)
740
741
742 # An LTTng metadata stream.
743 class _LttngMetadataStream:
744 def __init__(self, path):
745 self._path = path
746 logging.info('Built metadata stream: path="{}"'.format(path))
747
748 @property
749 def path(self):
750 return self._path
751
752 @property
753 def data(self):
754 assert os.path.isfile(self._path)
755
756 with open(self._path, 'rb') as f:
757 return f.read()
758
759
760 # An LTTng trace, a sequence of LTTng data streams.
761 class LttngTrace(collections.abc.Sequence):
762 def __init__(self, trace_dir):
763 assert os.path.isdir(trace_dir)
764 self._path = trace_dir
765 self._metadata_stream = _LttngMetadataStream(
766 os.path.join(trace_dir, 'metadata')
767 )
768 self._create_data_streams(trace_dir)
769 logging.info('Built trace: path="{}"'.format(trace_dir))
770
771 def _create_data_streams(self, trace_dir):
772 data_stream_paths = []
773
774 for filename in os.listdir(trace_dir):
775 path = os.path.join(trace_dir, filename)
776
777 if not os.path.isfile(path):
778 continue
779
780 if filename.startswith('.'):
781 continue
782
783 if filename == 'metadata':
784 continue
785
786 data_stream_paths.append(path)
787
788 data_stream_paths.sort()
789 self._data_streams = []
790
791 for data_stream_path in data_stream_paths:
792 self._data_streams.append(_LttngDataStream(data_stream_path))
793
794 @property
795 def path(self):
796 return self._path
797
798 @property
799 def metadata_stream(self):
800 return self._metadata_stream
801
802 def __getitem__(self, index):
803 return self._data_streams[index]
804
805 def __len__(self):
806 return len(self._data_streams)
807
808
809 # The state of a single data stream.
810 class _LttngLiveViewerSessionDataStreamState:
811 def __init__(self, ts_state, info, data_stream):
812 self._ts_state = ts_state
813 self._info = info
814 self._data_stream = data_stream
815 self._cur_index_entry_index = 0
816 fmt = 'Built data stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
817 logging.info(
818 fmt.format(
819 info.id,
820 ts_state.tracing_session_descriptor.info.tracing_session_id,
821 ts_state.tracing_session_descriptor.info.name,
822 data_stream.path,
823 )
824 )
825
826 @property
827 def tracing_session_state(self):
828 return self._ts_state
829
830 @property
831 def info(self):
832 return self._info
833
834 @property
835 def data_stream(self):
836 return self._data_stream
837
838 @property
839 def cur_index_entry(self):
840 if self._cur_index_entry_index == len(self._data_stream.index):
841 return
842
843 return self._data_stream.index[self._cur_index_entry_index]
844
845 def goto_next_index_entry(self):
846 self._cur_index_entry_index += 1
847
848
849 # The state of a single metadata stream.
850 class _LttngLiveViewerSessionMetadataStreamState:
851 def __init__(self, ts_state, info, metadata_stream):
852 self._ts_state = ts_state
853 self._info = info
854 self._metadata_stream = metadata_stream
855 self._is_sent = False
856 fmt = 'Built metadata stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
857 logging.info(
858 fmt.format(
859 info.id,
860 ts_state.tracing_session_descriptor.info.tracing_session_id,
861 ts_state.tracing_session_descriptor.info.name,
862 metadata_stream.path,
863 )
864 )
865
866 @property
867 def trace_session_state(self):
868 return self._trace_session_state
869
870 @property
871 def info(self):
872 return self._info
873
874 @property
875 def metadata_stream(self):
876 return self._metadata_stream
877
878 @property
879 def is_sent(self):
880 return self._is_sent
881
882 @is_sent.setter
883 def is_sent(self, value):
884 self._is_sent = value
885
886
887 # The state of a tracing session.
888 class _LttngLiveViewerSessionTracingSessionState:
889 def __init__(self, tc_descr, base_stream_id):
890 self._tc_descr = tc_descr
891 self._stream_infos = []
892 self._ds_states = {}
893 self._ms_states = {}
894 stream_id = base_stream_id
895
896 for trace in tc_descr.traces:
897 trace_id = stream_id * 1000
898
899 # Data streams -> stream infos and data stream states
900 for data_stream in trace:
901 info = _LttngLiveViewerStreamInfo(
902 stream_id,
903 trace_id,
904 False,
905 data_stream.path,
906 data_stream.channel_name,
907 )
908 self._stream_infos.append(info)
909 self._ds_states[stream_id] = _LttngLiveViewerSessionDataStreamState(
910 self, info, data_stream
911 )
912 stream_id += 1
913
914 # Metadata stream -> stream info and metadata stream state
915 info = _LttngLiveViewerStreamInfo(
916 stream_id, trace_id, True, trace.metadata_stream.path, 'metadata'
917 )
918 self._stream_infos.append(info)
919 self._ms_states[stream_id] = _LttngLiveViewerSessionMetadataStreamState(
920 self, info, trace.metadata_stream
921 )
922 stream_id += 1
923
924 self._is_attached = False
925 fmt = 'Built tracing session state: id={}, name="{}"'
926 logging.info(fmt.format(tc_descr.info.tracing_session_id, tc_descr.info.name))
927
928 @property
929 def tracing_session_descriptor(self):
930 return self._tc_descr
931
932 @property
933 def data_stream_states(self):
934 return self._ds_states
935
936 @property
937 def metadata_stream_states(self):
938 return self._ms_states
939
940 @property
941 def stream_infos(self):
942 return self._stream_infos
943
944 @property
945 def has_new_metadata(self):
946 return any([not ms.is_sent for ms in self._ms_states.values()])
947
948 @property
949 def is_attached(self):
950 return self._is_attached
951
952 @is_attached.setter
953 def is_attached(self, value):
954 self._is_attached = value
955
956
957 # An LTTng live viewer session manages a view on tracing sessions
958 # and replies to commands accordingly.
959 class _LttngLiveViewerSession:
960 def __init__(
961 self,
962 viewer_session_id,
963 tracing_session_descriptors,
964 max_query_data_response_size,
965 ):
966 self._viewer_session_id = viewer_session_id
967 self._ts_states = {}
968 self._stream_states = {}
969 self._max_query_data_response_size = max_query_data_response_size
970 total_stream_infos = 0
971
972 for ts_descr in tracing_session_descriptors:
973 ts_state = _LttngLiveViewerSessionTracingSessionState(
974 ts_descr, total_stream_infos
975 )
976 ts_id = ts_state.tracing_session_descriptor.info.tracing_session_id
977 self._ts_states[ts_id] = ts_state
978 total_stream_infos += len(ts_state.stream_infos)
979
980 # Update session's stream states to have the new states
981 self._stream_states.update(ts_state.data_stream_states)
982 self._stream_states.update(ts_state.metadata_stream_states)
983
984 self._command_handlers = {
985 _LttngLiveViewerAttachToTracingSessionCommand: self._handle_attach_to_tracing_session_command,
986 _LttngLiveViewerCreateViewerSessionCommand: self._handle_create_viewer_session_command,
987 _LttngLiveViewerDetachFromTracingSessionCommand: self._handle_detach_from_tracing_session_command,
988 _LttngLiveViewerGetDataStreamPacketDataCommand: self._handle_get_data_stream_packet_data_command,
989 _LttngLiveViewerGetMetadataStreamDataCommand: self._handle_get_metadata_stream_data_command,
990 _LttngLiveViewerGetNewStreamInfosCommand: self._handle_get_new_stream_infos_command,
991 _LttngLiveViewerGetNextDataStreamIndexEntryCommand: self._handle_get_next_data_stream_index_entry_command,
992 _LttngLiveViewerGetTracingSessionInfosCommand: self._handle_get_tracing_session_infos_command,
993 }
994
995 @property
996 def viewer_session_id(self):
997 return self._viewer_session_id
998
999 def _get_tracing_session_state(self, tracing_session_id):
1000 if tracing_session_id not in self._ts_states:
1001 raise UnexpectedInput(
1002 'Unknown tracing session ID {}'.format(tracing_session_id)
1003 )
1004
1005 return self._ts_states[tracing_session_id]
1006
1007 def _get_stream_state(self, stream_id):
1008 if stream_id not in self._stream_states:
1009 UnexpectedInput('Unknown stream ID {}'.format(stream_id))
1010
1011 return self._stream_states[stream_id]
1012
1013 def handle_command(self, cmd):
1014 logging.info(
1015 'Handling command in viewer session: cmd-cls-name={}'.format(
1016 cmd.__class__.__name__
1017 )
1018 )
1019 cmd_type = type(cmd)
1020
1021 if cmd_type not in self._command_handlers:
1022 raise UnexpectedInput(
1023 'Unexpected command: cmd-cls-name={}'.format(cmd.__class__.__name__)
1024 )
1025
1026 return self._command_handlers[cmd_type](cmd)
1027
1028 def _handle_attach_to_tracing_session_command(self, cmd):
1029 fmt = 'Handling "attach to tracing session" command: ts-id={}, offset={}, seek-type={}'
1030 logging.info(fmt.format(cmd.tracing_session_id, cmd.offset, cmd.seek_type))
1031 ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
1032 info = ts_state.tracing_session_descriptor.info
1033
1034 if ts_state.is_attached:
1035 raise UnexpectedInput(
1036 'Cannot attach to tracing session `{}`: viewer is already attached'.format(
1037 info.name
1038 )
1039 )
1040
1041 ts_state.is_attached = True
1042 status = _LttngLiveViewerAttachToTracingSessionReply.Status.OK
1043 return _LttngLiveViewerAttachToTracingSessionReply(
1044 status, ts_state.stream_infos
1045 )
1046
1047 def _handle_detach_from_tracing_session_command(self, cmd):
1048 fmt = 'Handling "detach from tracing session" command: ts-id={}'
1049 logging.info(fmt.format(cmd.tracing_session_id))
1050 ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
1051 info = ts_state.tracing_session_descriptor.info
1052
1053 if not ts_state.is_attached:
1054 raise UnexpectedInput(
1055 'Cannot detach to tracing session `{}`: viewer is not attached'.format(
1056 info.name
1057 )
1058 )
1059
1060 ts_state.is_attached = False
1061 status = _LttngLiveViewerDetachFromTracingSessionReply.Status.OK
1062 return _LttngLiveViewerDetachFromTracingSessionReply(status)
1063
1064 def _handle_get_next_data_stream_index_entry_command(self, cmd):
1065 fmt = 'Handling "get next data stream index entry" command: stream-id={}'
1066 logging.info(fmt.format(cmd.stream_id))
1067 stream_state = self._get_stream_state(cmd.stream_id)
1068
1069 if type(stream_state) is not _LttngLiveViewerSessionDataStreamState:
1070 raise UnexpectedInput(
1071 'Stream with ID {} is not a data stream'.format(cmd.stream_id)
1072 )
1073
1074 if stream_state.cur_index_entry is None:
1075 # The viewer is done reading this stream
1076 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.HUP
1077
1078 # Dummy data stream index entry to use with the `HUP` status
1079 # (the reply needs one, but the viewer ignores it)
1080 index_entry = _LttngDataStreamIndexEntry(0, 0, 0, 0, 0, 0, 0)
1081
1082 return _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1083 status, index_entry, False, False
1084 )
1085
1086 # The viewer only checks the `has_new_metadata` flag if the
1087 # reply's status is `OK`, so we need to provide an index here
1088 has_new_metadata = stream_state.tracing_session_state.has_new_metadata
1089 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.OK
1090 reply = _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1091 status, stream_state.cur_index_entry, has_new_metadata, False
1092 )
1093 stream_state.goto_next_index_entry()
1094 return reply
1095
1096 def _handle_get_data_stream_packet_data_command(self, cmd):
1097 fmt = 'Handling "get data stream packet data" command: stream-id={}, offset={}, req-length={}'
1098 logging.info(fmt.format(cmd.stream_id, cmd.offset, cmd.req_length))
1099 stream_state = self._get_stream_state(cmd.stream_id)
1100 data_response_length = cmd.req_length
1101
1102 if type(stream_state) is not _LttngLiveViewerSessionDataStreamState:
1103 raise UnexpectedInput(
1104 'Stream with ID {} is not a data stream'.format(cmd.stream_id)
1105 )
1106
1107 if stream_state.tracing_session_state.has_new_metadata:
1108 status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.ERROR
1109 return _LttngLiveViewerGetDataStreamPacketDataReply(
1110 status, bytes(), True, False
1111 )
1112
1113 if self._max_query_data_response_size:
1114 # Enforce a server side limit on the query requested length.
1115 # To ensure that the transaction terminate take the minimum of both
1116 # value.
1117 data_response_length = min(
1118 cmd.req_length, self._max_query_data_response_size
1119 )
1120 fmt = 'Limiting "get data stream packet data" command: req-length={} actual response size={}'
1121 logging.info(fmt.format(cmd.req_length, data_response_length))
1122
1123 data = stream_state.data_stream.get_data(cmd.offset, data_response_length)
1124 status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.OK
1125 return _LttngLiveViewerGetDataStreamPacketDataReply(status, data, False, False)
1126
1127 def _handle_get_metadata_stream_data_command(self, cmd):
1128 fmt = 'Handling "get metadata stream data" command: stream-id={}'
1129 logging.info(fmt.format(cmd.stream_id))
1130 stream_state = self._get_stream_state(cmd.stream_id)
1131
1132 if type(stream_state) is not _LttngLiveViewerSessionMetadataStreamState:
1133 raise UnexpectedInput(
1134 'Stream with ID {} is not a metadata stream'.format(cmd.stream_id)
1135 )
1136
1137 if stream_state.is_sent:
1138 status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.NO_NEW
1139 return _LttngLiveViewerGetMetadataStreamDataContentReply(status, bytes())
1140
1141 stream_state.is_sent = True
1142 status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.OK
1143 return _LttngLiveViewerGetMetadataStreamDataContentReply(
1144 status, stream_state.metadata_stream.data
1145 )
1146
1147 def _handle_get_new_stream_infos_command(self, cmd):
1148 fmt = 'Handling "get new stream infos" command: ts-id={}'
1149 logging.info(fmt.format(cmd.tracing_session_id))
1150
1151 # As of this version, all the tracing session's stream infos are
1152 # always given to the viewer when sending the "attach to tracing
1153 # session" reply, so there's nothing new here. Return the `HUP`
1154 # status as, if we're handling this command, the viewer consumed
1155 # all the existing data streams.
1156 status = _LttngLiveViewerGetNewStreamInfosReply.Status.HUP
1157 return _LttngLiveViewerGetNewStreamInfosReply(status, [])
1158
1159 def _handle_get_tracing_session_infos_command(self, cmd):
1160 logging.info('Handling "get tracing session infos" command.')
1161 infos = [
1162 tss.tracing_session_descriptor.info for tss in self._ts_states.values()
1163 ]
1164 infos.sort(key=lambda info: info.name)
1165 return _LttngLiveViewerGetTracingSessionInfosReply(infos)
1166
1167 def _handle_create_viewer_session_command(self, cmd):
1168 logging.info('Handling "create viewer session" command.')
1169 status = _LttngLiveViewerCreateViewerSessionReply.Status.OK
1170
1171 # This does nothing here. In the LTTng relay daemon, it
1172 # allocates the viewer session's state.
1173 return _LttngLiveViewerCreateViewerSessionReply(status)
1174
1175
1176 # An LTTng live TCP server.
1177 #
1178 # On creation, it binds to `localhost` with an OS-assigned TCP port. It writes
1179 # the decimal TCP port number to a temporary port file. It renames the
1180 # temporary port file to `port_filename`.
1181 #
1182 # `tracing_session_descriptors` is a list of tracing session descriptors
1183 # (`LttngTracingSessionDescriptor`) to serve.
1184 #
1185 # This server accepts a single viewer (client).
1186 #
1187 # When the viewer closes the connection, the server's constructor
1188 # returns.
1189 class LttngLiveServer:
1190 def __init__(
1191 self, port_filename, tracing_session_descriptors, max_query_data_response_size
1192 ):
1193 logging.info('Server configuration:')
1194
1195 logging.info(' Port file name: `{}`'.format(port_filename))
1196
1197 if max_query_data_response_size is not None:
1198 logging.info(
1199 ' Maximum response data query size: `{}`'.format(
1200 max_query_data_response_size
1201 )
1202 )
1203
1204 for ts_descr in tracing_session_descriptors:
1205 info = ts_descr.info
1206 fmt = ' TS descriptor: name="{}", id={}, hostname="{}", live-timer-freq={}, client-count={}, stream-count={}:'
1207 logging.info(
1208 fmt.format(
1209 info.name,
1210 info.tracing_session_id,
1211 info.hostname,
1212 info.live_timer_freq,
1213 info.client_count,
1214 info.stream_count,
1215 )
1216 )
1217
1218 for trace in ts_descr.traces:
1219 logging.info(' Trace: path="{}"'.format(trace.path))
1220
1221 self._ts_descriptors = tracing_session_descriptors
1222 self._max_query_data_response_size = max_query_data_response_size
1223 self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1224 self._codec = _LttngLiveViewerProtocolCodec()
1225
1226 # Port 0: OS assigns an unused port
1227 serv_addr = ('localhost', 0)
1228 self._sock.bind(serv_addr)
1229 self._write_port_to_file(port_filename)
1230
1231 try:
1232 self._listen()
1233 finally:
1234 self._sock.close()
1235 logging.info('Closed connection and socket.')
1236
1237 @property
1238 def _server_port(self):
1239 return self._sock.getsockname()[1]
1240
1241 def _recv_command(self):
1242 data = bytes()
1243
1244 while True:
1245 logging.info('Waiting for viewer command.')
1246 buf = self._conn.recv(128)
1247
1248 if not buf:
1249 logging.info('Client closed connection.')
1250
1251 if data:
1252 raise UnexpectedInput(
1253 'Client closed connection after having sent {} command bytes.'.format(
1254 len(data)
1255 )
1256 )
1257
1258 return
1259
1260 logging.info('Received data from viewer: length={}'.format(len(buf)))
1261
1262 data += buf
1263
1264 try:
1265 cmd = self._codec.decode(data)
1266 except struct.error as exc:
1267 raise UnexpectedInput('Malformed command: {}'.format(exc)) from exc
1268
1269 if cmd is not None:
1270 logging.info(
1271 'Received command from viewer: cmd-cls-name={}'.format(
1272 cmd.__class__.__name__
1273 )
1274 )
1275 return cmd
1276
1277 def _send_reply(self, reply):
1278 data = self._codec.encode(reply)
1279 logging.info(
1280 'Sending reply to viewer: reply-cls-name={}, length={}'.format(
1281 reply.__class__.__name__, len(data)
1282 )
1283 )
1284 self._conn.sendall(data)
1285
1286 def _handle_connection(self):
1287 # First command must be "connect"
1288 cmd = self._recv_command()
1289
1290 if type(cmd) is not _LttngLiveViewerConnectCommand:
1291 raise UnexpectedInput(
1292 'First command is not "connect": cmd-cls-name={}'.format(
1293 cmd.__class__.__name__
1294 )
1295 )
1296
1297 # Create viewer session (arbitrary ID 23)
1298 logging.info(
1299 'LTTng live viewer connected: version={}.{}'.format(cmd.major, cmd.minor)
1300 )
1301 viewer_session = _LttngLiveViewerSession(
1302 23, self._ts_descriptors, self._max_query_data_response_size
1303 )
1304
1305 # Send "connect" reply
1306 self._send_reply(
1307 _LttngLiveViewerConnectReply(viewer_session.viewer_session_id, 2, 10)
1308 )
1309
1310 # Make the viewer session handle the remaining commands
1311 while True:
1312 cmd = self._recv_command()
1313
1314 if cmd is None:
1315 # Connection closed (at an expected location within the
1316 # conversation)
1317 return
1318
1319 self._send_reply(viewer_session.handle_command(cmd))
1320
1321 def _listen(self):
1322 logging.info('Listening: port={}'.format(self._server_port))
1323 # Backlog must be present for Python version < 3.5.
1324 # 128 is an arbitrary number since we expect only 1 connection anyway.
1325 self._sock.listen(128)
1326 self._conn, viewer_addr = self._sock.accept()
1327 logging.info(
1328 'Accepted viewer: addr={}:{}'.format(viewer_addr[0], viewer_addr[1])
1329 )
1330
1331 try:
1332 self._handle_connection()
1333 finally:
1334 self._conn.close()
1335
1336 def _write_port_to_file(self, port_filename):
1337 # Write the port number to a temporary file.
1338 with tempfile.NamedTemporaryFile(mode='w', delete=False) as tmp_port_file:
1339 print(self._server_port, end='', file=tmp_port_file)
1340
1341 # Rename temporary file to real file
1342 os.rename(tmp_port_file.name, port_filename)
1343 logging.info(
1344 'Renamed port file: src-path="{}", dst-path="{}"'.format(
1345 tmp_port_file.name, port_filename
1346 )
1347 )
1348
1349
1350 # A tracing session descriptor.
1351 #
1352 # In the constructor, `traces` is a list of LTTng traces (`LttngTrace`
1353 # objects).
1354 class LttngTracingSessionDescriptor:
1355 def __init__(
1356 self, name, tracing_session_id, hostname, live_timer_freq, client_count, traces
1357 ):
1358 for trace in traces:
1359 if name not in trace.path:
1360 fmt = 'Tracing session name must be part of every trace path (`{}` not found in `{}`)'
1361 raise ValueError(fmt.format(name, trace.path))
1362
1363 self._traces = traces
1364 stream_count = sum([len(t) + 1 for t in traces])
1365 self._info = _LttngLiveViewerTracingSessionInfo(
1366 tracing_session_id,
1367 live_timer_freq,
1368 client_count,
1369 stream_count,
1370 hostname,
1371 name,
1372 )
1373
1374 @property
1375 def traces(self):
1376 return self._traces
1377
1378 @property
1379 def info(self):
1380 return self._info
1381
1382
1383 def _tracing_session_descriptors_from_arg(string):
1384 # Format is:
1385 # NAME,ID,HOSTNAME,FREQ,CLIENTS,TRACEPATH[,TRACEPATH]...
1386 parts = string.split(',')
1387 name = parts[0]
1388 tracing_session_id = int(parts[1])
1389 hostname = parts[2]
1390 live_timer_freq = int(parts[3])
1391 client_count = int(parts[4])
1392 traces = [LttngTrace(path) for path in parts[5:]]
1393 return LttngTracingSessionDescriptor(
1394 name, tracing_session_id, hostname, live_timer_freq, client_count, traces
1395 )
1396
1397
1398 def _loglevel_parser(string):
1399 loglevels = {'info': logging.INFO, 'warning': logging.WARNING}
1400 if string not in loglevels:
1401 msg = "{} is not a valid loglevel".format(string)
1402 raise argparse.ArgumentTypeError(msg)
1403 return loglevels[string]
1404
1405
1406 if __name__ == '__main__':
1407 logging.basicConfig(format='# %(asctime)-25s%(message)s')
1408 parser = argparse.ArgumentParser(
1409 description='LTTng-live protocol mocker', add_help=False
1410 )
1411 parser.add_argument(
1412 '--log-level',
1413 default='warning',
1414 choices=['info', 'warning'],
1415 help='The loglevel to be used.',
1416 )
1417
1418 loglevel_namespace, remaining_args = parser.parse_known_args()
1419 logging.getLogger().setLevel(_loglevel_parser(loglevel_namespace.log_level))
1420
1421 parser.add_argument(
1422 '--port-filename',
1423 help='The final port file. This file is present when the server is ready to receive connection.',
1424 required=True,
1425 )
1426 parser.add_argument(
1427 '--max-query-data-response-size',
1428 type=int,
1429 help='The maximum size of control data response in bytes',
1430 )
1431 parser.add_argument(
1432 'sessions',
1433 nargs="+",
1434 metavar="SESSION",
1435 type=_tracing_session_descriptors_from_arg,
1436 help='A session configuration. There is no space after comma. Format is: NAME,ID,HOSTNAME,FREQ,CLIENTS,TRACEPATH[,TRACEPATH]....',
1437 )
1438 parser.add_argument(
1439 '-h',
1440 '--help',
1441 action='help',
1442 default=argparse.SUPPRESS,
1443 help='Show this help message and exit.',
1444 )
1445
1446 args = parser.parse_args(args=remaining_args)
1447 try:
1448 LttngLiveServer(
1449 args.port_filename, args.sessions, args.max_query_data_response_size
1450 )
1451 except UnexpectedInput as exc:
1452 logging.error(str(exc))
1453 print(exc, file=sys.stderr)
1454 sys.exit(1)
This page took 0.090121 seconds and 5 git commands to generate.