Tests: whitespace fix in src.ctf.lttng-live/test_live
[babeltrace.git] / tests / data / plugins / src.ctf.lttng-live / lttng_live_server.py
1 # SPDX-License-Identifier: MIT
2 #
3 # Copyright (C) 2019 Philippe Proulx <pproulx@efficios.com>
4 #
5
6 import argparse
7 import collections.abc
8 import logging
9 import os
10 import os.path
11 import re
12 import socket
13 import struct
14 import sys
15 import tempfile
16
17
18 class UnexpectedInput(RuntimeError):
19 pass
20
21
22 class _LttngLiveViewerCommand:
23 def __init__(self, version):
24 self._version = version
25
26 @property
27 def version(self):
28 return self._version
29
30
31 class _LttngLiveViewerConnectCommand(_LttngLiveViewerCommand):
32 def __init__(self, version, viewer_session_id, major, minor):
33 super().__init__(version)
34 self._viewer_session_id = viewer_session_id
35 self._major = major
36 self._minor = minor
37
38 @property
39 def viewer_session_id(self):
40 return self._viewer_session_id
41
42 @property
43 def major(self):
44 return self._major
45
46 @property
47 def minor(self):
48 return self._minor
49
50
51 class _LttngLiveViewerConnectReply:
52 def __init__(self, viewer_session_id, major, minor):
53 self._viewer_session_id = viewer_session_id
54 self._major = major
55 self._minor = minor
56
57 @property
58 def viewer_session_id(self):
59 return self._viewer_session_id
60
61 @property
62 def major(self):
63 return self._major
64
65 @property
66 def minor(self):
67 return self._minor
68
69
70 class _LttngLiveViewerGetTracingSessionInfosCommand(_LttngLiveViewerCommand):
71 pass
72
73
74 class _LttngLiveViewerTracingSessionInfo:
75 def __init__(
76 self,
77 tracing_session_id,
78 live_timer_freq,
79 client_count,
80 stream_count,
81 hostname,
82 name,
83 ):
84 self._tracing_session_id = tracing_session_id
85 self._live_timer_freq = live_timer_freq
86 self._client_count = client_count
87 self._stream_count = stream_count
88 self._hostname = hostname
89 self._name = name
90
91 @property
92 def tracing_session_id(self):
93 return self._tracing_session_id
94
95 @property
96 def live_timer_freq(self):
97 return self._live_timer_freq
98
99 @property
100 def client_count(self):
101 return self._client_count
102
103 @property
104 def stream_count(self):
105 return self._stream_count
106
107 @property
108 def hostname(self):
109 return self._hostname
110
111 @property
112 def name(self):
113 return self._name
114
115
116 class _LttngLiveViewerGetTracingSessionInfosReply:
117 def __init__(self, tracing_session_infos):
118 self._tracing_session_infos = tracing_session_infos
119
120 @property
121 def tracing_session_infos(self):
122 return self._tracing_session_infos
123
124
125 class _LttngLiveViewerAttachToTracingSessionCommand(_LttngLiveViewerCommand):
126 class SeekType:
127 BEGINNING = 1
128 LAST = 2
129
130 def __init__(self, version, tracing_session_id, offset, seek_type):
131 super().__init__(version)
132 self._tracing_session_id = tracing_session_id
133 self._offset = offset
134 self._seek_type = seek_type
135
136 @property
137 def tracing_session_id(self):
138 return self._tracing_session_id
139
140 @property
141 def offset(self):
142 return self._offset
143
144 @property
145 def seek_type(self):
146 return self._seek_type
147
148
149 class _LttngLiveViewerStreamInfo:
150 def __init__(self, id, trace_id, is_metadata, path, channel_name):
151 self._id = id
152 self._trace_id = trace_id
153 self._is_metadata = is_metadata
154 self._path = path
155 self._channel_name = channel_name
156
157 @property
158 def id(self):
159 return self._id
160
161 @property
162 def trace_id(self):
163 return self._trace_id
164
165 @property
166 def is_metadata(self):
167 return self._is_metadata
168
169 @property
170 def path(self):
171 return self._path
172
173 @property
174 def channel_name(self):
175 return self._channel_name
176
177
178 class _LttngLiveViewerAttachToTracingSessionReply:
179 class Status:
180 OK = 1
181 ALREADY = 2
182 UNKNOWN = 3
183 NOT_LIVE = 4
184 SEEK_ERROR = 5
185 NO_SESSION = 6
186
187 def __init__(self, status, stream_infos):
188 self._status = status
189 self._stream_infos = stream_infos
190
191 @property
192 def status(self):
193 return self._status
194
195 @property
196 def stream_infos(self):
197 return self._stream_infos
198
199
200 class _LttngLiveViewerGetNextDataStreamIndexEntryCommand(_LttngLiveViewerCommand):
201 def __init__(self, version, stream_id):
202 super().__init__(version)
203 self._stream_id = stream_id
204
205 @property
206 def stream_id(self):
207 return self._stream_id
208
209
210 class _LttngLiveViewerGetNextDataStreamIndexEntryReply:
211 class Status:
212 OK = 1
213 RETRY = 2
214 HUP = 3
215 ERROR = 4
216 INACTIVE = 5
217 EOF = 6
218
219 def __init__(self, status, index_entry, has_new_metadata, has_new_data_stream):
220 self._status = status
221 self._index_entry = index_entry
222 self._has_new_metadata = has_new_metadata
223 self._has_new_data_stream = has_new_data_stream
224
225 @property
226 def status(self):
227 return self._status
228
229 @property
230 def index_entry(self):
231 return self._index_entry
232
233 @property
234 def has_new_metadata(self):
235 return self._has_new_metadata
236
237 @property
238 def has_new_data_stream(self):
239 return self._has_new_data_stream
240
241
242 class _LttngLiveViewerGetDataStreamPacketDataCommand(_LttngLiveViewerCommand):
243 def __init__(self, version, stream_id, offset, req_length):
244 super().__init__(version)
245 self._stream_id = stream_id
246 self._offset = offset
247 self._req_length = req_length
248
249 @property
250 def stream_id(self):
251 return self._stream_id
252
253 @property
254 def offset(self):
255 return self._offset
256
257 @property
258 def req_length(self):
259 return self._req_length
260
261
262 class _LttngLiveViewerGetDataStreamPacketDataReply:
263 class Status:
264 OK = 1
265 RETRY = 2
266 ERROR = 3
267 EOF = 4
268
269 def __init__(self, status, data, has_new_metadata, has_new_data_stream):
270 self._status = status
271 self._data = data
272 self._has_new_metadata = has_new_metadata
273 self._has_new_data_stream = has_new_data_stream
274
275 @property
276 def status(self):
277 return self._status
278
279 @property
280 def data(self):
281 return self._data
282
283 @property
284 def has_new_metadata(self):
285 return self._has_new_metadata
286
287 @property
288 def has_new_data_stream(self):
289 return self._has_new_data_stream
290
291
292 class _LttngLiveViewerGetMetadataStreamDataCommand(_LttngLiveViewerCommand):
293 def __init__(self, version, stream_id):
294 super().__init__(version)
295 self._stream_id = stream_id
296
297 @property
298 def stream_id(self):
299 return self._stream_id
300
301
302 class _LttngLiveViewerGetMetadataStreamDataContentReply:
303 class Status:
304 OK = 1
305 NO_NEW = 2
306 ERROR = 3
307
308 def __init__(self, status, data):
309 self._status = status
310 self._data = data
311
312 @property
313 def status(self):
314 return self._status
315
316 @property
317 def data(self):
318 return self._data
319
320
321 class _LttngLiveViewerGetNewStreamInfosCommand(_LttngLiveViewerCommand):
322 def __init__(self, version, tracing_session_id):
323 super().__init__(version)
324 self._tracing_session_id = tracing_session_id
325
326 @property
327 def tracing_session_id(self):
328 return self._tracing_session_id
329
330
331 class _LttngLiveViewerGetNewStreamInfosReply:
332 class Status:
333 OK = 1
334 NO_NEW = 2
335 ERROR = 3
336 HUP = 4
337
338 def __init__(self, status, stream_infos):
339 self._status = status
340 self._stream_infos = stream_infos
341
342 @property
343 def status(self):
344 return self._status
345
346 @property
347 def stream_infos(self):
348 return self._stream_infos
349
350
351 class _LttngLiveViewerCreateViewerSessionCommand(_LttngLiveViewerCommand):
352 pass
353
354
355 class _LttngLiveViewerCreateViewerSessionReply:
356 class Status:
357 OK = 1
358 ERROR = 2
359
360 def __init__(self, status):
361 self._status = status
362
363 @property
364 def status(self):
365 return self._status
366
367
368 class _LttngLiveViewerDetachFromTracingSessionCommand(_LttngLiveViewerCommand):
369 def __init__(self, version, tracing_session_id):
370 super().__init__(version)
371 self._tracing_session_id = tracing_session_id
372
373 @property
374 def tracing_session_id(self):
375 return self._tracing_session_id
376
377
378 class _LttngLiveViewerDetachFromTracingSessionReply:
379 class Status:
380 OK = 1
381 UNKNOWN = 2
382 ERROR = 3
383
384 def __init__(self, status):
385 self._status = status
386
387 @property
388 def status(self):
389 return self._status
390
391
392 # An LTTng live protocol codec can convert bytes to command objects and
393 # reply objects to bytes.
394 class _LttngLiveViewerProtocolCodec:
395 _COMMAND_HEADER_STRUCT_FMT = 'QII'
396 _COMMAND_HEADER_SIZE_BYTES = struct.calcsize(_COMMAND_HEADER_STRUCT_FMT)
397
398 def __init__(self):
399 pass
400
401 def _unpack(self, fmt, data, offset=0):
402 fmt = '!' + fmt
403 return struct.unpack_from(fmt, data, offset)
404
405 def _unpack_payload(self, fmt, data):
406 return self._unpack(
407 fmt, data, _LttngLiveViewerProtocolCodec._COMMAND_HEADER_SIZE_BYTES
408 )
409
410 def decode(self, data):
411 if len(data) < self._COMMAND_HEADER_SIZE_BYTES:
412 # Not enough data to read the command header
413 return
414
415 payload_size, cmd_type, version = self._unpack(
416 self._COMMAND_HEADER_STRUCT_FMT, data
417 )
418 logging.info(
419 'Decoded command header: payload-size={}, cmd-type={}, version={}'.format(
420 payload_size, cmd_type, version
421 )
422 )
423
424 if len(data) < self._COMMAND_HEADER_SIZE_BYTES + payload_size:
425 # Not enough data to read the whole command
426 return
427
428 if cmd_type == 1:
429 viewer_session_id, major, minor, conn_type = self._unpack_payload(
430 'QIII', data
431 )
432 return _LttngLiveViewerConnectCommand(
433 version, viewer_session_id, major, minor
434 )
435 elif cmd_type == 2:
436 return _LttngLiveViewerGetTracingSessionInfosCommand(version)
437 elif cmd_type == 3:
438 tracing_session_id, offset, seek_type = self._unpack_payload('QQI', data)
439 return _LttngLiveViewerAttachToTracingSessionCommand(
440 version, tracing_session_id, offset, seek_type
441 )
442 elif cmd_type == 4:
443 (stream_id,) = self._unpack_payload('Q', data)
444 return _LttngLiveViewerGetNextDataStreamIndexEntryCommand(
445 version, stream_id
446 )
447 elif cmd_type == 5:
448 stream_id, offset, req_length = self._unpack_payload('QQI', data)
449 return _LttngLiveViewerGetDataStreamPacketDataCommand(
450 version, stream_id, offset, req_length
451 )
452 elif cmd_type == 6:
453 (stream_id,) = self._unpack_payload('Q', data)
454 return _LttngLiveViewerGetMetadataStreamDataCommand(version, stream_id)
455 elif cmd_type == 7:
456 (tracing_session_id,) = self._unpack_payload('Q', data)
457 return _LttngLiveViewerGetNewStreamInfosCommand(version, tracing_session_id)
458 elif cmd_type == 8:
459 return _LttngLiveViewerCreateViewerSessionCommand(version)
460 elif cmd_type == 9:
461 (tracing_session_id,) = self._unpack_payload('Q', data)
462 return _LttngLiveViewerDetachFromTracingSessionCommand(
463 version, tracing_session_id
464 )
465 else:
466 raise UnexpectedInput('Unknown command type {}'.format(cmd_type))
467
468 def _pack(self, fmt, *args):
469 # Force network byte order
470 return struct.pack('!' + fmt, *args)
471
472 def _encode_zero_padded_str(self, string, length):
473 data = string.encode()
474 return data.ljust(length, b'\x00')
475
476 def _encode_stream_info(self, info):
477 data = self._pack('QQI', info.id, info.trace_id, int(info.is_metadata))
478 data += self._encode_zero_padded_str(info.path, 4096)
479 data += self._encode_zero_padded_str(info.channel_name, 255)
480 return data
481
482 def _get_has_new_stuff_flags(self, has_new_metadata, has_new_data_streams):
483 flags = 0
484
485 if has_new_metadata:
486 flags |= 1
487
488 if has_new_data_streams:
489 flags |= 2
490
491 return flags
492
493 def encode(self, reply):
494 if type(reply) is _LttngLiveViewerConnectReply:
495 data = self._pack(
496 'QIII', reply.viewer_session_id, reply.major, reply.minor, 2
497 )
498 elif type(reply) is _LttngLiveViewerGetTracingSessionInfosReply:
499 data = self._pack('I', len(reply.tracing_session_infos))
500
501 for info in reply.tracing_session_infos:
502 data += self._pack(
503 'QIII',
504 info.tracing_session_id,
505 info.live_timer_freq,
506 info.client_count,
507 info.stream_count,
508 )
509 data += self._encode_zero_padded_str(info.hostname, 64)
510 data += self._encode_zero_padded_str(info.name, 255)
511 elif type(reply) is _LttngLiveViewerAttachToTracingSessionReply:
512 data = self._pack('II', reply.status, len(reply.stream_infos))
513
514 for info in reply.stream_infos:
515 data += self._encode_stream_info(info)
516 elif type(reply) is _LttngLiveViewerGetNextDataStreamIndexEntryReply:
517 entry = reply.index_entry
518 flags = self._get_has_new_stuff_flags(
519 reply.has_new_metadata, reply.has_new_data_stream
520 )
521
522 data = self._pack(
523 'QQQQQQQII',
524 entry.offset_bytes,
525 entry.total_size_bits,
526 entry.content_size_bits,
527 entry.timestamp_begin,
528 entry.timestamp_end,
529 entry.events_discarded,
530 entry.stream_class_id,
531 reply.status,
532 flags,
533 )
534 elif type(reply) is _LttngLiveViewerGetDataStreamPacketDataReply:
535 flags = self._get_has_new_stuff_flags(
536 reply.has_new_metadata, reply.has_new_data_stream
537 )
538 data = self._pack('III', reply.status, len(reply.data), flags)
539 data += reply.data
540 elif type(reply) is _LttngLiveViewerGetMetadataStreamDataContentReply:
541 data = self._pack('QI', len(reply.data), reply.status)
542 data += reply.data
543 elif type(reply) is _LttngLiveViewerGetNewStreamInfosReply:
544 data = self._pack('II', reply.status, len(reply.stream_infos))
545
546 for info in reply.stream_infos:
547 data += self._encode_stream_info(info)
548 elif type(reply) is _LttngLiveViewerCreateViewerSessionReply:
549 data = self._pack('I', reply.status)
550 elif type(reply) is _LttngLiveViewerDetachFromTracingSessionReply:
551 data = self._pack('I', reply.status)
552 else:
553 raise ValueError(
554 'Unknown reply object with class `{}`'.format(reply.__class__.__name__)
555 )
556
557 return data
558
559
560 # An entry within the index of an LTTng data stream.
561 class _LttngDataStreamIndexEntry:
562 def __init__(
563 self,
564 offset_bytes,
565 total_size_bits,
566 content_size_bits,
567 timestamp_begin,
568 timestamp_end,
569 events_discarded,
570 stream_class_id,
571 ):
572 self._offset_bytes = offset_bytes
573 self._total_size_bits = total_size_bits
574 self._content_size_bits = content_size_bits
575 self._timestamp_begin = timestamp_begin
576 self._timestamp_end = timestamp_end
577 self._events_discarded = events_discarded
578 self._stream_class_id = stream_class_id
579
580 @property
581 def offset_bytes(self):
582 return self._offset_bytes
583
584 @property
585 def total_size_bits(self):
586 return self._total_size_bits
587
588 @property
589 def total_size_bytes(self):
590 return self._total_size_bits // 8
591
592 @property
593 def content_size_bits(self):
594 return self._content_size_bits
595
596 @property
597 def content_size_bytes(self):
598 return self._content_size_bits // 8
599
600 @property
601 def timestamp_begin(self):
602 return self._timestamp_begin
603
604 @property
605 def timestamp_end(self):
606 return self._timestamp_end
607
608 @property
609 def events_discarded(self):
610 return self._events_discarded
611
612 @property
613 def stream_class_id(self):
614 return self._stream_class_id
615
616
617 # The index of an LTTng data stream, a sequence of index entries.
618 class _LttngDataStreamIndex(collections.abc.Sequence):
619 def __init__(self, path):
620 self._path = path
621 self._build()
622 logging.info(
623 'Built data stream index entries: path="{}", count={}'.format(
624 path, len(self._entries)
625 )
626 )
627
628 def _build(self):
629 self._entries = []
630 assert os.path.isfile(self._path)
631
632 with open(self._path, 'rb') as f:
633 # Read header first
634 fmt = '>IIII'
635 size = struct.calcsize(fmt)
636 data = f.read(size)
637 assert len(data) == size
638 magic, index_major, index_minor, index_entry_length = struct.unpack(
639 fmt, data
640 )
641 assert magic == 0xC1F1DCC1
642
643 # Read index entries
644 fmt = '>QQQQQQQ'
645 size = struct.calcsize(fmt)
646
647 while True:
648 logging.debug(
649 'Decoding data stream index entry: path="{}", offset={}'.format(
650 self._path, f.tell()
651 )
652 )
653 data = f.read(size)
654
655 if not data:
656 # Done
657 break
658
659 assert len(data) == size
660 (
661 offset_bytes,
662 total_size_bits,
663 content_size_bits,
664 timestamp_begin,
665 timestamp_end,
666 events_discarded,
667 stream_class_id,
668 ) = struct.unpack(fmt, data)
669
670 self._entries.append(
671 _LttngDataStreamIndexEntry(
672 offset_bytes,
673 total_size_bits,
674 content_size_bits,
675 timestamp_begin,
676 timestamp_end,
677 events_discarded,
678 stream_class_id,
679 )
680 )
681
682 # Skip anything else before the next entry
683 f.seek(index_entry_length - size, os.SEEK_CUR)
684
685 def __getitem__(self, index):
686 return self._entries[index]
687
688 def __len__(self):
689 return len(self._entries)
690
691 @property
692 def path(self):
693 return self._path
694
695
696 # An LTTng data stream.
697 class _LttngDataStream:
698 def __init__(self, path):
699 self._path = path
700 filename = os.path.basename(path)
701 match = re.match(r'(.*)_\d+', filename)
702 self._channel_name = match.group(1)
703 trace_dir = os.path.dirname(path)
704 index_path = os.path.join(trace_dir, 'index', filename + '.idx')
705 self._index = _LttngDataStreamIndex(index_path)
706 assert os.path.isfile(path)
707 self._file = open(path, 'rb')
708 logging.info(
709 'Built data stream: path="{}", channel-name="{}"'.format(
710 path, self._channel_name
711 )
712 )
713
714 @property
715 def path(self):
716 return self._path
717
718 @property
719 def channel_name(self):
720 return self._channel_name
721
722 @property
723 def index(self):
724 return self._index
725
726 def get_data(self, offset_bytes, len_bytes):
727 self._file.seek(offset_bytes)
728 return self._file.read(len_bytes)
729
730
731 # An LTTng metadata stream.
732 class _LttngMetadataStream:
733 def __init__(self, path):
734 self._path = path
735 logging.info('Built metadata stream: path="{}"'.format(path))
736
737 @property
738 def path(self):
739 return self._path
740
741 @property
742 def data(self):
743 assert os.path.isfile(self._path)
744
745 with open(self._path, 'rb') as f:
746 return f.read()
747
748
749 # An LTTng trace, a sequence of LTTng data streams.
750 class LttngTrace(collections.abc.Sequence):
751 def __init__(self, trace_dir):
752 assert os.path.isdir(trace_dir)
753 self._path = trace_dir
754 self._metadata_stream = _LttngMetadataStream(
755 os.path.join(trace_dir, 'metadata')
756 )
757 self._create_data_streams(trace_dir)
758 logging.info('Built trace: path="{}"'.format(trace_dir))
759
760 def _create_data_streams(self, trace_dir):
761 data_stream_paths = []
762
763 for filename in os.listdir(trace_dir):
764 path = os.path.join(trace_dir, filename)
765
766 if not os.path.isfile(path):
767 continue
768
769 if filename.startswith('.'):
770 continue
771
772 if filename == 'metadata':
773 continue
774
775 data_stream_paths.append(path)
776
777 data_stream_paths.sort()
778 self._data_streams = []
779
780 for data_stream_path in data_stream_paths:
781 self._data_streams.append(_LttngDataStream(data_stream_path))
782
783 @property
784 def path(self):
785 return self._path
786
787 @property
788 def metadata_stream(self):
789 return self._metadata_stream
790
791 def __getitem__(self, index):
792 return self._data_streams[index]
793
794 def __len__(self):
795 return len(self._data_streams)
796
797
798 # The state of a single data stream.
799 class _LttngLiveViewerSessionDataStreamState:
800 def __init__(self, ts_state, info, data_stream):
801 self._ts_state = ts_state
802 self._info = info
803 self._data_stream = data_stream
804 self._cur_index_entry_index = 0
805 fmt = 'Built data stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
806 logging.info(
807 fmt.format(
808 info.id,
809 ts_state.tracing_session_descriptor.info.tracing_session_id,
810 ts_state.tracing_session_descriptor.info.name,
811 data_stream.path,
812 )
813 )
814
815 @property
816 def tracing_session_state(self):
817 return self._ts_state
818
819 @property
820 def info(self):
821 return self._info
822
823 @property
824 def data_stream(self):
825 return self._data_stream
826
827 @property
828 def cur_index_entry(self):
829 if self._cur_index_entry_index == len(self._data_stream.index):
830 return
831
832 return self._data_stream.index[self._cur_index_entry_index]
833
834 def goto_next_index_entry(self):
835 self._cur_index_entry_index += 1
836
837
838 # The state of a single metadata stream.
839 class _LttngLiveViewerSessionMetadataStreamState:
840 def __init__(self, ts_state, info, metadata_stream):
841 self._ts_state = ts_state
842 self._info = info
843 self._metadata_stream = metadata_stream
844 self._is_sent = False
845 fmt = 'Built metadata stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
846 logging.info(
847 fmt.format(
848 info.id,
849 ts_state.tracing_session_descriptor.info.tracing_session_id,
850 ts_state.tracing_session_descriptor.info.name,
851 metadata_stream.path,
852 )
853 )
854
855 @property
856 def trace_session_state(self):
857 return self._trace_session_state
858
859 @property
860 def info(self):
861 return self._info
862
863 @property
864 def metadata_stream(self):
865 return self._metadata_stream
866
867 @property
868 def is_sent(self):
869 return self._is_sent
870
871 @is_sent.setter
872 def is_sent(self, value):
873 self._is_sent = value
874
875
876 # The state of a tracing session.
877 class _LttngLiveViewerSessionTracingSessionState:
878 def __init__(self, tc_descr, base_stream_id):
879 self._tc_descr = tc_descr
880 self._stream_infos = []
881 self._ds_states = {}
882 self._ms_states = {}
883 stream_id = base_stream_id
884
885 for trace in tc_descr.traces:
886 trace_id = stream_id * 1000
887
888 # Data streams -> stream infos and data stream states
889 for data_stream in trace:
890 info = _LttngLiveViewerStreamInfo(
891 stream_id,
892 trace_id,
893 False,
894 data_stream.path,
895 data_stream.channel_name,
896 )
897 self._stream_infos.append(info)
898 self._ds_states[stream_id] = _LttngLiveViewerSessionDataStreamState(
899 self, info, data_stream
900 )
901 stream_id += 1
902
903 # Metadata stream -> stream info and metadata stream state
904 info = _LttngLiveViewerStreamInfo(
905 stream_id, trace_id, True, trace.metadata_stream.path, 'metadata'
906 )
907 self._stream_infos.append(info)
908 self._ms_states[stream_id] = _LttngLiveViewerSessionMetadataStreamState(
909 self, info, trace.metadata_stream
910 )
911 stream_id += 1
912
913 self._is_attached = False
914 fmt = 'Built tracing session state: id={}, name="{}"'
915 logging.info(fmt.format(tc_descr.info.tracing_session_id, tc_descr.info.name))
916
917 @property
918 def tracing_session_descriptor(self):
919 return self._tc_descr
920
921 @property
922 def data_stream_states(self):
923 return self._ds_states
924
925 @property
926 def metadata_stream_states(self):
927 return self._ms_states
928
929 @property
930 def stream_infos(self):
931 return self._stream_infos
932
933 @property
934 def has_new_metadata(self):
935 return any([not ms.is_sent for ms in self._ms_states.values()])
936
937 @property
938 def is_attached(self):
939 return self._is_attached
940
941 @is_attached.setter
942 def is_attached(self, value):
943 self._is_attached = value
944
945
946 # An LTTng live viewer session manages a view on tracing sessions
947 # and replies to commands accordingly.
948 class _LttngLiveViewerSession:
949 def __init__(
950 self,
951 viewer_session_id,
952 tracing_session_descriptors,
953 max_query_data_response_size,
954 ):
955 self._viewer_session_id = viewer_session_id
956 self._ts_states = {}
957 self._stream_states = {}
958 self._max_query_data_response_size = max_query_data_response_size
959 total_stream_infos = 0
960
961 for ts_descr in tracing_session_descriptors:
962 ts_state = _LttngLiveViewerSessionTracingSessionState(
963 ts_descr, total_stream_infos
964 )
965 ts_id = ts_state.tracing_session_descriptor.info.tracing_session_id
966 self._ts_states[ts_id] = ts_state
967 total_stream_infos += len(ts_state.stream_infos)
968
969 # Update session's stream states to have the new states
970 self._stream_states.update(ts_state.data_stream_states)
971 self._stream_states.update(ts_state.metadata_stream_states)
972
973 self._command_handlers = {
974 _LttngLiveViewerAttachToTracingSessionCommand: self._handle_attach_to_tracing_session_command,
975 _LttngLiveViewerCreateViewerSessionCommand: self._handle_create_viewer_session_command,
976 _LttngLiveViewerDetachFromTracingSessionCommand: self._handle_detach_from_tracing_session_command,
977 _LttngLiveViewerGetDataStreamPacketDataCommand: self._handle_get_data_stream_packet_data_command,
978 _LttngLiveViewerGetMetadataStreamDataCommand: self._handle_get_metadata_stream_data_command,
979 _LttngLiveViewerGetNewStreamInfosCommand: self._handle_get_new_stream_infos_command,
980 _LttngLiveViewerGetNextDataStreamIndexEntryCommand: self._handle_get_next_data_stream_index_entry_command,
981 _LttngLiveViewerGetTracingSessionInfosCommand: self._handle_get_tracing_session_infos_command,
982 }
983
984 @property
985 def viewer_session_id(self):
986 return self._viewer_session_id
987
988 def _get_tracing_session_state(self, tracing_session_id):
989 if tracing_session_id not in self._ts_states:
990 raise UnexpectedInput(
991 'Unknown tracing session ID {}'.format(tracing_session_id)
992 )
993
994 return self._ts_states[tracing_session_id]
995
996 def _get_stream_state(self, stream_id):
997 if stream_id not in self._stream_states:
998 UnexpectedInput('Unknown stream ID {}'.format(stream_id))
999
1000 return self._stream_states[stream_id]
1001
1002 def handle_command(self, cmd):
1003 logging.info(
1004 'Handling command in viewer session: cmd-cls-name={}'.format(
1005 cmd.__class__.__name__
1006 )
1007 )
1008 cmd_type = type(cmd)
1009
1010 if cmd_type not in self._command_handlers:
1011 raise UnexpectedInput(
1012 'Unexpected command: cmd-cls-name={}'.format(cmd.__class__.__name__)
1013 )
1014
1015 return self._command_handlers[cmd_type](cmd)
1016
1017 def _handle_attach_to_tracing_session_command(self, cmd):
1018 fmt = 'Handling "attach to tracing session" command: ts-id={}, offset={}, seek-type={}'
1019 logging.info(fmt.format(cmd.tracing_session_id, cmd.offset, cmd.seek_type))
1020 ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
1021 info = ts_state.tracing_session_descriptor.info
1022
1023 if ts_state.is_attached:
1024 raise UnexpectedInput(
1025 'Cannot attach to tracing session `{}`: viewer is already attached'.format(
1026 info.name
1027 )
1028 )
1029
1030 ts_state.is_attached = True
1031 status = _LttngLiveViewerAttachToTracingSessionReply.Status.OK
1032 return _LttngLiveViewerAttachToTracingSessionReply(
1033 status, ts_state.stream_infos
1034 )
1035
1036 def _handle_detach_from_tracing_session_command(self, cmd):
1037 fmt = 'Handling "detach from tracing session" command: ts-id={}'
1038 logging.info(fmt.format(cmd.tracing_session_id))
1039 ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
1040 info = ts_state.tracing_session_descriptor.info
1041
1042 if not ts_state.is_attached:
1043 raise UnexpectedInput(
1044 'Cannot detach to tracing session `{}`: viewer is not attached'.format(
1045 info.name
1046 )
1047 )
1048
1049 ts_state.is_attached = False
1050 status = _LttngLiveViewerDetachFromTracingSessionReply.Status.OK
1051 return _LttngLiveViewerDetachFromTracingSessionReply(status)
1052
1053 def _handle_get_next_data_stream_index_entry_command(self, cmd):
1054 fmt = 'Handling "get next data stream index entry" command: stream-id={}'
1055 logging.info(fmt.format(cmd.stream_id))
1056 stream_state = self._get_stream_state(cmd.stream_id)
1057
1058 if type(stream_state) is not _LttngLiveViewerSessionDataStreamState:
1059 raise UnexpectedInput(
1060 'Stream with ID {} is not a data stream'.format(cmd.stream_id)
1061 )
1062
1063 if stream_state.cur_index_entry is None:
1064 # The viewer is done reading this stream
1065 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.HUP
1066
1067 # Dummy data stream index entry to use with the `HUP` status
1068 # (the reply needs one, but the viewer ignores it)
1069 index_entry = _LttngDataStreamIndexEntry(0, 0, 0, 0, 0, 0, 0)
1070
1071 return _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1072 status, index_entry, False, False
1073 )
1074
1075 # The viewer only checks the `has_new_metadata` flag if the
1076 # reply's status is `OK`, so we need to provide an index here
1077 has_new_metadata = stream_state.tracing_session_state.has_new_metadata
1078 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.OK
1079 reply = _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1080 status, stream_state.cur_index_entry, has_new_metadata, False
1081 )
1082 stream_state.goto_next_index_entry()
1083 return reply
1084
1085 def _handle_get_data_stream_packet_data_command(self, cmd):
1086 fmt = 'Handling "get data stream packet data" command: stream-id={}, offset={}, req-length={}'
1087 logging.info(fmt.format(cmd.stream_id, cmd.offset, cmd.req_length))
1088 stream_state = self._get_stream_state(cmd.stream_id)
1089 data_response_length = cmd.req_length
1090
1091 if type(stream_state) is not _LttngLiveViewerSessionDataStreamState:
1092 raise UnexpectedInput(
1093 'Stream with ID {} is not a data stream'.format(cmd.stream_id)
1094 )
1095
1096 if stream_state.tracing_session_state.has_new_metadata:
1097 status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.ERROR
1098 return _LttngLiveViewerGetDataStreamPacketDataReply(
1099 status, bytes(), True, False
1100 )
1101
1102 if self._max_query_data_response_size:
1103 # Enforce a server side limit on the query requested length.
1104 # To ensure that the transaction terminate take the minimum of both
1105 # value.
1106 data_response_length = min(
1107 cmd.req_length, self._max_query_data_response_size
1108 )
1109 fmt = 'Limiting "get data stream packet data" command: req-length={} actual response size={}'
1110 logging.info(fmt.format(cmd.req_length, data_response_length))
1111
1112 data = stream_state.data_stream.get_data(cmd.offset, data_response_length)
1113 status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.OK
1114 return _LttngLiveViewerGetDataStreamPacketDataReply(status, data, False, False)
1115
1116 def _handle_get_metadata_stream_data_command(self, cmd):
1117 fmt = 'Handling "get metadata stream data" command: stream-id={}'
1118 logging.info(fmt.format(cmd.stream_id))
1119 stream_state = self._get_stream_state(cmd.stream_id)
1120
1121 if type(stream_state) is not _LttngLiveViewerSessionMetadataStreamState:
1122 raise UnexpectedInput(
1123 'Stream with ID {} is not a metadata stream'.format(cmd.stream_id)
1124 )
1125
1126 if stream_state.is_sent:
1127 status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.NO_NEW
1128 return _LttngLiveViewerGetMetadataStreamDataContentReply(status, bytes())
1129
1130 stream_state.is_sent = True
1131 status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.OK
1132 return _LttngLiveViewerGetMetadataStreamDataContentReply(
1133 status, stream_state.metadata_stream.data
1134 )
1135
1136 def _handle_get_new_stream_infos_command(self, cmd):
1137 fmt = 'Handling "get new stream infos" command: ts-id={}'
1138 logging.info(fmt.format(cmd.tracing_session_id))
1139
1140 # As of this version, all the tracing session's stream infos are
1141 # always given to the viewer when sending the "attach to tracing
1142 # session" reply, so there's nothing new here. Return the `HUP`
1143 # status as, if we're handling this command, the viewer consumed
1144 # all the existing data streams.
1145 status = _LttngLiveViewerGetNewStreamInfosReply.Status.HUP
1146 return _LttngLiveViewerGetNewStreamInfosReply(status, [])
1147
1148 def _handle_get_tracing_session_infos_command(self, cmd):
1149 logging.info('Handling "get tracing session infos" command.')
1150 infos = [
1151 tss.tracing_session_descriptor.info for tss in self._ts_states.values()
1152 ]
1153 infos.sort(key=lambda info: info.name)
1154 return _LttngLiveViewerGetTracingSessionInfosReply(infos)
1155
1156 def _handle_create_viewer_session_command(self, cmd):
1157 logging.info('Handling "create viewer session" command.')
1158 status = _LttngLiveViewerCreateViewerSessionReply.Status.OK
1159
1160 # This does nothing here. In the LTTng relay daemon, it
1161 # allocates the viewer session's state.
1162 return _LttngLiveViewerCreateViewerSessionReply(status)
1163
1164
1165 # An LTTng live TCP server.
1166 #
1167 # On creation, it binds to `localhost` with an OS-assigned TCP port. It writes
1168 # the decimal TCP port number to a temporary port file. It renames the
1169 # temporary port file to `port_filename`.
1170 #
1171 # `tracing_session_descriptors` is a list of tracing session descriptors
1172 # (`LttngTracingSessionDescriptor`) to serve.
1173 #
1174 # This server accepts a single viewer (client).
1175 #
1176 # When the viewer closes the connection, the server's constructor
1177 # returns.
1178 class LttngLiveServer:
1179 def __init__(
1180 self, port_filename, tracing_session_descriptors, max_query_data_response_size
1181 ):
1182 logging.info('Server configuration:')
1183
1184 logging.info(' Port file name: `{}`'.format(port_filename))
1185
1186 if max_query_data_response_size is not None:
1187 logging.info(
1188 ' Maximum response data query size: `{}`'.format(
1189 max_query_data_response_size
1190 )
1191 )
1192
1193 for ts_descr in tracing_session_descriptors:
1194 info = ts_descr.info
1195 fmt = ' TS descriptor: name="{}", id={}, hostname="{}", live-timer-freq={}, client-count={}, stream-count={}:'
1196 logging.info(
1197 fmt.format(
1198 info.name,
1199 info.tracing_session_id,
1200 info.hostname,
1201 info.live_timer_freq,
1202 info.client_count,
1203 info.stream_count,
1204 )
1205 )
1206
1207 for trace in ts_descr.traces:
1208 logging.info(' Trace: path="{}"'.format(trace.path))
1209
1210 self._ts_descriptors = tracing_session_descriptors
1211 self._max_query_data_response_size = max_query_data_response_size
1212 self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1213 self._codec = _LttngLiveViewerProtocolCodec()
1214
1215 # Port 0: OS assigns an unused port
1216 serv_addr = ('localhost', 0)
1217 self._sock.bind(serv_addr)
1218 self._write_port_to_file(port_filename)
1219
1220 try:
1221 self._listen()
1222 finally:
1223 self._sock.close()
1224 logging.info('Closed connection and socket.')
1225
1226 @property
1227 def _server_port(self):
1228 return self._sock.getsockname()[1]
1229
1230 def _recv_command(self):
1231 data = bytes()
1232
1233 while True:
1234 logging.info('Waiting for viewer command.')
1235 buf = self._conn.recv(128)
1236
1237 if not buf:
1238 logging.info('Client closed connection.')
1239
1240 if data:
1241 raise UnexpectedInput(
1242 'Client closed connection after having sent {} command bytes.'.format(
1243 len(data)
1244 )
1245 )
1246
1247 return
1248
1249 logging.info('Received data from viewer: length={}'.format(len(buf)))
1250
1251 data += buf
1252
1253 try:
1254 cmd = self._codec.decode(data)
1255 except struct.error as exc:
1256 raise UnexpectedInput('Malformed command: {}'.format(exc)) from exc
1257
1258 if cmd is not None:
1259 logging.info(
1260 'Received command from viewer: cmd-cls-name={}'.format(
1261 cmd.__class__.__name__
1262 )
1263 )
1264 return cmd
1265
1266 def _send_reply(self, reply):
1267 data = self._codec.encode(reply)
1268 logging.info(
1269 'Sending reply to viewer: reply-cls-name={}, length={}'.format(
1270 reply.__class__.__name__, len(data)
1271 )
1272 )
1273 self._conn.sendall(data)
1274
1275 def _handle_connection(self):
1276 # First command must be "connect"
1277 cmd = self._recv_command()
1278
1279 if type(cmd) is not _LttngLiveViewerConnectCommand:
1280 raise UnexpectedInput(
1281 'First command is not "connect": cmd-cls-name={}'.format(
1282 cmd.__class__.__name__
1283 )
1284 )
1285
1286 # Create viewer session (arbitrary ID 23)
1287 logging.info(
1288 'LTTng live viewer connected: version={}.{}'.format(cmd.major, cmd.minor)
1289 )
1290 viewer_session = _LttngLiveViewerSession(
1291 23, self._ts_descriptors, self._max_query_data_response_size
1292 )
1293
1294 # Send "connect" reply
1295 self._send_reply(
1296 _LttngLiveViewerConnectReply(viewer_session.viewer_session_id, 2, 10)
1297 )
1298
1299 # Make the viewer session handle the remaining commands
1300 while True:
1301 cmd = self._recv_command()
1302
1303 if cmd is None:
1304 # Connection closed (at an expected location within the
1305 # conversation)
1306 return
1307
1308 self._send_reply(viewer_session.handle_command(cmd))
1309
1310 def _listen(self):
1311 logging.info('Listening: port={}'.format(self._server_port))
1312 # Backlog must be present for Python version < 3.5.
1313 # 128 is an arbitrary number since we expect only 1 connection anyway.
1314 self._sock.listen(128)
1315 self._conn, viewer_addr = self._sock.accept()
1316 logging.info(
1317 'Accepted viewer: addr={}:{}'.format(viewer_addr[0], viewer_addr[1])
1318 )
1319
1320 try:
1321 self._handle_connection()
1322 finally:
1323 self._conn.close()
1324
1325 def _write_port_to_file(self, port_filename):
1326 # Write the port number to a temporary file.
1327 with tempfile.NamedTemporaryFile(mode='w', delete=False) as tmp_port_file:
1328 print(self._server_port, end='', file=tmp_port_file)
1329
1330 # Rename temporary file to real file
1331 os.replace(tmp_port_file.name, port_filename)
1332 logging.info(
1333 'Renamed port file: src-path="{}", dst-path="{}"'.format(
1334 tmp_port_file.name, port_filename
1335 )
1336 )
1337
1338
1339 # A tracing session descriptor.
1340 #
1341 # In the constructor, `traces` is a list of LTTng traces (`LttngTrace`
1342 # objects).
1343 class LttngTracingSessionDescriptor:
1344 def __init__(
1345 self, name, tracing_session_id, hostname, live_timer_freq, client_count, traces
1346 ):
1347 for trace in traces:
1348 if name not in trace.path:
1349 fmt = 'Tracing session name must be part of every trace path (`{}` not found in `{}`)'
1350 raise ValueError(fmt.format(name, trace.path))
1351
1352 self._traces = traces
1353 stream_count = sum([len(t) + 1 for t in traces])
1354 self._info = _LttngLiveViewerTracingSessionInfo(
1355 tracing_session_id,
1356 live_timer_freq,
1357 client_count,
1358 stream_count,
1359 hostname,
1360 name,
1361 )
1362
1363 @property
1364 def traces(self):
1365 return self._traces
1366
1367 @property
1368 def info(self):
1369 return self._info
1370
1371
1372 def _tracing_session_descriptors_from_arg(string):
1373 # Format is:
1374 # NAME,ID,HOSTNAME,FREQ,CLIENTS,TRACEPATH[,TRACEPATH]...
1375 parts = string.split(',')
1376 name = parts[0]
1377 tracing_session_id = int(parts[1])
1378 hostname = parts[2]
1379 live_timer_freq = int(parts[3])
1380 client_count = int(parts[4])
1381 traces = [LttngTrace(path) for path in parts[5:]]
1382 return LttngTracingSessionDescriptor(
1383 name, tracing_session_id, hostname, live_timer_freq, client_count, traces
1384 )
1385
1386
1387 def _loglevel_parser(string):
1388 loglevels = {'info': logging.INFO, 'warning': logging.WARNING}
1389 if string not in loglevels:
1390 msg = "{} is not a valid loglevel".format(string)
1391 raise argparse.ArgumentTypeError(msg)
1392 return loglevels[string]
1393
1394
1395 if __name__ == '__main__':
1396 logging.basicConfig(format='# %(asctime)-25s%(message)s')
1397 parser = argparse.ArgumentParser(
1398 description='LTTng-live protocol mocker', add_help=False
1399 )
1400 parser.add_argument(
1401 '--log-level',
1402 default='warning',
1403 choices=['info', 'warning'],
1404 help='The loglevel to be used.',
1405 )
1406
1407 loglevel_namespace, remaining_args = parser.parse_known_args()
1408 logging.getLogger().setLevel(_loglevel_parser(loglevel_namespace.log_level))
1409
1410 parser.add_argument(
1411 '--port-filename',
1412 help='The final port file. This file is present when the server is ready to receive connection.',
1413 required=True,
1414 )
1415 parser.add_argument(
1416 '--max-query-data-response-size',
1417 type=int,
1418 help='The maximum size of control data response in bytes',
1419 )
1420 parser.add_argument(
1421 'sessions',
1422 nargs="+",
1423 metavar="SESSION",
1424 type=_tracing_session_descriptors_from_arg,
1425 help='A session configuration. There is no space after comma. Format is: NAME,ID,HOSTNAME,FREQ,CLIENTS,TRACEPATH[,TRACEPATH]....',
1426 )
1427 parser.add_argument(
1428 '-h',
1429 '--help',
1430 action='help',
1431 default=argparse.SUPPRESS,
1432 help='Show this help message and exit.',
1433 )
1434
1435 args = parser.parse_args(args=remaining_args)
1436 try:
1437 LttngLiveServer(
1438 args.port_filename, args.sessions, args.max_query_data_response_size
1439 )
1440 except UnexpectedInput as exc:
1441 logging.error(str(exc))
1442 print(exc, file=sys.stderr)
1443 sys.exit(1)
This page took 0.069746 seconds and 5 git commands to generate.