tests/python: add local copy of typing module
[babeltrace.git] / tests / data / plugins / src.ctf.lttng-live / lttng_live_server.py
1 # SPDX-License-Identifier: MIT
2 #
3 # Copyright (C) 2019 Philippe Proulx <pproulx@efficios.com>
4 #
5
6 import os
7 import re
8 import sys
9 import json
10 import socket
11 import struct
12 import logging
13 import os.path
14 import argparse
15 import tempfile
16 import collections.abc
17 from collections import namedtuple
18
19
20 class UnexpectedInput(RuntimeError):
21 pass
22
23
24 # An entry within the index of an LTTng data stream.
25 class _LttngDataStreamIndexEntry:
26 def __init__(
27 self,
28 offset_bytes,
29 total_size_bits,
30 content_size_bits,
31 timestamp_begin,
32 timestamp_end,
33 events_discarded,
34 stream_class_id,
35 ):
36 self._offset_bytes = offset_bytes
37 self._total_size_bits = total_size_bits
38 self._content_size_bits = content_size_bits
39 self._timestamp_begin = timestamp_begin
40 self._timestamp_end = timestamp_end
41 self._events_discarded = events_discarded
42 self._stream_class_id = stream_class_id
43
44 @property
45 def offset_bytes(self):
46 return self._offset_bytes
47
48 @property
49 def total_size_bits(self):
50 return self._total_size_bits
51
52 @property
53 def total_size_bytes(self):
54 return self._total_size_bits // 8
55
56 @property
57 def content_size_bits(self):
58 return self._content_size_bits
59
60 @property
61 def content_size_bytes(self):
62 return self._content_size_bits // 8
63
64 @property
65 def timestamp_begin(self):
66 return self._timestamp_begin
67
68 @property
69 def timestamp_end(self):
70 return self._timestamp_end
71
72 @property
73 def events_discarded(self):
74 return self._events_discarded
75
76 @property
77 def stream_class_id(self):
78 return self._stream_class_id
79
80
81 # An entry within the index of an LTTng data stream. While a stream beacon entry
82 # is conceptually unrelated to an index, it is sent as a reply to a
83 # LttngLiveViewerGetNextDataStreamIndexEntryCommand
84 class _LttngDataStreamBeaconEntry:
85 def __init__(self, stream_class_id, timestamp):
86 self._stream_class_id = stream_class_id
87 self._timestamp = timestamp
88
89 @property
90 def timestamp(self):
91 return self._timestamp
92
93 @property
94 def stream_class_id(self):
95 return self._stream_class_id
96
97
98 class _LttngLiveViewerCommand:
99 def __init__(self, version):
100 self._version = version
101
102 @property
103 def version(self):
104 return self._version
105
106
107 class _LttngLiveViewerConnectCommand(_LttngLiveViewerCommand):
108 def __init__(self, version, viewer_session_id, major, minor):
109 super().__init__(version)
110 self._viewer_session_id = viewer_session_id
111 self._major = major
112 self._minor = minor
113
114 @property
115 def viewer_session_id(self):
116 return self._viewer_session_id
117
118 @property
119 def major(self):
120 return self._major
121
122 @property
123 def minor(self):
124 return self._minor
125
126
127 class _LttngLiveViewerConnectReply:
128 def __init__(self, viewer_session_id, major, minor):
129 self._viewer_session_id = viewer_session_id
130 self._major = major
131 self._minor = minor
132
133 @property
134 def viewer_session_id(self):
135 return self._viewer_session_id
136
137 @property
138 def major(self):
139 return self._major
140
141 @property
142 def minor(self):
143 return self._minor
144
145
146 class _LttngLiveViewerGetTracingSessionInfosCommand(_LttngLiveViewerCommand):
147 pass
148
149
150 class _LttngLiveViewerTracingSessionInfo:
151 def __init__(
152 self,
153 tracing_session_id,
154 live_timer_freq,
155 client_count,
156 stream_count,
157 hostname,
158 name,
159 ):
160 self._tracing_session_id = tracing_session_id
161 self._live_timer_freq = live_timer_freq
162 self._client_count = client_count
163 self._stream_count = stream_count
164 self._hostname = hostname
165 self._name = name
166
167 @property
168 def tracing_session_id(self):
169 return self._tracing_session_id
170
171 @property
172 def live_timer_freq(self):
173 return self._live_timer_freq
174
175 @property
176 def client_count(self):
177 return self._client_count
178
179 @property
180 def stream_count(self):
181 return self._stream_count
182
183 @property
184 def hostname(self):
185 return self._hostname
186
187 @property
188 def name(self):
189 return self._name
190
191
192 class _LttngLiveViewerGetTracingSessionInfosReply:
193 def __init__(self, tracing_session_infos):
194 self._tracing_session_infos = tracing_session_infos
195
196 @property
197 def tracing_session_infos(self):
198 return self._tracing_session_infos
199
200
201 class _LttngLiveViewerAttachToTracingSessionCommand(_LttngLiveViewerCommand):
202 class SeekType:
203 BEGINNING = 1
204 LAST = 2
205
206 def __init__(self, version, tracing_session_id, offset, seek_type):
207 super().__init__(version)
208 self._tracing_session_id = tracing_session_id
209 self._offset = offset
210 self._seek_type = seek_type
211
212 @property
213 def tracing_session_id(self):
214 return self._tracing_session_id
215
216 @property
217 def offset(self):
218 return self._offset
219
220 @property
221 def seek_type(self):
222 return self._seek_type
223
224
225 class _LttngLiveViewerStreamInfo:
226 def __init__(self, id, trace_id, is_metadata, path, channel_name):
227 self._id = id
228 self._trace_id = trace_id
229 self._is_metadata = is_metadata
230 self._path = path
231 self._channel_name = channel_name
232
233 @property
234 def id(self):
235 return self._id
236
237 @property
238 def trace_id(self):
239 return self._trace_id
240
241 @property
242 def is_metadata(self):
243 return self._is_metadata
244
245 @property
246 def path(self):
247 return self._path
248
249 @property
250 def channel_name(self):
251 return self._channel_name
252
253
254 class _LttngLiveViewerAttachToTracingSessionReply:
255 class Status:
256 OK = 1
257 ALREADY = 2
258 UNKNOWN = 3
259 NOT_LIVE = 4
260 SEEK_ERROR = 5
261 NO_SESSION = 6
262
263 def __init__(self, status, stream_infos):
264 self._status = status
265 self._stream_infos = stream_infos
266
267 @property
268 def status(self):
269 return self._status
270
271 @property
272 def stream_infos(self):
273 return self._stream_infos
274
275
276 class _LttngLiveViewerGetNextDataStreamIndexEntryCommand(_LttngLiveViewerCommand):
277 def __init__(self, version, stream_id):
278 super().__init__(version)
279 self._stream_id = stream_id
280
281 @property
282 def stream_id(self):
283 return self._stream_id
284
285
286 class _LttngLiveViewerGetNextDataStreamIndexEntryReply:
287 class Status:
288 OK = 1
289 RETRY = 2
290 HUP = 3
291 ERROR = 4
292 INACTIVE = 5
293 EOF = 6
294
295 def __init__(self, status, index_entry, has_new_metadata, has_new_data_stream):
296 self._status = status
297 self._index_entry = index_entry
298 self._has_new_metadata = has_new_metadata
299 self._has_new_data_stream = has_new_data_stream
300
301 @property
302 def status(self):
303 return self._status
304
305 @property
306 def index_entry(self):
307 return self._index_entry
308
309 @property
310 def has_new_metadata(self):
311 return self._has_new_metadata
312
313 @property
314 def has_new_data_stream(self):
315 return self._has_new_data_stream
316
317
318 class _LttngLiveViewerGetDataStreamPacketDataCommand(_LttngLiveViewerCommand):
319 def __init__(self, version, stream_id, offset, req_length):
320 super().__init__(version)
321 self._stream_id = stream_id
322 self._offset = offset
323 self._req_length = req_length
324
325 @property
326 def stream_id(self):
327 return self._stream_id
328
329 @property
330 def offset(self):
331 return self._offset
332
333 @property
334 def req_length(self):
335 return self._req_length
336
337
338 class _LttngLiveViewerGetDataStreamPacketDataReply:
339 class Status:
340 OK = 1
341 RETRY = 2
342 ERROR = 3
343 EOF = 4
344
345 def __init__(self, status, data, has_new_metadata, has_new_data_stream):
346 self._status = status
347 self._data = data
348 self._has_new_metadata = has_new_metadata
349 self._has_new_data_stream = has_new_data_stream
350
351 @property
352 def status(self):
353 return self._status
354
355 @property
356 def data(self):
357 return self._data
358
359 @property
360 def has_new_metadata(self):
361 return self._has_new_metadata
362
363 @property
364 def has_new_data_stream(self):
365 return self._has_new_data_stream
366
367
368 class _LttngLiveViewerGetMetadataStreamDataCommand(_LttngLiveViewerCommand):
369 def __init__(self, version, stream_id):
370 super().__init__(version)
371 self._stream_id = stream_id
372
373 @property
374 def stream_id(self):
375 return self._stream_id
376
377
378 class _LttngLiveViewerGetMetadataStreamDataContentReply:
379 class Status:
380 OK = 1
381 NO_NEW = 2
382 ERROR = 3
383
384 def __init__(self, status, data):
385 self._status = status
386 self._data = data
387
388 @property
389 def status(self):
390 return self._status
391
392 @property
393 def data(self):
394 return self._data
395
396
397 class _LttngLiveViewerGetNewStreamInfosCommand(_LttngLiveViewerCommand):
398 def __init__(self, version, tracing_session_id):
399 super().__init__(version)
400 self._tracing_session_id = tracing_session_id
401
402 @property
403 def tracing_session_id(self):
404 return self._tracing_session_id
405
406
407 class _LttngLiveViewerGetNewStreamInfosReply:
408 class Status:
409 OK = 1
410 NO_NEW = 2
411 ERROR = 3
412 HUP = 4
413
414 def __init__(self, status, stream_infos):
415 self._status = status
416 self._stream_infos = stream_infos
417
418 @property
419 def status(self):
420 return self._status
421
422 @property
423 def stream_infos(self):
424 return self._stream_infos
425
426
427 class _LttngLiveViewerCreateViewerSessionCommand(_LttngLiveViewerCommand):
428 pass
429
430
431 class _LttngLiveViewerCreateViewerSessionReply:
432 class Status:
433 OK = 1
434 ERROR = 2
435
436 def __init__(self, status):
437 self._status = status
438
439 @property
440 def status(self):
441 return self._status
442
443
444 class _LttngLiveViewerDetachFromTracingSessionCommand(_LttngLiveViewerCommand):
445 def __init__(self, version, tracing_session_id):
446 super().__init__(version)
447 self._tracing_session_id = tracing_session_id
448
449 @property
450 def tracing_session_id(self):
451 return self._tracing_session_id
452
453
454 class _LttngLiveViewerDetachFromTracingSessionReply:
455 class Status:
456 OK = 1
457 UNKNOWN = 2
458 ERROR = 3
459
460 def __init__(self, status):
461 self._status = status
462
463 @property
464 def status(self):
465 return self._status
466
467
468 # An LTTng live protocol codec can convert bytes to command objects and
469 # reply objects to bytes.
470 class _LttngLiveViewerProtocolCodec:
471 _COMMAND_HEADER_STRUCT_FMT = "QII"
472 _COMMAND_HEADER_SIZE_BYTES = struct.calcsize(_COMMAND_HEADER_STRUCT_FMT)
473
474 def __init__(self):
475 pass
476
477 def _unpack(self, fmt, data, offset=0):
478 fmt = "!" + fmt
479 return struct.unpack_from(fmt, data, offset)
480
481 def _unpack_payload(self, fmt, data):
482 return self._unpack(
483 fmt, data, _LttngLiveViewerProtocolCodec._COMMAND_HEADER_SIZE_BYTES
484 )
485
486 def decode(self, data):
487 if len(data) < self._COMMAND_HEADER_SIZE_BYTES:
488 # Not enough data to read the command header
489 return
490
491 payload_size, cmd_type, version = self._unpack(
492 self._COMMAND_HEADER_STRUCT_FMT, data
493 )
494 logging.info(
495 "Decoded command header: payload-size={}, cmd-type={}, version={}".format(
496 payload_size, cmd_type, version
497 )
498 )
499
500 if len(data) < self._COMMAND_HEADER_SIZE_BYTES + payload_size:
501 # Not enough data to read the whole command
502 return
503
504 if cmd_type == 1:
505 viewer_session_id, major, minor, conn_type = self._unpack_payload(
506 "QIII", data
507 )
508 return _LttngLiveViewerConnectCommand(
509 version, viewer_session_id, major, minor
510 )
511 elif cmd_type == 2:
512 return _LttngLiveViewerGetTracingSessionInfosCommand(version)
513 elif cmd_type == 3:
514 tracing_session_id, offset, seek_type = self._unpack_payload("QQI", data)
515 return _LttngLiveViewerAttachToTracingSessionCommand(
516 version, tracing_session_id, offset, seek_type
517 )
518 elif cmd_type == 4:
519 (stream_id,) = self._unpack_payload("Q", data)
520 return _LttngLiveViewerGetNextDataStreamIndexEntryCommand(
521 version, stream_id
522 )
523 elif cmd_type == 5:
524 stream_id, offset, req_length = self._unpack_payload("QQI", data)
525 return _LttngLiveViewerGetDataStreamPacketDataCommand(
526 version, stream_id, offset, req_length
527 )
528 elif cmd_type == 6:
529 (stream_id,) = self._unpack_payload("Q", data)
530 return _LttngLiveViewerGetMetadataStreamDataCommand(version, stream_id)
531 elif cmd_type == 7:
532 (tracing_session_id,) = self._unpack_payload("Q", data)
533 return _LttngLiveViewerGetNewStreamInfosCommand(version, tracing_session_id)
534 elif cmd_type == 8:
535 return _LttngLiveViewerCreateViewerSessionCommand(version)
536 elif cmd_type == 9:
537 (tracing_session_id,) = self._unpack_payload("Q", data)
538 return _LttngLiveViewerDetachFromTracingSessionCommand(
539 version, tracing_session_id
540 )
541 else:
542 raise UnexpectedInput("Unknown command type {}".format(cmd_type))
543
544 def _pack(self, fmt, *args):
545 # Force network byte order
546 return struct.pack("!" + fmt, *args)
547
548 def _encode_zero_padded_str(self, string, length):
549 data = string.encode()
550 return data.ljust(length, b"\x00")
551
552 def _encode_stream_info(self, info):
553 data = self._pack("QQI", info.id, info.trace_id, int(info.is_metadata))
554 data += self._encode_zero_padded_str(info.path, 4096)
555 data += self._encode_zero_padded_str(info.channel_name, 255)
556 return data
557
558 def _get_has_new_stuff_flags(self, has_new_metadata, has_new_data_streams):
559 flags = 0
560
561 if has_new_metadata:
562 flags |= 1
563
564 if has_new_data_streams:
565 flags |= 2
566
567 return flags
568
569 def encode(self, reply):
570 if type(reply) is _LttngLiveViewerConnectReply:
571 data = self._pack(
572 "QIII", reply.viewer_session_id, reply.major, reply.minor, 2
573 )
574 elif type(reply) is _LttngLiveViewerGetTracingSessionInfosReply:
575 data = self._pack("I", len(reply.tracing_session_infos))
576
577 for info in reply.tracing_session_infos:
578 data += self._pack(
579 "QIII",
580 info.tracing_session_id,
581 info.live_timer_freq,
582 info.client_count,
583 info.stream_count,
584 )
585 data += self._encode_zero_padded_str(info.hostname, 64)
586 data += self._encode_zero_padded_str(info.name, 255)
587 elif type(reply) is _LttngLiveViewerAttachToTracingSessionReply:
588 data = self._pack("II", reply.status, len(reply.stream_infos))
589
590 for info in reply.stream_infos:
591 data += self._encode_stream_info(info)
592 elif type(reply) is _LttngLiveViewerGetNextDataStreamIndexEntryReply:
593 index_format = "QQQQQQQII"
594 entry = reply.index_entry
595 flags = self._get_has_new_stuff_flags(
596 reply.has_new_metadata, reply.has_new_data_stream
597 )
598
599 if type(entry) is _LttngDataStreamIndexEntry:
600 data = self._pack(
601 index_format,
602 entry.offset_bytes,
603 entry.total_size_bits,
604 entry.content_size_bits,
605 entry.timestamp_begin,
606 entry.timestamp_end,
607 entry.events_discarded,
608 entry.stream_class_id,
609 reply.status,
610 flags,
611 )
612 else:
613 assert type(entry) is _LttngDataStreamBeaconEntry
614 data = self._pack(
615 index_format,
616 0,
617 0,
618 0,
619 0,
620 entry.timestamp,
621 0,
622 entry.stream_class_id,
623 reply.status,
624 flags,
625 )
626 elif type(reply) is _LttngLiveViewerGetDataStreamPacketDataReply:
627 flags = self._get_has_new_stuff_flags(
628 reply.has_new_metadata, reply.has_new_data_stream
629 )
630 data = self._pack("III", reply.status, len(reply.data), flags)
631 data += reply.data
632 elif type(reply) is _LttngLiveViewerGetMetadataStreamDataContentReply:
633 data = self._pack("QI", len(reply.data), reply.status)
634 data += reply.data
635 elif type(reply) is _LttngLiveViewerGetNewStreamInfosReply:
636 data = self._pack("II", reply.status, len(reply.stream_infos))
637
638 for info in reply.stream_infos:
639 data += self._encode_stream_info(info)
640 elif type(reply) is _LttngLiveViewerCreateViewerSessionReply:
641 data = self._pack("I", reply.status)
642 elif type(reply) is _LttngLiveViewerDetachFromTracingSessionReply:
643 data = self._pack("I", reply.status)
644 else:
645 raise ValueError(
646 "Unknown reply object with class `{}`".format(reply.__class__.__name__)
647 )
648
649 return data
650
651
652 def _get_entry_timestamp_begin(entry):
653 if type(entry) is _LttngDataStreamBeaconEntry:
654 return entry.timestamp
655 else:
656 assert type(entry) is _LttngDataStreamIndexEntry
657 return entry.timestamp_begin
658
659
660 # The index of an LTTng data stream, a sequence of index entries.
661 class _LttngDataStreamIndex(collections.abc.Sequence):
662 def __init__(self, path, beacons):
663 self._path = path
664 self._build()
665
666 if beacons:
667 stream_class_id = self._entries[0].stream_class_id
668 beacons = [
669 _LttngDataStreamBeaconEntry(stream_class_id, ts) for ts in beacons
670 ]
671 self._add_beacons(beacons)
672
673 logging.info(
674 'Built data stream index entries: path="{}", count={}'.format(
675 path, len(self._entries)
676 )
677 )
678
679 def _build(self):
680 self._entries = []
681 assert os.path.isfile(self._path)
682
683 with open(self._path, "rb") as f:
684 # Read header first
685 fmt = ">IIII"
686 size = struct.calcsize(fmt)
687 data = f.read(size)
688 assert len(data) == size
689 magic, index_major, index_minor, index_entry_length = struct.unpack(
690 fmt, data
691 )
692 assert magic == 0xC1F1DCC1
693
694 # Read index entries
695 fmt = ">QQQQQQQ"
696 size = struct.calcsize(fmt)
697
698 while True:
699 logging.debug(
700 'Decoding data stream index entry: path="{}", offset={}'.format(
701 self._path, f.tell()
702 )
703 )
704 data = f.read(size)
705
706 if not data:
707 # Done
708 break
709
710 assert len(data) == size
711 (
712 offset_bytes,
713 total_size_bits,
714 content_size_bits,
715 timestamp_begin,
716 timestamp_end,
717 events_discarded,
718 stream_class_id,
719 ) = struct.unpack(fmt, data)
720
721 self._entries.append(
722 _LttngDataStreamIndexEntry(
723 offset_bytes,
724 total_size_bits,
725 content_size_bits,
726 timestamp_begin,
727 timestamp_end,
728 events_discarded,
729 stream_class_id,
730 )
731 )
732
733 # Skip anything else before the next entry
734 f.seek(index_entry_length - size, os.SEEK_CUR)
735
736 def _add_beacons(self, beacons):
737 # Assumes entries[n + 1].timestamp_end >= entries[n].timestamp_begin
738 def sort_key(entry):
739 if type(entry) is _LttngDataStreamBeaconEntry:
740 return entry.timestamp
741 else:
742 return entry.timestamp_end
743
744 self._entries += beacons
745 self._entries.sort(key=sort_key)
746
747 def __getitem__(self, index):
748 return self._entries[index]
749
750 def __len__(self):
751 return len(self._entries)
752
753 @property
754 def path(self):
755 return self._path
756
757
758 # An LTTng data stream.
759 class _LttngDataStream:
760 def __init__(self, path, beacons):
761 self._path = path
762 filename = os.path.basename(path)
763 match = re.match(r"(.*)_\d+", filename)
764 self._channel_name = match.group(1)
765 trace_dir = os.path.dirname(path)
766 index_path = os.path.join(trace_dir, "index", filename + ".idx")
767 self._index = _LttngDataStreamIndex(index_path, beacons)
768 assert os.path.isfile(path)
769 self._file = open(path, "rb")
770 logging.info(
771 'Built data stream: path="{}", channel-name="{}"'.format(
772 path, self._channel_name
773 )
774 )
775
776 @property
777 def path(self):
778 return self._path
779
780 @property
781 def channel_name(self):
782 return self._channel_name
783
784 @property
785 def index(self):
786 return self._index
787
788 def get_data(self, offset_bytes, len_bytes):
789 self._file.seek(offset_bytes)
790 return self._file.read(len_bytes)
791
792
793 class _LttngMetadataStreamSection:
794 def __init__(self, timestamp, data):
795 self._timestamp = timestamp
796 if data is None:
797 self._data = bytes()
798 else:
799 self._data = data
800 logging.info(
801 "Built metadata stream section: ts={}, data-len={}".format(
802 self._timestamp, len(self._data)
803 )
804 )
805
806 @property
807 def timestamp(self):
808 return self._timestamp
809
810 @property
811 def data(self):
812 return self._data
813
814
815 # An LTTng metadata stream.
816 class _LttngMetadataStream:
817 def __init__(self, metadata_file_path, config_sections):
818 self._path = metadata_file_path
819 self._sections = config_sections
820 logging.info(
821 "Built metadata stream: path={}, section-len={}".format(
822 self._path, len(self._sections)
823 )
824 )
825
826 @property
827 def path(self):
828 return self._path
829
830 @property
831 def sections(self):
832 return self._sections
833
834
835 LttngMetadataConfigSection = namedtuple(
836 "LttngMetadataConfigSection", ["line", "timestamp", "is_empty"]
837 )
838
839
840 def _parse_metadata_sections_config(config_sections):
841 assert config_sections is not None
842 config_metadata_sections = []
843 append_empty_section = False
844 last_timestamp = 0
845 last_line = 0
846
847 for config_section in config_sections:
848 if config_section == "empty":
849 # Found a empty section marker. Actually append the section at the
850 # timestamp of the next concrete section.
851 append_empty_section = True
852 else:
853 assert type(config_section) is dict
854 line = config_section.get("line")
855 ts = config_section.get("timestamp")
856
857 # Sections' timestamps and lines must both be increasing.
858 assert ts > last_timestamp
859 last_timestamp = ts
860 assert line > last_line
861 last_line = line
862
863 if append_empty_section:
864 config_metadata_sections.append(
865 LttngMetadataConfigSection(line, ts, True)
866 )
867 append_empty_section = False
868
869 config_metadata_sections.append(LttngMetadataConfigSection(line, ts, False))
870
871 return config_metadata_sections
872
873
874 def _split_metadata_sections(metadata_file_path, raw_config_sections):
875 assert isinstance(raw_config_sections, collections.abc.Sequence)
876
877 parsed_sections = _parse_metadata_sections_config(raw_config_sections)
878
879 sections = []
880 with open(metadata_file_path, "r") as metadata_file:
881 metadata_lines = [line for line in metadata_file]
882
883 config_metadata_sections_idx = 0
884 curr_metadata_section = bytearray()
885
886 for idx, line_content in enumerate(metadata_lines):
887 # Add one to the index to convert from the zero-indexing of the
888 # enumerate() function to the one-indexing used by humans when
889 # viewing a text file.
890 curr_line_number = idx + 1
891
892 # If there are no more sections, simply append the line.
893 if config_metadata_sections_idx + 1 >= len(parsed_sections):
894 curr_metadata_section += bytearray(line_content, "utf8")
895 continue
896
897 next_section_line_number = parsed_sections[
898 config_metadata_sections_idx + 1
899 ].line
900
901 # If the next section begins at the current line, create a
902 # section with the metadata we gathered so far.
903 if curr_line_number >= next_section_line_number:
904 # Flushing the metadata of the current section.
905 sections.append(
906 _LttngMetadataStreamSection(
907 parsed_sections[config_metadata_sections_idx].timestamp,
908 bytes(curr_metadata_section),
909 )
910 )
911
912 # Move to the next section.
913 config_metadata_sections_idx += 1
914
915 # Clear old content and append current line for the next section.
916 curr_metadata_section.clear()
917 curr_metadata_section += bytearray(line_content, "utf8")
918
919 # Append any empty sections.
920 while parsed_sections[config_metadata_sections_idx].is_empty:
921 sections.append(
922 _LttngMetadataStreamSection(
923 parsed_sections[config_metadata_sections_idx].timestamp, None
924 )
925 )
926 config_metadata_sections_idx += 1
927 else:
928 # Append line_content to the current metadata section.
929 curr_metadata_section += bytearray(line_content, "utf8")
930
931 # We iterated over all the lines of the metadata file. Close the current section.
932 sections.append(
933 _LttngMetadataStreamSection(
934 parsed_sections[config_metadata_sections_idx].timestamp,
935 bytes(curr_metadata_section),
936 )
937 )
938
939 return sections
940
941
942 # An LTTng trace, a sequence of LTTng data streams.
943 class LttngTrace(collections.abc.Sequence):
944 def __init__(self, trace_dir, metadata_sections, beacons):
945 assert os.path.isdir(trace_dir)
946 self._path = trace_dir
947 self._create_metadata_stream(trace_dir, metadata_sections)
948 self._create_data_streams(trace_dir, beacons)
949 logging.info('Built trace: path="{}"'.format(trace_dir))
950
951 def _create_data_streams(self, trace_dir, beacons):
952 data_stream_paths = []
953
954 for filename in os.listdir(trace_dir):
955 path = os.path.join(trace_dir, filename)
956
957 if not os.path.isfile(path):
958 continue
959
960 if filename.startswith("."):
961 continue
962
963 if filename == "metadata":
964 continue
965
966 data_stream_paths.append(path)
967
968 data_stream_paths.sort()
969 self._data_streams = []
970
971 for data_stream_path in data_stream_paths:
972 stream_name = os.path.basename(data_stream_path)
973 this_stream_beacons = None
974
975 if beacons is not None and stream_name in beacons:
976 this_stream_beacons = beacons[stream_name]
977
978 self._data_streams.append(
979 _LttngDataStream(data_stream_path, this_stream_beacons)
980 )
981
982 def _create_metadata_stream(self, trace_dir, config_metadata_sections):
983 metadata_path = os.path.join(trace_dir, "metadata")
984 metadata_sections = []
985
986 if config_metadata_sections is None:
987 with open(metadata_path, "rb") as metadata_file:
988 metadata_sections.append(
989 _LttngMetadataStreamSection(0, metadata_file.read())
990 )
991 else:
992 metadata_sections = _split_metadata_sections(
993 metadata_path, config_metadata_sections
994 )
995
996 self._metadata_stream = _LttngMetadataStream(metadata_path, metadata_sections)
997
998 @property
999 def path(self):
1000 return self._path
1001
1002 @property
1003 def metadata_stream(self):
1004 return self._metadata_stream
1005
1006 def __getitem__(self, index):
1007 return self._data_streams[index]
1008
1009 def __len__(self):
1010 return len(self._data_streams)
1011
1012
1013 # The state of a single data stream.
1014 class _LttngLiveViewerSessionDataStreamState:
1015 def __init__(self, ts_state, info, data_stream, metadata_stream_id):
1016 self._ts_state = ts_state
1017 self._info = info
1018 self._data_stream = data_stream
1019 self._metadata_stream_id = metadata_stream_id
1020 self._cur_index_entry_index = 0
1021 fmt = 'Built data stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
1022 logging.info(
1023 fmt.format(
1024 info.id,
1025 ts_state.tracing_session_descriptor.info.tracing_session_id,
1026 ts_state.tracing_session_descriptor.info.name,
1027 data_stream.path,
1028 )
1029 )
1030
1031 @property
1032 def tracing_session_state(self):
1033 return self._ts_state
1034
1035 @property
1036 def info(self):
1037 return self._info
1038
1039 @property
1040 def data_stream(self):
1041 return self._data_stream
1042
1043 @property
1044 def cur_index_entry(self):
1045 if self._cur_index_entry_index == len(self._data_stream.index):
1046 return
1047
1048 return self._data_stream.index[self._cur_index_entry_index]
1049
1050 def goto_next_index_entry(self):
1051 self._cur_index_entry_index += 1
1052
1053
1054 # The state of a single metadata stream.
1055 class _LttngLiveViewerSessionMetadataStreamState:
1056 def __init__(self, ts_state, info, metadata_stream):
1057 self._ts_state = ts_state
1058 self._info = info
1059 self._metadata_stream = metadata_stream
1060 self._cur_metadata_stream_section_index = 0
1061 if len(metadata_stream.sections) > 1:
1062 self._next_metadata_stream_section_timestamp = metadata_stream.sections[
1063 1
1064 ].timestamp
1065 else:
1066 self._next_metadata_stream_section_timestamp = None
1067
1068 self._is_sent = False
1069 fmt = 'Built metadata stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
1070 logging.info(
1071 fmt.format(
1072 info.id,
1073 ts_state.tracing_session_descriptor.info.tracing_session_id,
1074 ts_state.tracing_session_descriptor.info.name,
1075 metadata_stream.path,
1076 )
1077 )
1078
1079 @property
1080 def trace_session_state(self):
1081 return self._trace_session_state
1082
1083 @property
1084 def info(self):
1085 return self._info
1086
1087 @property
1088 def metadata_stream(self):
1089 return self._metadata_stream
1090
1091 @property
1092 def is_sent(self):
1093 return self._is_sent
1094
1095 @is_sent.setter
1096 def is_sent(self, value):
1097 self._is_sent = value
1098
1099 @property
1100 def cur_section(self):
1101 fmt = "Get current metadata section: section-idx={}"
1102 logging.info(fmt.format(self._cur_metadata_stream_section_index))
1103 if self._cur_metadata_stream_section_index == len(
1104 self._metadata_stream.sections
1105 ):
1106 return
1107
1108 return self._metadata_stream.sections[self._cur_metadata_stream_section_index]
1109
1110 def goto_next_section(self):
1111 self._cur_metadata_stream_section_index += 1
1112 if self.cur_section:
1113 self._next_metadata_stream_section_timestamp = self.cur_section.timestamp
1114 else:
1115 self._next_metadata_stream_section_timestamp = None
1116
1117 @property
1118 def next_section_timestamp(self):
1119 return self._next_metadata_stream_section_timestamp
1120
1121
1122 # A tracing session descriptor.
1123 #
1124 # In the constructor, `traces` is a list of LTTng traces (`LttngTrace`
1125 # objects).
1126 class LttngTracingSessionDescriptor:
1127 def __init__(
1128 self, name, tracing_session_id, hostname, live_timer_freq, client_count, traces
1129 ):
1130 for trace in traces:
1131 if name not in trace.path:
1132 fmt = "Tracing session name must be part of every trace path (`{}` not found in `{}`)"
1133 raise ValueError(fmt.format(name, trace.path))
1134
1135 self._traces = traces
1136 stream_count = sum([len(t) + 1 for t in traces])
1137 self._info = _LttngLiveViewerTracingSessionInfo(
1138 tracing_session_id,
1139 live_timer_freq,
1140 client_count,
1141 stream_count,
1142 hostname,
1143 name,
1144 )
1145
1146 @property
1147 def traces(self):
1148 return self._traces
1149
1150 @property
1151 def info(self):
1152 return self._info
1153
1154
1155 # The state of a tracing session.
1156 class _LttngLiveViewerSessionTracingSessionState:
1157 def __init__(self, tc_descr, base_stream_id):
1158 self._tc_descr = tc_descr
1159 self._stream_infos = []
1160 self._ds_states = {}
1161 self._ms_states = {}
1162 stream_id = base_stream_id
1163
1164 for trace in tc_descr.traces:
1165 trace_id = stream_id * 1000
1166
1167 # Metadata stream -> stream info and metadata stream state
1168 info = _LttngLiveViewerStreamInfo(
1169 stream_id, trace_id, True, trace.metadata_stream.path, "metadata"
1170 )
1171 self._stream_infos.append(info)
1172 self._ms_states[stream_id] = _LttngLiveViewerSessionMetadataStreamState(
1173 self, info, trace.metadata_stream
1174 )
1175 metadata_stream_id = stream_id
1176 stream_id += 1
1177
1178 # Data streams -> stream infos and data stream states
1179 for data_stream in trace:
1180 info = _LttngLiveViewerStreamInfo(
1181 stream_id,
1182 trace_id,
1183 False,
1184 data_stream.path,
1185 data_stream.channel_name,
1186 )
1187 self._stream_infos.append(info)
1188 self._ds_states[stream_id] = _LttngLiveViewerSessionDataStreamState(
1189 self, info, data_stream, metadata_stream_id
1190 )
1191 stream_id += 1
1192
1193 self._is_attached = False
1194 fmt = 'Built tracing session state: id={}, name="{}"'
1195 logging.info(fmt.format(tc_descr.info.tracing_session_id, tc_descr.info.name))
1196
1197 @property
1198 def tracing_session_descriptor(self):
1199 return self._tc_descr
1200
1201 @property
1202 def data_stream_states(self):
1203 return self._ds_states
1204
1205 @property
1206 def metadata_stream_states(self):
1207 return self._ms_states
1208
1209 @property
1210 def stream_infos(self):
1211 return self._stream_infos
1212
1213 @property
1214 def has_new_metadata(self):
1215 return any([not ms.is_sent for ms in self._ms_states.values()])
1216
1217 @property
1218 def is_attached(self):
1219 return self._is_attached
1220
1221 @is_attached.setter
1222 def is_attached(self, value):
1223 self._is_attached = value
1224
1225
1226 def needs_new_metadata_section(metadata_stream_state, latest_timestamp):
1227 if metadata_stream_state.next_section_timestamp is None:
1228 return False
1229
1230 if latest_timestamp >= metadata_stream_state.next_section_timestamp:
1231 return True
1232 else:
1233 return False
1234
1235
1236 # An LTTng live viewer session manages a view on tracing sessions
1237 # and replies to commands accordingly.
1238 class _LttngLiveViewerSession:
1239 def __init__(
1240 self,
1241 viewer_session_id,
1242 tracing_session_descriptors,
1243 max_query_data_response_size,
1244 ):
1245 self._viewer_session_id = viewer_session_id
1246 self._ts_states = {}
1247 self._stream_states = {}
1248 self._max_query_data_response_size = max_query_data_response_size
1249 total_stream_infos = 0
1250
1251 for ts_descr in tracing_session_descriptors:
1252 ts_state = _LttngLiveViewerSessionTracingSessionState(
1253 ts_descr, total_stream_infos
1254 )
1255 ts_id = ts_state.tracing_session_descriptor.info.tracing_session_id
1256 self._ts_states[ts_id] = ts_state
1257 total_stream_infos += len(ts_state.stream_infos)
1258
1259 # Update session's stream states to have the new states
1260 self._stream_states.update(ts_state.data_stream_states)
1261 self._stream_states.update(ts_state.metadata_stream_states)
1262
1263 self._command_handlers = {
1264 _LttngLiveViewerAttachToTracingSessionCommand: self._handle_attach_to_tracing_session_command,
1265 _LttngLiveViewerCreateViewerSessionCommand: self._handle_create_viewer_session_command,
1266 _LttngLiveViewerDetachFromTracingSessionCommand: self._handle_detach_from_tracing_session_command,
1267 _LttngLiveViewerGetDataStreamPacketDataCommand: self._handle_get_data_stream_packet_data_command,
1268 _LttngLiveViewerGetMetadataStreamDataCommand: self._handle_get_metadata_stream_data_command,
1269 _LttngLiveViewerGetNewStreamInfosCommand: self._handle_get_new_stream_infos_command,
1270 _LttngLiveViewerGetNextDataStreamIndexEntryCommand: self._handle_get_next_data_stream_index_entry_command,
1271 _LttngLiveViewerGetTracingSessionInfosCommand: self._handle_get_tracing_session_infos_command,
1272 }
1273
1274 @property
1275 def viewer_session_id(self):
1276 return self._viewer_session_id
1277
1278 def _get_tracing_session_state(self, tracing_session_id):
1279 if tracing_session_id not in self._ts_states:
1280 raise UnexpectedInput(
1281 "Unknown tracing session ID {}".format(tracing_session_id)
1282 )
1283
1284 return self._ts_states[tracing_session_id]
1285
1286 def _get_stream_state(self, stream_id):
1287 if stream_id not in self._stream_states:
1288 UnexpectedInput("Unknown stream ID {}".format(stream_id))
1289
1290 return self._stream_states[stream_id]
1291
1292 def handle_command(self, cmd):
1293 logging.info(
1294 "Handling command in viewer session: cmd-cls-name={}".format(
1295 cmd.__class__.__name__
1296 )
1297 )
1298 cmd_type = type(cmd)
1299
1300 if cmd_type not in self._command_handlers:
1301 raise UnexpectedInput(
1302 "Unexpected command: cmd-cls-name={}".format(cmd.__class__.__name__)
1303 )
1304
1305 return self._command_handlers[cmd_type](cmd)
1306
1307 def _handle_attach_to_tracing_session_command(self, cmd):
1308 fmt = 'Handling "attach to tracing session" command: ts-id={}, offset={}, seek-type={}'
1309 logging.info(fmt.format(cmd.tracing_session_id, cmd.offset, cmd.seek_type))
1310 ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
1311 info = ts_state.tracing_session_descriptor.info
1312
1313 if ts_state.is_attached:
1314 raise UnexpectedInput(
1315 "Cannot attach to tracing session `{}`: viewer is already attached".format(
1316 info.name
1317 )
1318 )
1319
1320 ts_state.is_attached = True
1321 status = _LttngLiveViewerAttachToTracingSessionReply.Status.OK
1322 return _LttngLiveViewerAttachToTracingSessionReply(
1323 status, ts_state.stream_infos
1324 )
1325
1326 def _handle_detach_from_tracing_session_command(self, cmd):
1327 fmt = 'Handling "detach from tracing session" command: ts-id={}'
1328 logging.info(fmt.format(cmd.tracing_session_id))
1329 ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
1330 info = ts_state.tracing_session_descriptor.info
1331
1332 if not ts_state.is_attached:
1333 raise UnexpectedInput(
1334 "Cannot detach to tracing session `{}`: viewer is not attached".format(
1335 info.name
1336 )
1337 )
1338
1339 ts_state.is_attached = False
1340 status = _LttngLiveViewerDetachFromTracingSessionReply.Status.OK
1341 return _LttngLiveViewerDetachFromTracingSessionReply(status)
1342
1343 def _handle_get_next_data_stream_index_entry_command(self, cmd):
1344 fmt = 'Handling "get next data stream index entry" command: stream-id={}'
1345 logging.info(fmt.format(cmd.stream_id))
1346 stream_state = self._get_stream_state(cmd.stream_id)
1347 metadata_stream_state = self._get_stream_state(stream_state._metadata_stream_id)
1348
1349 if type(stream_state) is not _LttngLiveViewerSessionDataStreamState:
1350 raise UnexpectedInput(
1351 "Stream with ID {} is not a data stream".format(cmd.stream_id)
1352 )
1353
1354 if stream_state.cur_index_entry is None:
1355 # The viewer is done reading this stream
1356 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.HUP
1357
1358 # Dummy data stream index entry to use with the `HUP` status
1359 # (the reply needs one, but the viewer ignores it)
1360 index_entry = _LttngDataStreamIndexEntry(0, 0, 0, 0, 0, 0, 0)
1361
1362 return _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1363 status, index_entry, False, False
1364 )
1365
1366 timestamp_begin = _get_entry_timestamp_begin(stream_state.cur_index_entry)
1367
1368 if needs_new_metadata_section(metadata_stream_state, timestamp_begin):
1369 metadata_stream_state.is_sent = False
1370 metadata_stream_state.goto_next_section()
1371
1372 # The viewer only checks the `has_new_metadata` flag if the
1373 # reply's status is `OK`, so we need to provide an index here
1374 has_new_metadata = stream_state.tracing_session_state.has_new_metadata
1375 if type(stream_state.cur_index_entry) is _LttngDataStreamIndexEntry:
1376 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.OK
1377 else:
1378 assert type(stream_state.cur_index_entry) is _LttngDataStreamBeaconEntry
1379 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.INACTIVE
1380
1381 reply = _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1382 status, stream_state.cur_index_entry, has_new_metadata, False
1383 )
1384 stream_state.goto_next_index_entry()
1385 return reply
1386
1387 def _handle_get_data_stream_packet_data_command(self, cmd):
1388 fmt = 'Handling "get data stream packet data" command: stream-id={}, offset={}, req-length={}'
1389 logging.info(fmt.format(cmd.stream_id, cmd.offset, cmd.req_length))
1390 stream_state = self._get_stream_state(cmd.stream_id)
1391 data_response_length = cmd.req_length
1392
1393 if type(stream_state) is not _LttngLiveViewerSessionDataStreamState:
1394 raise UnexpectedInput(
1395 "Stream with ID {} is not a data stream".format(cmd.stream_id)
1396 )
1397
1398 if stream_state.tracing_session_state.has_new_metadata:
1399 status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.ERROR
1400 return _LttngLiveViewerGetDataStreamPacketDataReply(
1401 status, bytes(), True, False
1402 )
1403
1404 if self._max_query_data_response_size:
1405 # Enforce a server side limit on the query requested length.
1406 # To ensure that the transaction terminate take the minimum of both
1407 # value.
1408 data_response_length = min(
1409 cmd.req_length, self._max_query_data_response_size
1410 )
1411 fmt = 'Limiting "get data stream packet data" command: req-length={} actual response size={}'
1412 logging.info(fmt.format(cmd.req_length, data_response_length))
1413
1414 data = stream_state.data_stream.get_data(cmd.offset, data_response_length)
1415 status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.OK
1416 return _LttngLiveViewerGetDataStreamPacketDataReply(status, data, False, False)
1417
1418 def _handle_get_metadata_stream_data_command(self, cmd):
1419 fmt = 'Handling "get metadata stream data" command: stream-id={}'
1420 logging.info(fmt.format(cmd.stream_id))
1421 metadata_stream_state = self._get_stream_state(cmd.stream_id)
1422
1423 if (
1424 type(metadata_stream_state)
1425 is not _LttngLiveViewerSessionMetadataStreamState
1426 ):
1427 raise UnexpectedInput(
1428 "Stream with ID {} is not a metadata stream".format(cmd.stream_id)
1429 )
1430
1431 if metadata_stream_state.is_sent:
1432 status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.NO_NEW
1433 return _LttngLiveViewerGetMetadataStreamDataContentReply(status, bytes())
1434
1435 metadata_stream_state.is_sent = True
1436 status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.OK
1437 metadata_section = metadata_stream_state.cur_section
1438
1439 # If we are sending an empty section, ready the next one right away.
1440 if len(metadata_section.data) == 0:
1441 metadata_stream_state.is_sent = False
1442 metadata_stream_state.goto_next_section()
1443
1444 fmt = 'Replying to "get metadata stream data" command: metadata-size={}'
1445 logging.info(fmt.format(len(metadata_section.data)))
1446 return _LttngLiveViewerGetMetadataStreamDataContentReply(
1447 status, metadata_section.data
1448 )
1449
1450 def _handle_get_new_stream_infos_command(self, cmd):
1451 fmt = 'Handling "get new stream infos" command: ts-id={}'
1452 logging.info(fmt.format(cmd.tracing_session_id))
1453
1454 # As of this version, all the tracing session's stream infos are
1455 # always given to the viewer when sending the "attach to tracing
1456 # session" reply, so there's nothing new here. Return the `HUP`
1457 # status as, if we're handling this command, the viewer consumed
1458 # all the existing data streams.
1459 status = _LttngLiveViewerGetNewStreamInfosReply.Status.HUP
1460 return _LttngLiveViewerGetNewStreamInfosReply(status, [])
1461
1462 def _handle_get_tracing_session_infos_command(self, cmd):
1463 logging.info('Handling "get tracing session infos" command.')
1464 infos = [
1465 tss.tracing_session_descriptor.info for tss in self._ts_states.values()
1466 ]
1467 infos.sort(key=lambda info: info.name)
1468 return _LttngLiveViewerGetTracingSessionInfosReply(infos)
1469
1470 def _handle_create_viewer_session_command(self, cmd):
1471 logging.info('Handling "create viewer session" command.')
1472 status = _LttngLiveViewerCreateViewerSessionReply.Status.OK
1473
1474 # This does nothing here. In the LTTng relay daemon, it
1475 # allocates the viewer session's state.
1476 return _LttngLiveViewerCreateViewerSessionReply(status)
1477
1478
1479 # An LTTng live TCP server.
1480 #
1481 # On creation, it binds to `localhost` on the TCP port `port` if not `None`, or
1482 # on an OS-assigned TCP port otherwise. It writes the decimal TCP port number
1483 # to a temporary port file. It renames the temporary port file to
1484 # `port_filename`.
1485 #
1486 # `tracing_session_descriptors` is a list of tracing session descriptors
1487 # (`LttngTracingSessionDescriptor`) to serve.
1488 #
1489 # This server accepts a single viewer (client).
1490 #
1491 # When the viewer closes the connection, the server's constructor
1492 # returns.
1493 class LttngLiveServer:
1494 def __init__(
1495 self,
1496 port,
1497 port_filename,
1498 tracing_session_descriptors,
1499 max_query_data_response_size,
1500 ):
1501 logging.info("Server configuration:")
1502
1503 logging.info(" Port file name: `{}`".format(port_filename))
1504
1505 if max_query_data_response_size is not None:
1506 logging.info(
1507 " Maximum response data query size: `{}`".format(
1508 max_query_data_response_size
1509 )
1510 )
1511
1512 for ts_descr in tracing_session_descriptors:
1513 info = ts_descr.info
1514 fmt = ' TS descriptor: name="{}", id={}, hostname="{}", live-timer-freq={}, client-count={}, stream-count={}:'
1515 logging.info(
1516 fmt.format(
1517 info.name,
1518 info.tracing_session_id,
1519 info.hostname,
1520 info.live_timer_freq,
1521 info.client_count,
1522 info.stream_count,
1523 )
1524 )
1525
1526 for trace in ts_descr.traces:
1527 logging.info(' Trace: path="{}"'.format(trace.path))
1528
1529 self._ts_descriptors = tracing_session_descriptors
1530 self._max_query_data_response_size = max_query_data_response_size
1531 self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1532 self._codec = _LttngLiveViewerProtocolCodec()
1533
1534 # Port 0: OS assigns an unused port
1535 serv_addr = ("localhost", port if port is not None else 0)
1536 self._sock.bind(serv_addr)
1537 self._write_port_to_file(port_filename)
1538
1539 try:
1540 self._listen()
1541 finally:
1542 self._sock.close()
1543 logging.info("Closed connection and socket.")
1544
1545 @property
1546 def _server_port(self):
1547 return self._sock.getsockname()[1]
1548
1549 def _recv_command(self):
1550 data = bytes()
1551
1552 while True:
1553 logging.info("Waiting for viewer command.")
1554 buf = self._conn.recv(128)
1555
1556 if not buf:
1557 logging.info("Client closed connection.")
1558
1559 if data:
1560 raise UnexpectedInput(
1561 "Client closed connection after having sent {} command bytes.".format(
1562 len(data)
1563 )
1564 )
1565
1566 return
1567
1568 logging.info("Received data from viewer: length={}".format(len(buf)))
1569
1570 data += buf
1571
1572 try:
1573 cmd = self._codec.decode(data)
1574 except struct.error as exc:
1575 raise UnexpectedInput("Malformed command: {}".format(exc)) from exc
1576
1577 if cmd is not None:
1578 logging.info(
1579 "Received command from viewer: cmd-cls-name={}".format(
1580 cmd.__class__.__name__
1581 )
1582 )
1583 return cmd
1584
1585 def _send_reply(self, reply):
1586 data = self._codec.encode(reply)
1587 logging.info(
1588 "Sending reply to viewer: reply-cls-name={}, length={}".format(
1589 reply.__class__.__name__, len(data)
1590 )
1591 )
1592 self._conn.sendall(data)
1593
1594 def _handle_connection(self):
1595 # First command must be "connect"
1596 cmd = self._recv_command()
1597
1598 if type(cmd) is not _LttngLiveViewerConnectCommand:
1599 raise UnexpectedInput(
1600 'First command is not "connect": cmd-cls-name={}'.format(
1601 cmd.__class__.__name__
1602 )
1603 )
1604
1605 # Create viewer session (arbitrary ID 23)
1606 logging.info(
1607 "LTTng live viewer connected: version={}.{}".format(cmd.major, cmd.minor)
1608 )
1609 viewer_session = _LttngLiveViewerSession(
1610 23, self._ts_descriptors, self._max_query_data_response_size
1611 )
1612
1613 # Send "connect" reply
1614 self._send_reply(
1615 _LttngLiveViewerConnectReply(viewer_session.viewer_session_id, 2, 10)
1616 )
1617
1618 # Make the viewer session handle the remaining commands
1619 while True:
1620 cmd = self._recv_command()
1621
1622 if cmd is None:
1623 # Connection closed (at an expected location within the
1624 # conversation)
1625 return
1626
1627 self._send_reply(viewer_session.handle_command(cmd))
1628
1629 def _listen(self):
1630 logging.info("Listening: port={}".format(self._server_port))
1631 # Backlog must be present for Python version < 3.5.
1632 # 128 is an arbitrary number since we expect only 1 connection anyway.
1633 self._sock.listen(128)
1634 self._conn, viewer_addr = self._sock.accept()
1635 logging.info(
1636 "Accepted viewer: addr={}:{}".format(viewer_addr[0], viewer_addr[1])
1637 )
1638
1639 try:
1640 self._handle_connection()
1641 finally:
1642 self._conn.close()
1643
1644 def _write_port_to_file(self, port_filename):
1645 # Write the port number to a temporary file.
1646 with tempfile.NamedTemporaryFile(mode="w", delete=False) as tmp_port_file:
1647 print(self._server_port, end="", file=tmp_port_file)
1648
1649 # Rename temporary file to real file
1650 os.replace(tmp_port_file.name, port_filename)
1651 logging.info(
1652 'Renamed port file: src-path="{}", dst-path="{}"'.format(
1653 tmp_port_file.name, port_filename
1654 )
1655 )
1656
1657
1658 def _session_descriptors_from_path(sessions_filename, trace_path_prefix):
1659 # File format is:
1660 #
1661 # [
1662 # {
1663 # "name": "my-session",
1664 # "id": 17,
1665 # "hostname": "myhost",
1666 # "live-timer-freq": 1000000,
1667 # "client-count": 23,
1668 # "traces": [
1669 # {
1670 # "path": "lol"
1671 # },
1672 # {
1673 # "path": "meow/mix",
1674 # "beacons": {
1675 # "my_stream": [ 5235787, 728375283 ]
1676 # },
1677 # "metadata-sections": [
1678 # {
1679 # "line": 1,
1680 # "timestamp": 0
1681 # }
1682 # ]
1683 # }
1684 # ]
1685 # }
1686 # ]
1687 with open(sessions_filename, "r") as sessions_file:
1688 params = json.load(sessions_file)
1689
1690 sessions = []
1691
1692 for session in params:
1693 name = session["name"]
1694 tracing_session_id = session["id"]
1695 hostname = session["hostname"]
1696 live_timer_freq = session["live-timer-freq"]
1697 client_count = session["client-count"]
1698 traces = []
1699
1700 for trace in session["traces"]:
1701 metadata_sections = trace.get("metadata-sections")
1702 beacons = trace.get("beacons")
1703 path = trace["path"]
1704
1705 if not os.path.isabs(path):
1706 path = os.path.join(trace_path_prefix, path)
1707
1708 traces.append(LttngTrace(path, metadata_sections, beacons))
1709
1710 sessions.append(
1711 LttngTracingSessionDescriptor(
1712 name,
1713 tracing_session_id,
1714 hostname,
1715 live_timer_freq,
1716 client_count,
1717 traces,
1718 )
1719 )
1720
1721 return sessions
1722
1723
1724 def _loglevel_parser(string):
1725 loglevels = {"info": logging.INFO, "warning": logging.WARNING}
1726 if string not in loglevels:
1727 msg = "{} is not a valid loglevel".format(string)
1728 raise argparse.ArgumentTypeError(msg)
1729 return loglevels[string]
1730
1731
1732 if __name__ == "__main__":
1733 logging.basicConfig(format="# %(asctime)-25s%(message)s")
1734 parser = argparse.ArgumentParser(
1735 description="LTTng-live protocol mocker", add_help=False
1736 )
1737 parser.add_argument(
1738 "--log-level",
1739 default="warning",
1740 choices=["info", "warning"],
1741 help="The loglevel to be used.",
1742 )
1743
1744 loglevel_namespace, remaining_args = parser.parse_known_args()
1745 logging.getLogger().setLevel(_loglevel_parser(loglevel_namespace.log_level))
1746
1747 parser.add_argument(
1748 "--port",
1749 help="The port to bind to. If missing, use an OS-assigned port..",
1750 type=int,
1751 )
1752 parser.add_argument(
1753 "--port-filename",
1754 help="The final port file. This file is present when the server is ready to receive connection.",
1755 required=True,
1756 )
1757 parser.add_argument(
1758 "--max-query-data-response-size",
1759 type=int,
1760 help="The maximum size of control data response in bytes",
1761 )
1762 parser.add_argument(
1763 "--trace-path-prefix",
1764 type=str,
1765 help="Prefix to prepend to the trace paths of session configurations",
1766 )
1767 parser.add_argument(
1768 "--sessions-filename",
1769 type=str,
1770 help="Path to a session configuration file",
1771 )
1772 parser.add_argument(
1773 "-h",
1774 "--help",
1775 action="help",
1776 default=argparse.SUPPRESS,
1777 help="Show this help message and exit.",
1778 )
1779
1780 args = parser.parse_args(args=remaining_args)
1781 try:
1782 sessions = _session_descriptors_from_path(
1783 args.sessions_filename,
1784 args.trace_path_prefix,
1785 )
1786 LttngLiveServer(
1787 args.port, args.port_filename, sessions, args.max_query_data_response_size
1788 )
1789 except UnexpectedInput as exc:
1790 logging.error(str(exc))
1791 print(exc, file=sys.stderr)
1792 sys.exit(1)
This page took 0.087164 seconds and 5 git commands to generate.