black: run `black` version 19.10b0 on entire project
[babeltrace.git] / tests / data / plugins / src.ctf.lttng-live / lttng_live_server.py
1 # The MIT License (MIT)
2 #
3 # Copyright (c) 2019 Philippe Proulx <pproulx@efficios.com>
4 #
5 # Permission is hereby granted, free of charge, to any person obtaining a copy
6 # of this software and associated documentation files (the "Software"), to deal
7 # in the Software without restriction, including without limitation the rights
8 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 # copies of the Software, and to permit persons to whom the Software is
10 # furnished to do so, subject to the following conditions:
11 #
12 # The above copyright notice and this permission notice shall be included in
13 # all copies or substantial portions of the Software.
14 #
15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21 # THE SOFTWARE.
22
23 import argparse
24 import collections.abc
25 import logging
26 import os
27 import os.path
28 import re
29 import socket
30 import struct
31 import sys
32 import tempfile
33
34
35 class UnexpectedInput(RuntimeError):
36 pass
37
38
39 class _LttngLiveViewerCommand:
40 def __init__(self, version):
41 self._version = version
42
43 @property
44 def version(self):
45 return self._version
46
47
48 class _LttngLiveViewerConnectCommand(_LttngLiveViewerCommand):
49 def __init__(self, version, viewer_session_id, major, minor):
50 super().__init__(version)
51 self._viewer_session_id = viewer_session_id
52 self._major = major
53 self._minor = minor
54
55 @property
56 def viewer_session_id(self):
57 return self._viewer_session_id
58
59 @property
60 def major(self):
61 return self._major
62
63 @property
64 def minor(self):
65 return self._minor
66
67
68 class _LttngLiveViewerConnectReply:
69 def __init__(self, viewer_session_id, major, minor):
70 self._viewer_session_id = viewer_session_id
71 self._major = major
72 self._minor = minor
73
74 @property
75 def viewer_session_id(self):
76 return self._viewer_session_id
77
78 @property
79 def major(self):
80 return self._major
81
82 @property
83 def minor(self):
84 return self._minor
85
86
87 class _LttngLiveViewerGetTracingSessionInfosCommand(_LttngLiveViewerCommand):
88 pass
89
90
91 class _LttngLiveViewerTracingSessionInfo:
92 def __init__(
93 self,
94 tracing_session_id,
95 live_timer_freq,
96 client_count,
97 stream_count,
98 hostname,
99 name,
100 ):
101 self._tracing_session_id = tracing_session_id
102 self._live_timer_freq = live_timer_freq
103 self._client_count = client_count
104 self._stream_count = stream_count
105 self._hostname = hostname
106 self._name = name
107
108 @property
109 def tracing_session_id(self):
110 return self._tracing_session_id
111
112 @property
113 def live_timer_freq(self):
114 return self._live_timer_freq
115
116 @property
117 def client_count(self):
118 return self._client_count
119
120 @property
121 def stream_count(self):
122 return self._stream_count
123
124 @property
125 def hostname(self):
126 return self._hostname
127
128 @property
129 def name(self):
130 return self._name
131
132
133 class _LttngLiveViewerGetTracingSessionInfosReply:
134 def __init__(self, tracing_session_infos):
135 self._tracing_session_infos = tracing_session_infos
136
137 @property
138 def tracing_session_infos(self):
139 return self._tracing_session_infos
140
141
142 class _LttngLiveViewerAttachToTracingSessionCommand(_LttngLiveViewerCommand):
143 class SeekType:
144 BEGINNING = 1
145 LAST = 2
146
147 def __init__(self, version, tracing_session_id, offset, seek_type):
148 super().__init__(version)
149 self._tracing_session_id = tracing_session_id
150 self._offset = offset
151 self._seek_type = seek_type
152
153 @property
154 def tracing_session_id(self):
155 return self._tracing_session_id
156
157 @property
158 def offset(self):
159 return self._offset
160
161 @property
162 def seek_type(self):
163 return self._seek_type
164
165
166 class _LttngLiveViewerStreamInfo:
167 def __init__(self, id, trace_id, is_metadata, path, channel_name):
168 self._id = id
169 self._trace_id = trace_id
170 self._is_metadata = is_metadata
171 self._path = path
172 self._channel_name = channel_name
173
174 @property
175 def id(self):
176 return self._id
177
178 @property
179 def trace_id(self):
180 return self._trace_id
181
182 @property
183 def is_metadata(self):
184 return self._is_metadata
185
186 @property
187 def path(self):
188 return self._path
189
190 @property
191 def channel_name(self):
192 return self._channel_name
193
194
195 class _LttngLiveViewerAttachToTracingSessionReply:
196 class Status:
197 OK = 1
198 ALREADY = 2
199 UNKNOWN = 3
200 NOT_LIVE = 4
201 SEEK_ERROR = 5
202 NO_SESSION = 6
203
204 def __init__(self, status, stream_infos):
205 self._status = status
206 self._stream_infos = stream_infos
207
208 @property
209 def status(self):
210 return self._status
211
212 @property
213 def stream_infos(self):
214 return self._stream_infos
215
216
217 class _LttngLiveViewerGetNextDataStreamIndexEntryCommand(_LttngLiveViewerCommand):
218 def __init__(self, version, stream_id):
219 super().__init__(version)
220 self._stream_id = stream_id
221
222 @property
223 def stream_id(self):
224 return self._stream_id
225
226
227 class _LttngLiveViewerGetNextDataStreamIndexEntryReply:
228 class Status:
229 OK = 1
230 RETRY = 2
231 HUP = 3
232 ERROR = 4
233 INACTIVE = 5
234 EOF = 6
235
236 def __init__(self, status, index_entry, has_new_metadata, has_new_data_stream):
237 self._status = status
238 self._index_entry = index_entry
239 self._has_new_metadata = has_new_metadata
240 self._has_new_data_stream = has_new_data_stream
241
242 @property
243 def status(self):
244 return self._status
245
246 @property
247 def index_entry(self):
248 return self._index_entry
249
250 @property
251 def has_new_metadata(self):
252 return self._has_new_metadata
253
254 @property
255 def has_new_data_stream(self):
256 return self._has_new_data_stream
257
258
259 class _LttngLiveViewerGetDataStreamPacketDataCommand(_LttngLiveViewerCommand):
260 def __init__(self, version, stream_id, offset, req_length):
261 super().__init__(version)
262 self._stream_id = stream_id
263 self._offset = offset
264 self._req_length = req_length
265
266 @property
267 def stream_id(self):
268 return self._stream_id
269
270 @property
271 def offset(self):
272 return self._offset
273
274 @property
275 def req_length(self):
276 return self._req_length
277
278
279 class _LttngLiveViewerGetDataStreamPacketDataReply:
280 class Status:
281 OK = 1
282 RETRY = 2
283 ERROR = 3
284 EOF = 4
285
286 def __init__(self, status, data, has_new_metadata, has_new_data_stream):
287 self._status = status
288 self._data = data
289 self._has_new_metadata = has_new_metadata
290 self._has_new_data_stream = has_new_data_stream
291
292 @property
293 def status(self):
294 return self._status
295
296 @property
297 def data(self):
298 return self._data
299
300 @property
301 def has_new_metadata(self):
302 return self._has_new_metadata
303
304 @property
305 def has_new_data_stream(self):
306 return self._has_new_data_stream
307
308
309 class _LttngLiveViewerGetMetadataStreamDataCommand(_LttngLiveViewerCommand):
310 def __init__(self, version, stream_id):
311 super().__init__(version)
312 self._stream_id = stream_id
313
314 @property
315 def stream_id(self):
316 return self._stream_id
317
318
319 class _LttngLiveViewerGetMetadataStreamDataContentReply:
320 class Status:
321 OK = 1
322 NO_NEW = 2
323 ERROR = 3
324
325 def __init__(self, status, data):
326 self._status = status
327 self._data = data
328
329 @property
330 def status(self):
331 return self._status
332
333 @property
334 def data(self):
335 return self._data
336
337
338 class _LttngLiveViewerGetNewStreamInfosCommand(_LttngLiveViewerCommand):
339 def __init__(self, version, tracing_session_id):
340 super().__init__(version)
341 self._tracing_session_id = tracing_session_id
342
343 @property
344 def tracing_session_id(self):
345 return self._tracing_session_id
346
347
348 class _LttngLiveViewerGetNewStreamInfosReply:
349 class Status:
350 OK = 1
351 NO_NEW = 2
352 ERROR = 3
353 HUP = 4
354
355 def __init__(self, status, stream_infos):
356 self._status = status
357 self._stream_infos = stream_infos
358
359 @property
360 def status(self):
361 return self._status
362
363 @property
364 def stream_infos(self):
365 return self._stream_infos
366
367
368 class _LttngLiveViewerCreateViewerSessionCommand(_LttngLiveViewerCommand):
369 pass
370
371
372 class _LttngLiveViewerCreateViewerSessionReply:
373 class Status:
374 OK = 1
375 ERROR = 2
376
377 def __init__(self, status):
378 self._status = status
379
380 @property
381 def status(self):
382 return self._status
383
384
385 class _LttngLiveViewerDetachFromTracingSessionCommand(_LttngLiveViewerCommand):
386 def __init__(self, version, tracing_session_id):
387 super().__init__(version)
388 self._tracing_session_id = tracing_session_id
389
390 @property
391 def tracing_session_id(self):
392 return self._tracing_session_id
393
394
395 class _LttngLiveViewerDetachFromTracingSessionReply:
396 class Status:
397 OK = 1
398 UNKNOWN = 2
399 ERROR = 3
400
401 def __init__(self, status):
402 self._status = status
403
404 @property
405 def status(self):
406 return self._status
407
408
409 # An LTTng live protocol codec can convert bytes to command objects and
410 # reply objects to bytes.
411 class _LttngLiveViewerProtocolCodec:
412 _COMMAND_HEADER_STRUCT_FMT = 'QII'
413 _COMMAND_HEADER_SIZE_BYTES = struct.calcsize(_COMMAND_HEADER_STRUCT_FMT)
414
415 def __init__(self):
416 pass
417
418 def _unpack(self, fmt, data, offset=0):
419 fmt = '!' + fmt
420 return struct.unpack_from(fmt, data, offset)
421
422 def _unpack_payload(self, fmt, data):
423 return self._unpack(
424 fmt, data, _LttngLiveViewerProtocolCodec._COMMAND_HEADER_SIZE_BYTES
425 )
426
427 def decode(self, data):
428 if len(data) < self._COMMAND_HEADER_SIZE_BYTES:
429 # Not enough data to read the command header
430 return
431
432 payload_size, cmd_type, version = self._unpack(
433 self._COMMAND_HEADER_STRUCT_FMT, data
434 )
435 logging.info(
436 'Decoded command header: payload-size={}, cmd-type={}, version={}'.format(
437 payload_size, cmd_type, version
438 )
439 )
440
441 if len(data) < self._COMMAND_HEADER_SIZE_BYTES + payload_size:
442 # Not enough data to read the whole command
443 return
444
445 if cmd_type == 1:
446 viewer_session_id, major, minor, conn_type = self._unpack_payload(
447 'QIII', data
448 )
449 return _LttngLiveViewerConnectCommand(
450 version, viewer_session_id, major, minor
451 )
452 elif cmd_type == 2:
453 return _LttngLiveViewerGetTracingSessionInfosCommand(version)
454 elif cmd_type == 3:
455 tracing_session_id, offset, seek_type = self._unpack_payload('QQI', data)
456 return _LttngLiveViewerAttachToTracingSessionCommand(
457 version, tracing_session_id, offset, seek_type
458 )
459 elif cmd_type == 4:
460 (stream_id,) = self._unpack_payload('Q', data)
461 return _LttngLiveViewerGetNextDataStreamIndexEntryCommand(
462 version, stream_id
463 )
464 elif cmd_type == 5:
465 stream_id, offset, req_length = self._unpack_payload('QQI', data)
466 return _LttngLiveViewerGetDataStreamPacketDataCommand(
467 version, stream_id, offset, req_length
468 )
469 elif cmd_type == 6:
470 (stream_id,) = self._unpack_payload('Q', data)
471 return _LttngLiveViewerGetMetadataStreamDataCommand(version, stream_id)
472 elif cmd_type == 7:
473 (tracing_session_id,) = self._unpack_payload('Q', data)
474 return _LttngLiveViewerGetNewStreamInfosCommand(version, tracing_session_id)
475 elif cmd_type == 8:
476 return _LttngLiveViewerCreateViewerSessionCommand(version)
477 elif cmd_type == 9:
478 (tracing_session_id,) = self._unpack_payload('Q', data)
479 return _LttngLiveViewerDetachFromTracingSessionCommand(
480 version, tracing_session_id
481 )
482 else:
483 raise UnexpectedInput('Unknown command type {}'.format(cmd_type))
484
485 def _pack(self, fmt, *args):
486 # Force network byte order
487 return struct.pack('!' + fmt, *args)
488
489 def _encode_zero_padded_str(self, string, length):
490 data = string.encode()
491 return data.ljust(length, b'\x00')
492
493 def _encode_stream_info(self, info):
494 data = self._pack('QQI', info.id, info.trace_id, int(info.is_metadata))
495 data += self._encode_zero_padded_str(info.path, 4096)
496 data += self._encode_zero_padded_str(info.channel_name, 255)
497 return data
498
499 def _get_has_new_stuff_flags(self, has_new_metadata, has_new_data_streams):
500 flags = 0
501
502 if has_new_metadata:
503 flags |= 1
504
505 if has_new_data_streams:
506 flags |= 2
507
508 return flags
509
510 def encode(self, reply):
511 if type(reply) is _LttngLiveViewerConnectReply:
512 data = self._pack(
513 'QIII', reply.viewer_session_id, reply.major, reply.minor, 2
514 )
515 elif type(reply) is _LttngLiveViewerGetTracingSessionInfosReply:
516 data = self._pack('I', len(reply.tracing_session_infos))
517
518 for info in reply.tracing_session_infos:
519 data += self._pack(
520 'QIII',
521 info.tracing_session_id,
522 info.live_timer_freq,
523 info.client_count,
524 info.stream_count,
525 )
526 data += self._encode_zero_padded_str(info.hostname, 64)
527 data += self._encode_zero_padded_str(info.name, 255)
528 elif type(reply) is _LttngLiveViewerAttachToTracingSessionReply:
529 data = self._pack('II', reply.status, len(reply.stream_infos))
530
531 for info in reply.stream_infos:
532 data += self._encode_stream_info(info)
533 elif type(reply) is _LttngLiveViewerGetNextDataStreamIndexEntryReply:
534 entry = reply.index_entry
535 flags = self._get_has_new_stuff_flags(
536 reply.has_new_metadata, reply.has_new_data_stream
537 )
538
539 data = self._pack(
540 'QQQQQQQII',
541 entry.offset_bytes,
542 entry.total_size_bits,
543 entry.content_size_bits,
544 entry.timestamp_begin,
545 entry.timestamp_end,
546 entry.events_discarded,
547 entry.stream_class_id,
548 reply.status,
549 flags,
550 )
551 elif type(reply) is _LttngLiveViewerGetDataStreamPacketDataReply:
552 flags = self._get_has_new_stuff_flags(
553 reply.has_new_metadata, reply.has_new_data_stream
554 )
555 data = self._pack('III', reply.status, len(reply.data), flags)
556 data += reply.data
557 elif type(reply) is _LttngLiveViewerGetMetadataStreamDataContentReply:
558 data = self._pack('QI', len(reply.data), reply.status)
559 data += reply.data
560 elif type(reply) is _LttngLiveViewerGetNewStreamInfosReply:
561 data = self._pack('II', reply.status, len(reply.stream_infos))
562
563 for info in reply.stream_infos:
564 data += self._encode_stream_info(info)
565 elif type(reply) is _LttngLiveViewerCreateViewerSessionReply:
566 data = self._pack('I', reply.status)
567 elif type(reply) is _LttngLiveViewerDetachFromTracingSessionReply:
568 data = self._pack('I', reply.status)
569 else:
570 raise ValueError(
571 'Unknown reply object with class `{}`'.format(reply.__class__.__name__)
572 )
573
574 return data
575
576
577 # An entry within the index of an LTTng data stream.
578 class _LttngDataStreamIndexEntry:
579 def __init__(
580 self,
581 offset_bytes,
582 total_size_bits,
583 content_size_bits,
584 timestamp_begin,
585 timestamp_end,
586 events_discarded,
587 stream_class_id,
588 ):
589 self._offset_bytes = offset_bytes
590 self._total_size_bits = total_size_bits
591 self._content_size_bits = content_size_bits
592 self._timestamp_begin = timestamp_begin
593 self._timestamp_end = timestamp_end
594 self._events_discarded = events_discarded
595 self._stream_class_id = stream_class_id
596
597 @property
598 def offset_bytes(self):
599 return self._offset_bytes
600
601 @property
602 def total_size_bits(self):
603 return self._total_size_bits
604
605 @property
606 def total_size_bytes(self):
607 return self._total_size_bits // 8
608
609 @property
610 def content_size_bits(self):
611 return self._content_size_bits
612
613 @property
614 def content_size_bytes(self):
615 return self._content_size_bits // 8
616
617 @property
618 def timestamp_begin(self):
619 return self._timestamp_begin
620
621 @property
622 def timestamp_end(self):
623 return self._timestamp_end
624
625 @property
626 def events_discarded(self):
627 return self._events_discarded
628
629 @property
630 def stream_class_id(self):
631 return self._stream_class_id
632
633
634 # The index of an LTTng data stream, a sequence of index entries.
635 class _LttngDataStreamIndex(collections.abc.Sequence):
636 def __init__(self, path):
637 self._path = path
638 self._build()
639 logging.info(
640 'Built data stream index entries: path="{}", count={}'.format(
641 path, len(self._entries)
642 )
643 )
644
645 def _build(self):
646 self._entries = []
647 assert os.path.isfile(self._path)
648
649 with open(self._path, 'rb') as f:
650 # Read header first
651 fmt = '>IIII'
652 size = struct.calcsize(fmt)
653 data = f.read(size)
654 assert len(data) == size
655 magic, index_major, index_minor, index_entry_length = struct.unpack(
656 fmt, data
657 )
658 assert magic == 0xC1F1DCC1
659
660 # Read index entries
661 fmt = '>QQQQQQQ'
662 size = struct.calcsize(fmt)
663
664 while True:
665 logging.debug(
666 'Decoding data stream index entry: path="{}", offset={}'.format(
667 self._path, f.tell()
668 )
669 )
670 data = f.read(size)
671
672 if not data:
673 # Done
674 break
675
676 assert len(data) == size
677 (
678 offset_bytes,
679 total_size_bits,
680 content_size_bits,
681 timestamp_begin,
682 timestamp_end,
683 events_discarded,
684 stream_class_id,
685 ) = struct.unpack(fmt, data)
686
687 self._entries.append(
688 _LttngDataStreamIndexEntry(
689 offset_bytes,
690 total_size_bits,
691 content_size_bits,
692 timestamp_begin,
693 timestamp_end,
694 events_discarded,
695 stream_class_id,
696 )
697 )
698
699 # Skip anything else before the next entry
700 f.seek(index_entry_length - size, os.SEEK_CUR)
701
702 def __getitem__(self, index):
703 return self._entries[index]
704
705 def __len__(self):
706 return len(self._entries)
707
708 @property
709 def path(self):
710 return self._path
711
712
713 # An LTTng data stream.
714 class _LttngDataStream:
715 def __init__(self, path):
716 self._path = path
717 filename = os.path.basename(path)
718 match = re.match(r'(.*)_\d+', filename)
719 self._channel_name = match.group(1)
720 trace_dir = os.path.dirname(path)
721 index_path = os.path.join(trace_dir, 'index', filename + '.idx')
722 self._index = _LttngDataStreamIndex(index_path)
723 assert os.path.isfile(path)
724 self._file = open(path, 'rb')
725 logging.info(
726 'Built data stream: path="{}", channel-name="{}"'.format(
727 path, self._channel_name
728 )
729 )
730
731 @property
732 def path(self):
733 return self._path
734
735 @property
736 def channel_name(self):
737 return self._channel_name
738
739 @property
740 def index(self):
741 return self._index
742
743 def get_data(self, offset_bytes, len_bytes):
744 self._file.seek(offset_bytes)
745 return self._file.read(len_bytes)
746
747
748 # An LTTng metadata stream.
749 class _LttngMetadataStream:
750 def __init__(self, path):
751 self._path = path
752 logging.info('Built metadata stream: path="{}"'.format(path))
753
754 @property
755 def path(self):
756 return self._path
757
758 @property
759 def data(self):
760 assert os.path.isfile(self._path)
761
762 with open(self._path, 'rb') as f:
763 return f.read()
764
765
766 # An LTTng trace, a sequence of LTTng data streams.
767 class LttngTrace(collections.abc.Sequence):
768 def __init__(self, trace_dir):
769 assert os.path.isdir(trace_dir)
770 self._path = trace_dir
771 self._metadata_stream = _LttngMetadataStream(
772 os.path.join(trace_dir, 'metadata')
773 )
774 self._create_data_streams(trace_dir)
775 logging.info('Built trace: path="{}"'.format(trace_dir))
776
777 def _create_data_streams(self, trace_dir):
778 data_stream_paths = []
779
780 for filename in os.listdir(trace_dir):
781 path = os.path.join(trace_dir, filename)
782
783 if not os.path.isfile(path):
784 continue
785
786 if filename.startswith('.'):
787 continue
788
789 if filename == 'metadata':
790 continue
791
792 data_stream_paths.append(path)
793
794 data_stream_paths.sort()
795 self._data_streams = []
796
797 for data_stream_path in data_stream_paths:
798 self._data_streams.append(_LttngDataStream(data_stream_path))
799
800 @property
801 def path(self):
802 return self._path
803
804 @property
805 def metadata_stream(self):
806 return self._metadata_stream
807
808 def __getitem__(self, index):
809 return self._data_streams[index]
810
811 def __len__(self):
812 return len(self._data_streams)
813
814
815 # The state of a single data stream.
816 class _LttngLiveViewerSessionDataStreamState:
817 def __init__(self, ts_state, info, data_stream):
818 self._ts_state = ts_state
819 self._info = info
820 self._data_stream = data_stream
821 self._cur_index_entry_index = 0
822 fmt = 'Built data stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
823 logging.info(
824 fmt.format(
825 info.id,
826 ts_state.tracing_session_descriptor.info.tracing_session_id,
827 ts_state.tracing_session_descriptor.info.name,
828 data_stream.path,
829 )
830 )
831
832 @property
833 def tracing_session_state(self):
834 return self._ts_state
835
836 @property
837 def info(self):
838 return self._info
839
840 @property
841 def data_stream(self):
842 return self._data_stream
843
844 @property
845 def cur_index_entry(self):
846 if self._cur_index_entry_index == len(self._data_stream.index):
847 return
848
849 return self._data_stream.index[self._cur_index_entry_index]
850
851 def goto_next_index_entry(self):
852 self._cur_index_entry_index += 1
853
854
855 # The state of a single metadata stream.
856 class _LttngLiveViewerSessionMetadataStreamState:
857 def __init__(self, ts_state, info, metadata_stream):
858 self._ts_state = ts_state
859 self._info = info
860 self._metadata_stream = metadata_stream
861 self._is_sent = False
862 fmt = 'Built metadata stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
863 logging.info(
864 fmt.format(
865 info.id,
866 ts_state.tracing_session_descriptor.info.tracing_session_id,
867 ts_state.tracing_session_descriptor.info.name,
868 metadata_stream.path,
869 )
870 )
871
872 @property
873 def trace_session_state(self):
874 return self._trace_session_state
875
876 @property
877 def info(self):
878 return self._info
879
880 @property
881 def metadata_stream(self):
882 return self._metadata_stream
883
884 @property
885 def is_sent(self):
886 return self._is_sent
887
888 @is_sent.setter
889 def is_sent(self, value):
890 self._is_sent = value
891
892
893 # The state of a tracing session.
894 class _LttngLiveViewerSessionTracingSessionState:
895 def __init__(self, tc_descr, base_stream_id):
896 self._tc_descr = tc_descr
897 self._stream_infos = []
898 self._ds_states = {}
899 self._ms_states = {}
900 stream_id = base_stream_id
901
902 for trace in tc_descr.traces:
903 trace_id = stream_id * 1000
904
905 # Data streams -> stream infos and data stream states
906 for data_stream in trace:
907 info = _LttngLiveViewerStreamInfo(
908 stream_id,
909 trace_id,
910 False,
911 data_stream.path,
912 data_stream.channel_name,
913 )
914 self._stream_infos.append(info)
915 self._ds_states[stream_id] = _LttngLiveViewerSessionDataStreamState(
916 self, info, data_stream
917 )
918 stream_id += 1
919
920 # Metadata stream -> stream info and metadata stream state
921 info = _LttngLiveViewerStreamInfo(
922 stream_id, trace_id, True, trace.metadata_stream.path, 'metadata'
923 )
924 self._stream_infos.append(info)
925 self._ms_states[stream_id] = _LttngLiveViewerSessionMetadataStreamState(
926 self, info, trace.metadata_stream
927 )
928 stream_id += 1
929
930 self._is_attached = False
931 fmt = 'Built tracing session state: id={}, name="{}"'
932 logging.info(fmt.format(tc_descr.info.tracing_session_id, tc_descr.info.name))
933
934 @property
935 def tracing_session_descriptor(self):
936 return self._tc_descr
937
938 @property
939 def data_stream_states(self):
940 return self._ds_states
941
942 @property
943 def metadata_stream_states(self):
944 return self._ms_states
945
946 @property
947 def stream_infos(self):
948 return self._stream_infos
949
950 @property
951 def has_new_metadata(self):
952 return any([not ms.is_sent for ms in self._ms_states.values()])
953
954 @property
955 def is_attached(self):
956 return self._is_attached
957
958 @is_attached.setter
959 def is_attached(self, value):
960 self._is_attached = value
961
962
963 # An LTTng live viewer session manages a view on tracing sessions
964 # and replies to commands accordingly.
965 class _LttngLiveViewerSession:
966 def __init__(
967 self,
968 viewer_session_id,
969 tracing_session_descriptors,
970 max_query_data_response_size,
971 ):
972 self._viewer_session_id = viewer_session_id
973 self._ts_states = {}
974 self._stream_states = {}
975 self._max_query_data_response_size = max_query_data_response_size
976 total_stream_infos = 0
977
978 for ts_descr in tracing_session_descriptors:
979 ts_state = _LttngLiveViewerSessionTracingSessionState(
980 ts_descr, total_stream_infos
981 )
982 ts_id = ts_state.tracing_session_descriptor.info.tracing_session_id
983 self._ts_states[ts_id] = ts_state
984 total_stream_infos += len(ts_state.stream_infos)
985
986 # Update session's stream states to have the new states
987 self._stream_states.update(ts_state.data_stream_states)
988 self._stream_states.update(ts_state.metadata_stream_states)
989
990 self._command_handlers = {
991 _LttngLiveViewerAttachToTracingSessionCommand: self._handle_attach_to_tracing_session_command,
992 _LttngLiveViewerCreateViewerSessionCommand: self._handle_create_viewer_session_command,
993 _LttngLiveViewerDetachFromTracingSessionCommand: self._handle_detach_from_tracing_session_command,
994 _LttngLiveViewerGetDataStreamPacketDataCommand: self._handle_get_data_stream_packet_data_command,
995 _LttngLiveViewerGetMetadataStreamDataCommand: self._handle_get_metadata_stream_data_command,
996 _LttngLiveViewerGetNewStreamInfosCommand: self._handle_get_new_stream_infos_command,
997 _LttngLiveViewerGetNextDataStreamIndexEntryCommand: self._handle_get_next_data_stream_index_entry_command,
998 _LttngLiveViewerGetTracingSessionInfosCommand: self._handle_get_tracing_session_infos_command,
999 }
1000
1001 @property
1002 def viewer_session_id(self):
1003 return self._viewer_session_id
1004
1005 def _get_tracing_session_state(self, tracing_session_id):
1006 if tracing_session_id not in self._ts_states:
1007 raise UnexpectedInput(
1008 'Unknown tracing session ID {}'.format(tracing_session_id)
1009 )
1010
1011 return self._ts_states[tracing_session_id]
1012
1013 def _get_stream_state(self, stream_id):
1014 if stream_id not in self._stream_states:
1015 UnexpectedInput('Unknown stream ID {}'.format(stream_id))
1016
1017 return self._stream_states[stream_id]
1018
1019 def handle_command(self, cmd):
1020 logging.info(
1021 'Handling command in viewer session: cmd-cls-name={}'.format(
1022 cmd.__class__.__name__
1023 )
1024 )
1025 cmd_type = type(cmd)
1026
1027 if cmd_type not in self._command_handlers:
1028 raise UnexpectedInput(
1029 'Unexpected command: cmd-cls-name={}'.format(cmd.__class__.__name__)
1030 )
1031
1032 return self._command_handlers[cmd_type](cmd)
1033
1034 def _handle_attach_to_tracing_session_command(self, cmd):
1035 fmt = 'Handling "attach to tracing session" command: ts-id={}, offset={}, seek-type={}'
1036 logging.info(fmt.format(cmd.tracing_session_id, cmd.offset, cmd.seek_type))
1037 ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
1038 info = ts_state.tracing_session_descriptor.info
1039
1040 if ts_state.is_attached:
1041 raise UnexpectedInput(
1042 'Cannot attach to tracing session `{}`: viewer is already attached'.format(
1043 info.name
1044 )
1045 )
1046
1047 ts_state.is_attached = True
1048 status = _LttngLiveViewerAttachToTracingSessionReply.Status.OK
1049 return _LttngLiveViewerAttachToTracingSessionReply(
1050 status, ts_state.stream_infos
1051 )
1052
1053 def _handle_detach_from_tracing_session_command(self, cmd):
1054 fmt = 'Handling "detach from tracing session" command: ts-id={}'
1055 logging.info(fmt.format(cmd.tracing_session_id))
1056 ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
1057 info = ts_state.tracing_session_descriptor.info
1058
1059 if not ts_state.is_attached:
1060 raise UnexpectedInput(
1061 'Cannot detach to tracing session `{}`: viewer is not attached'.format(
1062 info.name
1063 )
1064 )
1065
1066 ts_state.is_attached = False
1067 status = _LttngLiveViewerDetachFromTracingSessionReply.Status.OK
1068 return _LttngLiveViewerDetachFromTracingSessionReply(status)
1069
1070 def _handle_get_next_data_stream_index_entry_command(self, cmd):
1071 fmt = 'Handling "get next data stream index entry" command: stream-id={}'
1072 logging.info(fmt.format(cmd.stream_id))
1073 stream_state = self._get_stream_state(cmd.stream_id)
1074
1075 if type(stream_state) is not _LttngLiveViewerSessionDataStreamState:
1076 raise UnexpectedInput(
1077 'Stream with ID {} is not a data stream'.format(cmd.stream_id)
1078 )
1079
1080 if stream_state.cur_index_entry is None:
1081 # The viewer is done reading this stream
1082 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.HUP
1083
1084 # Dummy data stream index entry to use with the `HUP` status
1085 # (the reply needs one, but the viewer ignores it)
1086 index_entry = _LttngDataStreamIndexEntry(0, 0, 0, 0, 0, 0, 0)
1087
1088 return _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1089 status, index_entry, False, False
1090 )
1091
1092 # The viewer only checks the `has_new_metadata` flag if the
1093 # reply's status is `OK`, so we need to provide an index here
1094 has_new_metadata = stream_state.tracing_session_state.has_new_metadata
1095 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.OK
1096 reply = _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1097 status, stream_state.cur_index_entry, has_new_metadata, False
1098 )
1099 stream_state.goto_next_index_entry()
1100 return reply
1101
1102 def _handle_get_data_stream_packet_data_command(self, cmd):
1103 fmt = 'Handling "get data stream packet data" command: stream-id={}, offset={}, req-length={}'
1104 logging.info(fmt.format(cmd.stream_id, cmd.offset, cmd.req_length))
1105 stream_state = self._get_stream_state(cmd.stream_id)
1106 data_response_length = cmd.req_length
1107
1108 if type(stream_state) is not _LttngLiveViewerSessionDataStreamState:
1109 raise UnexpectedInput(
1110 'Stream with ID {} is not a data stream'.format(cmd.stream_id)
1111 )
1112
1113 if stream_state.tracing_session_state.has_new_metadata:
1114 status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.ERROR
1115 return _LttngLiveViewerGetDataStreamPacketDataReply(
1116 status, bytes(), True, False
1117 )
1118
1119 if self._max_query_data_response_size:
1120 # Enforce a server side limit on the query requested length.
1121 # To ensure that the transaction terminate take the minimum of both
1122 # value.
1123 data_response_length = min(
1124 cmd.req_length, self._max_query_data_response_size
1125 )
1126 fmt = 'Limiting "get data stream packet data" command: req-length={} actual response size={}'
1127 logging.info(fmt.format(cmd.req_length, data_response_length))
1128
1129 data = stream_state.data_stream.get_data(cmd.offset, data_response_length)
1130 status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.OK
1131 return _LttngLiveViewerGetDataStreamPacketDataReply(status, data, False, False)
1132
1133 def _handle_get_metadata_stream_data_command(self, cmd):
1134 fmt = 'Handling "get metadata stream data" command: stream-id={}'
1135 logging.info(fmt.format(cmd.stream_id))
1136 stream_state = self._get_stream_state(cmd.stream_id)
1137
1138 if type(stream_state) is not _LttngLiveViewerSessionMetadataStreamState:
1139 raise UnexpectedInput(
1140 'Stream with ID {} is not a metadata stream'.format(cmd.stream_id)
1141 )
1142
1143 if stream_state.is_sent:
1144 status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.NO_NEW
1145 return _LttngLiveViewerGetMetadataStreamDataContentReply(status, bytes())
1146
1147 stream_state.is_sent = True
1148 status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.OK
1149 return _LttngLiveViewerGetMetadataStreamDataContentReply(
1150 status, stream_state.metadata_stream.data
1151 )
1152
1153 def _handle_get_new_stream_infos_command(self, cmd):
1154 fmt = 'Handling "get new stream infos" command: ts-id={}'
1155 logging.info(fmt.format(cmd.tracing_session_id))
1156
1157 # As of this version, all the tracing session's stream infos are
1158 # always given to the viewer when sending the "attach to tracing
1159 # session" reply, so there's nothing new here. Return the `HUP`
1160 # status as, if we're handling this command, the viewer consumed
1161 # all the existing data streams.
1162 status = _LttngLiveViewerGetNewStreamInfosReply.Status.HUP
1163 return _LttngLiveViewerGetNewStreamInfosReply(status, [])
1164
1165 def _handle_get_tracing_session_infos_command(self, cmd):
1166 logging.info('Handling "get tracing session infos" command.')
1167 infos = [
1168 tss.tracing_session_descriptor.info for tss in self._ts_states.values()
1169 ]
1170 infos.sort(key=lambda info: info.name)
1171 return _LttngLiveViewerGetTracingSessionInfosReply(infos)
1172
1173 def _handle_create_viewer_session_command(self, cmd):
1174 logging.info('Handling "create viewer session" command.')
1175 status = _LttngLiveViewerCreateViewerSessionReply.Status.OK
1176
1177 # This does nothing here. In the LTTng relay daemon, it
1178 # allocates the viewer session's state.
1179 return _LttngLiveViewerCreateViewerSessionReply(status)
1180
1181
1182 # An LTTng live TCP server.
1183 #
1184 # On creation, it binds to `localhost` with an OS-assigned TCP port. It writes
1185 # the decimal TCP port number to a temporary port file. It renames the
1186 # temporary port file to `port_filename`.
1187 #
1188 # `tracing_session_descriptors` is a list of tracing session descriptors
1189 # (`LttngTracingSessionDescriptor`) to serve.
1190 #
1191 # This server accepts a single viewer (client).
1192 #
1193 # When the viewer closes the connection, the server's constructor
1194 # returns.
1195 class LttngLiveServer:
1196 def __init__(
1197 self, port_filename, tracing_session_descriptors, max_query_data_response_size
1198 ):
1199 logging.info('Server configuration:')
1200
1201 logging.info(' Port file name: `{}`'.format(port_filename))
1202
1203 if max_query_data_response_size is not None:
1204 logging.info(
1205 ' Maximum response data query size: `{}`'.format(
1206 max_query_data_response_size
1207 )
1208 )
1209
1210 for ts_descr in tracing_session_descriptors:
1211 info = ts_descr.info
1212 fmt = ' TS descriptor: name="{}", id={}, hostname="{}", live-timer-freq={}, client-count={}, stream-count={}:'
1213 logging.info(
1214 fmt.format(
1215 info.name,
1216 info.tracing_session_id,
1217 info.hostname,
1218 info.live_timer_freq,
1219 info.client_count,
1220 info.stream_count,
1221 )
1222 )
1223
1224 for trace in ts_descr.traces:
1225 logging.info(' Trace: path="{}"'.format(trace.path))
1226
1227 self._ts_descriptors = tracing_session_descriptors
1228 self._max_query_data_response_size = max_query_data_response_size
1229 self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1230 self._codec = _LttngLiveViewerProtocolCodec()
1231
1232 # Port 0: OS assigns an unused port
1233 serv_addr = ('localhost', 0)
1234 self._sock.bind(serv_addr)
1235 self._write_port_to_file(port_filename)
1236
1237 try:
1238 self._listen()
1239 finally:
1240 self._sock.close()
1241 logging.info('Closed connection and socket.')
1242
1243 @property
1244 def _server_port(self):
1245 return self._sock.getsockname()[1]
1246
1247 def _recv_command(self):
1248 data = bytes()
1249
1250 while True:
1251 logging.info('Waiting for viewer command.')
1252 buf = self._conn.recv(128)
1253
1254 if not buf:
1255 logging.info('Client closed connection.')
1256
1257 if data:
1258 raise UnexpectedInput(
1259 'Client closed connection after having sent {} command bytes.'.format(
1260 len(data)
1261 )
1262 )
1263
1264 return
1265
1266 logging.info('Received data from viewer: length={}'.format(len(buf)))
1267
1268 data += buf
1269
1270 try:
1271 cmd = self._codec.decode(data)
1272 except struct.error as exc:
1273 raise UnexpectedInput('Malformed command: {}'.format(exc)) from exc
1274
1275 if cmd is not None:
1276 logging.info(
1277 'Received command from viewer: cmd-cls-name={}'.format(
1278 cmd.__class__.__name__
1279 )
1280 )
1281 return cmd
1282
1283 def _send_reply(self, reply):
1284 data = self._codec.encode(reply)
1285 logging.info(
1286 'Sending reply to viewer: reply-cls-name={}, length={}'.format(
1287 reply.__class__.__name__, len(data)
1288 )
1289 )
1290 self._conn.sendall(data)
1291
1292 def _handle_connection(self):
1293 # First command must be "connect"
1294 cmd = self._recv_command()
1295
1296 if type(cmd) is not _LttngLiveViewerConnectCommand:
1297 raise UnexpectedInput(
1298 'First command is not "connect": cmd-cls-name={}'.format(
1299 cmd.__class__.__name__
1300 )
1301 )
1302
1303 # Create viewer session (arbitrary ID 23)
1304 logging.info(
1305 'LTTng live viewer connected: version={}.{}'.format(cmd.major, cmd.minor)
1306 )
1307 viewer_session = _LttngLiveViewerSession(
1308 23, self._ts_descriptors, self._max_query_data_response_size
1309 )
1310
1311 # Send "connect" reply
1312 self._send_reply(
1313 _LttngLiveViewerConnectReply(viewer_session.viewer_session_id, 2, 10)
1314 )
1315
1316 # Make the viewer session handle the remaining commands
1317 while True:
1318 cmd = self._recv_command()
1319
1320 if cmd is None:
1321 # Connection closed (at an expected location within the
1322 # conversation)
1323 return
1324
1325 self._send_reply(viewer_session.handle_command(cmd))
1326
1327 def _listen(self):
1328 logging.info('Listening: port={}'.format(self._server_port))
1329 # Backlog must be present for Python version < 3.5.
1330 # 128 is an arbitrary number since we expect only 1 connection anyway.
1331 self._sock.listen(128)
1332 self._conn, viewer_addr = self._sock.accept()
1333 logging.info(
1334 'Accepted viewer: addr={}:{}'.format(viewer_addr[0], viewer_addr[1])
1335 )
1336
1337 try:
1338 self._handle_connection()
1339 finally:
1340 self._conn.close()
1341
1342 def _write_port_to_file(self, port_filename):
1343 # Write the port number to a temporary file.
1344 with tempfile.NamedTemporaryFile(mode='w', delete=False) as tmp_port_file:
1345 print(self._server_port, end='', file=tmp_port_file)
1346
1347 # Rename temporary file to real file
1348 os.replace(tmp_port_file.name, port_filename)
1349 logging.info(
1350 'Renamed port file: src-path="{}", dst-path="{}"'.format(
1351 tmp_port_file.name, port_filename
1352 )
1353 )
1354
1355
1356 # A tracing session descriptor.
1357 #
1358 # In the constructor, `traces` is a list of LTTng traces (`LttngTrace`
1359 # objects).
1360 class LttngTracingSessionDescriptor:
1361 def __init__(
1362 self, name, tracing_session_id, hostname, live_timer_freq, client_count, traces
1363 ):
1364 for trace in traces:
1365 if name not in trace.path:
1366 fmt = 'Tracing session name must be part of every trace path (`{}` not found in `{}`)'
1367 raise ValueError(fmt.format(name, trace.path))
1368
1369 self._traces = traces
1370 stream_count = sum([len(t) + 1 for t in traces])
1371 self._info = _LttngLiveViewerTracingSessionInfo(
1372 tracing_session_id,
1373 live_timer_freq,
1374 client_count,
1375 stream_count,
1376 hostname,
1377 name,
1378 )
1379
1380 @property
1381 def traces(self):
1382 return self._traces
1383
1384 @property
1385 def info(self):
1386 return self._info
1387
1388
1389 def _tracing_session_descriptors_from_arg(string):
1390 # Format is:
1391 # NAME,ID,HOSTNAME,FREQ,CLIENTS,TRACEPATH[,TRACEPATH]...
1392 parts = string.split(',')
1393 name = parts[0]
1394 tracing_session_id = int(parts[1])
1395 hostname = parts[2]
1396 live_timer_freq = int(parts[3])
1397 client_count = int(parts[4])
1398 traces = [LttngTrace(path) for path in parts[5:]]
1399 return LttngTracingSessionDescriptor(
1400 name, tracing_session_id, hostname, live_timer_freq, client_count, traces
1401 )
1402
1403
1404 def _loglevel_parser(string):
1405 loglevels = {'info': logging.INFO, 'warning': logging.WARNING}
1406 if string not in loglevels:
1407 msg = "{} is not a valid loglevel".format(string)
1408 raise argparse.ArgumentTypeError(msg)
1409 return loglevels[string]
1410
1411
1412 if __name__ == '__main__':
1413 logging.basicConfig(format='# %(asctime)-25s%(message)s')
1414 parser = argparse.ArgumentParser(
1415 description='LTTng-live protocol mocker', add_help=False
1416 )
1417 parser.add_argument(
1418 '--log-level',
1419 default='warning',
1420 choices=['info', 'warning'],
1421 help='The loglevel to be used.',
1422 )
1423
1424 loglevel_namespace, remaining_args = parser.parse_known_args()
1425 logging.getLogger().setLevel(_loglevel_parser(loglevel_namespace.log_level))
1426
1427 parser.add_argument(
1428 '--port-filename',
1429 help='The final port file. This file is present when the server is ready to receive connection.',
1430 required=True,
1431 )
1432 parser.add_argument(
1433 '--max-query-data-response-size',
1434 type=int,
1435 help='The maximum size of control data response in bytes',
1436 )
1437 parser.add_argument(
1438 'sessions',
1439 nargs="+",
1440 metavar="SESSION",
1441 type=_tracing_session_descriptors_from_arg,
1442 help='A session configuration. There is no space after comma. Format is: NAME,ID,HOSTNAME,FREQ,CLIENTS,TRACEPATH[,TRACEPATH]....',
1443 )
1444 parser.add_argument(
1445 '-h',
1446 '--help',
1447 action='help',
1448 default=argparse.SUPPRESS,
1449 help='Show this help message and exit.',
1450 )
1451
1452 args = parser.parse_args(args=remaining_args)
1453 try:
1454 LttngLiveServer(
1455 args.port_filename, args.sessions, args.max_query_data_response_size
1456 )
1457 except UnexpectedInput as exc:
1458 logging.error(str(exc))
1459 print(exc, file=sys.stderr)
1460 sys.exit(1)
This page took 0.087648 seconds and 5 git commands to generate.