tests: retry os.rename on PermissionError failure in lttng_live_server.py
[babeltrace.git] / tests / data / plugins / src.ctf.lttng-live / lttng_live_server.py
CommitLineData
0198f2fb
PP
1# The MIT License (MIT)
2#
3# Copyright (c) 2019 Philippe Proulx <pproulx@efficios.com>
4#
5# Permission is hereby granted, free of charge, to any person obtaining a copy
6# of this software and associated documentation files (the "Software"), to deal
7# in the Software without restriction, including without limitation the rights
8# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9# copies of the Software, and to permit persons to whom the Software is
10# furnished to do so, subject to the following conditions:
11#
12# The above copyright notice and this permission notice shall be included in
13# all copies or substantial portions of the Software.
14#
15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21# THE SOFTWARE.
22
23import argparse
24import collections.abc
25import logging
26import os
27import os.path
28import re
e21e74d3 29import time
0198f2fb
PP
30import socket
31import struct
32import sys
33import tempfile
34
35
36class UnexpectedInput(RuntimeError):
37 pass
38
39
40class _LttngLiveViewerCommand:
41 def __init__(self, version):
42 self._version = version
43
44 @property
45 def version(self):
46 return self._version
47
48
49class _LttngLiveViewerConnectCommand(_LttngLiveViewerCommand):
50 def __init__(self, version, viewer_session_id, major, minor):
51 super().__init__(version)
52 self._viewer_session_id = viewer_session_id
53 self._major = major
54 self._minor = minor
55
56 @property
57 def viewer_session_id(self):
58 return self._viewer_session_id
59
60 @property
61 def major(self):
62 return self._major
63
64 @property
65 def minor(self):
66 return self._minor
67
68
69class _LttngLiveViewerConnectReply:
70 def __init__(self, viewer_session_id, major, minor):
71 self._viewer_session_id = viewer_session_id
72 self._major = major
73 self._minor = minor
74
75 @property
76 def viewer_session_id(self):
77 return self._viewer_session_id
78
79 @property
80 def major(self):
81 return self._major
82
83 @property
84 def minor(self):
85 return self._minor
86
87
88class _LttngLiveViewerGetTracingSessionInfosCommand(_LttngLiveViewerCommand):
89 pass
90
91
92class _LttngLiveViewerTracingSessionInfo:
93 def __init__(
94 self,
95 tracing_session_id,
96 live_timer_freq,
97 client_count,
98 stream_count,
99 hostname,
100 name,
101 ):
102 self._tracing_session_id = tracing_session_id
103 self._live_timer_freq = live_timer_freq
104 self._client_count = client_count
105 self._stream_count = stream_count
106 self._hostname = hostname
107 self._name = name
108
109 @property
110 def tracing_session_id(self):
111 return self._tracing_session_id
112
113 @property
114 def live_timer_freq(self):
115 return self._live_timer_freq
116
117 @property
118 def client_count(self):
119 return self._client_count
120
121 @property
122 def stream_count(self):
123 return self._stream_count
124
125 @property
126 def hostname(self):
127 return self._hostname
128
129 @property
130 def name(self):
131 return self._name
132
133
134class _LttngLiveViewerGetTracingSessionInfosReply:
135 def __init__(self, tracing_session_infos):
136 self._tracing_session_infos = tracing_session_infos
137
138 @property
139 def tracing_session_infos(self):
140 return self._tracing_session_infos
141
142
143class _LttngLiveViewerAttachToTracingSessionCommand(_LttngLiveViewerCommand):
144 class SeekType:
145 BEGINNING = 1
146 LAST = 2
147
148 def __init__(self, version, tracing_session_id, offset, seek_type):
149 super().__init__(version)
150 self._tracing_session_id = tracing_session_id
151 self._offset = offset
152 self._seek_type = seek_type
153
154 @property
155 def tracing_session_id(self):
156 return self._tracing_session_id
157
158 @property
159 def offset(self):
160 return self._offset
161
162 @property
163 def seek_type(self):
164 return self._seek_type
165
166
167class _LttngLiveViewerStreamInfo:
168 def __init__(self, id, trace_id, is_metadata, path, channel_name):
169 self._id = id
170 self._trace_id = trace_id
171 self._is_metadata = is_metadata
172 self._path = path
173 self._channel_name = channel_name
174
175 @property
176 def id(self):
177 return self._id
178
179 @property
180 def trace_id(self):
181 return self._trace_id
182
183 @property
184 def is_metadata(self):
185 return self._is_metadata
186
187 @property
188 def path(self):
189 return self._path
190
191 @property
192 def channel_name(self):
193 return self._channel_name
194
195
196class _LttngLiveViewerAttachToTracingSessionReply:
197 class Status:
198 OK = 1
199 ALREADY = 2
200 UNKNOWN = 3
201 NOT_LIVE = 4
202 SEEK_ERROR = 5
203 NO_SESSION = 6
204
205 def __init__(self, status, stream_infos):
206 self._status = status
207 self._stream_infos = stream_infos
208
209 @property
210 def status(self):
211 return self._status
212
213 @property
214 def stream_infos(self):
215 return self._stream_infos
216
217
218class _LttngLiveViewerGetNextDataStreamIndexEntryCommand(_LttngLiveViewerCommand):
219 def __init__(self, version, stream_id):
220 super().__init__(version)
221 self._stream_id = stream_id
222
223 @property
224 def stream_id(self):
225 return self._stream_id
226
227
228class _LttngLiveViewerGetNextDataStreamIndexEntryReply:
229 class Status:
230 OK = 1
231 RETRY = 2
232 HUP = 3
233 ERROR = 4
234 INACTIVE = 5
235 EOF = 6
236
237 def __init__(self, status, index_entry, has_new_metadata, has_new_data_stream):
238 self._status = status
239 self._index_entry = index_entry
240 self._has_new_metadata = has_new_metadata
241 self._has_new_data_stream = has_new_data_stream
242
243 @property
244 def status(self):
245 return self._status
246
247 @property
248 def index_entry(self):
249 return self._index_entry
250
251 @property
252 def has_new_metadata(self):
253 return self._has_new_metadata
254
255 @property
256 def has_new_data_stream(self):
257 return self._has_new_data_stream
258
259
260class _LttngLiveViewerGetDataStreamPacketDataCommand(_LttngLiveViewerCommand):
261 def __init__(self, version, stream_id, offset, req_length):
262 super().__init__(version)
263 self._stream_id = stream_id
264 self._offset = offset
265 self._req_length = req_length
266
267 @property
268 def stream_id(self):
269 return self._stream_id
270
271 @property
272 def offset(self):
273 return self._offset
274
275 @property
276 def req_length(self):
277 return self._req_length
278
279
280class _LttngLiveViewerGetDataStreamPacketDataReply:
281 class Status:
282 OK = 1
283 RETRY = 2
284 ERROR = 3
285 EOF = 4
286
287 def __init__(self, status, data, has_new_metadata, has_new_data_stream):
288 self._status = status
289 self._data = data
290 self._has_new_metadata = has_new_metadata
291 self._has_new_data_stream = has_new_data_stream
292
293 @property
294 def status(self):
295 return self._status
296
297 @property
298 def data(self):
299 return self._data
300
301 @property
302 def has_new_metadata(self):
303 return self._has_new_metadata
304
305 @property
306 def has_new_data_stream(self):
307 return self._has_new_data_stream
308
309
310class _LttngLiveViewerGetMetadataStreamDataCommand(_LttngLiveViewerCommand):
311 def __init__(self, version, stream_id):
312 super().__init__(version)
313 self._stream_id = stream_id
314
315 @property
316 def stream_id(self):
317 return self._stream_id
318
319
320class _LttngLiveViewerGetMetadataStreamDataContentReply:
321 class Status:
322 OK = 1
323 NO_NEW = 2
324 ERROR = 3
325
326 def __init__(self, status, data):
327 self._status = status
328 self._data = data
329
330 @property
331 def status(self):
332 return self._status
333
334 @property
335 def data(self):
336 return self._data
337
338
339class _LttngLiveViewerGetNewStreamInfosCommand(_LttngLiveViewerCommand):
340 def __init__(self, version, tracing_session_id):
341 super().__init__(version)
342 self._tracing_session_id = tracing_session_id
343
344 @property
345 def tracing_session_id(self):
346 return self._tracing_session_id
347
348
349class _LttngLiveViewerGetNewStreamInfosReply:
350 class Status:
351 OK = 1
352 NO_NEW = 2
353 ERROR = 3
354 HUP = 4
355
356 def __init__(self, status, stream_infos):
357 self._status = status
358 self._stream_infos = stream_infos
359
360 @property
361 def status(self):
362 return self._status
363
364 @property
365 def stream_infos(self):
366 return self._stream_infos
367
368
369class _LttngLiveViewerCreateViewerSessionCommand(_LttngLiveViewerCommand):
370 pass
371
372
373class _LttngLiveViewerCreateViewerSessionReply:
374 class Status:
375 OK = 1
376 ERROR = 2
377
378 def __init__(self, status):
379 self._status = status
380
381 @property
382 def status(self):
383 return self._status
384
385
386class _LttngLiveViewerDetachFromTracingSessionCommand(_LttngLiveViewerCommand):
387 def __init__(self, version, tracing_session_id):
388 super().__init__(version)
389 self._tracing_session_id = tracing_session_id
390
391 @property
392 def tracing_session_id(self):
393 return self._tracing_session_id
394
395
396class _LttngLiveViewerDetachFromTracingSessionReply:
397 class Status:
398 OK = 1
399 UNKNOWN = 2
400 ERROR = 3
401
402 def __init__(self, status):
403 self._status = status
404
405 @property
406 def status(self):
407 return self._status
408
409
410# An LTTng live protocol codec can convert bytes to command objects and
411# reply objects to bytes.
412class _LttngLiveViewerProtocolCodec:
413 _COMMAND_HEADER_STRUCT_FMT = 'QII'
414 _COMMAND_HEADER_SIZE_BYTES = struct.calcsize(_COMMAND_HEADER_STRUCT_FMT)
415
416 def __init__(self):
417 pass
418
419 def _unpack(self, fmt, data, offset=0):
420 fmt = '!' + fmt
421 return struct.unpack_from(fmt, data, offset)
422
423 def _unpack_payload(self, fmt, data):
424 return self._unpack(
425 fmt, data, _LttngLiveViewerProtocolCodec._COMMAND_HEADER_SIZE_BYTES
426 )
427
428 def decode(self, data):
429 if len(data) < self._COMMAND_HEADER_SIZE_BYTES:
430 # Not enough data to read the command header
431 return
432
433 payload_size, cmd_type, version = self._unpack(
434 self._COMMAND_HEADER_STRUCT_FMT, data
435 )
436 logging.info(
437 'Decoded command header: payload-size={}, cmd-type={}, version={}'.format(
438 payload_size, cmd_type, version
439 )
440 )
441
442 if len(data) < self._COMMAND_HEADER_SIZE_BYTES + payload_size:
443 # Not enough data to read the whole command
444 return
445
446 if cmd_type == 1:
447 viewer_session_id, major, minor, conn_type = self._unpack_payload(
448 'QIII', data
449 )
450 return _LttngLiveViewerConnectCommand(
451 version, viewer_session_id, major, minor
452 )
453 elif cmd_type == 2:
454 return _LttngLiveViewerGetTracingSessionInfosCommand(version)
455 elif cmd_type == 3:
456 tracing_session_id, offset, seek_type = self._unpack_payload('QQI', data)
457 return _LttngLiveViewerAttachToTracingSessionCommand(
458 version, tracing_session_id, offset, seek_type
459 )
460 elif cmd_type == 4:
c1491996 461 (stream_id,) = self._unpack_payload('Q', data)
0198f2fb
PP
462 return _LttngLiveViewerGetNextDataStreamIndexEntryCommand(
463 version, stream_id
464 )
465 elif cmd_type == 5:
466 stream_id, offset, req_length = self._unpack_payload('QQI', data)
467 return _LttngLiveViewerGetDataStreamPacketDataCommand(
468 version, stream_id, offset, req_length
469 )
470 elif cmd_type == 6:
c1491996 471 (stream_id,) = self._unpack_payload('Q', data)
0198f2fb
PP
472 return _LttngLiveViewerGetMetadataStreamDataCommand(version, stream_id)
473 elif cmd_type == 7:
c1491996 474 (tracing_session_id,) = self._unpack_payload('Q', data)
0198f2fb
PP
475 return _LttngLiveViewerGetNewStreamInfosCommand(version, tracing_session_id)
476 elif cmd_type == 8:
477 return _LttngLiveViewerCreateViewerSessionCommand(version)
478 elif cmd_type == 9:
c1491996 479 (tracing_session_id,) = self._unpack_payload('Q', data)
0198f2fb
PP
480 return _LttngLiveViewerDetachFromTracingSessionCommand(
481 version, tracing_session_id
482 )
483 else:
484 raise UnexpectedInput('Unknown command type {}'.format(cmd_type))
485
486 def _pack(self, fmt, *args):
487 # Force network byte order
488 return struct.pack('!' + fmt, *args)
489
490 def _encode_zero_padded_str(self, string, length):
491 data = string.encode()
492 return data.ljust(length, b'\x00')
493
494 def _encode_stream_info(self, info):
495 data = self._pack('QQI', info.id, info.trace_id, int(info.is_metadata))
496 data += self._encode_zero_padded_str(info.path, 4096)
497 data += self._encode_zero_padded_str(info.channel_name, 255)
498 return data
499
500 def _get_has_new_stuff_flags(self, has_new_metadata, has_new_data_streams):
501 flags = 0
502
503 if has_new_metadata:
504 flags |= 1
505
506 if has_new_data_streams:
507 flags |= 2
508
509 return flags
510
511 def encode(self, reply):
512 if type(reply) is _LttngLiveViewerConnectReply:
513 data = self._pack(
514 'QIII', reply.viewer_session_id, reply.major, reply.minor, 2
515 )
516 elif type(reply) is _LttngLiveViewerGetTracingSessionInfosReply:
517 data = self._pack('I', len(reply.tracing_session_infos))
518
519 for info in reply.tracing_session_infos:
520 data += self._pack(
521 'QIII',
522 info.tracing_session_id,
523 info.live_timer_freq,
524 info.client_count,
525 info.stream_count,
526 )
527 data += self._encode_zero_padded_str(info.hostname, 64)
528 data += self._encode_zero_padded_str(info.name, 255)
529 elif type(reply) is _LttngLiveViewerAttachToTracingSessionReply:
530 data = self._pack('II', reply.status, len(reply.stream_infos))
531
532 for info in reply.stream_infos:
533 data += self._encode_stream_info(info)
534 elif type(reply) is _LttngLiveViewerGetNextDataStreamIndexEntryReply:
535 entry = reply.index_entry
536 flags = self._get_has_new_stuff_flags(
537 reply.has_new_metadata, reply.has_new_data_stream
538 )
539
540 data = self._pack(
541 'QQQQQQQII',
542 entry.offset_bytes,
543 entry.total_size_bits,
544 entry.content_size_bits,
545 entry.timestamp_begin,
546 entry.timestamp_end,
547 entry.events_discarded,
548 entry.stream_class_id,
549 reply.status,
550 flags,
551 )
552 elif type(reply) is _LttngLiveViewerGetDataStreamPacketDataReply:
553 flags = self._get_has_new_stuff_flags(
554 reply.has_new_metadata, reply.has_new_data_stream
555 )
556 data = self._pack('III', reply.status, len(reply.data), flags)
557 data += reply.data
558 elif type(reply) is _LttngLiveViewerGetMetadataStreamDataContentReply:
559 data = self._pack('QI', len(reply.data), reply.status)
560 data += reply.data
561 elif type(reply) is _LttngLiveViewerGetNewStreamInfosReply:
562 data = self._pack('II', reply.status, len(reply.stream_infos))
563
564 for info in reply.stream_infos:
565 data += self._encode_stream_info(info)
566 elif type(reply) is _LttngLiveViewerCreateViewerSessionReply:
567 data = self._pack('I', reply.status)
568 elif type(reply) is _LttngLiveViewerDetachFromTracingSessionReply:
569 data = self._pack('I', reply.status)
570 else:
571 raise ValueError(
572 'Unknown reply object with class `{}`'.format(reply.__class__.__name__)
573 )
574
575 return data
576
577
578# An entry within the index of an LTTng data stream.
579class _LttngDataStreamIndexEntry:
580 def __init__(
581 self,
582 offset_bytes,
583 total_size_bits,
584 content_size_bits,
585 timestamp_begin,
586 timestamp_end,
587 events_discarded,
588 stream_class_id,
589 ):
590 self._offset_bytes = offset_bytes
591 self._total_size_bits = total_size_bits
592 self._content_size_bits = content_size_bits
593 self._timestamp_begin = timestamp_begin
594 self._timestamp_end = timestamp_end
595 self._events_discarded = events_discarded
596 self._stream_class_id = stream_class_id
597
598 @property
599 def offset_bytes(self):
600 return self._offset_bytes
601
602 @property
603 def total_size_bits(self):
604 return self._total_size_bits
605
606 @property
607 def total_size_bytes(self):
608 return self._total_size_bits // 8
609
610 @property
611 def content_size_bits(self):
612 return self._content_size_bits
613
614 @property
615 def content_size_bytes(self):
616 return self._content_size_bits // 8
617
618 @property
619 def timestamp_begin(self):
620 return self._timestamp_begin
621
622 @property
623 def timestamp_end(self):
624 return self._timestamp_end
625
626 @property
627 def events_discarded(self):
628 return self._events_discarded
629
630 @property
631 def stream_class_id(self):
632 return self._stream_class_id
633
634
635# The index of an LTTng data stream, a sequence of index entries.
636class _LttngDataStreamIndex(collections.abc.Sequence):
637 def __init__(self, path):
638 self._path = path
639 self._build()
640 logging.info(
641 'Built data stream index entries: path="{}", count={}'.format(
642 path, len(self._entries)
643 )
644 )
645
646 def _build(self):
647 self._entries = []
648 assert os.path.isfile(self._path)
649
650 with open(self._path, 'rb') as f:
651 # Read header first
652 fmt = '>IIII'
653 size = struct.calcsize(fmt)
654 data = f.read(size)
655 assert len(data) == size
656 magic, index_major, index_minor, index_entry_length = struct.unpack(
657 fmt, data
658 )
659 assert magic == 0xC1F1DCC1
660
661 # Read index entries
662 fmt = '>QQQQQQQ'
663 size = struct.calcsize(fmt)
664
665 while True:
666 logging.debug(
667 'Decoding data stream index entry: path="{}", offset={}'.format(
668 self._path, f.tell()
669 )
670 )
671 data = f.read(size)
672
673 if not data:
674 # Done
675 break
676
677 assert len(data) == size
c1491996
FD
678 (
679 offset_bytes,
680 total_size_bits,
681 content_size_bits,
682 timestamp_begin,
683 timestamp_end,
684 events_discarded,
685 stream_class_id,
686 ) = struct.unpack(fmt, data)
0198f2fb
PP
687
688 self._entries.append(
689 _LttngDataStreamIndexEntry(
690 offset_bytes,
691 total_size_bits,
692 content_size_bits,
693 timestamp_begin,
694 timestamp_end,
695 events_discarded,
696 stream_class_id,
697 )
698 )
699
700 # Skip anything else before the next entry
701 f.seek(index_entry_length - size, os.SEEK_CUR)
702
703 def __getitem__(self, index):
704 return self._entries[index]
705
706 def __len__(self):
707 return len(self._entries)
708
709 @property
710 def path(self):
711 return self._path
712
713
714# An LTTng data stream.
715class _LttngDataStream:
716 def __init__(self, path):
717 self._path = path
718 filename = os.path.basename(path)
719 match = re.match(r'(.*)_\d+', filename)
720 self._channel_name = match.group(1)
721 trace_dir = os.path.dirname(path)
722 index_path = os.path.join(trace_dir, 'index', filename + '.idx')
723 self._index = _LttngDataStreamIndex(index_path)
724 assert os.path.isfile(path)
725 self._file = open(path, 'rb')
726 logging.info(
727 'Built data stream: path="{}", channel-name="{}"'.format(
728 path, self._channel_name
729 )
730 )
731
732 @property
733 def path(self):
734 return self._path
735
736 @property
737 def channel_name(self):
738 return self._channel_name
739
740 @property
741 def index(self):
742 return self._index
743
744 def get_data(self, offset_bytes, len_bytes):
745 self._file.seek(offset_bytes)
746 return self._file.read(len_bytes)
747
748
749# An LTTng metadata stream.
750class _LttngMetadataStream:
751 def __init__(self, path):
752 self._path = path
753 logging.info('Built metadata stream: path="{}"'.format(path))
754
755 @property
756 def path(self):
757 return self._path
758
759 @property
760 def data(self):
761 assert os.path.isfile(self._path)
762
763 with open(self._path, 'rb') as f:
764 return f.read()
765
766
767# An LTTng trace, a sequence of LTTng data streams.
768class LttngTrace(collections.abc.Sequence):
769 def __init__(self, trace_dir):
770 assert os.path.isdir(trace_dir)
771 self._path = trace_dir
772 self._metadata_stream = _LttngMetadataStream(
773 os.path.join(trace_dir, 'metadata')
774 )
775 self._create_data_streams(trace_dir)
776 logging.info('Built trace: path="{}"'.format(trace_dir))
777
778 def _create_data_streams(self, trace_dir):
779 data_stream_paths = []
780
781 for filename in os.listdir(trace_dir):
782 path = os.path.join(trace_dir, filename)
783
784 if not os.path.isfile(path):
785 continue
786
787 if filename.startswith('.'):
788 continue
789
790 if filename == 'metadata':
791 continue
792
793 data_stream_paths.append(path)
794
795 data_stream_paths.sort()
796 self._data_streams = []
797
798 for data_stream_path in data_stream_paths:
799 self._data_streams.append(_LttngDataStream(data_stream_path))
800
801 @property
802 def path(self):
803 return self._path
804
805 @property
806 def metadata_stream(self):
807 return self._metadata_stream
808
809 def __getitem__(self, index):
810 return self._data_streams[index]
811
812 def __len__(self):
813 return len(self._data_streams)
814
815
816# The state of a single data stream.
817class _LttngLiveViewerSessionDataStreamState:
818 def __init__(self, ts_state, info, data_stream):
819 self._ts_state = ts_state
820 self._info = info
821 self._data_stream = data_stream
822 self._cur_index_entry_index = 0
823 fmt = 'Built data stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
824 logging.info(
825 fmt.format(
826 info.id,
827 ts_state.tracing_session_descriptor.info.tracing_session_id,
828 ts_state.tracing_session_descriptor.info.name,
829 data_stream.path,
830 )
831 )
832
833 @property
834 def tracing_session_state(self):
835 return self._ts_state
836
837 @property
838 def info(self):
839 return self._info
840
841 @property
842 def data_stream(self):
843 return self._data_stream
844
845 @property
846 def cur_index_entry(self):
847 if self._cur_index_entry_index == len(self._data_stream.index):
848 return
849
850 return self._data_stream.index[self._cur_index_entry_index]
851
852 def goto_next_index_entry(self):
853 self._cur_index_entry_index += 1
854
855
856# The state of a single metadata stream.
857class _LttngLiveViewerSessionMetadataStreamState:
858 def __init__(self, ts_state, info, metadata_stream):
859 self._ts_state = ts_state
860 self._info = info
861 self._metadata_stream = metadata_stream
862 self._is_sent = False
863 fmt = 'Built metadata stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
864 logging.info(
865 fmt.format(
866 info.id,
867 ts_state.tracing_session_descriptor.info.tracing_session_id,
868 ts_state.tracing_session_descriptor.info.name,
869 metadata_stream.path,
870 )
871 )
872
873 @property
874 def trace_session_state(self):
875 return self._trace_session_state
876
877 @property
878 def info(self):
879 return self._info
880
881 @property
882 def metadata_stream(self):
883 return self._metadata_stream
884
885 @property
886 def is_sent(self):
887 return self._is_sent
888
889 @is_sent.setter
890 def is_sent(self, value):
891 self._is_sent = value
892
893
894# The state of a tracing session.
895class _LttngLiveViewerSessionTracingSessionState:
896 def __init__(self, tc_descr, base_stream_id):
897 self._tc_descr = tc_descr
898 self._stream_infos = []
899 self._ds_states = {}
900 self._ms_states = {}
901 stream_id = base_stream_id
902
903 for trace in tc_descr.traces:
904 trace_id = stream_id * 1000
905
906 # Data streams -> stream infos and data stream states
907 for data_stream in trace:
908 info = _LttngLiveViewerStreamInfo(
909 stream_id,
910 trace_id,
911 False,
912 data_stream.path,
913 data_stream.channel_name,
914 )
915 self._stream_infos.append(info)
916 self._ds_states[stream_id] = _LttngLiveViewerSessionDataStreamState(
917 self, info, data_stream
918 )
919 stream_id += 1
920
921 # Metadata stream -> stream info and metadata stream state
922 info = _LttngLiveViewerStreamInfo(
923 stream_id, trace_id, True, trace.metadata_stream.path, 'metadata'
924 )
925 self._stream_infos.append(info)
926 self._ms_states[stream_id] = _LttngLiveViewerSessionMetadataStreamState(
927 self, info, trace.metadata_stream
928 )
929 stream_id += 1
930
931 self._is_attached = False
932 fmt = 'Built tracing session state: id={}, name="{}"'
933 logging.info(fmt.format(tc_descr.info.tracing_session_id, tc_descr.info.name))
934
935 @property
936 def tracing_session_descriptor(self):
937 return self._tc_descr
938
939 @property
940 def data_stream_states(self):
941 return self._ds_states
942
943 @property
944 def metadata_stream_states(self):
945 return self._ms_states
946
947 @property
948 def stream_infos(self):
949 return self._stream_infos
950
951 @property
952 def has_new_metadata(self):
953 return any([not ms.is_sent for ms in self._ms_states.values()])
954
955 @property
956 def is_attached(self):
957 return self._is_attached
958
959 @is_attached.setter
960 def is_attached(self, value):
961 self._is_attached = value
962
963
964# An LTTng live viewer session manages a view on tracing sessions
965# and replies to commands accordingly.
966class _LttngLiveViewerSession:
967 def __init__(
968 self,
969 viewer_session_id,
970 tracing_session_descriptors,
971 max_query_data_response_size,
972 ):
973 self._viewer_session_id = viewer_session_id
974 self._ts_states = {}
975 self._stream_states = {}
976 self._max_query_data_response_size = max_query_data_response_size
977 total_stream_infos = 0
978
979 for ts_descr in tracing_session_descriptors:
980 ts_state = _LttngLiveViewerSessionTracingSessionState(
981 ts_descr, total_stream_infos
982 )
983 ts_id = ts_state.tracing_session_descriptor.info.tracing_session_id
984 self._ts_states[ts_id] = ts_state
985 total_stream_infos += len(ts_state.stream_infos)
986
987 # Update session's stream states to have the new states
988 self._stream_states.update(ts_state.data_stream_states)
989 self._stream_states.update(ts_state.metadata_stream_states)
990
991 self._command_handlers = {
992 _LttngLiveViewerAttachToTracingSessionCommand: self._handle_attach_to_tracing_session_command,
993 _LttngLiveViewerCreateViewerSessionCommand: self._handle_create_viewer_session_command,
994 _LttngLiveViewerDetachFromTracingSessionCommand: self._handle_detach_from_tracing_session_command,
995 _LttngLiveViewerGetDataStreamPacketDataCommand: self._handle_get_data_stream_packet_data_command,
996 _LttngLiveViewerGetMetadataStreamDataCommand: self._handle_get_metadata_stream_data_command,
997 _LttngLiveViewerGetNewStreamInfosCommand: self._handle_get_new_stream_infos_command,
998 _LttngLiveViewerGetNextDataStreamIndexEntryCommand: self._handle_get_next_data_stream_index_entry_command,
999 _LttngLiveViewerGetTracingSessionInfosCommand: self._handle_get_tracing_session_infos_command,
1000 }
1001
1002 @property
1003 def viewer_session_id(self):
1004 return self._viewer_session_id
1005
1006 def _get_tracing_session_state(self, tracing_session_id):
1007 if tracing_session_id not in self._ts_states:
1008 raise UnexpectedInput(
1009 'Unknown tracing session ID {}'.format(tracing_session_id)
1010 )
1011
1012 return self._ts_states[tracing_session_id]
1013
1014 def _get_stream_state(self, stream_id):
1015 if stream_id not in self._stream_states:
1016 UnexpectedInput('Unknown stream ID {}'.format(stream_id))
1017
1018 return self._stream_states[stream_id]
1019
1020 def handle_command(self, cmd):
1021 logging.info(
1022 'Handling command in viewer session: cmd-cls-name={}'.format(
1023 cmd.__class__.__name__
1024 )
1025 )
1026 cmd_type = type(cmd)
1027
1028 if cmd_type not in self._command_handlers:
1029 raise UnexpectedInput(
1030 'Unexpected command: cmd-cls-name={}'.format(cmd.__class__.__name__)
1031 )
1032
1033 return self._command_handlers[cmd_type](cmd)
1034
1035 def _handle_attach_to_tracing_session_command(self, cmd):
1036 fmt = 'Handling "attach to tracing session" command: ts-id={}, offset={}, seek-type={}'
1037 logging.info(fmt.format(cmd.tracing_session_id, cmd.offset, cmd.seek_type))
1038 ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
1039 info = ts_state.tracing_session_descriptor.info
1040
1041 if ts_state.is_attached:
1042 raise UnexpectedInput(
1043 'Cannot attach to tracing session `{}`: viewer is already attached'.format(
1044 info.name
1045 )
1046 )
1047
1048 ts_state.is_attached = True
1049 status = _LttngLiveViewerAttachToTracingSessionReply.Status.OK
1050 return _LttngLiveViewerAttachToTracingSessionReply(
1051 status, ts_state.stream_infos
1052 )
1053
1054 def _handle_detach_from_tracing_session_command(self, cmd):
1055 fmt = 'Handling "detach from tracing session" command: ts-id={}'
1056 logging.info(fmt.format(cmd.tracing_session_id))
1057 ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
1058 info = ts_state.tracing_session_descriptor.info
1059
1060 if not ts_state.is_attached:
1061 raise UnexpectedInput(
1062 'Cannot detach to tracing session `{}`: viewer is not attached'.format(
1063 info.name
1064 )
1065 )
1066
1067 ts_state.is_attached = False
1068 status = _LttngLiveViewerDetachFromTracingSessionReply.Status.OK
1069 return _LttngLiveViewerDetachFromTracingSessionReply(status)
1070
1071 def _handle_get_next_data_stream_index_entry_command(self, cmd):
1072 fmt = 'Handling "get next data stream index entry" command: stream-id={}'
1073 logging.info(fmt.format(cmd.stream_id))
1074 stream_state = self._get_stream_state(cmd.stream_id)
1075
1076 if type(stream_state) is not _LttngLiveViewerSessionDataStreamState:
1077 raise UnexpectedInput(
1078 'Stream with ID {} is not a data stream'.format(cmd.stream_id)
1079 )
1080
1081 if stream_state.cur_index_entry is None:
1082 # The viewer is done reading this stream
1083 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.HUP
1084
1085 # Dummy data stream index entry to use with the `HUP` status
1086 # (the reply needs one, but the viewer ignores it)
1087 index_entry = _LttngDataStreamIndexEntry(0, 0, 0, 0, 0, 0, 0)
1088
1089 return _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1090 status, index_entry, False, False
1091 )
1092
1093 # The viewer only checks the `has_new_metadata` flag if the
1094 # reply's status is `OK`, so we need to provide an index here
1095 has_new_metadata = stream_state.tracing_session_state.has_new_metadata
1096 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.OK
1097 reply = _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1098 status, stream_state.cur_index_entry, has_new_metadata, False
1099 )
1100 stream_state.goto_next_index_entry()
1101 return reply
1102
1103 def _handle_get_data_stream_packet_data_command(self, cmd):
1104 fmt = 'Handling "get data stream packet data" command: stream-id={}, offset={}, req-length={}'
1105 logging.info(fmt.format(cmd.stream_id, cmd.offset, cmd.req_length))
1106 stream_state = self._get_stream_state(cmd.stream_id)
1107 data_response_length = cmd.req_length
1108
1109 if type(stream_state) is not _LttngLiveViewerSessionDataStreamState:
1110 raise UnexpectedInput(
1111 'Stream with ID {} is not a data stream'.format(cmd.stream_id)
1112 )
1113
1114 if stream_state.tracing_session_state.has_new_metadata:
1115 status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.ERROR
1116 return _LttngLiveViewerGetDataStreamPacketDataReply(
1117 status, bytes(), True, False
1118 )
1119
1120 if self._max_query_data_response_size:
1121 # Enforce a server side limit on the query requested length.
1122 # To ensure that the transaction terminate take the minimum of both
1123 # value.
1124 data_response_length = min(
1125 cmd.req_length, self._max_query_data_response_size
1126 )
1127 fmt = 'Limiting "get data stream packet data" command: req-length={} actual response size={}'
1128 logging.info(fmt.format(cmd.req_length, data_response_length))
1129
1130 data = stream_state.data_stream.get_data(cmd.offset, data_response_length)
1131 status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.OK
1132 return _LttngLiveViewerGetDataStreamPacketDataReply(status, data, False, False)
1133
1134 def _handle_get_metadata_stream_data_command(self, cmd):
1135 fmt = 'Handling "get metadata stream data" command: stream-id={}'
1136 logging.info(fmt.format(cmd.stream_id))
1137 stream_state = self._get_stream_state(cmd.stream_id)
1138
1139 if type(stream_state) is not _LttngLiveViewerSessionMetadataStreamState:
1140 raise UnexpectedInput(
1141 'Stream with ID {} is not a metadata stream'.format(cmd.stream_id)
1142 )
1143
1144 if stream_state.is_sent:
1145 status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.NO_NEW
1146 return _LttngLiveViewerGetMetadataStreamDataContentReply(status, bytes())
1147
1148 stream_state.is_sent = True
1149 status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.OK
1150 return _LttngLiveViewerGetMetadataStreamDataContentReply(
1151 status, stream_state.metadata_stream.data
1152 )
1153
1154 def _handle_get_new_stream_infos_command(self, cmd):
1155 fmt = 'Handling "get new stream infos" command: ts-id={}'
1156 logging.info(fmt.format(cmd.tracing_session_id))
1157
1158 # As of this version, all the tracing session's stream infos are
1159 # always given to the viewer when sending the "attach to tracing
1160 # session" reply, so there's nothing new here. Return the `HUP`
1161 # status as, if we're handling this command, the viewer consumed
1162 # all the existing data streams.
1163 status = _LttngLiveViewerGetNewStreamInfosReply.Status.HUP
1164 return _LttngLiveViewerGetNewStreamInfosReply(status, [])
1165
1166 def _handle_get_tracing_session_infos_command(self, cmd):
1167 logging.info('Handling "get tracing session infos" command.')
1168 infos = [
1169 tss.tracing_session_descriptor.info for tss in self._ts_states.values()
1170 ]
1171 infos.sort(key=lambda info: info.name)
1172 return _LttngLiveViewerGetTracingSessionInfosReply(infos)
1173
1174 def _handle_create_viewer_session_command(self, cmd):
1175 logging.info('Handling "create viewer session" command.')
1176 status = _LttngLiveViewerCreateViewerSessionReply.Status.OK
1177
1178 # This does nothing here. In the LTTng relay daemon, it
1179 # allocates the viewer session's state.
1180 return _LttngLiveViewerCreateViewerSessionReply(status)
1181
1182
1183# An LTTng live TCP server.
1184#
1185# On creation, it binds to `localhost` with an OS-assigned TCP port. It writes
1186# the decimal TCP port number to a temporary port file. It renames the
1187# temporary port file to `port_filename`.
1188#
1189# `tracing_session_descriptors` is a list of tracing session descriptors
1190# (`LttngTracingSessionDescriptor`) to serve.
1191#
1192# This server accepts a single viewer (client).
1193#
1194# When the viewer closes the connection, the server's constructor
1195# returns.
1196class LttngLiveServer:
1197 def __init__(
1198 self, port_filename, tracing_session_descriptors, max_query_data_response_size
1199 ):
1200 logging.info('Server configuration:')
1201
1202 logging.info(' Port file name: `{}`'.format(port_filename))
1203
1204 if max_query_data_response_size is not None:
1205 logging.info(
1206 ' Maximum response data query size: `{}`'.format(
1207 max_query_data_response_size
1208 )
1209 )
1210
1211 for ts_descr in tracing_session_descriptors:
1212 info = ts_descr.info
1213 fmt = ' TS descriptor: name="{}", id={}, hostname="{}", live-timer-freq={}, client-count={}, stream-count={}:'
1214 logging.info(
1215 fmt.format(
1216 info.name,
1217 info.tracing_session_id,
1218 info.hostname,
1219 info.live_timer_freq,
1220 info.client_count,
1221 info.stream_count,
1222 )
1223 )
1224
1225 for trace in ts_descr.traces:
1226 logging.info(' Trace: path="{}"'.format(trace.path))
1227
1228 self._ts_descriptors = tracing_session_descriptors
1229 self._max_query_data_response_size = max_query_data_response_size
1230 self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1231 self._codec = _LttngLiveViewerProtocolCodec()
1232
1233 # Port 0: OS assigns an unused port
1234 serv_addr = ('localhost', 0)
1235 self._sock.bind(serv_addr)
1236 self._write_port_to_file(port_filename)
1237
1238 try:
1239 self._listen()
1240 finally:
1241 self._sock.close()
1242 logging.info('Closed connection and socket.')
1243
1244 @property
1245 def _server_port(self):
1246 return self._sock.getsockname()[1]
1247
1248 def _recv_command(self):
1249 data = bytes()
1250
1251 while True:
1252 logging.info('Waiting for viewer command.')
1253 buf = self._conn.recv(128)
1254
1255 if not buf:
1256 logging.info('Client closed connection.')
1257
1258 if data:
1259 raise UnexpectedInput(
1260 'Client closed connection after having sent {} command bytes.'.format(
1261 len(data)
1262 )
1263 )
1264
1265 return
1266
1267 logging.info('Received data from viewer: length={}'.format(len(buf)))
1268
1269 data += buf
1270
1271 try:
1272 cmd = self._codec.decode(data)
1273 except struct.error as exc:
1274 raise UnexpectedInput('Malformed command: {}'.format(exc)) from exc
1275
1276 if cmd is not None:
1277 logging.info(
1278 'Received command from viewer: cmd-cls-name={}'.format(
1279 cmd.__class__.__name__
1280 )
1281 )
1282 return cmd
1283
1284 def _send_reply(self, reply):
1285 data = self._codec.encode(reply)
1286 logging.info(
1287 'Sending reply to viewer: reply-cls-name={}, length={}'.format(
1288 reply.__class__.__name__, len(data)
1289 )
1290 )
1291 self._conn.sendall(data)
1292
1293 def _handle_connection(self):
1294 # First command must be "connect"
1295 cmd = self._recv_command()
1296
1297 if type(cmd) is not _LttngLiveViewerConnectCommand:
1298 raise UnexpectedInput(
1299 'First command is not "connect": cmd-cls-name={}'.format(
1300 cmd.__class__.__name__
1301 )
1302 )
1303
1304 # Create viewer session (arbitrary ID 23)
1305 logging.info(
1306 'LTTng live viewer connected: version={}.{}'.format(cmd.major, cmd.minor)
1307 )
1308 viewer_session = _LttngLiveViewerSession(
1309 23, self._ts_descriptors, self._max_query_data_response_size
1310 )
1311
1312 # Send "connect" reply
1313 self._send_reply(
1314 _LttngLiveViewerConnectReply(viewer_session.viewer_session_id, 2, 10)
1315 )
1316
1317 # Make the viewer session handle the remaining commands
1318 while True:
1319 cmd = self._recv_command()
1320
1321 if cmd is None:
1322 # Connection closed (at an expected location within the
1323 # conversation)
1324 return
1325
1326 self._send_reply(viewer_session.handle_command(cmd))
1327
1328 def _listen(self):
1329 logging.info('Listening: port={}'.format(self._server_port))
6f1b720d
JR
1330 # Backlog must be present for Python version < 3.5.
1331 # 128 is an arbitrary number since we expect only 1 connection anyway.
1332 self._sock.listen(128)
0198f2fb
PP
1333 self._conn, viewer_addr = self._sock.accept()
1334 logging.info(
1335 'Accepted viewer: addr={}:{}'.format(viewer_addr[0], viewer_addr[1])
1336 )
1337
1338 try:
1339 self._handle_connection()
1340 finally:
1341 self._conn.close()
1342
1343 def _write_port_to_file(self, port_filename):
1344 # Write the port number to a temporary file.
1345 with tempfile.NamedTemporaryFile(mode='w', delete=False) as tmp_port_file:
1346 print(self._server_port, end='', file=tmp_port_file)
1347
e21e74d3
SM
1348 # Rename temporary file to real file.
1349 #
1350 # For unknown reasons, on Windows, moving the port file from its
1351 # temporary location to its final location (where the user of
1352 # the server expects it to appear) may raise a `PermissionError`
1353 # exception.
1354 #
1355 # We suppose it's possible that something in the Windows kernel
1356 # hasn't completely finished using the file when we try to move
1357 # it.
1358 #
1359 # Use a wait-and-retry scheme as a (bad) workaround.
1360 num_attempts = 5
1361 retry_delay_s = 1
1362
1363 for attempt in reversed(range(num_attempts)):
1364 try:
1365 os.replace(tmp_port_file.name, port_filename)
1366 logging.info(
1367 'Renamed port file: src-path="{}", dst-path="{}"'.format(
1368 tmp_port_file.name, port_filename
1369 )
1370 )
1371 return
1372 except PermissionError:
1373 logging.info(
1374 'Permission error while attempting to rename port file; retrying in {} second: src-path="{}", dst-path="{}"'.format(
1375 retry_delay_s, tmp_port_file.name, port_filename
1376 )
1377 )
1378
1379 if attempt == 0:
1380 raise
1381
1382 time.sleep(retry_delay_s)
0198f2fb
PP
1383
1384
1385# A tracing session descriptor.
1386#
1387# In the constructor, `traces` is a list of LTTng traces (`LttngTrace`
1388# objects).
1389class LttngTracingSessionDescriptor:
1390 def __init__(
1391 self, name, tracing_session_id, hostname, live_timer_freq, client_count, traces
1392 ):
1393 for trace in traces:
1394 if name not in trace.path:
1395 fmt = 'Tracing session name must be part of every trace path (`{}` not found in `{}`)'
1396 raise ValueError(fmt.format(name, trace.path))
1397
1398 self._traces = traces
1399 stream_count = sum([len(t) + 1 for t in traces])
1400 self._info = _LttngLiveViewerTracingSessionInfo(
1401 tracing_session_id,
1402 live_timer_freq,
1403 client_count,
1404 stream_count,
1405 hostname,
1406 name,
1407 )
1408
1409 @property
1410 def traces(self):
1411 return self._traces
1412
1413 @property
1414 def info(self):
1415 return self._info
1416
1417
1418def _tracing_session_descriptors_from_arg(string):
1419 # Format is:
1420 # NAME,ID,HOSTNAME,FREQ,CLIENTS,TRACEPATH[,TRACEPATH]...
1421 parts = string.split(',')
1422 name = parts[0]
1423 tracing_session_id = int(parts[1])
1424 hostname = parts[2]
1425 live_timer_freq = int(parts[3])
1426 client_count = int(parts[4])
1427 traces = [LttngTrace(path) for path in parts[5:]]
1428 return LttngTracingSessionDescriptor(
1429 name, tracing_session_id, hostname, live_timer_freq, client_count, traces
1430 )
1431
1432
1433def _loglevel_parser(string):
1434 loglevels = {'info': logging.INFO, 'warning': logging.WARNING}
1435 if string not in loglevels:
1436 msg = "{} is not a valid loglevel".format(string)
1437 raise argparse.ArgumentTypeError(msg)
1438 return loglevels[string]
1439
1440
1441if __name__ == '__main__':
1442 logging.basicConfig(format='# %(asctime)-25s%(message)s')
1443 parser = argparse.ArgumentParser(
1444 description='LTTng-live protocol mocker', add_help=False
1445 )
1446 parser.add_argument(
1447 '--log-level',
1448 default='warning',
1449 choices=['info', 'warning'],
1450 help='The loglevel to be used.',
1451 )
1452
1453 loglevel_namespace, remaining_args = parser.parse_known_args()
1454 logging.getLogger().setLevel(_loglevel_parser(loglevel_namespace.log_level))
1455
1456 parser.add_argument(
1457 '--port-filename',
1458 help='The final port file. This file is present when the server is ready to receive connection.',
1459 required=True,
1460 )
1461 parser.add_argument(
1462 '--max-query-data-response-size',
1463 type=int,
1464 help='The maximum size of control data response in bytes',
1465 )
1466 parser.add_argument(
1467 'sessions',
1468 nargs="+",
1469 metavar="SESSION",
1470 type=_tracing_session_descriptors_from_arg,
1471 help='A session configuration. There is no space after comma. Format is: NAME,ID,HOSTNAME,FREQ,CLIENTS,TRACEPATH[,TRACEPATH]....',
1472 )
1473 parser.add_argument(
1474 '-h',
1475 '--help',
1476 action='help',
1477 default=argparse.SUPPRESS,
1478 help='Show this help message and exit.',
1479 )
1480
1481 args = parser.parse_args(args=remaining_args)
1482 try:
1483 LttngLiveServer(
1484 args.port_filename, args.sessions, args.max_query_data_response_size
1485 )
1486 except UnexpectedInput as exc:
1487 logging.error(str(exc))
1488 print(exc, file=sys.stderr)
1489 sys.exit(1)
This page took 0.086639 seconds and 4 git commands to generate.