black: run `black` version 19.10b0 on entire project
[babeltrace.git] / tests / data / plugins / src.ctf.lttng-live / lttng_live_server.py
CommitLineData
584af91e
PP
1# The MIT License (MIT)
2#
3# Copyright (c) 2019 Philippe Proulx <pproulx@efficios.com>
4#
5# Permission is hereby granted, free of charge, to any person obtaining a copy
6# of this software and associated documentation files (the "Software"), to deal
7# in the Software without restriction, including without limitation the rights
8# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9# copies of the Software, and to permit persons to whom the Software is
10# furnished to do so, subject to the following conditions:
11#
12# The above copyright notice and this permission notice shall be included in
13# all copies or substantial portions of the Software.
14#
15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21# THE SOFTWARE.
22
23import argparse
24import collections.abc
25import logging
26import os
27import os.path
28import re
29import socket
30import struct
31import sys
32import tempfile
33
34
35class UnexpectedInput(RuntimeError):
36 pass
37
38
39class _LttngLiveViewerCommand:
40 def __init__(self, version):
41 self._version = version
42
43 @property
44 def version(self):
45 return self._version
46
47
48class _LttngLiveViewerConnectCommand(_LttngLiveViewerCommand):
49 def __init__(self, version, viewer_session_id, major, minor):
50 super().__init__(version)
51 self._viewer_session_id = viewer_session_id
52 self._major = major
53 self._minor = minor
54
55 @property
56 def viewer_session_id(self):
57 return self._viewer_session_id
58
59 @property
60 def major(self):
61 return self._major
62
63 @property
64 def minor(self):
65 return self._minor
66
67
68class _LttngLiveViewerConnectReply:
69 def __init__(self, viewer_session_id, major, minor):
70 self._viewer_session_id = viewer_session_id
71 self._major = major
72 self._minor = minor
73
74 @property
75 def viewer_session_id(self):
76 return self._viewer_session_id
77
78 @property
79 def major(self):
80 return self._major
81
82 @property
83 def minor(self):
84 return self._minor
85
86
87class _LttngLiveViewerGetTracingSessionInfosCommand(_LttngLiveViewerCommand):
88 pass
89
90
91class _LttngLiveViewerTracingSessionInfo:
92 def __init__(
93 self,
94 tracing_session_id,
95 live_timer_freq,
96 client_count,
97 stream_count,
98 hostname,
99 name,
100 ):
101 self._tracing_session_id = tracing_session_id
102 self._live_timer_freq = live_timer_freq
103 self._client_count = client_count
104 self._stream_count = stream_count
105 self._hostname = hostname
106 self._name = name
107
108 @property
109 def tracing_session_id(self):
110 return self._tracing_session_id
111
112 @property
113 def live_timer_freq(self):
114 return self._live_timer_freq
115
116 @property
117 def client_count(self):
118 return self._client_count
119
120 @property
121 def stream_count(self):
122 return self._stream_count
123
124 @property
125 def hostname(self):
126 return self._hostname
127
128 @property
129 def name(self):
130 return self._name
131
132
133class _LttngLiveViewerGetTracingSessionInfosReply:
134 def __init__(self, tracing_session_infos):
135 self._tracing_session_infos = tracing_session_infos
136
137 @property
138 def tracing_session_infos(self):
139 return self._tracing_session_infos
140
141
142class _LttngLiveViewerAttachToTracingSessionCommand(_LttngLiveViewerCommand):
143 class SeekType:
144 BEGINNING = 1
145 LAST = 2
146
147 def __init__(self, version, tracing_session_id, offset, seek_type):
148 super().__init__(version)
149 self._tracing_session_id = tracing_session_id
150 self._offset = offset
151 self._seek_type = seek_type
152
153 @property
154 def tracing_session_id(self):
155 return self._tracing_session_id
156
157 @property
158 def offset(self):
159 return self._offset
160
161 @property
162 def seek_type(self):
163 return self._seek_type
164
165
166class _LttngLiveViewerStreamInfo:
167 def __init__(self, id, trace_id, is_metadata, path, channel_name):
168 self._id = id
169 self._trace_id = trace_id
170 self._is_metadata = is_metadata
171 self._path = path
172 self._channel_name = channel_name
173
174 @property
175 def id(self):
176 return self._id
177
178 @property
179 def trace_id(self):
180 return self._trace_id
181
182 @property
183 def is_metadata(self):
184 return self._is_metadata
185
186 @property
187 def path(self):
188 return self._path
189
190 @property
191 def channel_name(self):
192 return self._channel_name
193
194
195class _LttngLiveViewerAttachToTracingSessionReply:
196 class Status:
197 OK = 1
198 ALREADY = 2
199 UNKNOWN = 3
200 NOT_LIVE = 4
201 SEEK_ERROR = 5
202 NO_SESSION = 6
203
204 def __init__(self, status, stream_infos):
205 self._status = status
206 self._stream_infos = stream_infos
207
208 @property
209 def status(self):
210 return self._status
211
212 @property
213 def stream_infos(self):
214 return self._stream_infos
215
216
217class _LttngLiveViewerGetNextDataStreamIndexEntryCommand(_LttngLiveViewerCommand):
218 def __init__(self, version, stream_id):
219 super().__init__(version)
220 self._stream_id = stream_id
221
222 @property
223 def stream_id(self):
224 return self._stream_id
225
226
227class _LttngLiveViewerGetNextDataStreamIndexEntryReply:
228 class Status:
229 OK = 1
230 RETRY = 2
231 HUP = 3
232 ERROR = 4
233 INACTIVE = 5
234 EOF = 6
235
236 def __init__(self, status, index_entry, has_new_metadata, has_new_data_stream):
237 self._status = status
238 self._index_entry = index_entry
239 self._has_new_metadata = has_new_metadata
240 self._has_new_data_stream = has_new_data_stream
241
242 @property
243 def status(self):
244 return self._status
245
246 @property
247 def index_entry(self):
248 return self._index_entry
249
250 @property
251 def has_new_metadata(self):
252 return self._has_new_metadata
253
254 @property
255 def has_new_data_stream(self):
256 return self._has_new_data_stream
257
258
259class _LttngLiveViewerGetDataStreamPacketDataCommand(_LttngLiveViewerCommand):
260 def __init__(self, version, stream_id, offset, req_length):
261 super().__init__(version)
262 self._stream_id = stream_id
263 self._offset = offset
264 self._req_length = req_length
265
266 @property
267 def stream_id(self):
268 return self._stream_id
269
270 @property
271 def offset(self):
272 return self._offset
273
274 @property
275 def req_length(self):
276 return self._req_length
277
278
279class _LttngLiveViewerGetDataStreamPacketDataReply:
280 class Status:
281 OK = 1
282 RETRY = 2
283 ERROR = 3
284 EOF = 4
285
286 def __init__(self, status, data, has_new_metadata, has_new_data_stream):
287 self._status = status
288 self._data = data
289 self._has_new_metadata = has_new_metadata
290 self._has_new_data_stream = has_new_data_stream
291
292 @property
293 def status(self):
294 return self._status
295
296 @property
297 def data(self):
298 return self._data
299
300 @property
301 def has_new_metadata(self):
302 return self._has_new_metadata
303
304 @property
305 def has_new_data_stream(self):
306 return self._has_new_data_stream
307
308
309class _LttngLiveViewerGetMetadataStreamDataCommand(_LttngLiveViewerCommand):
310 def __init__(self, version, stream_id):
311 super().__init__(version)
312 self._stream_id = stream_id
313
314 @property
315 def stream_id(self):
316 return self._stream_id
317
318
319class _LttngLiveViewerGetMetadataStreamDataContentReply:
320 class Status:
321 OK = 1
322 NO_NEW = 2
323 ERROR = 3
324
325 def __init__(self, status, data):
326 self._status = status
327 self._data = data
328
329 @property
330 def status(self):
331 return self._status
332
333 @property
334 def data(self):
335 return self._data
336
337
338class _LttngLiveViewerGetNewStreamInfosCommand(_LttngLiveViewerCommand):
339 def __init__(self, version, tracing_session_id):
340 super().__init__(version)
341 self._tracing_session_id = tracing_session_id
342
343 @property
344 def tracing_session_id(self):
345 return self._tracing_session_id
346
347
348class _LttngLiveViewerGetNewStreamInfosReply:
349 class Status:
350 OK = 1
351 NO_NEW = 2
352 ERROR = 3
353 HUP = 4
354
355 def __init__(self, status, stream_infos):
356 self._status = status
357 self._stream_infos = stream_infos
358
359 @property
360 def status(self):
361 return self._status
362
363 @property
364 def stream_infos(self):
365 return self._stream_infos
366
367
368class _LttngLiveViewerCreateViewerSessionCommand(_LttngLiveViewerCommand):
369 pass
370
371
372class _LttngLiveViewerCreateViewerSessionReply:
373 class Status:
374 OK = 1
375 ERROR = 2
376
377 def __init__(self, status):
378 self._status = status
379
380 @property
381 def status(self):
382 return self._status
383
384
385class _LttngLiveViewerDetachFromTracingSessionCommand(_LttngLiveViewerCommand):
386 def __init__(self, version, tracing_session_id):
387 super().__init__(version)
388 self._tracing_session_id = tracing_session_id
389
390 @property
391 def tracing_session_id(self):
392 return self._tracing_session_id
393
394
395class _LttngLiveViewerDetachFromTracingSessionReply:
396 class Status:
397 OK = 1
398 UNKNOWN = 2
399 ERROR = 3
400
401 def __init__(self, status):
402 self._status = status
403
404 @property
405 def status(self):
406 return self._status
407
408
409# An LTTng live protocol codec can convert bytes to command objects and
410# reply objects to bytes.
411class _LttngLiveViewerProtocolCodec:
412 _COMMAND_HEADER_STRUCT_FMT = 'QII'
413 _COMMAND_HEADER_SIZE_BYTES = struct.calcsize(_COMMAND_HEADER_STRUCT_FMT)
414
415 def __init__(self):
416 pass
417
418 def _unpack(self, fmt, data, offset=0):
419 fmt = '!' + fmt
420 return struct.unpack_from(fmt, data, offset)
421
422 def _unpack_payload(self, fmt, data):
423 return self._unpack(
424 fmt, data, _LttngLiveViewerProtocolCodec._COMMAND_HEADER_SIZE_BYTES
425 )
426
427 def decode(self, data):
428 if len(data) < self._COMMAND_HEADER_SIZE_BYTES:
429 # Not enough data to read the command header
430 return
431
432 payload_size, cmd_type, version = self._unpack(
433 self._COMMAND_HEADER_STRUCT_FMT, data
434 )
435 logging.info(
436 'Decoded command header: payload-size={}, cmd-type={}, version={}'.format(
437 payload_size, cmd_type, version
438 )
439 )
440
441 if len(data) < self._COMMAND_HEADER_SIZE_BYTES + payload_size:
442 # Not enough data to read the whole command
443 return
444
445 if cmd_type == 1:
446 viewer_session_id, major, minor, conn_type = self._unpack_payload(
447 'QIII', data
448 )
449 return _LttngLiveViewerConnectCommand(
450 version, viewer_session_id, major, minor
451 )
452 elif cmd_type == 2:
453 return _LttngLiveViewerGetTracingSessionInfosCommand(version)
454 elif cmd_type == 3:
455 tracing_session_id, offset, seek_type = self._unpack_payload('QQI', data)
456 return _LttngLiveViewerAttachToTracingSessionCommand(
457 version, tracing_session_id, offset, seek_type
458 )
459 elif cmd_type == 4:
75882e97 460 (stream_id,) = self._unpack_payload('Q', data)
584af91e
PP
461 return _LttngLiveViewerGetNextDataStreamIndexEntryCommand(
462 version, stream_id
463 )
464 elif cmd_type == 5:
465 stream_id, offset, req_length = self._unpack_payload('QQI', data)
466 return _LttngLiveViewerGetDataStreamPacketDataCommand(
467 version, stream_id, offset, req_length
468 )
469 elif cmd_type == 6:
75882e97 470 (stream_id,) = self._unpack_payload('Q', data)
584af91e
PP
471 return _LttngLiveViewerGetMetadataStreamDataCommand(version, stream_id)
472 elif cmd_type == 7:
75882e97 473 (tracing_session_id,) = self._unpack_payload('Q', data)
584af91e
PP
474 return _LttngLiveViewerGetNewStreamInfosCommand(version, tracing_session_id)
475 elif cmd_type == 8:
476 return _LttngLiveViewerCreateViewerSessionCommand(version)
477 elif cmd_type == 9:
75882e97 478 (tracing_session_id,) = self._unpack_payload('Q', data)
584af91e
PP
479 return _LttngLiveViewerDetachFromTracingSessionCommand(
480 version, tracing_session_id
481 )
482 else:
483 raise UnexpectedInput('Unknown command type {}'.format(cmd_type))
484
485 def _pack(self, fmt, *args):
486 # Force network byte order
487 return struct.pack('!' + fmt, *args)
488
489 def _encode_zero_padded_str(self, string, length):
490 data = string.encode()
491 return data.ljust(length, b'\x00')
492
493 def _encode_stream_info(self, info):
494 data = self._pack('QQI', info.id, info.trace_id, int(info.is_metadata))
495 data += self._encode_zero_padded_str(info.path, 4096)
496 data += self._encode_zero_padded_str(info.channel_name, 255)
497 return data
498
499 def _get_has_new_stuff_flags(self, has_new_metadata, has_new_data_streams):
500 flags = 0
501
502 if has_new_metadata:
503 flags |= 1
504
505 if has_new_data_streams:
506 flags |= 2
507
508 return flags
509
510 def encode(self, reply):
511 if type(reply) is _LttngLiveViewerConnectReply:
512 data = self._pack(
513 'QIII', reply.viewer_session_id, reply.major, reply.minor, 2
514 )
515 elif type(reply) is _LttngLiveViewerGetTracingSessionInfosReply:
516 data = self._pack('I', len(reply.tracing_session_infos))
517
518 for info in reply.tracing_session_infos:
519 data += self._pack(
520 'QIII',
521 info.tracing_session_id,
522 info.live_timer_freq,
523 info.client_count,
524 info.stream_count,
525 )
526 data += self._encode_zero_padded_str(info.hostname, 64)
527 data += self._encode_zero_padded_str(info.name, 255)
528 elif type(reply) is _LttngLiveViewerAttachToTracingSessionReply:
529 data = self._pack('II', reply.status, len(reply.stream_infos))
530
531 for info in reply.stream_infos:
532 data += self._encode_stream_info(info)
533 elif type(reply) is _LttngLiveViewerGetNextDataStreamIndexEntryReply:
534 entry = reply.index_entry
535 flags = self._get_has_new_stuff_flags(
536 reply.has_new_metadata, reply.has_new_data_stream
537 )
538
539 data = self._pack(
540 'QQQQQQQII',
541 entry.offset_bytes,
542 entry.total_size_bits,
543 entry.content_size_bits,
544 entry.timestamp_begin,
545 entry.timestamp_end,
546 entry.events_discarded,
547 entry.stream_class_id,
548 reply.status,
549 flags,
550 )
551 elif type(reply) is _LttngLiveViewerGetDataStreamPacketDataReply:
552 flags = self._get_has_new_stuff_flags(
553 reply.has_new_metadata, reply.has_new_data_stream
554 )
555 data = self._pack('III', reply.status, len(reply.data), flags)
556 data += reply.data
557 elif type(reply) is _LttngLiveViewerGetMetadataStreamDataContentReply:
558 data = self._pack('QI', len(reply.data), reply.status)
559 data += reply.data
560 elif type(reply) is _LttngLiveViewerGetNewStreamInfosReply:
561 data = self._pack('II', reply.status, len(reply.stream_infos))
562
563 for info in reply.stream_infos:
564 data += self._encode_stream_info(info)
565 elif type(reply) is _LttngLiveViewerCreateViewerSessionReply:
566 data = self._pack('I', reply.status)
567 elif type(reply) is _LttngLiveViewerDetachFromTracingSessionReply:
568 data = self._pack('I', reply.status)
569 else:
570 raise ValueError(
571 'Unknown reply object with class `{}`'.format(reply.__class__.__name__)
572 )
573
574 return data
575
576
577# An entry within the index of an LTTng data stream.
578class _LttngDataStreamIndexEntry:
579 def __init__(
580 self,
581 offset_bytes,
582 total_size_bits,
583 content_size_bits,
584 timestamp_begin,
585 timestamp_end,
586 events_discarded,
587 stream_class_id,
588 ):
589 self._offset_bytes = offset_bytes
590 self._total_size_bits = total_size_bits
591 self._content_size_bits = content_size_bits
592 self._timestamp_begin = timestamp_begin
593 self._timestamp_end = timestamp_end
594 self._events_discarded = events_discarded
595 self._stream_class_id = stream_class_id
596
597 @property
598 def offset_bytes(self):
599 return self._offset_bytes
600
601 @property
602 def total_size_bits(self):
603 return self._total_size_bits
604
605 @property
606 def total_size_bytes(self):
607 return self._total_size_bits // 8
608
609 @property
610 def content_size_bits(self):
611 return self._content_size_bits
612
613 @property
614 def content_size_bytes(self):
615 return self._content_size_bits // 8
616
617 @property
618 def timestamp_begin(self):
619 return self._timestamp_begin
620
621 @property
622 def timestamp_end(self):
623 return self._timestamp_end
624
625 @property
626 def events_discarded(self):
627 return self._events_discarded
628
629 @property
630 def stream_class_id(self):
631 return self._stream_class_id
632
633
634# The index of an LTTng data stream, a sequence of index entries.
635class _LttngDataStreamIndex(collections.abc.Sequence):
636 def __init__(self, path):
637 self._path = path
638 self._build()
639 logging.info(
640 'Built data stream index entries: path="{}", count={}'.format(
641 path, len(self._entries)
642 )
643 )
644
645 def _build(self):
646 self._entries = []
647 assert os.path.isfile(self._path)
648
649 with open(self._path, 'rb') as f:
650 # Read header first
651 fmt = '>IIII'
652 size = struct.calcsize(fmt)
653 data = f.read(size)
654 assert len(data) == size
655 magic, index_major, index_minor, index_entry_length = struct.unpack(
656 fmt, data
657 )
658 assert magic == 0xC1F1DCC1
659
660 # Read index entries
661 fmt = '>QQQQQQQ'
662 size = struct.calcsize(fmt)
663
664 while True:
665 logging.debug(
666 'Decoding data stream index entry: path="{}", offset={}'.format(
667 self._path, f.tell()
668 )
669 )
670 data = f.read(size)
671
672 if not data:
673 # Done
674 break
675
676 assert len(data) == size
75882e97
FD
677 (
678 offset_bytes,
679 total_size_bits,
680 content_size_bits,
681 timestamp_begin,
682 timestamp_end,
683 events_discarded,
684 stream_class_id,
685 ) = struct.unpack(fmt, data)
584af91e
PP
686
687 self._entries.append(
688 _LttngDataStreamIndexEntry(
689 offset_bytes,
690 total_size_bits,
691 content_size_bits,
692 timestamp_begin,
693 timestamp_end,
694 events_discarded,
695 stream_class_id,
696 )
697 )
698
699 # Skip anything else before the next entry
700 f.seek(index_entry_length - size, os.SEEK_CUR)
701
702 def __getitem__(self, index):
703 return self._entries[index]
704
705 def __len__(self):
706 return len(self._entries)
707
708 @property
709 def path(self):
710 return self._path
711
712
713# An LTTng data stream.
714class _LttngDataStream:
715 def __init__(self, path):
716 self._path = path
717 filename = os.path.basename(path)
718 match = re.match(r'(.*)_\d+', filename)
719 self._channel_name = match.group(1)
720 trace_dir = os.path.dirname(path)
721 index_path = os.path.join(trace_dir, 'index', filename + '.idx')
722 self._index = _LttngDataStreamIndex(index_path)
723 assert os.path.isfile(path)
724 self._file = open(path, 'rb')
725 logging.info(
726 'Built data stream: path="{}", channel-name="{}"'.format(
727 path, self._channel_name
728 )
729 )
730
731 @property
732 def path(self):
733 return self._path
734
735 @property
736 def channel_name(self):
737 return self._channel_name
738
739 @property
740 def index(self):
741 return self._index
742
743 def get_data(self, offset_bytes, len_bytes):
744 self._file.seek(offset_bytes)
745 return self._file.read(len_bytes)
746
747
748# An LTTng metadata stream.
749class _LttngMetadataStream:
750 def __init__(self, path):
751 self._path = path
752 logging.info('Built metadata stream: path="{}"'.format(path))
753
754 @property
755 def path(self):
756 return self._path
757
758 @property
759 def data(self):
760 assert os.path.isfile(self._path)
761
762 with open(self._path, 'rb') as f:
763 return f.read()
764
765
766# An LTTng trace, a sequence of LTTng data streams.
767class LttngTrace(collections.abc.Sequence):
768 def __init__(self, trace_dir):
769 assert os.path.isdir(trace_dir)
770 self._path = trace_dir
771 self._metadata_stream = _LttngMetadataStream(
772 os.path.join(trace_dir, 'metadata')
773 )
774 self._create_data_streams(trace_dir)
775 logging.info('Built trace: path="{}"'.format(trace_dir))
776
777 def _create_data_streams(self, trace_dir):
778 data_stream_paths = []
779
780 for filename in os.listdir(trace_dir):
781 path = os.path.join(trace_dir, filename)
782
783 if not os.path.isfile(path):
784 continue
785
786 if filename.startswith('.'):
787 continue
788
789 if filename == 'metadata':
790 continue
791
792 data_stream_paths.append(path)
793
794 data_stream_paths.sort()
795 self._data_streams = []
796
797 for data_stream_path in data_stream_paths:
798 self._data_streams.append(_LttngDataStream(data_stream_path))
799
800 @property
801 def path(self):
802 return self._path
803
804 @property
805 def metadata_stream(self):
806 return self._metadata_stream
807
808 def __getitem__(self, index):
809 return self._data_streams[index]
810
811 def __len__(self):
812 return len(self._data_streams)
813
814
815# The state of a single data stream.
816class _LttngLiveViewerSessionDataStreamState:
817 def __init__(self, ts_state, info, data_stream):
818 self._ts_state = ts_state
819 self._info = info
820 self._data_stream = data_stream
821 self._cur_index_entry_index = 0
822 fmt = 'Built data stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
823 logging.info(
824 fmt.format(
825 info.id,
826 ts_state.tracing_session_descriptor.info.tracing_session_id,
827 ts_state.tracing_session_descriptor.info.name,
828 data_stream.path,
829 )
830 )
831
832 @property
833 def tracing_session_state(self):
834 return self._ts_state
835
836 @property
837 def info(self):
838 return self._info
839
840 @property
841 def data_stream(self):
842 return self._data_stream
843
844 @property
845 def cur_index_entry(self):
846 if self._cur_index_entry_index == len(self._data_stream.index):
847 return
848
849 return self._data_stream.index[self._cur_index_entry_index]
850
851 def goto_next_index_entry(self):
852 self._cur_index_entry_index += 1
853
854
855# The state of a single metadata stream.
856class _LttngLiveViewerSessionMetadataStreamState:
857 def __init__(self, ts_state, info, metadata_stream):
858 self._ts_state = ts_state
859 self._info = info
860 self._metadata_stream = metadata_stream
861 self._is_sent = False
862 fmt = 'Built metadata stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
863 logging.info(
864 fmt.format(
865 info.id,
866 ts_state.tracing_session_descriptor.info.tracing_session_id,
867 ts_state.tracing_session_descriptor.info.name,
868 metadata_stream.path,
869 )
870 )
871
872 @property
873 def trace_session_state(self):
874 return self._trace_session_state
875
876 @property
877 def info(self):
878 return self._info
879
880 @property
881 def metadata_stream(self):
882 return self._metadata_stream
883
884 @property
885 def is_sent(self):
886 return self._is_sent
887
888 @is_sent.setter
889 def is_sent(self, value):
890 self._is_sent = value
891
892
893# The state of a tracing session.
894class _LttngLiveViewerSessionTracingSessionState:
895 def __init__(self, tc_descr, base_stream_id):
896 self._tc_descr = tc_descr
897 self._stream_infos = []
898 self._ds_states = {}
899 self._ms_states = {}
900 stream_id = base_stream_id
901
902 for trace in tc_descr.traces:
903 trace_id = stream_id * 1000
904
905 # Data streams -> stream infos and data stream states
906 for data_stream in trace:
907 info = _LttngLiveViewerStreamInfo(
908 stream_id,
909 trace_id,
910 False,
911 data_stream.path,
912 data_stream.channel_name,
913 )
914 self._stream_infos.append(info)
915 self._ds_states[stream_id] = _LttngLiveViewerSessionDataStreamState(
916 self, info, data_stream
917 )
918 stream_id += 1
919
920 # Metadata stream -> stream info and metadata stream state
921 info = _LttngLiveViewerStreamInfo(
922 stream_id, trace_id, True, trace.metadata_stream.path, 'metadata'
923 )
924 self._stream_infos.append(info)
925 self._ms_states[stream_id] = _LttngLiveViewerSessionMetadataStreamState(
926 self, info, trace.metadata_stream
927 )
928 stream_id += 1
929
930 self._is_attached = False
931 fmt = 'Built tracing session state: id={}, name="{}"'
932 logging.info(fmt.format(tc_descr.info.tracing_session_id, tc_descr.info.name))
933
934 @property
935 def tracing_session_descriptor(self):
936 return self._tc_descr
937
938 @property
939 def data_stream_states(self):
940 return self._ds_states
941
942 @property
943 def metadata_stream_states(self):
944 return self._ms_states
945
946 @property
947 def stream_infos(self):
948 return self._stream_infos
949
950 @property
951 def has_new_metadata(self):
952 return any([not ms.is_sent for ms in self._ms_states.values()])
953
954 @property
955 def is_attached(self):
956 return self._is_attached
957
958 @is_attached.setter
959 def is_attached(self, value):
960 self._is_attached = value
961
962
963# An LTTng live viewer session manages a view on tracing sessions
964# and replies to commands accordingly.
965class _LttngLiveViewerSession:
966 def __init__(
967 self,
968 viewer_session_id,
969 tracing_session_descriptors,
970 max_query_data_response_size,
971 ):
972 self._viewer_session_id = viewer_session_id
973 self._ts_states = {}
974 self._stream_states = {}
975 self._max_query_data_response_size = max_query_data_response_size
976 total_stream_infos = 0
977
978 for ts_descr in tracing_session_descriptors:
979 ts_state = _LttngLiveViewerSessionTracingSessionState(
980 ts_descr, total_stream_infos
981 )
982 ts_id = ts_state.tracing_session_descriptor.info.tracing_session_id
983 self._ts_states[ts_id] = ts_state
984 total_stream_infos += len(ts_state.stream_infos)
985
986 # Update session's stream states to have the new states
987 self._stream_states.update(ts_state.data_stream_states)
988 self._stream_states.update(ts_state.metadata_stream_states)
989
990 self._command_handlers = {
991 _LttngLiveViewerAttachToTracingSessionCommand: self._handle_attach_to_tracing_session_command,
992 _LttngLiveViewerCreateViewerSessionCommand: self._handle_create_viewer_session_command,
993 _LttngLiveViewerDetachFromTracingSessionCommand: self._handle_detach_from_tracing_session_command,
994 _LttngLiveViewerGetDataStreamPacketDataCommand: self._handle_get_data_stream_packet_data_command,
995 _LttngLiveViewerGetMetadataStreamDataCommand: self._handle_get_metadata_stream_data_command,
996 _LttngLiveViewerGetNewStreamInfosCommand: self._handle_get_new_stream_infos_command,
997 _LttngLiveViewerGetNextDataStreamIndexEntryCommand: self._handle_get_next_data_stream_index_entry_command,
998 _LttngLiveViewerGetTracingSessionInfosCommand: self._handle_get_tracing_session_infos_command,
999 }
1000
1001 @property
1002 def viewer_session_id(self):
1003 return self._viewer_session_id
1004
1005 def _get_tracing_session_state(self, tracing_session_id):
1006 if tracing_session_id not in self._ts_states:
1007 raise UnexpectedInput(
1008 'Unknown tracing session ID {}'.format(tracing_session_id)
1009 )
1010
1011 return self._ts_states[tracing_session_id]
1012
1013 def _get_stream_state(self, stream_id):
1014 if stream_id not in self._stream_states:
1015 UnexpectedInput('Unknown stream ID {}'.format(stream_id))
1016
1017 return self._stream_states[stream_id]
1018
1019 def handle_command(self, cmd):
1020 logging.info(
1021 'Handling command in viewer session: cmd-cls-name={}'.format(
1022 cmd.__class__.__name__
1023 )
1024 )
1025 cmd_type = type(cmd)
1026
1027 if cmd_type not in self._command_handlers:
1028 raise UnexpectedInput(
1029 'Unexpected command: cmd-cls-name={}'.format(cmd.__class__.__name__)
1030 )
1031
1032 return self._command_handlers[cmd_type](cmd)
1033
1034 def _handle_attach_to_tracing_session_command(self, cmd):
1035 fmt = 'Handling "attach to tracing session" command: ts-id={}, offset={}, seek-type={}'
1036 logging.info(fmt.format(cmd.tracing_session_id, cmd.offset, cmd.seek_type))
1037 ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
1038 info = ts_state.tracing_session_descriptor.info
1039
1040 if ts_state.is_attached:
1041 raise UnexpectedInput(
1042 'Cannot attach to tracing session `{}`: viewer is already attached'.format(
1043 info.name
1044 )
1045 )
1046
1047 ts_state.is_attached = True
1048 status = _LttngLiveViewerAttachToTracingSessionReply.Status.OK
1049 return _LttngLiveViewerAttachToTracingSessionReply(
1050 status, ts_state.stream_infos
1051 )
1052
1053 def _handle_detach_from_tracing_session_command(self, cmd):
1054 fmt = 'Handling "detach from tracing session" command: ts-id={}'
1055 logging.info(fmt.format(cmd.tracing_session_id))
1056 ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
1057 info = ts_state.tracing_session_descriptor.info
1058
1059 if not ts_state.is_attached:
1060 raise UnexpectedInput(
1061 'Cannot detach to tracing session `{}`: viewer is not attached'.format(
1062 info.name
1063 )
1064 )
1065
1066 ts_state.is_attached = False
1067 status = _LttngLiveViewerDetachFromTracingSessionReply.Status.OK
1068 return _LttngLiveViewerDetachFromTracingSessionReply(status)
1069
1070 def _handle_get_next_data_stream_index_entry_command(self, cmd):
1071 fmt = 'Handling "get next data stream index entry" command: stream-id={}'
1072 logging.info(fmt.format(cmd.stream_id))
1073 stream_state = self._get_stream_state(cmd.stream_id)
1074
1075 if type(stream_state) is not _LttngLiveViewerSessionDataStreamState:
1076 raise UnexpectedInput(
1077 'Stream with ID {} is not a data stream'.format(cmd.stream_id)
1078 )
1079
1080 if stream_state.cur_index_entry is None:
1081 # The viewer is done reading this stream
1082 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.HUP
1083
1084 # Dummy data stream index entry to use with the `HUP` status
1085 # (the reply needs one, but the viewer ignores it)
1086 index_entry = _LttngDataStreamIndexEntry(0, 0, 0, 0, 0, 0, 0)
1087
1088 return _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1089 status, index_entry, False, False
1090 )
1091
1092 # The viewer only checks the `has_new_metadata` flag if the
1093 # reply's status is `OK`, so we need to provide an index here
1094 has_new_metadata = stream_state.tracing_session_state.has_new_metadata
1095 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.OK
1096 reply = _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1097 status, stream_state.cur_index_entry, has_new_metadata, False
1098 )
1099 stream_state.goto_next_index_entry()
1100 return reply
1101
1102 def _handle_get_data_stream_packet_data_command(self, cmd):
1103 fmt = 'Handling "get data stream packet data" command: stream-id={}, offset={}, req-length={}'
1104 logging.info(fmt.format(cmd.stream_id, cmd.offset, cmd.req_length))
1105 stream_state = self._get_stream_state(cmd.stream_id)
1106 data_response_length = cmd.req_length
1107
1108 if type(stream_state) is not _LttngLiveViewerSessionDataStreamState:
1109 raise UnexpectedInput(
1110 'Stream with ID {} is not a data stream'.format(cmd.stream_id)
1111 )
1112
1113 if stream_state.tracing_session_state.has_new_metadata:
1114 status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.ERROR
1115 return _LttngLiveViewerGetDataStreamPacketDataReply(
1116 status, bytes(), True, False
1117 )
1118
1119 if self._max_query_data_response_size:
1120 # Enforce a server side limit on the query requested length.
1121 # To ensure that the transaction terminate take the minimum of both
1122 # value.
1123 data_response_length = min(
1124 cmd.req_length, self._max_query_data_response_size
1125 )
1126 fmt = 'Limiting "get data stream packet data" command: req-length={} actual response size={}'
1127 logging.info(fmt.format(cmd.req_length, data_response_length))
1128
1129 data = stream_state.data_stream.get_data(cmd.offset, data_response_length)
1130 status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.OK
1131 return _LttngLiveViewerGetDataStreamPacketDataReply(status, data, False, False)
1132
1133 def _handle_get_metadata_stream_data_command(self, cmd):
1134 fmt = 'Handling "get metadata stream data" command: stream-id={}'
1135 logging.info(fmt.format(cmd.stream_id))
1136 stream_state = self._get_stream_state(cmd.stream_id)
1137
1138 if type(stream_state) is not _LttngLiveViewerSessionMetadataStreamState:
1139 raise UnexpectedInput(
1140 'Stream with ID {} is not a metadata stream'.format(cmd.stream_id)
1141 )
1142
1143 if stream_state.is_sent:
1144 status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.NO_NEW
1145 return _LttngLiveViewerGetMetadataStreamDataContentReply(status, bytes())
1146
1147 stream_state.is_sent = True
1148 status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.OK
1149 return _LttngLiveViewerGetMetadataStreamDataContentReply(
1150 status, stream_state.metadata_stream.data
1151 )
1152
1153 def _handle_get_new_stream_infos_command(self, cmd):
1154 fmt = 'Handling "get new stream infos" command: ts-id={}'
1155 logging.info(fmt.format(cmd.tracing_session_id))
1156
1157 # As of this version, all the tracing session's stream infos are
1158 # always given to the viewer when sending the "attach to tracing
1159 # session" reply, so there's nothing new here. Return the `HUP`
1160 # status as, if we're handling this command, the viewer consumed
1161 # all the existing data streams.
1162 status = _LttngLiveViewerGetNewStreamInfosReply.Status.HUP
1163 return _LttngLiveViewerGetNewStreamInfosReply(status, [])
1164
1165 def _handle_get_tracing_session_infos_command(self, cmd):
1166 logging.info('Handling "get tracing session infos" command.')
1167 infos = [
1168 tss.tracing_session_descriptor.info for tss in self._ts_states.values()
1169 ]
1170 infos.sort(key=lambda info: info.name)
1171 return _LttngLiveViewerGetTracingSessionInfosReply(infos)
1172
1173 def _handle_create_viewer_session_command(self, cmd):
1174 logging.info('Handling "create viewer session" command.')
1175 status = _LttngLiveViewerCreateViewerSessionReply.Status.OK
1176
1177 # This does nothing here. In the LTTng relay daemon, it
1178 # allocates the viewer session's state.
1179 return _LttngLiveViewerCreateViewerSessionReply(status)
1180
1181
1182# An LTTng live TCP server.
1183#
1184# On creation, it binds to `localhost` with an OS-assigned TCP port. It writes
1185# the decimal TCP port number to a temporary port file. It renames the
1186# temporary port file to `port_filename`.
1187#
1188# `tracing_session_descriptors` is a list of tracing session descriptors
1189# (`LttngTracingSessionDescriptor`) to serve.
1190#
1191# This server accepts a single viewer (client).
1192#
1193# When the viewer closes the connection, the server's constructor
1194# returns.
1195class LttngLiveServer:
1196 def __init__(
1197 self, port_filename, tracing_session_descriptors, max_query_data_response_size
1198 ):
1199 logging.info('Server configuration:')
1200
1201 logging.info(' Port file name: `{}`'.format(port_filename))
1202
1203 if max_query_data_response_size is not None:
1204 logging.info(
1205 ' Maximum response data query size: `{}`'.format(
1206 max_query_data_response_size
1207 )
1208 )
1209
1210 for ts_descr in tracing_session_descriptors:
1211 info = ts_descr.info
1212 fmt = ' TS descriptor: name="{}", id={}, hostname="{}", live-timer-freq={}, client-count={}, stream-count={}:'
1213 logging.info(
1214 fmt.format(
1215 info.name,
1216 info.tracing_session_id,
1217 info.hostname,
1218 info.live_timer_freq,
1219 info.client_count,
1220 info.stream_count,
1221 )
1222 )
1223
1224 for trace in ts_descr.traces:
1225 logging.info(' Trace: path="{}"'.format(trace.path))
1226
1227 self._ts_descriptors = tracing_session_descriptors
1228 self._max_query_data_response_size = max_query_data_response_size
1229 self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1230 self._codec = _LttngLiveViewerProtocolCodec()
1231
1232 # Port 0: OS assigns an unused port
1233 serv_addr = ('localhost', 0)
1234 self._sock.bind(serv_addr)
1235 self._write_port_to_file(port_filename)
1236
1237 try:
1238 self._listen()
1239 finally:
1240 self._sock.close()
1241 logging.info('Closed connection and socket.')
1242
1243 @property
1244 def _server_port(self):
1245 return self._sock.getsockname()[1]
1246
1247 def _recv_command(self):
1248 data = bytes()
1249
1250 while True:
1251 logging.info('Waiting for viewer command.')
1252 buf = self._conn.recv(128)
1253
1254 if not buf:
1255 logging.info('Client closed connection.')
1256
1257 if data:
1258 raise UnexpectedInput(
1259 'Client closed connection after having sent {} command bytes.'.format(
1260 len(data)
1261 )
1262 )
1263
1264 return
1265
1266 logging.info('Received data from viewer: length={}'.format(len(buf)))
1267
1268 data += buf
1269
1270 try:
1271 cmd = self._codec.decode(data)
1272 except struct.error as exc:
1273 raise UnexpectedInput('Malformed command: {}'.format(exc)) from exc
1274
1275 if cmd is not None:
1276 logging.info(
1277 'Received command from viewer: cmd-cls-name={}'.format(
1278 cmd.__class__.__name__
1279 )
1280 )
1281 return cmd
1282
1283 def _send_reply(self, reply):
1284 data = self._codec.encode(reply)
1285 logging.info(
1286 'Sending reply to viewer: reply-cls-name={}, length={}'.format(
1287 reply.__class__.__name__, len(data)
1288 )
1289 )
1290 self._conn.sendall(data)
1291
1292 def _handle_connection(self):
1293 # First command must be "connect"
1294 cmd = self._recv_command()
1295
1296 if type(cmd) is not _LttngLiveViewerConnectCommand:
1297 raise UnexpectedInput(
1298 'First command is not "connect": cmd-cls-name={}'.format(
1299 cmd.__class__.__name__
1300 )
1301 )
1302
1303 # Create viewer session (arbitrary ID 23)
1304 logging.info(
1305 'LTTng live viewer connected: version={}.{}'.format(cmd.major, cmd.minor)
1306 )
1307 viewer_session = _LttngLiveViewerSession(
1308 23, self._ts_descriptors, self._max_query_data_response_size
1309 )
1310
1311 # Send "connect" reply
1312 self._send_reply(
1313 _LttngLiveViewerConnectReply(viewer_session.viewer_session_id, 2, 10)
1314 )
1315
1316 # Make the viewer session handle the remaining commands
1317 while True:
1318 cmd = self._recv_command()
1319
1320 if cmd is None:
1321 # Connection closed (at an expected location within the
1322 # conversation)
1323 return
1324
1325 self._send_reply(viewer_session.handle_command(cmd))
1326
1327 def _listen(self):
1328 logging.info('Listening: port={}'.format(self._server_port))
1726ac08
JR
1329 # Backlog must be present for Python version < 3.5.
1330 # 128 is an arbitrary number since we expect only 1 connection anyway.
1331 self._sock.listen(128)
584af91e
PP
1332 self._conn, viewer_addr = self._sock.accept()
1333 logging.info(
1334 'Accepted viewer: addr={}:{}'.format(viewer_addr[0], viewer_addr[1])
1335 )
1336
1337 try:
1338 self._handle_connection()
1339 finally:
1340 self._conn.close()
1341
1342 def _write_port_to_file(self, port_filename):
1343 # Write the port number to a temporary file.
1344 with tempfile.NamedTemporaryFile(mode='w', delete=False) as tmp_port_file:
1345 print(self._server_port, end='', file=tmp_port_file)
1346
1347 # Rename temporary file to real file
9c878ece 1348 os.replace(tmp_port_file.name, port_filename)
584af91e
PP
1349 logging.info(
1350 'Renamed port file: src-path="{}", dst-path="{}"'.format(
1351 tmp_port_file.name, port_filename
1352 )
1353 )
1354
1355
1356# A tracing session descriptor.
1357#
1358# In the constructor, `traces` is a list of LTTng traces (`LttngTrace`
1359# objects).
1360class LttngTracingSessionDescriptor:
1361 def __init__(
1362 self, name, tracing_session_id, hostname, live_timer_freq, client_count, traces
1363 ):
1364 for trace in traces:
1365 if name not in trace.path:
1366 fmt = 'Tracing session name must be part of every trace path (`{}` not found in `{}`)'
1367 raise ValueError(fmt.format(name, trace.path))
1368
1369 self._traces = traces
1370 stream_count = sum([len(t) + 1 for t in traces])
1371 self._info = _LttngLiveViewerTracingSessionInfo(
1372 tracing_session_id,
1373 live_timer_freq,
1374 client_count,
1375 stream_count,
1376 hostname,
1377 name,
1378 )
1379
1380 @property
1381 def traces(self):
1382 return self._traces
1383
1384 @property
1385 def info(self):
1386 return self._info
1387
1388
1389def _tracing_session_descriptors_from_arg(string):
1390 # Format is:
1391 # NAME,ID,HOSTNAME,FREQ,CLIENTS,TRACEPATH[,TRACEPATH]...
1392 parts = string.split(',')
1393 name = parts[0]
1394 tracing_session_id = int(parts[1])
1395 hostname = parts[2]
1396 live_timer_freq = int(parts[3])
1397 client_count = int(parts[4])
1398 traces = [LttngTrace(path) for path in parts[5:]]
1399 return LttngTracingSessionDescriptor(
1400 name, tracing_session_id, hostname, live_timer_freq, client_count, traces
1401 )
1402
1403
1404def _loglevel_parser(string):
1405 loglevels = {'info': logging.INFO, 'warning': logging.WARNING}
1406 if string not in loglevels:
1407 msg = "{} is not a valid loglevel".format(string)
1408 raise argparse.ArgumentTypeError(msg)
1409 return loglevels[string]
1410
1411
1412if __name__ == '__main__':
1413 logging.basicConfig(format='# %(asctime)-25s%(message)s')
1414 parser = argparse.ArgumentParser(
1415 description='LTTng-live protocol mocker', add_help=False
1416 )
1417 parser.add_argument(
1418 '--log-level',
1419 default='warning',
1420 choices=['info', 'warning'],
1421 help='The loglevel to be used.',
1422 )
1423
1424 loglevel_namespace, remaining_args = parser.parse_known_args()
1425 logging.getLogger().setLevel(_loglevel_parser(loglevel_namespace.log_level))
1426
1427 parser.add_argument(
1428 '--port-filename',
1429 help='The final port file. This file is present when the server is ready to receive connection.',
1430 required=True,
1431 )
1432 parser.add_argument(
1433 '--max-query-data-response-size',
1434 type=int,
1435 help='The maximum size of control data response in bytes',
1436 )
1437 parser.add_argument(
1438 'sessions',
1439 nargs="+",
1440 metavar="SESSION",
1441 type=_tracing_session_descriptors_from_arg,
1442 help='A session configuration. There is no space after comma. Format is: NAME,ID,HOSTNAME,FREQ,CLIENTS,TRACEPATH[,TRACEPATH]....',
1443 )
1444 parser.add_argument(
1445 '-h',
1446 '--help',
1447 action='help',
1448 default=argparse.SUPPRESS,
1449 help='Show this help message and exit.',
1450 )
1451
1452 args = parser.parse_args(args=remaining_args)
1453 try:
1454 LttngLiveServer(
1455 args.port_filename, args.sessions, args.max_query_data_response_size
1456 )
1457 except UnexpectedInput as exc:
1458 logging.error(str(exc))
1459 print(exc, file=sys.stderr)
1460 sys.exit(1)
This page took 0.075894 seconds and 4 git commands to generate.