tests: retry os.rename on PermissionError failure in lttng_live_server.py
[babeltrace.git] / tests / data / plugins / src.ctf.lttng-live / lttng_live_server.py
... / ...
CommitLineData
1# The MIT License (MIT)
2#
3# Copyright (c) 2019 Philippe Proulx <pproulx@efficios.com>
4#
5# Permission is hereby granted, free of charge, to any person obtaining a copy
6# of this software and associated documentation files (the "Software"), to deal
7# in the Software without restriction, including without limitation the rights
8# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9# copies of the Software, and to permit persons to whom the Software is
10# furnished to do so, subject to the following conditions:
11#
12# The above copyright notice and this permission notice shall be included in
13# all copies or substantial portions of the Software.
14#
15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21# THE SOFTWARE.
22
23import argparse
24import collections.abc
25import logging
26import os
27import os.path
28import re
29import time
30import socket
31import struct
32import sys
33import tempfile
34
35
36class UnexpectedInput(RuntimeError):
37 pass
38
39
40class _LttngLiveViewerCommand:
41 def __init__(self, version):
42 self._version = version
43
44 @property
45 def version(self):
46 return self._version
47
48
49class _LttngLiveViewerConnectCommand(_LttngLiveViewerCommand):
50 def __init__(self, version, viewer_session_id, major, minor):
51 super().__init__(version)
52 self._viewer_session_id = viewer_session_id
53 self._major = major
54 self._minor = minor
55
56 @property
57 def viewer_session_id(self):
58 return self._viewer_session_id
59
60 @property
61 def major(self):
62 return self._major
63
64 @property
65 def minor(self):
66 return self._minor
67
68
69class _LttngLiveViewerConnectReply:
70 def __init__(self, viewer_session_id, major, minor):
71 self._viewer_session_id = viewer_session_id
72 self._major = major
73 self._minor = minor
74
75 @property
76 def viewer_session_id(self):
77 return self._viewer_session_id
78
79 @property
80 def major(self):
81 return self._major
82
83 @property
84 def minor(self):
85 return self._minor
86
87
88class _LttngLiveViewerGetTracingSessionInfosCommand(_LttngLiveViewerCommand):
89 pass
90
91
92class _LttngLiveViewerTracingSessionInfo:
93 def __init__(
94 self,
95 tracing_session_id,
96 live_timer_freq,
97 client_count,
98 stream_count,
99 hostname,
100 name,
101 ):
102 self._tracing_session_id = tracing_session_id
103 self._live_timer_freq = live_timer_freq
104 self._client_count = client_count
105 self._stream_count = stream_count
106 self._hostname = hostname
107 self._name = name
108
109 @property
110 def tracing_session_id(self):
111 return self._tracing_session_id
112
113 @property
114 def live_timer_freq(self):
115 return self._live_timer_freq
116
117 @property
118 def client_count(self):
119 return self._client_count
120
121 @property
122 def stream_count(self):
123 return self._stream_count
124
125 @property
126 def hostname(self):
127 return self._hostname
128
129 @property
130 def name(self):
131 return self._name
132
133
134class _LttngLiveViewerGetTracingSessionInfosReply:
135 def __init__(self, tracing_session_infos):
136 self._tracing_session_infos = tracing_session_infos
137
138 @property
139 def tracing_session_infos(self):
140 return self._tracing_session_infos
141
142
143class _LttngLiveViewerAttachToTracingSessionCommand(_LttngLiveViewerCommand):
144 class SeekType:
145 BEGINNING = 1
146 LAST = 2
147
148 def __init__(self, version, tracing_session_id, offset, seek_type):
149 super().__init__(version)
150 self._tracing_session_id = tracing_session_id
151 self._offset = offset
152 self._seek_type = seek_type
153
154 @property
155 def tracing_session_id(self):
156 return self._tracing_session_id
157
158 @property
159 def offset(self):
160 return self._offset
161
162 @property
163 def seek_type(self):
164 return self._seek_type
165
166
167class _LttngLiveViewerStreamInfo:
168 def __init__(self, id, trace_id, is_metadata, path, channel_name):
169 self._id = id
170 self._trace_id = trace_id
171 self._is_metadata = is_metadata
172 self._path = path
173 self._channel_name = channel_name
174
175 @property
176 def id(self):
177 return self._id
178
179 @property
180 def trace_id(self):
181 return self._trace_id
182
183 @property
184 def is_metadata(self):
185 return self._is_metadata
186
187 @property
188 def path(self):
189 return self._path
190
191 @property
192 def channel_name(self):
193 return self._channel_name
194
195
196class _LttngLiveViewerAttachToTracingSessionReply:
197 class Status:
198 OK = 1
199 ALREADY = 2
200 UNKNOWN = 3
201 NOT_LIVE = 4
202 SEEK_ERROR = 5
203 NO_SESSION = 6
204
205 def __init__(self, status, stream_infos):
206 self._status = status
207 self._stream_infos = stream_infos
208
209 @property
210 def status(self):
211 return self._status
212
213 @property
214 def stream_infos(self):
215 return self._stream_infos
216
217
218class _LttngLiveViewerGetNextDataStreamIndexEntryCommand(_LttngLiveViewerCommand):
219 def __init__(self, version, stream_id):
220 super().__init__(version)
221 self._stream_id = stream_id
222
223 @property
224 def stream_id(self):
225 return self._stream_id
226
227
228class _LttngLiveViewerGetNextDataStreamIndexEntryReply:
229 class Status:
230 OK = 1
231 RETRY = 2
232 HUP = 3
233 ERROR = 4
234 INACTIVE = 5
235 EOF = 6
236
237 def __init__(self, status, index_entry, has_new_metadata, has_new_data_stream):
238 self._status = status
239 self._index_entry = index_entry
240 self._has_new_metadata = has_new_metadata
241 self._has_new_data_stream = has_new_data_stream
242
243 @property
244 def status(self):
245 return self._status
246
247 @property
248 def index_entry(self):
249 return self._index_entry
250
251 @property
252 def has_new_metadata(self):
253 return self._has_new_metadata
254
255 @property
256 def has_new_data_stream(self):
257 return self._has_new_data_stream
258
259
260class _LttngLiveViewerGetDataStreamPacketDataCommand(_LttngLiveViewerCommand):
261 def __init__(self, version, stream_id, offset, req_length):
262 super().__init__(version)
263 self._stream_id = stream_id
264 self._offset = offset
265 self._req_length = req_length
266
267 @property
268 def stream_id(self):
269 return self._stream_id
270
271 @property
272 def offset(self):
273 return self._offset
274
275 @property
276 def req_length(self):
277 return self._req_length
278
279
280class _LttngLiveViewerGetDataStreamPacketDataReply:
281 class Status:
282 OK = 1
283 RETRY = 2
284 ERROR = 3
285 EOF = 4
286
287 def __init__(self, status, data, has_new_metadata, has_new_data_stream):
288 self._status = status
289 self._data = data
290 self._has_new_metadata = has_new_metadata
291 self._has_new_data_stream = has_new_data_stream
292
293 @property
294 def status(self):
295 return self._status
296
297 @property
298 def data(self):
299 return self._data
300
301 @property
302 def has_new_metadata(self):
303 return self._has_new_metadata
304
305 @property
306 def has_new_data_stream(self):
307 return self._has_new_data_stream
308
309
310class _LttngLiveViewerGetMetadataStreamDataCommand(_LttngLiveViewerCommand):
311 def __init__(self, version, stream_id):
312 super().__init__(version)
313 self._stream_id = stream_id
314
315 @property
316 def stream_id(self):
317 return self._stream_id
318
319
320class _LttngLiveViewerGetMetadataStreamDataContentReply:
321 class Status:
322 OK = 1
323 NO_NEW = 2
324 ERROR = 3
325
326 def __init__(self, status, data):
327 self._status = status
328 self._data = data
329
330 @property
331 def status(self):
332 return self._status
333
334 @property
335 def data(self):
336 return self._data
337
338
339class _LttngLiveViewerGetNewStreamInfosCommand(_LttngLiveViewerCommand):
340 def __init__(self, version, tracing_session_id):
341 super().__init__(version)
342 self._tracing_session_id = tracing_session_id
343
344 @property
345 def tracing_session_id(self):
346 return self._tracing_session_id
347
348
349class _LttngLiveViewerGetNewStreamInfosReply:
350 class Status:
351 OK = 1
352 NO_NEW = 2
353 ERROR = 3
354 HUP = 4
355
356 def __init__(self, status, stream_infos):
357 self._status = status
358 self._stream_infos = stream_infos
359
360 @property
361 def status(self):
362 return self._status
363
364 @property
365 def stream_infos(self):
366 return self._stream_infos
367
368
369class _LttngLiveViewerCreateViewerSessionCommand(_LttngLiveViewerCommand):
370 pass
371
372
373class _LttngLiveViewerCreateViewerSessionReply:
374 class Status:
375 OK = 1
376 ERROR = 2
377
378 def __init__(self, status):
379 self._status = status
380
381 @property
382 def status(self):
383 return self._status
384
385
386class _LttngLiveViewerDetachFromTracingSessionCommand(_LttngLiveViewerCommand):
387 def __init__(self, version, tracing_session_id):
388 super().__init__(version)
389 self._tracing_session_id = tracing_session_id
390
391 @property
392 def tracing_session_id(self):
393 return self._tracing_session_id
394
395
396class _LttngLiveViewerDetachFromTracingSessionReply:
397 class Status:
398 OK = 1
399 UNKNOWN = 2
400 ERROR = 3
401
402 def __init__(self, status):
403 self._status = status
404
405 @property
406 def status(self):
407 return self._status
408
409
410# An LTTng live protocol codec can convert bytes to command objects and
411# reply objects to bytes.
412class _LttngLiveViewerProtocolCodec:
413 _COMMAND_HEADER_STRUCT_FMT = 'QII'
414 _COMMAND_HEADER_SIZE_BYTES = struct.calcsize(_COMMAND_HEADER_STRUCT_FMT)
415
416 def __init__(self):
417 pass
418
419 def _unpack(self, fmt, data, offset=0):
420 fmt = '!' + fmt
421 return struct.unpack_from(fmt, data, offset)
422
423 def _unpack_payload(self, fmt, data):
424 return self._unpack(
425 fmt, data, _LttngLiveViewerProtocolCodec._COMMAND_HEADER_SIZE_BYTES
426 )
427
428 def decode(self, data):
429 if len(data) < self._COMMAND_HEADER_SIZE_BYTES:
430 # Not enough data to read the command header
431 return
432
433 payload_size, cmd_type, version = self._unpack(
434 self._COMMAND_HEADER_STRUCT_FMT, data
435 )
436 logging.info(
437 'Decoded command header: payload-size={}, cmd-type={}, version={}'.format(
438 payload_size, cmd_type, version
439 )
440 )
441
442 if len(data) < self._COMMAND_HEADER_SIZE_BYTES + payload_size:
443 # Not enough data to read the whole command
444 return
445
446 if cmd_type == 1:
447 viewer_session_id, major, minor, conn_type = self._unpack_payload(
448 'QIII', data
449 )
450 return _LttngLiveViewerConnectCommand(
451 version, viewer_session_id, major, minor
452 )
453 elif cmd_type == 2:
454 return _LttngLiveViewerGetTracingSessionInfosCommand(version)
455 elif cmd_type == 3:
456 tracing_session_id, offset, seek_type = self._unpack_payload('QQI', data)
457 return _LttngLiveViewerAttachToTracingSessionCommand(
458 version, tracing_session_id, offset, seek_type
459 )
460 elif cmd_type == 4:
461 (stream_id,) = self._unpack_payload('Q', data)
462 return _LttngLiveViewerGetNextDataStreamIndexEntryCommand(
463 version, stream_id
464 )
465 elif cmd_type == 5:
466 stream_id, offset, req_length = self._unpack_payload('QQI', data)
467 return _LttngLiveViewerGetDataStreamPacketDataCommand(
468 version, stream_id, offset, req_length
469 )
470 elif cmd_type == 6:
471 (stream_id,) = self._unpack_payload('Q', data)
472 return _LttngLiveViewerGetMetadataStreamDataCommand(version, stream_id)
473 elif cmd_type == 7:
474 (tracing_session_id,) = self._unpack_payload('Q', data)
475 return _LttngLiveViewerGetNewStreamInfosCommand(version, tracing_session_id)
476 elif cmd_type == 8:
477 return _LttngLiveViewerCreateViewerSessionCommand(version)
478 elif cmd_type == 9:
479 (tracing_session_id,) = self._unpack_payload('Q', data)
480 return _LttngLiveViewerDetachFromTracingSessionCommand(
481 version, tracing_session_id
482 )
483 else:
484 raise UnexpectedInput('Unknown command type {}'.format(cmd_type))
485
486 def _pack(self, fmt, *args):
487 # Force network byte order
488 return struct.pack('!' + fmt, *args)
489
490 def _encode_zero_padded_str(self, string, length):
491 data = string.encode()
492 return data.ljust(length, b'\x00')
493
494 def _encode_stream_info(self, info):
495 data = self._pack('QQI', info.id, info.trace_id, int(info.is_metadata))
496 data += self._encode_zero_padded_str(info.path, 4096)
497 data += self._encode_zero_padded_str(info.channel_name, 255)
498 return data
499
500 def _get_has_new_stuff_flags(self, has_new_metadata, has_new_data_streams):
501 flags = 0
502
503 if has_new_metadata:
504 flags |= 1
505
506 if has_new_data_streams:
507 flags |= 2
508
509 return flags
510
511 def encode(self, reply):
512 if type(reply) is _LttngLiveViewerConnectReply:
513 data = self._pack(
514 'QIII', reply.viewer_session_id, reply.major, reply.minor, 2
515 )
516 elif type(reply) is _LttngLiveViewerGetTracingSessionInfosReply:
517 data = self._pack('I', len(reply.tracing_session_infos))
518
519 for info in reply.tracing_session_infos:
520 data += self._pack(
521 'QIII',
522 info.tracing_session_id,
523 info.live_timer_freq,
524 info.client_count,
525 info.stream_count,
526 )
527 data += self._encode_zero_padded_str(info.hostname, 64)
528 data += self._encode_zero_padded_str(info.name, 255)
529 elif type(reply) is _LttngLiveViewerAttachToTracingSessionReply:
530 data = self._pack('II', reply.status, len(reply.stream_infos))
531
532 for info in reply.stream_infos:
533 data += self._encode_stream_info(info)
534 elif type(reply) is _LttngLiveViewerGetNextDataStreamIndexEntryReply:
535 entry = reply.index_entry
536 flags = self._get_has_new_stuff_flags(
537 reply.has_new_metadata, reply.has_new_data_stream
538 )
539
540 data = self._pack(
541 'QQQQQQQII',
542 entry.offset_bytes,
543 entry.total_size_bits,
544 entry.content_size_bits,
545 entry.timestamp_begin,
546 entry.timestamp_end,
547 entry.events_discarded,
548 entry.stream_class_id,
549 reply.status,
550 flags,
551 )
552 elif type(reply) is _LttngLiveViewerGetDataStreamPacketDataReply:
553 flags = self._get_has_new_stuff_flags(
554 reply.has_new_metadata, reply.has_new_data_stream
555 )
556 data = self._pack('III', reply.status, len(reply.data), flags)
557 data += reply.data
558 elif type(reply) is _LttngLiveViewerGetMetadataStreamDataContentReply:
559 data = self._pack('QI', len(reply.data), reply.status)
560 data += reply.data
561 elif type(reply) is _LttngLiveViewerGetNewStreamInfosReply:
562 data = self._pack('II', reply.status, len(reply.stream_infos))
563
564 for info in reply.stream_infos:
565 data += self._encode_stream_info(info)
566 elif type(reply) is _LttngLiveViewerCreateViewerSessionReply:
567 data = self._pack('I', reply.status)
568 elif type(reply) is _LttngLiveViewerDetachFromTracingSessionReply:
569 data = self._pack('I', reply.status)
570 else:
571 raise ValueError(
572 'Unknown reply object with class `{}`'.format(reply.__class__.__name__)
573 )
574
575 return data
576
577
578# An entry within the index of an LTTng data stream.
579class _LttngDataStreamIndexEntry:
580 def __init__(
581 self,
582 offset_bytes,
583 total_size_bits,
584 content_size_bits,
585 timestamp_begin,
586 timestamp_end,
587 events_discarded,
588 stream_class_id,
589 ):
590 self._offset_bytes = offset_bytes
591 self._total_size_bits = total_size_bits
592 self._content_size_bits = content_size_bits
593 self._timestamp_begin = timestamp_begin
594 self._timestamp_end = timestamp_end
595 self._events_discarded = events_discarded
596 self._stream_class_id = stream_class_id
597
598 @property
599 def offset_bytes(self):
600 return self._offset_bytes
601
602 @property
603 def total_size_bits(self):
604 return self._total_size_bits
605
606 @property
607 def total_size_bytes(self):
608 return self._total_size_bits // 8
609
610 @property
611 def content_size_bits(self):
612 return self._content_size_bits
613
614 @property
615 def content_size_bytes(self):
616 return self._content_size_bits // 8
617
618 @property
619 def timestamp_begin(self):
620 return self._timestamp_begin
621
622 @property
623 def timestamp_end(self):
624 return self._timestamp_end
625
626 @property
627 def events_discarded(self):
628 return self._events_discarded
629
630 @property
631 def stream_class_id(self):
632 return self._stream_class_id
633
634
635# The index of an LTTng data stream, a sequence of index entries.
636class _LttngDataStreamIndex(collections.abc.Sequence):
637 def __init__(self, path):
638 self._path = path
639 self._build()
640 logging.info(
641 'Built data stream index entries: path="{}", count={}'.format(
642 path, len(self._entries)
643 )
644 )
645
646 def _build(self):
647 self._entries = []
648 assert os.path.isfile(self._path)
649
650 with open(self._path, 'rb') as f:
651 # Read header first
652 fmt = '>IIII'
653 size = struct.calcsize(fmt)
654 data = f.read(size)
655 assert len(data) == size
656 magic, index_major, index_minor, index_entry_length = struct.unpack(
657 fmt, data
658 )
659 assert magic == 0xC1F1DCC1
660
661 # Read index entries
662 fmt = '>QQQQQQQ'
663 size = struct.calcsize(fmt)
664
665 while True:
666 logging.debug(
667 'Decoding data stream index entry: path="{}", offset={}'.format(
668 self._path, f.tell()
669 )
670 )
671 data = f.read(size)
672
673 if not data:
674 # Done
675 break
676
677 assert len(data) == size
678 (
679 offset_bytes,
680 total_size_bits,
681 content_size_bits,
682 timestamp_begin,
683 timestamp_end,
684 events_discarded,
685 stream_class_id,
686 ) = struct.unpack(fmt, data)
687
688 self._entries.append(
689 _LttngDataStreamIndexEntry(
690 offset_bytes,
691 total_size_bits,
692 content_size_bits,
693 timestamp_begin,
694 timestamp_end,
695 events_discarded,
696 stream_class_id,
697 )
698 )
699
700 # Skip anything else before the next entry
701 f.seek(index_entry_length - size, os.SEEK_CUR)
702
703 def __getitem__(self, index):
704 return self._entries[index]
705
706 def __len__(self):
707 return len(self._entries)
708
709 @property
710 def path(self):
711 return self._path
712
713
714# An LTTng data stream.
715class _LttngDataStream:
716 def __init__(self, path):
717 self._path = path
718 filename = os.path.basename(path)
719 match = re.match(r'(.*)_\d+', filename)
720 self._channel_name = match.group(1)
721 trace_dir = os.path.dirname(path)
722 index_path = os.path.join(trace_dir, 'index', filename + '.idx')
723 self._index = _LttngDataStreamIndex(index_path)
724 assert os.path.isfile(path)
725 self._file = open(path, 'rb')
726 logging.info(
727 'Built data stream: path="{}", channel-name="{}"'.format(
728 path, self._channel_name
729 )
730 )
731
732 @property
733 def path(self):
734 return self._path
735
736 @property
737 def channel_name(self):
738 return self._channel_name
739
740 @property
741 def index(self):
742 return self._index
743
744 def get_data(self, offset_bytes, len_bytes):
745 self._file.seek(offset_bytes)
746 return self._file.read(len_bytes)
747
748
749# An LTTng metadata stream.
750class _LttngMetadataStream:
751 def __init__(self, path):
752 self._path = path
753 logging.info('Built metadata stream: path="{}"'.format(path))
754
755 @property
756 def path(self):
757 return self._path
758
759 @property
760 def data(self):
761 assert os.path.isfile(self._path)
762
763 with open(self._path, 'rb') as f:
764 return f.read()
765
766
767# An LTTng trace, a sequence of LTTng data streams.
768class LttngTrace(collections.abc.Sequence):
769 def __init__(self, trace_dir):
770 assert os.path.isdir(trace_dir)
771 self._path = trace_dir
772 self._metadata_stream = _LttngMetadataStream(
773 os.path.join(trace_dir, 'metadata')
774 )
775 self._create_data_streams(trace_dir)
776 logging.info('Built trace: path="{}"'.format(trace_dir))
777
778 def _create_data_streams(self, trace_dir):
779 data_stream_paths = []
780
781 for filename in os.listdir(trace_dir):
782 path = os.path.join(trace_dir, filename)
783
784 if not os.path.isfile(path):
785 continue
786
787 if filename.startswith('.'):
788 continue
789
790 if filename == 'metadata':
791 continue
792
793 data_stream_paths.append(path)
794
795 data_stream_paths.sort()
796 self._data_streams = []
797
798 for data_stream_path in data_stream_paths:
799 self._data_streams.append(_LttngDataStream(data_stream_path))
800
801 @property
802 def path(self):
803 return self._path
804
805 @property
806 def metadata_stream(self):
807 return self._metadata_stream
808
809 def __getitem__(self, index):
810 return self._data_streams[index]
811
812 def __len__(self):
813 return len(self._data_streams)
814
815
816# The state of a single data stream.
817class _LttngLiveViewerSessionDataStreamState:
818 def __init__(self, ts_state, info, data_stream):
819 self._ts_state = ts_state
820 self._info = info
821 self._data_stream = data_stream
822 self._cur_index_entry_index = 0
823 fmt = 'Built data stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
824 logging.info(
825 fmt.format(
826 info.id,
827 ts_state.tracing_session_descriptor.info.tracing_session_id,
828 ts_state.tracing_session_descriptor.info.name,
829 data_stream.path,
830 )
831 )
832
833 @property
834 def tracing_session_state(self):
835 return self._ts_state
836
837 @property
838 def info(self):
839 return self._info
840
841 @property
842 def data_stream(self):
843 return self._data_stream
844
845 @property
846 def cur_index_entry(self):
847 if self._cur_index_entry_index == len(self._data_stream.index):
848 return
849
850 return self._data_stream.index[self._cur_index_entry_index]
851
852 def goto_next_index_entry(self):
853 self._cur_index_entry_index += 1
854
855
856# The state of a single metadata stream.
857class _LttngLiveViewerSessionMetadataStreamState:
858 def __init__(self, ts_state, info, metadata_stream):
859 self._ts_state = ts_state
860 self._info = info
861 self._metadata_stream = metadata_stream
862 self._is_sent = False
863 fmt = 'Built metadata stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
864 logging.info(
865 fmt.format(
866 info.id,
867 ts_state.tracing_session_descriptor.info.tracing_session_id,
868 ts_state.tracing_session_descriptor.info.name,
869 metadata_stream.path,
870 )
871 )
872
873 @property
874 def trace_session_state(self):
875 return self._trace_session_state
876
877 @property
878 def info(self):
879 return self._info
880
881 @property
882 def metadata_stream(self):
883 return self._metadata_stream
884
885 @property
886 def is_sent(self):
887 return self._is_sent
888
889 @is_sent.setter
890 def is_sent(self, value):
891 self._is_sent = value
892
893
894# The state of a tracing session.
895class _LttngLiveViewerSessionTracingSessionState:
896 def __init__(self, tc_descr, base_stream_id):
897 self._tc_descr = tc_descr
898 self._stream_infos = []
899 self._ds_states = {}
900 self._ms_states = {}
901 stream_id = base_stream_id
902
903 for trace in tc_descr.traces:
904 trace_id = stream_id * 1000
905
906 # Data streams -> stream infos and data stream states
907 for data_stream in trace:
908 info = _LttngLiveViewerStreamInfo(
909 stream_id,
910 trace_id,
911 False,
912 data_stream.path,
913 data_stream.channel_name,
914 )
915 self._stream_infos.append(info)
916 self._ds_states[stream_id] = _LttngLiveViewerSessionDataStreamState(
917 self, info, data_stream
918 )
919 stream_id += 1
920
921 # Metadata stream -> stream info and metadata stream state
922 info = _LttngLiveViewerStreamInfo(
923 stream_id, trace_id, True, trace.metadata_stream.path, 'metadata'
924 )
925 self._stream_infos.append(info)
926 self._ms_states[stream_id] = _LttngLiveViewerSessionMetadataStreamState(
927 self, info, trace.metadata_stream
928 )
929 stream_id += 1
930
931 self._is_attached = False
932 fmt = 'Built tracing session state: id={}, name="{}"'
933 logging.info(fmt.format(tc_descr.info.tracing_session_id, tc_descr.info.name))
934
935 @property
936 def tracing_session_descriptor(self):
937 return self._tc_descr
938
939 @property
940 def data_stream_states(self):
941 return self._ds_states
942
943 @property
944 def metadata_stream_states(self):
945 return self._ms_states
946
947 @property
948 def stream_infos(self):
949 return self._stream_infos
950
951 @property
952 def has_new_metadata(self):
953 return any([not ms.is_sent for ms in self._ms_states.values()])
954
955 @property
956 def is_attached(self):
957 return self._is_attached
958
959 @is_attached.setter
960 def is_attached(self, value):
961 self._is_attached = value
962
963
964# An LTTng live viewer session manages a view on tracing sessions
965# and replies to commands accordingly.
966class _LttngLiveViewerSession:
967 def __init__(
968 self,
969 viewer_session_id,
970 tracing_session_descriptors,
971 max_query_data_response_size,
972 ):
973 self._viewer_session_id = viewer_session_id
974 self._ts_states = {}
975 self._stream_states = {}
976 self._max_query_data_response_size = max_query_data_response_size
977 total_stream_infos = 0
978
979 for ts_descr in tracing_session_descriptors:
980 ts_state = _LttngLiveViewerSessionTracingSessionState(
981 ts_descr, total_stream_infos
982 )
983 ts_id = ts_state.tracing_session_descriptor.info.tracing_session_id
984 self._ts_states[ts_id] = ts_state
985 total_stream_infos += len(ts_state.stream_infos)
986
987 # Update session's stream states to have the new states
988 self._stream_states.update(ts_state.data_stream_states)
989 self._stream_states.update(ts_state.metadata_stream_states)
990
991 self._command_handlers = {
992 _LttngLiveViewerAttachToTracingSessionCommand: self._handle_attach_to_tracing_session_command,
993 _LttngLiveViewerCreateViewerSessionCommand: self._handle_create_viewer_session_command,
994 _LttngLiveViewerDetachFromTracingSessionCommand: self._handle_detach_from_tracing_session_command,
995 _LttngLiveViewerGetDataStreamPacketDataCommand: self._handle_get_data_stream_packet_data_command,
996 _LttngLiveViewerGetMetadataStreamDataCommand: self._handle_get_metadata_stream_data_command,
997 _LttngLiveViewerGetNewStreamInfosCommand: self._handle_get_new_stream_infos_command,
998 _LttngLiveViewerGetNextDataStreamIndexEntryCommand: self._handle_get_next_data_stream_index_entry_command,
999 _LttngLiveViewerGetTracingSessionInfosCommand: self._handle_get_tracing_session_infos_command,
1000 }
1001
1002 @property
1003 def viewer_session_id(self):
1004 return self._viewer_session_id
1005
1006 def _get_tracing_session_state(self, tracing_session_id):
1007 if tracing_session_id not in self._ts_states:
1008 raise UnexpectedInput(
1009 'Unknown tracing session ID {}'.format(tracing_session_id)
1010 )
1011
1012 return self._ts_states[tracing_session_id]
1013
1014 def _get_stream_state(self, stream_id):
1015 if stream_id not in self._stream_states:
1016 UnexpectedInput('Unknown stream ID {}'.format(stream_id))
1017
1018 return self._stream_states[stream_id]
1019
1020 def handle_command(self, cmd):
1021 logging.info(
1022 'Handling command in viewer session: cmd-cls-name={}'.format(
1023 cmd.__class__.__name__
1024 )
1025 )
1026 cmd_type = type(cmd)
1027
1028 if cmd_type not in self._command_handlers:
1029 raise UnexpectedInput(
1030 'Unexpected command: cmd-cls-name={}'.format(cmd.__class__.__name__)
1031 )
1032
1033 return self._command_handlers[cmd_type](cmd)
1034
1035 def _handle_attach_to_tracing_session_command(self, cmd):
1036 fmt = 'Handling "attach to tracing session" command: ts-id={}, offset={}, seek-type={}'
1037 logging.info(fmt.format(cmd.tracing_session_id, cmd.offset, cmd.seek_type))
1038 ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
1039 info = ts_state.tracing_session_descriptor.info
1040
1041 if ts_state.is_attached:
1042 raise UnexpectedInput(
1043 'Cannot attach to tracing session `{}`: viewer is already attached'.format(
1044 info.name
1045 )
1046 )
1047
1048 ts_state.is_attached = True
1049 status = _LttngLiveViewerAttachToTracingSessionReply.Status.OK
1050 return _LttngLiveViewerAttachToTracingSessionReply(
1051 status, ts_state.stream_infos
1052 )
1053
1054 def _handle_detach_from_tracing_session_command(self, cmd):
1055 fmt = 'Handling "detach from tracing session" command: ts-id={}'
1056 logging.info(fmt.format(cmd.tracing_session_id))
1057 ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
1058 info = ts_state.tracing_session_descriptor.info
1059
1060 if not ts_state.is_attached:
1061 raise UnexpectedInput(
1062 'Cannot detach to tracing session `{}`: viewer is not attached'.format(
1063 info.name
1064 )
1065 )
1066
1067 ts_state.is_attached = False
1068 status = _LttngLiveViewerDetachFromTracingSessionReply.Status.OK
1069 return _LttngLiveViewerDetachFromTracingSessionReply(status)
1070
1071 def _handle_get_next_data_stream_index_entry_command(self, cmd):
1072 fmt = 'Handling "get next data stream index entry" command: stream-id={}'
1073 logging.info(fmt.format(cmd.stream_id))
1074 stream_state = self._get_stream_state(cmd.stream_id)
1075
1076 if type(stream_state) is not _LttngLiveViewerSessionDataStreamState:
1077 raise UnexpectedInput(
1078 'Stream with ID {} is not a data stream'.format(cmd.stream_id)
1079 )
1080
1081 if stream_state.cur_index_entry is None:
1082 # The viewer is done reading this stream
1083 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.HUP
1084
1085 # Dummy data stream index entry to use with the `HUP` status
1086 # (the reply needs one, but the viewer ignores it)
1087 index_entry = _LttngDataStreamIndexEntry(0, 0, 0, 0, 0, 0, 0)
1088
1089 return _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1090 status, index_entry, False, False
1091 )
1092
1093 # The viewer only checks the `has_new_metadata` flag if the
1094 # reply's status is `OK`, so we need to provide an index here
1095 has_new_metadata = stream_state.tracing_session_state.has_new_metadata
1096 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.OK
1097 reply = _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1098 status, stream_state.cur_index_entry, has_new_metadata, False
1099 )
1100 stream_state.goto_next_index_entry()
1101 return reply
1102
1103 def _handle_get_data_stream_packet_data_command(self, cmd):
1104 fmt = 'Handling "get data stream packet data" command: stream-id={}, offset={}, req-length={}'
1105 logging.info(fmt.format(cmd.stream_id, cmd.offset, cmd.req_length))
1106 stream_state = self._get_stream_state(cmd.stream_id)
1107 data_response_length = cmd.req_length
1108
1109 if type(stream_state) is not _LttngLiveViewerSessionDataStreamState:
1110 raise UnexpectedInput(
1111 'Stream with ID {} is not a data stream'.format(cmd.stream_id)
1112 )
1113
1114 if stream_state.tracing_session_state.has_new_metadata:
1115 status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.ERROR
1116 return _LttngLiveViewerGetDataStreamPacketDataReply(
1117 status, bytes(), True, False
1118 )
1119
1120 if self._max_query_data_response_size:
1121 # Enforce a server side limit on the query requested length.
1122 # To ensure that the transaction terminate take the minimum of both
1123 # value.
1124 data_response_length = min(
1125 cmd.req_length, self._max_query_data_response_size
1126 )
1127 fmt = 'Limiting "get data stream packet data" command: req-length={} actual response size={}'
1128 logging.info(fmt.format(cmd.req_length, data_response_length))
1129
1130 data = stream_state.data_stream.get_data(cmd.offset, data_response_length)
1131 status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.OK
1132 return _LttngLiveViewerGetDataStreamPacketDataReply(status, data, False, False)
1133
1134 def _handle_get_metadata_stream_data_command(self, cmd):
1135 fmt = 'Handling "get metadata stream data" command: stream-id={}'
1136 logging.info(fmt.format(cmd.stream_id))
1137 stream_state = self._get_stream_state(cmd.stream_id)
1138
1139 if type(stream_state) is not _LttngLiveViewerSessionMetadataStreamState:
1140 raise UnexpectedInput(
1141 'Stream with ID {} is not a metadata stream'.format(cmd.stream_id)
1142 )
1143
1144 if stream_state.is_sent:
1145 status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.NO_NEW
1146 return _LttngLiveViewerGetMetadataStreamDataContentReply(status, bytes())
1147
1148 stream_state.is_sent = True
1149 status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.OK
1150 return _LttngLiveViewerGetMetadataStreamDataContentReply(
1151 status, stream_state.metadata_stream.data
1152 )
1153
1154 def _handle_get_new_stream_infos_command(self, cmd):
1155 fmt = 'Handling "get new stream infos" command: ts-id={}'
1156 logging.info(fmt.format(cmd.tracing_session_id))
1157
1158 # As of this version, all the tracing session's stream infos are
1159 # always given to the viewer when sending the "attach to tracing
1160 # session" reply, so there's nothing new here. Return the `HUP`
1161 # status as, if we're handling this command, the viewer consumed
1162 # all the existing data streams.
1163 status = _LttngLiveViewerGetNewStreamInfosReply.Status.HUP
1164 return _LttngLiveViewerGetNewStreamInfosReply(status, [])
1165
1166 def _handle_get_tracing_session_infos_command(self, cmd):
1167 logging.info('Handling "get tracing session infos" command.')
1168 infos = [
1169 tss.tracing_session_descriptor.info for tss in self._ts_states.values()
1170 ]
1171 infos.sort(key=lambda info: info.name)
1172 return _LttngLiveViewerGetTracingSessionInfosReply(infos)
1173
1174 def _handle_create_viewer_session_command(self, cmd):
1175 logging.info('Handling "create viewer session" command.')
1176 status = _LttngLiveViewerCreateViewerSessionReply.Status.OK
1177
1178 # This does nothing here. In the LTTng relay daemon, it
1179 # allocates the viewer session's state.
1180 return _LttngLiveViewerCreateViewerSessionReply(status)
1181
1182
1183# An LTTng live TCP server.
1184#
1185# On creation, it binds to `localhost` with an OS-assigned TCP port. It writes
1186# the decimal TCP port number to a temporary port file. It renames the
1187# temporary port file to `port_filename`.
1188#
1189# `tracing_session_descriptors` is a list of tracing session descriptors
1190# (`LttngTracingSessionDescriptor`) to serve.
1191#
1192# This server accepts a single viewer (client).
1193#
1194# When the viewer closes the connection, the server's constructor
1195# returns.
1196class LttngLiveServer:
1197 def __init__(
1198 self, port_filename, tracing_session_descriptors, max_query_data_response_size
1199 ):
1200 logging.info('Server configuration:')
1201
1202 logging.info(' Port file name: `{}`'.format(port_filename))
1203
1204 if max_query_data_response_size is not None:
1205 logging.info(
1206 ' Maximum response data query size: `{}`'.format(
1207 max_query_data_response_size
1208 )
1209 )
1210
1211 for ts_descr in tracing_session_descriptors:
1212 info = ts_descr.info
1213 fmt = ' TS descriptor: name="{}", id={}, hostname="{}", live-timer-freq={}, client-count={}, stream-count={}:'
1214 logging.info(
1215 fmt.format(
1216 info.name,
1217 info.tracing_session_id,
1218 info.hostname,
1219 info.live_timer_freq,
1220 info.client_count,
1221 info.stream_count,
1222 )
1223 )
1224
1225 for trace in ts_descr.traces:
1226 logging.info(' Trace: path="{}"'.format(trace.path))
1227
1228 self._ts_descriptors = tracing_session_descriptors
1229 self._max_query_data_response_size = max_query_data_response_size
1230 self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1231 self._codec = _LttngLiveViewerProtocolCodec()
1232
1233 # Port 0: OS assigns an unused port
1234 serv_addr = ('localhost', 0)
1235 self._sock.bind(serv_addr)
1236 self._write_port_to_file(port_filename)
1237
1238 try:
1239 self._listen()
1240 finally:
1241 self._sock.close()
1242 logging.info('Closed connection and socket.')
1243
1244 @property
1245 def _server_port(self):
1246 return self._sock.getsockname()[1]
1247
1248 def _recv_command(self):
1249 data = bytes()
1250
1251 while True:
1252 logging.info('Waiting for viewer command.')
1253 buf = self._conn.recv(128)
1254
1255 if not buf:
1256 logging.info('Client closed connection.')
1257
1258 if data:
1259 raise UnexpectedInput(
1260 'Client closed connection after having sent {} command bytes.'.format(
1261 len(data)
1262 )
1263 )
1264
1265 return
1266
1267 logging.info('Received data from viewer: length={}'.format(len(buf)))
1268
1269 data += buf
1270
1271 try:
1272 cmd = self._codec.decode(data)
1273 except struct.error as exc:
1274 raise UnexpectedInput('Malformed command: {}'.format(exc)) from exc
1275
1276 if cmd is not None:
1277 logging.info(
1278 'Received command from viewer: cmd-cls-name={}'.format(
1279 cmd.__class__.__name__
1280 )
1281 )
1282 return cmd
1283
1284 def _send_reply(self, reply):
1285 data = self._codec.encode(reply)
1286 logging.info(
1287 'Sending reply to viewer: reply-cls-name={}, length={}'.format(
1288 reply.__class__.__name__, len(data)
1289 )
1290 )
1291 self._conn.sendall(data)
1292
1293 def _handle_connection(self):
1294 # First command must be "connect"
1295 cmd = self._recv_command()
1296
1297 if type(cmd) is not _LttngLiveViewerConnectCommand:
1298 raise UnexpectedInput(
1299 'First command is not "connect": cmd-cls-name={}'.format(
1300 cmd.__class__.__name__
1301 )
1302 )
1303
1304 # Create viewer session (arbitrary ID 23)
1305 logging.info(
1306 'LTTng live viewer connected: version={}.{}'.format(cmd.major, cmd.minor)
1307 )
1308 viewer_session = _LttngLiveViewerSession(
1309 23, self._ts_descriptors, self._max_query_data_response_size
1310 )
1311
1312 # Send "connect" reply
1313 self._send_reply(
1314 _LttngLiveViewerConnectReply(viewer_session.viewer_session_id, 2, 10)
1315 )
1316
1317 # Make the viewer session handle the remaining commands
1318 while True:
1319 cmd = self._recv_command()
1320
1321 if cmd is None:
1322 # Connection closed (at an expected location within the
1323 # conversation)
1324 return
1325
1326 self._send_reply(viewer_session.handle_command(cmd))
1327
1328 def _listen(self):
1329 logging.info('Listening: port={}'.format(self._server_port))
1330 # Backlog must be present for Python version < 3.5.
1331 # 128 is an arbitrary number since we expect only 1 connection anyway.
1332 self._sock.listen(128)
1333 self._conn, viewer_addr = self._sock.accept()
1334 logging.info(
1335 'Accepted viewer: addr={}:{}'.format(viewer_addr[0], viewer_addr[1])
1336 )
1337
1338 try:
1339 self._handle_connection()
1340 finally:
1341 self._conn.close()
1342
1343 def _write_port_to_file(self, port_filename):
1344 # Write the port number to a temporary file.
1345 with tempfile.NamedTemporaryFile(mode='w', delete=False) as tmp_port_file:
1346 print(self._server_port, end='', file=tmp_port_file)
1347
1348 # Rename temporary file to real file.
1349 #
1350 # For unknown reasons, on Windows, moving the port file from its
1351 # temporary location to its final location (where the user of
1352 # the server expects it to appear) may raise a `PermissionError`
1353 # exception.
1354 #
1355 # We suppose it's possible that something in the Windows kernel
1356 # hasn't completely finished using the file when we try to move
1357 # it.
1358 #
1359 # Use a wait-and-retry scheme as a (bad) workaround.
1360 num_attempts = 5
1361 retry_delay_s = 1
1362
1363 for attempt in reversed(range(num_attempts)):
1364 try:
1365 os.replace(tmp_port_file.name, port_filename)
1366 logging.info(
1367 'Renamed port file: src-path="{}", dst-path="{}"'.format(
1368 tmp_port_file.name, port_filename
1369 )
1370 )
1371 return
1372 except PermissionError:
1373 logging.info(
1374 'Permission error while attempting to rename port file; retrying in {} second: src-path="{}", dst-path="{}"'.format(
1375 retry_delay_s, tmp_port_file.name, port_filename
1376 )
1377 )
1378
1379 if attempt == 0:
1380 raise
1381
1382 time.sleep(retry_delay_s)
1383
1384
1385# A tracing session descriptor.
1386#
1387# In the constructor, `traces` is a list of LTTng traces (`LttngTrace`
1388# objects).
1389class LttngTracingSessionDescriptor:
1390 def __init__(
1391 self, name, tracing_session_id, hostname, live_timer_freq, client_count, traces
1392 ):
1393 for trace in traces:
1394 if name not in trace.path:
1395 fmt = 'Tracing session name must be part of every trace path (`{}` not found in `{}`)'
1396 raise ValueError(fmt.format(name, trace.path))
1397
1398 self._traces = traces
1399 stream_count = sum([len(t) + 1 for t in traces])
1400 self._info = _LttngLiveViewerTracingSessionInfo(
1401 tracing_session_id,
1402 live_timer_freq,
1403 client_count,
1404 stream_count,
1405 hostname,
1406 name,
1407 )
1408
1409 @property
1410 def traces(self):
1411 return self._traces
1412
1413 @property
1414 def info(self):
1415 return self._info
1416
1417
1418def _tracing_session_descriptors_from_arg(string):
1419 # Format is:
1420 # NAME,ID,HOSTNAME,FREQ,CLIENTS,TRACEPATH[,TRACEPATH]...
1421 parts = string.split(',')
1422 name = parts[0]
1423 tracing_session_id = int(parts[1])
1424 hostname = parts[2]
1425 live_timer_freq = int(parts[3])
1426 client_count = int(parts[4])
1427 traces = [LttngTrace(path) for path in parts[5:]]
1428 return LttngTracingSessionDescriptor(
1429 name, tracing_session_id, hostname, live_timer_freq, client_count, traces
1430 )
1431
1432
1433def _loglevel_parser(string):
1434 loglevels = {'info': logging.INFO, 'warning': logging.WARNING}
1435 if string not in loglevels:
1436 msg = "{} is not a valid loglevel".format(string)
1437 raise argparse.ArgumentTypeError(msg)
1438 return loglevels[string]
1439
1440
1441if __name__ == '__main__':
1442 logging.basicConfig(format='# %(asctime)-25s%(message)s')
1443 parser = argparse.ArgumentParser(
1444 description='LTTng-live protocol mocker', add_help=False
1445 )
1446 parser.add_argument(
1447 '--log-level',
1448 default='warning',
1449 choices=['info', 'warning'],
1450 help='The loglevel to be used.',
1451 )
1452
1453 loglevel_namespace, remaining_args = parser.parse_known_args()
1454 logging.getLogger().setLevel(_loglevel_parser(loglevel_namespace.log_level))
1455
1456 parser.add_argument(
1457 '--port-filename',
1458 help='The final port file. This file is present when the server is ready to receive connection.',
1459 required=True,
1460 )
1461 parser.add_argument(
1462 '--max-query-data-response-size',
1463 type=int,
1464 help='The maximum size of control data response in bytes',
1465 )
1466 parser.add_argument(
1467 'sessions',
1468 nargs="+",
1469 metavar="SESSION",
1470 type=_tracing_session_descriptors_from_arg,
1471 help='A session configuration. There is no space after comma. Format is: NAME,ID,HOSTNAME,FREQ,CLIENTS,TRACEPATH[,TRACEPATH]....',
1472 )
1473 parser.add_argument(
1474 '-h',
1475 '--help',
1476 action='help',
1477 default=argparse.SUPPRESS,
1478 help='Show this help message and exit.',
1479 )
1480
1481 args = parser.parse_args(args=remaining_args)
1482 try:
1483 LttngLiveServer(
1484 args.port_filename, args.sessions, args.max_query_data_response_size
1485 )
1486 except UnexpectedInput as exc:
1487 logging.error(str(exc))
1488 print(exc, file=sys.stderr)
1489 sys.exit(1)
This page took 0.026636 seconds and 4 git commands to generate.