tests/python: add local copy of typing module
[babeltrace.git] / tests / data / plugins / src.ctf.lttng-live / lttng_live_server.py
... / ...
CommitLineData
1# SPDX-License-Identifier: MIT
2#
3# Copyright (C) 2019 Philippe Proulx <pproulx@efficios.com>
4#
5
6import os
7import re
8import sys
9import json
10import socket
11import struct
12import logging
13import os.path
14import argparse
15import tempfile
16import collections.abc
17from collections import namedtuple
18
19
20class UnexpectedInput(RuntimeError):
21 pass
22
23
24# An entry within the index of an LTTng data stream.
25class _LttngDataStreamIndexEntry:
26 def __init__(
27 self,
28 offset_bytes,
29 total_size_bits,
30 content_size_bits,
31 timestamp_begin,
32 timestamp_end,
33 events_discarded,
34 stream_class_id,
35 ):
36 self._offset_bytes = offset_bytes
37 self._total_size_bits = total_size_bits
38 self._content_size_bits = content_size_bits
39 self._timestamp_begin = timestamp_begin
40 self._timestamp_end = timestamp_end
41 self._events_discarded = events_discarded
42 self._stream_class_id = stream_class_id
43
44 @property
45 def offset_bytes(self):
46 return self._offset_bytes
47
48 @property
49 def total_size_bits(self):
50 return self._total_size_bits
51
52 @property
53 def total_size_bytes(self):
54 return self._total_size_bits // 8
55
56 @property
57 def content_size_bits(self):
58 return self._content_size_bits
59
60 @property
61 def content_size_bytes(self):
62 return self._content_size_bits // 8
63
64 @property
65 def timestamp_begin(self):
66 return self._timestamp_begin
67
68 @property
69 def timestamp_end(self):
70 return self._timestamp_end
71
72 @property
73 def events_discarded(self):
74 return self._events_discarded
75
76 @property
77 def stream_class_id(self):
78 return self._stream_class_id
79
80
81# An entry within the index of an LTTng data stream. While a stream beacon entry
82# is conceptually unrelated to an index, it is sent as a reply to a
83# LttngLiveViewerGetNextDataStreamIndexEntryCommand
84class _LttngDataStreamBeaconEntry:
85 def __init__(self, stream_class_id, timestamp):
86 self._stream_class_id = stream_class_id
87 self._timestamp = timestamp
88
89 @property
90 def timestamp(self):
91 return self._timestamp
92
93 @property
94 def stream_class_id(self):
95 return self._stream_class_id
96
97
98class _LttngLiveViewerCommand:
99 def __init__(self, version):
100 self._version = version
101
102 @property
103 def version(self):
104 return self._version
105
106
107class _LttngLiveViewerConnectCommand(_LttngLiveViewerCommand):
108 def __init__(self, version, viewer_session_id, major, minor):
109 super().__init__(version)
110 self._viewer_session_id = viewer_session_id
111 self._major = major
112 self._minor = minor
113
114 @property
115 def viewer_session_id(self):
116 return self._viewer_session_id
117
118 @property
119 def major(self):
120 return self._major
121
122 @property
123 def minor(self):
124 return self._minor
125
126
127class _LttngLiveViewerConnectReply:
128 def __init__(self, viewer_session_id, major, minor):
129 self._viewer_session_id = viewer_session_id
130 self._major = major
131 self._minor = minor
132
133 @property
134 def viewer_session_id(self):
135 return self._viewer_session_id
136
137 @property
138 def major(self):
139 return self._major
140
141 @property
142 def minor(self):
143 return self._minor
144
145
146class _LttngLiveViewerGetTracingSessionInfosCommand(_LttngLiveViewerCommand):
147 pass
148
149
150class _LttngLiveViewerTracingSessionInfo:
151 def __init__(
152 self,
153 tracing_session_id,
154 live_timer_freq,
155 client_count,
156 stream_count,
157 hostname,
158 name,
159 ):
160 self._tracing_session_id = tracing_session_id
161 self._live_timer_freq = live_timer_freq
162 self._client_count = client_count
163 self._stream_count = stream_count
164 self._hostname = hostname
165 self._name = name
166
167 @property
168 def tracing_session_id(self):
169 return self._tracing_session_id
170
171 @property
172 def live_timer_freq(self):
173 return self._live_timer_freq
174
175 @property
176 def client_count(self):
177 return self._client_count
178
179 @property
180 def stream_count(self):
181 return self._stream_count
182
183 @property
184 def hostname(self):
185 return self._hostname
186
187 @property
188 def name(self):
189 return self._name
190
191
192class _LttngLiveViewerGetTracingSessionInfosReply:
193 def __init__(self, tracing_session_infos):
194 self._tracing_session_infos = tracing_session_infos
195
196 @property
197 def tracing_session_infos(self):
198 return self._tracing_session_infos
199
200
201class _LttngLiveViewerAttachToTracingSessionCommand(_LttngLiveViewerCommand):
202 class SeekType:
203 BEGINNING = 1
204 LAST = 2
205
206 def __init__(self, version, tracing_session_id, offset, seek_type):
207 super().__init__(version)
208 self._tracing_session_id = tracing_session_id
209 self._offset = offset
210 self._seek_type = seek_type
211
212 @property
213 def tracing_session_id(self):
214 return self._tracing_session_id
215
216 @property
217 def offset(self):
218 return self._offset
219
220 @property
221 def seek_type(self):
222 return self._seek_type
223
224
225class _LttngLiveViewerStreamInfo:
226 def __init__(self, id, trace_id, is_metadata, path, channel_name):
227 self._id = id
228 self._trace_id = trace_id
229 self._is_metadata = is_metadata
230 self._path = path
231 self._channel_name = channel_name
232
233 @property
234 def id(self):
235 return self._id
236
237 @property
238 def trace_id(self):
239 return self._trace_id
240
241 @property
242 def is_metadata(self):
243 return self._is_metadata
244
245 @property
246 def path(self):
247 return self._path
248
249 @property
250 def channel_name(self):
251 return self._channel_name
252
253
254class _LttngLiveViewerAttachToTracingSessionReply:
255 class Status:
256 OK = 1
257 ALREADY = 2
258 UNKNOWN = 3
259 NOT_LIVE = 4
260 SEEK_ERROR = 5
261 NO_SESSION = 6
262
263 def __init__(self, status, stream_infos):
264 self._status = status
265 self._stream_infos = stream_infos
266
267 @property
268 def status(self):
269 return self._status
270
271 @property
272 def stream_infos(self):
273 return self._stream_infos
274
275
276class _LttngLiveViewerGetNextDataStreamIndexEntryCommand(_LttngLiveViewerCommand):
277 def __init__(self, version, stream_id):
278 super().__init__(version)
279 self._stream_id = stream_id
280
281 @property
282 def stream_id(self):
283 return self._stream_id
284
285
286class _LttngLiveViewerGetNextDataStreamIndexEntryReply:
287 class Status:
288 OK = 1
289 RETRY = 2
290 HUP = 3
291 ERROR = 4
292 INACTIVE = 5
293 EOF = 6
294
295 def __init__(self, status, index_entry, has_new_metadata, has_new_data_stream):
296 self._status = status
297 self._index_entry = index_entry
298 self._has_new_metadata = has_new_metadata
299 self._has_new_data_stream = has_new_data_stream
300
301 @property
302 def status(self):
303 return self._status
304
305 @property
306 def index_entry(self):
307 return self._index_entry
308
309 @property
310 def has_new_metadata(self):
311 return self._has_new_metadata
312
313 @property
314 def has_new_data_stream(self):
315 return self._has_new_data_stream
316
317
318class _LttngLiveViewerGetDataStreamPacketDataCommand(_LttngLiveViewerCommand):
319 def __init__(self, version, stream_id, offset, req_length):
320 super().__init__(version)
321 self._stream_id = stream_id
322 self._offset = offset
323 self._req_length = req_length
324
325 @property
326 def stream_id(self):
327 return self._stream_id
328
329 @property
330 def offset(self):
331 return self._offset
332
333 @property
334 def req_length(self):
335 return self._req_length
336
337
338class _LttngLiveViewerGetDataStreamPacketDataReply:
339 class Status:
340 OK = 1
341 RETRY = 2
342 ERROR = 3
343 EOF = 4
344
345 def __init__(self, status, data, has_new_metadata, has_new_data_stream):
346 self._status = status
347 self._data = data
348 self._has_new_metadata = has_new_metadata
349 self._has_new_data_stream = has_new_data_stream
350
351 @property
352 def status(self):
353 return self._status
354
355 @property
356 def data(self):
357 return self._data
358
359 @property
360 def has_new_metadata(self):
361 return self._has_new_metadata
362
363 @property
364 def has_new_data_stream(self):
365 return self._has_new_data_stream
366
367
368class _LttngLiveViewerGetMetadataStreamDataCommand(_LttngLiveViewerCommand):
369 def __init__(self, version, stream_id):
370 super().__init__(version)
371 self._stream_id = stream_id
372
373 @property
374 def stream_id(self):
375 return self._stream_id
376
377
378class _LttngLiveViewerGetMetadataStreamDataContentReply:
379 class Status:
380 OK = 1
381 NO_NEW = 2
382 ERROR = 3
383
384 def __init__(self, status, data):
385 self._status = status
386 self._data = data
387
388 @property
389 def status(self):
390 return self._status
391
392 @property
393 def data(self):
394 return self._data
395
396
397class _LttngLiveViewerGetNewStreamInfosCommand(_LttngLiveViewerCommand):
398 def __init__(self, version, tracing_session_id):
399 super().__init__(version)
400 self._tracing_session_id = tracing_session_id
401
402 @property
403 def tracing_session_id(self):
404 return self._tracing_session_id
405
406
407class _LttngLiveViewerGetNewStreamInfosReply:
408 class Status:
409 OK = 1
410 NO_NEW = 2
411 ERROR = 3
412 HUP = 4
413
414 def __init__(self, status, stream_infos):
415 self._status = status
416 self._stream_infos = stream_infos
417
418 @property
419 def status(self):
420 return self._status
421
422 @property
423 def stream_infos(self):
424 return self._stream_infos
425
426
427class _LttngLiveViewerCreateViewerSessionCommand(_LttngLiveViewerCommand):
428 pass
429
430
431class _LttngLiveViewerCreateViewerSessionReply:
432 class Status:
433 OK = 1
434 ERROR = 2
435
436 def __init__(self, status):
437 self._status = status
438
439 @property
440 def status(self):
441 return self._status
442
443
444class _LttngLiveViewerDetachFromTracingSessionCommand(_LttngLiveViewerCommand):
445 def __init__(self, version, tracing_session_id):
446 super().__init__(version)
447 self._tracing_session_id = tracing_session_id
448
449 @property
450 def tracing_session_id(self):
451 return self._tracing_session_id
452
453
454class _LttngLiveViewerDetachFromTracingSessionReply:
455 class Status:
456 OK = 1
457 UNKNOWN = 2
458 ERROR = 3
459
460 def __init__(self, status):
461 self._status = status
462
463 @property
464 def status(self):
465 return self._status
466
467
468# An LTTng live protocol codec can convert bytes to command objects and
469# reply objects to bytes.
470class _LttngLiveViewerProtocolCodec:
471 _COMMAND_HEADER_STRUCT_FMT = "QII"
472 _COMMAND_HEADER_SIZE_BYTES = struct.calcsize(_COMMAND_HEADER_STRUCT_FMT)
473
474 def __init__(self):
475 pass
476
477 def _unpack(self, fmt, data, offset=0):
478 fmt = "!" + fmt
479 return struct.unpack_from(fmt, data, offset)
480
481 def _unpack_payload(self, fmt, data):
482 return self._unpack(
483 fmt, data, _LttngLiveViewerProtocolCodec._COMMAND_HEADER_SIZE_BYTES
484 )
485
486 def decode(self, data):
487 if len(data) < self._COMMAND_HEADER_SIZE_BYTES:
488 # Not enough data to read the command header
489 return
490
491 payload_size, cmd_type, version = self._unpack(
492 self._COMMAND_HEADER_STRUCT_FMT, data
493 )
494 logging.info(
495 "Decoded command header: payload-size={}, cmd-type={}, version={}".format(
496 payload_size, cmd_type, version
497 )
498 )
499
500 if len(data) < self._COMMAND_HEADER_SIZE_BYTES + payload_size:
501 # Not enough data to read the whole command
502 return
503
504 if cmd_type == 1:
505 viewer_session_id, major, minor, conn_type = self._unpack_payload(
506 "QIII", data
507 )
508 return _LttngLiveViewerConnectCommand(
509 version, viewer_session_id, major, minor
510 )
511 elif cmd_type == 2:
512 return _LttngLiveViewerGetTracingSessionInfosCommand(version)
513 elif cmd_type == 3:
514 tracing_session_id, offset, seek_type = self._unpack_payload("QQI", data)
515 return _LttngLiveViewerAttachToTracingSessionCommand(
516 version, tracing_session_id, offset, seek_type
517 )
518 elif cmd_type == 4:
519 (stream_id,) = self._unpack_payload("Q", data)
520 return _LttngLiveViewerGetNextDataStreamIndexEntryCommand(
521 version, stream_id
522 )
523 elif cmd_type == 5:
524 stream_id, offset, req_length = self._unpack_payload("QQI", data)
525 return _LttngLiveViewerGetDataStreamPacketDataCommand(
526 version, stream_id, offset, req_length
527 )
528 elif cmd_type == 6:
529 (stream_id,) = self._unpack_payload("Q", data)
530 return _LttngLiveViewerGetMetadataStreamDataCommand(version, stream_id)
531 elif cmd_type == 7:
532 (tracing_session_id,) = self._unpack_payload("Q", data)
533 return _LttngLiveViewerGetNewStreamInfosCommand(version, tracing_session_id)
534 elif cmd_type == 8:
535 return _LttngLiveViewerCreateViewerSessionCommand(version)
536 elif cmd_type == 9:
537 (tracing_session_id,) = self._unpack_payload("Q", data)
538 return _LttngLiveViewerDetachFromTracingSessionCommand(
539 version, tracing_session_id
540 )
541 else:
542 raise UnexpectedInput("Unknown command type {}".format(cmd_type))
543
544 def _pack(self, fmt, *args):
545 # Force network byte order
546 return struct.pack("!" + fmt, *args)
547
548 def _encode_zero_padded_str(self, string, length):
549 data = string.encode()
550 return data.ljust(length, b"\x00")
551
552 def _encode_stream_info(self, info):
553 data = self._pack("QQI", info.id, info.trace_id, int(info.is_metadata))
554 data += self._encode_zero_padded_str(info.path, 4096)
555 data += self._encode_zero_padded_str(info.channel_name, 255)
556 return data
557
558 def _get_has_new_stuff_flags(self, has_new_metadata, has_new_data_streams):
559 flags = 0
560
561 if has_new_metadata:
562 flags |= 1
563
564 if has_new_data_streams:
565 flags |= 2
566
567 return flags
568
569 def encode(self, reply):
570 if type(reply) is _LttngLiveViewerConnectReply:
571 data = self._pack(
572 "QIII", reply.viewer_session_id, reply.major, reply.minor, 2
573 )
574 elif type(reply) is _LttngLiveViewerGetTracingSessionInfosReply:
575 data = self._pack("I", len(reply.tracing_session_infos))
576
577 for info in reply.tracing_session_infos:
578 data += self._pack(
579 "QIII",
580 info.tracing_session_id,
581 info.live_timer_freq,
582 info.client_count,
583 info.stream_count,
584 )
585 data += self._encode_zero_padded_str(info.hostname, 64)
586 data += self._encode_zero_padded_str(info.name, 255)
587 elif type(reply) is _LttngLiveViewerAttachToTracingSessionReply:
588 data = self._pack("II", reply.status, len(reply.stream_infos))
589
590 for info in reply.stream_infos:
591 data += self._encode_stream_info(info)
592 elif type(reply) is _LttngLiveViewerGetNextDataStreamIndexEntryReply:
593 index_format = "QQQQQQQII"
594 entry = reply.index_entry
595 flags = self._get_has_new_stuff_flags(
596 reply.has_new_metadata, reply.has_new_data_stream
597 )
598
599 if type(entry) is _LttngDataStreamIndexEntry:
600 data = self._pack(
601 index_format,
602 entry.offset_bytes,
603 entry.total_size_bits,
604 entry.content_size_bits,
605 entry.timestamp_begin,
606 entry.timestamp_end,
607 entry.events_discarded,
608 entry.stream_class_id,
609 reply.status,
610 flags,
611 )
612 else:
613 assert type(entry) is _LttngDataStreamBeaconEntry
614 data = self._pack(
615 index_format,
616 0,
617 0,
618 0,
619 0,
620 entry.timestamp,
621 0,
622 entry.stream_class_id,
623 reply.status,
624 flags,
625 )
626 elif type(reply) is _LttngLiveViewerGetDataStreamPacketDataReply:
627 flags = self._get_has_new_stuff_flags(
628 reply.has_new_metadata, reply.has_new_data_stream
629 )
630 data = self._pack("III", reply.status, len(reply.data), flags)
631 data += reply.data
632 elif type(reply) is _LttngLiveViewerGetMetadataStreamDataContentReply:
633 data = self._pack("QI", len(reply.data), reply.status)
634 data += reply.data
635 elif type(reply) is _LttngLiveViewerGetNewStreamInfosReply:
636 data = self._pack("II", reply.status, len(reply.stream_infos))
637
638 for info in reply.stream_infos:
639 data += self._encode_stream_info(info)
640 elif type(reply) is _LttngLiveViewerCreateViewerSessionReply:
641 data = self._pack("I", reply.status)
642 elif type(reply) is _LttngLiveViewerDetachFromTracingSessionReply:
643 data = self._pack("I", reply.status)
644 else:
645 raise ValueError(
646 "Unknown reply object with class `{}`".format(reply.__class__.__name__)
647 )
648
649 return data
650
651
652def _get_entry_timestamp_begin(entry):
653 if type(entry) is _LttngDataStreamBeaconEntry:
654 return entry.timestamp
655 else:
656 assert type(entry) is _LttngDataStreamIndexEntry
657 return entry.timestamp_begin
658
659
660# The index of an LTTng data stream, a sequence of index entries.
661class _LttngDataStreamIndex(collections.abc.Sequence):
662 def __init__(self, path, beacons):
663 self._path = path
664 self._build()
665
666 if beacons:
667 stream_class_id = self._entries[0].stream_class_id
668 beacons = [
669 _LttngDataStreamBeaconEntry(stream_class_id, ts) for ts in beacons
670 ]
671 self._add_beacons(beacons)
672
673 logging.info(
674 'Built data stream index entries: path="{}", count={}'.format(
675 path, len(self._entries)
676 )
677 )
678
679 def _build(self):
680 self._entries = []
681 assert os.path.isfile(self._path)
682
683 with open(self._path, "rb") as f:
684 # Read header first
685 fmt = ">IIII"
686 size = struct.calcsize(fmt)
687 data = f.read(size)
688 assert len(data) == size
689 magic, index_major, index_minor, index_entry_length = struct.unpack(
690 fmt, data
691 )
692 assert magic == 0xC1F1DCC1
693
694 # Read index entries
695 fmt = ">QQQQQQQ"
696 size = struct.calcsize(fmt)
697
698 while True:
699 logging.debug(
700 'Decoding data stream index entry: path="{}", offset={}'.format(
701 self._path, f.tell()
702 )
703 )
704 data = f.read(size)
705
706 if not data:
707 # Done
708 break
709
710 assert len(data) == size
711 (
712 offset_bytes,
713 total_size_bits,
714 content_size_bits,
715 timestamp_begin,
716 timestamp_end,
717 events_discarded,
718 stream_class_id,
719 ) = struct.unpack(fmt, data)
720
721 self._entries.append(
722 _LttngDataStreamIndexEntry(
723 offset_bytes,
724 total_size_bits,
725 content_size_bits,
726 timestamp_begin,
727 timestamp_end,
728 events_discarded,
729 stream_class_id,
730 )
731 )
732
733 # Skip anything else before the next entry
734 f.seek(index_entry_length - size, os.SEEK_CUR)
735
736 def _add_beacons(self, beacons):
737 # Assumes entries[n + 1].timestamp_end >= entries[n].timestamp_begin
738 def sort_key(entry):
739 if type(entry) is _LttngDataStreamBeaconEntry:
740 return entry.timestamp
741 else:
742 return entry.timestamp_end
743
744 self._entries += beacons
745 self._entries.sort(key=sort_key)
746
747 def __getitem__(self, index):
748 return self._entries[index]
749
750 def __len__(self):
751 return len(self._entries)
752
753 @property
754 def path(self):
755 return self._path
756
757
758# An LTTng data stream.
759class _LttngDataStream:
760 def __init__(self, path, beacons):
761 self._path = path
762 filename = os.path.basename(path)
763 match = re.match(r"(.*)_\d+", filename)
764 self._channel_name = match.group(1)
765 trace_dir = os.path.dirname(path)
766 index_path = os.path.join(trace_dir, "index", filename + ".idx")
767 self._index = _LttngDataStreamIndex(index_path, beacons)
768 assert os.path.isfile(path)
769 self._file = open(path, "rb")
770 logging.info(
771 'Built data stream: path="{}", channel-name="{}"'.format(
772 path, self._channel_name
773 )
774 )
775
776 @property
777 def path(self):
778 return self._path
779
780 @property
781 def channel_name(self):
782 return self._channel_name
783
784 @property
785 def index(self):
786 return self._index
787
788 def get_data(self, offset_bytes, len_bytes):
789 self._file.seek(offset_bytes)
790 return self._file.read(len_bytes)
791
792
793class _LttngMetadataStreamSection:
794 def __init__(self, timestamp, data):
795 self._timestamp = timestamp
796 if data is None:
797 self._data = bytes()
798 else:
799 self._data = data
800 logging.info(
801 "Built metadata stream section: ts={}, data-len={}".format(
802 self._timestamp, len(self._data)
803 )
804 )
805
806 @property
807 def timestamp(self):
808 return self._timestamp
809
810 @property
811 def data(self):
812 return self._data
813
814
815# An LTTng metadata stream.
816class _LttngMetadataStream:
817 def __init__(self, metadata_file_path, config_sections):
818 self._path = metadata_file_path
819 self._sections = config_sections
820 logging.info(
821 "Built metadata stream: path={}, section-len={}".format(
822 self._path, len(self._sections)
823 )
824 )
825
826 @property
827 def path(self):
828 return self._path
829
830 @property
831 def sections(self):
832 return self._sections
833
834
835LttngMetadataConfigSection = namedtuple(
836 "LttngMetadataConfigSection", ["line", "timestamp", "is_empty"]
837)
838
839
840def _parse_metadata_sections_config(config_sections):
841 assert config_sections is not None
842 config_metadata_sections = []
843 append_empty_section = False
844 last_timestamp = 0
845 last_line = 0
846
847 for config_section in config_sections:
848 if config_section == "empty":
849 # Found a empty section marker. Actually append the section at the
850 # timestamp of the next concrete section.
851 append_empty_section = True
852 else:
853 assert type(config_section) is dict
854 line = config_section.get("line")
855 ts = config_section.get("timestamp")
856
857 # Sections' timestamps and lines must both be increasing.
858 assert ts > last_timestamp
859 last_timestamp = ts
860 assert line > last_line
861 last_line = line
862
863 if append_empty_section:
864 config_metadata_sections.append(
865 LttngMetadataConfigSection(line, ts, True)
866 )
867 append_empty_section = False
868
869 config_metadata_sections.append(LttngMetadataConfigSection(line, ts, False))
870
871 return config_metadata_sections
872
873
874def _split_metadata_sections(metadata_file_path, raw_config_sections):
875 assert isinstance(raw_config_sections, collections.abc.Sequence)
876
877 parsed_sections = _parse_metadata_sections_config(raw_config_sections)
878
879 sections = []
880 with open(metadata_file_path, "r") as metadata_file:
881 metadata_lines = [line for line in metadata_file]
882
883 config_metadata_sections_idx = 0
884 curr_metadata_section = bytearray()
885
886 for idx, line_content in enumerate(metadata_lines):
887 # Add one to the index to convert from the zero-indexing of the
888 # enumerate() function to the one-indexing used by humans when
889 # viewing a text file.
890 curr_line_number = idx + 1
891
892 # If there are no more sections, simply append the line.
893 if config_metadata_sections_idx + 1 >= len(parsed_sections):
894 curr_metadata_section += bytearray(line_content, "utf8")
895 continue
896
897 next_section_line_number = parsed_sections[
898 config_metadata_sections_idx + 1
899 ].line
900
901 # If the next section begins at the current line, create a
902 # section with the metadata we gathered so far.
903 if curr_line_number >= next_section_line_number:
904 # Flushing the metadata of the current section.
905 sections.append(
906 _LttngMetadataStreamSection(
907 parsed_sections[config_metadata_sections_idx].timestamp,
908 bytes(curr_metadata_section),
909 )
910 )
911
912 # Move to the next section.
913 config_metadata_sections_idx += 1
914
915 # Clear old content and append current line for the next section.
916 curr_metadata_section.clear()
917 curr_metadata_section += bytearray(line_content, "utf8")
918
919 # Append any empty sections.
920 while parsed_sections[config_metadata_sections_idx].is_empty:
921 sections.append(
922 _LttngMetadataStreamSection(
923 parsed_sections[config_metadata_sections_idx].timestamp, None
924 )
925 )
926 config_metadata_sections_idx += 1
927 else:
928 # Append line_content to the current metadata section.
929 curr_metadata_section += bytearray(line_content, "utf8")
930
931 # We iterated over all the lines of the metadata file. Close the current section.
932 sections.append(
933 _LttngMetadataStreamSection(
934 parsed_sections[config_metadata_sections_idx].timestamp,
935 bytes(curr_metadata_section),
936 )
937 )
938
939 return sections
940
941
942# An LTTng trace, a sequence of LTTng data streams.
943class LttngTrace(collections.abc.Sequence):
944 def __init__(self, trace_dir, metadata_sections, beacons):
945 assert os.path.isdir(trace_dir)
946 self._path = trace_dir
947 self._create_metadata_stream(trace_dir, metadata_sections)
948 self._create_data_streams(trace_dir, beacons)
949 logging.info('Built trace: path="{}"'.format(trace_dir))
950
951 def _create_data_streams(self, trace_dir, beacons):
952 data_stream_paths = []
953
954 for filename in os.listdir(trace_dir):
955 path = os.path.join(trace_dir, filename)
956
957 if not os.path.isfile(path):
958 continue
959
960 if filename.startswith("."):
961 continue
962
963 if filename == "metadata":
964 continue
965
966 data_stream_paths.append(path)
967
968 data_stream_paths.sort()
969 self._data_streams = []
970
971 for data_stream_path in data_stream_paths:
972 stream_name = os.path.basename(data_stream_path)
973 this_stream_beacons = None
974
975 if beacons is not None and stream_name in beacons:
976 this_stream_beacons = beacons[stream_name]
977
978 self._data_streams.append(
979 _LttngDataStream(data_stream_path, this_stream_beacons)
980 )
981
982 def _create_metadata_stream(self, trace_dir, config_metadata_sections):
983 metadata_path = os.path.join(trace_dir, "metadata")
984 metadata_sections = []
985
986 if config_metadata_sections is None:
987 with open(metadata_path, "rb") as metadata_file:
988 metadata_sections.append(
989 _LttngMetadataStreamSection(0, metadata_file.read())
990 )
991 else:
992 metadata_sections = _split_metadata_sections(
993 metadata_path, config_metadata_sections
994 )
995
996 self._metadata_stream = _LttngMetadataStream(metadata_path, metadata_sections)
997
998 @property
999 def path(self):
1000 return self._path
1001
1002 @property
1003 def metadata_stream(self):
1004 return self._metadata_stream
1005
1006 def __getitem__(self, index):
1007 return self._data_streams[index]
1008
1009 def __len__(self):
1010 return len(self._data_streams)
1011
1012
1013# The state of a single data stream.
1014class _LttngLiveViewerSessionDataStreamState:
1015 def __init__(self, ts_state, info, data_stream, metadata_stream_id):
1016 self._ts_state = ts_state
1017 self._info = info
1018 self._data_stream = data_stream
1019 self._metadata_stream_id = metadata_stream_id
1020 self._cur_index_entry_index = 0
1021 fmt = 'Built data stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
1022 logging.info(
1023 fmt.format(
1024 info.id,
1025 ts_state.tracing_session_descriptor.info.tracing_session_id,
1026 ts_state.tracing_session_descriptor.info.name,
1027 data_stream.path,
1028 )
1029 )
1030
1031 @property
1032 def tracing_session_state(self):
1033 return self._ts_state
1034
1035 @property
1036 def info(self):
1037 return self._info
1038
1039 @property
1040 def data_stream(self):
1041 return self._data_stream
1042
1043 @property
1044 def cur_index_entry(self):
1045 if self._cur_index_entry_index == len(self._data_stream.index):
1046 return
1047
1048 return self._data_stream.index[self._cur_index_entry_index]
1049
1050 def goto_next_index_entry(self):
1051 self._cur_index_entry_index += 1
1052
1053
1054# The state of a single metadata stream.
1055class _LttngLiveViewerSessionMetadataStreamState:
1056 def __init__(self, ts_state, info, metadata_stream):
1057 self._ts_state = ts_state
1058 self._info = info
1059 self._metadata_stream = metadata_stream
1060 self._cur_metadata_stream_section_index = 0
1061 if len(metadata_stream.sections) > 1:
1062 self._next_metadata_stream_section_timestamp = metadata_stream.sections[
1063 1
1064 ].timestamp
1065 else:
1066 self._next_metadata_stream_section_timestamp = None
1067
1068 self._is_sent = False
1069 fmt = 'Built metadata stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
1070 logging.info(
1071 fmt.format(
1072 info.id,
1073 ts_state.tracing_session_descriptor.info.tracing_session_id,
1074 ts_state.tracing_session_descriptor.info.name,
1075 metadata_stream.path,
1076 )
1077 )
1078
1079 @property
1080 def trace_session_state(self):
1081 return self._trace_session_state
1082
1083 @property
1084 def info(self):
1085 return self._info
1086
1087 @property
1088 def metadata_stream(self):
1089 return self._metadata_stream
1090
1091 @property
1092 def is_sent(self):
1093 return self._is_sent
1094
1095 @is_sent.setter
1096 def is_sent(self, value):
1097 self._is_sent = value
1098
1099 @property
1100 def cur_section(self):
1101 fmt = "Get current metadata section: section-idx={}"
1102 logging.info(fmt.format(self._cur_metadata_stream_section_index))
1103 if self._cur_metadata_stream_section_index == len(
1104 self._metadata_stream.sections
1105 ):
1106 return
1107
1108 return self._metadata_stream.sections[self._cur_metadata_stream_section_index]
1109
1110 def goto_next_section(self):
1111 self._cur_metadata_stream_section_index += 1
1112 if self.cur_section:
1113 self._next_metadata_stream_section_timestamp = self.cur_section.timestamp
1114 else:
1115 self._next_metadata_stream_section_timestamp = None
1116
1117 @property
1118 def next_section_timestamp(self):
1119 return self._next_metadata_stream_section_timestamp
1120
1121
1122# A tracing session descriptor.
1123#
1124# In the constructor, `traces` is a list of LTTng traces (`LttngTrace`
1125# objects).
1126class LttngTracingSessionDescriptor:
1127 def __init__(
1128 self, name, tracing_session_id, hostname, live_timer_freq, client_count, traces
1129 ):
1130 for trace in traces:
1131 if name not in trace.path:
1132 fmt = "Tracing session name must be part of every trace path (`{}` not found in `{}`)"
1133 raise ValueError(fmt.format(name, trace.path))
1134
1135 self._traces = traces
1136 stream_count = sum([len(t) + 1 for t in traces])
1137 self._info = _LttngLiveViewerTracingSessionInfo(
1138 tracing_session_id,
1139 live_timer_freq,
1140 client_count,
1141 stream_count,
1142 hostname,
1143 name,
1144 )
1145
1146 @property
1147 def traces(self):
1148 return self._traces
1149
1150 @property
1151 def info(self):
1152 return self._info
1153
1154
1155# The state of a tracing session.
1156class _LttngLiveViewerSessionTracingSessionState:
1157 def __init__(self, tc_descr, base_stream_id):
1158 self._tc_descr = tc_descr
1159 self._stream_infos = []
1160 self._ds_states = {}
1161 self._ms_states = {}
1162 stream_id = base_stream_id
1163
1164 for trace in tc_descr.traces:
1165 trace_id = stream_id * 1000
1166
1167 # Metadata stream -> stream info and metadata stream state
1168 info = _LttngLiveViewerStreamInfo(
1169 stream_id, trace_id, True, trace.metadata_stream.path, "metadata"
1170 )
1171 self._stream_infos.append(info)
1172 self._ms_states[stream_id] = _LttngLiveViewerSessionMetadataStreamState(
1173 self, info, trace.metadata_stream
1174 )
1175 metadata_stream_id = stream_id
1176 stream_id += 1
1177
1178 # Data streams -> stream infos and data stream states
1179 for data_stream in trace:
1180 info = _LttngLiveViewerStreamInfo(
1181 stream_id,
1182 trace_id,
1183 False,
1184 data_stream.path,
1185 data_stream.channel_name,
1186 )
1187 self._stream_infos.append(info)
1188 self._ds_states[stream_id] = _LttngLiveViewerSessionDataStreamState(
1189 self, info, data_stream, metadata_stream_id
1190 )
1191 stream_id += 1
1192
1193 self._is_attached = False
1194 fmt = 'Built tracing session state: id={}, name="{}"'
1195 logging.info(fmt.format(tc_descr.info.tracing_session_id, tc_descr.info.name))
1196
1197 @property
1198 def tracing_session_descriptor(self):
1199 return self._tc_descr
1200
1201 @property
1202 def data_stream_states(self):
1203 return self._ds_states
1204
1205 @property
1206 def metadata_stream_states(self):
1207 return self._ms_states
1208
1209 @property
1210 def stream_infos(self):
1211 return self._stream_infos
1212
1213 @property
1214 def has_new_metadata(self):
1215 return any([not ms.is_sent for ms in self._ms_states.values()])
1216
1217 @property
1218 def is_attached(self):
1219 return self._is_attached
1220
1221 @is_attached.setter
1222 def is_attached(self, value):
1223 self._is_attached = value
1224
1225
1226def needs_new_metadata_section(metadata_stream_state, latest_timestamp):
1227 if metadata_stream_state.next_section_timestamp is None:
1228 return False
1229
1230 if latest_timestamp >= metadata_stream_state.next_section_timestamp:
1231 return True
1232 else:
1233 return False
1234
1235
1236# An LTTng live viewer session manages a view on tracing sessions
1237# and replies to commands accordingly.
1238class _LttngLiveViewerSession:
1239 def __init__(
1240 self,
1241 viewer_session_id,
1242 tracing_session_descriptors,
1243 max_query_data_response_size,
1244 ):
1245 self._viewer_session_id = viewer_session_id
1246 self._ts_states = {}
1247 self._stream_states = {}
1248 self._max_query_data_response_size = max_query_data_response_size
1249 total_stream_infos = 0
1250
1251 for ts_descr in tracing_session_descriptors:
1252 ts_state = _LttngLiveViewerSessionTracingSessionState(
1253 ts_descr, total_stream_infos
1254 )
1255 ts_id = ts_state.tracing_session_descriptor.info.tracing_session_id
1256 self._ts_states[ts_id] = ts_state
1257 total_stream_infos += len(ts_state.stream_infos)
1258
1259 # Update session's stream states to have the new states
1260 self._stream_states.update(ts_state.data_stream_states)
1261 self._stream_states.update(ts_state.metadata_stream_states)
1262
1263 self._command_handlers = {
1264 _LttngLiveViewerAttachToTracingSessionCommand: self._handle_attach_to_tracing_session_command,
1265 _LttngLiveViewerCreateViewerSessionCommand: self._handle_create_viewer_session_command,
1266 _LttngLiveViewerDetachFromTracingSessionCommand: self._handle_detach_from_tracing_session_command,
1267 _LttngLiveViewerGetDataStreamPacketDataCommand: self._handle_get_data_stream_packet_data_command,
1268 _LttngLiveViewerGetMetadataStreamDataCommand: self._handle_get_metadata_stream_data_command,
1269 _LttngLiveViewerGetNewStreamInfosCommand: self._handle_get_new_stream_infos_command,
1270 _LttngLiveViewerGetNextDataStreamIndexEntryCommand: self._handle_get_next_data_stream_index_entry_command,
1271 _LttngLiveViewerGetTracingSessionInfosCommand: self._handle_get_tracing_session_infos_command,
1272 }
1273
1274 @property
1275 def viewer_session_id(self):
1276 return self._viewer_session_id
1277
1278 def _get_tracing_session_state(self, tracing_session_id):
1279 if tracing_session_id not in self._ts_states:
1280 raise UnexpectedInput(
1281 "Unknown tracing session ID {}".format(tracing_session_id)
1282 )
1283
1284 return self._ts_states[tracing_session_id]
1285
1286 def _get_stream_state(self, stream_id):
1287 if stream_id not in self._stream_states:
1288 UnexpectedInput("Unknown stream ID {}".format(stream_id))
1289
1290 return self._stream_states[stream_id]
1291
1292 def handle_command(self, cmd):
1293 logging.info(
1294 "Handling command in viewer session: cmd-cls-name={}".format(
1295 cmd.__class__.__name__
1296 )
1297 )
1298 cmd_type = type(cmd)
1299
1300 if cmd_type not in self._command_handlers:
1301 raise UnexpectedInput(
1302 "Unexpected command: cmd-cls-name={}".format(cmd.__class__.__name__)
1303 )
1304
1305 return self._command_handlers[cmd_type](cmd)
1306
1307 def _handle_attach_to_tracing_session_command(self, cmd):
1308 fmt = 'Handling "attach to tracing session" command: ts-id={}, offset={}, seek-type={}'
1309 logging.info(fmt.format(cmd.tracing_session_id, cmd.offset, cmd.seek_type))
1310 ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
1311 info = ts_state.tracing_session_descriptor.info
1312
1313 if ts_state.is_attached:
1314 raise UnexpectedInput(
1315 "Cannot attach to tracing session `{}`: viewer is already attached".format(
1316 info.name
1317 )
1318 )
1319
1320 ts_state.is_attached = True
1321 status = _LttngLiveViewerAttachToTracingSessionReply.Status.OK
1322 return _LttngLiveViewerAttachToTracingSessionReply(
1323 status, ts_state.stream_infos
1324 )
1325
1326 def _handle_detach_from_tracing_session_command(self, cmd):
1327 fmt = 'Handling "detach from tracing session" command: ts-id={}'
1328 logging.info(fmt.format(cmd.tracing_session_id))
1329 ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
1330 info = ts_state.tracing_session_descriptor.info
1331
1332 if not ts_state.is_attached:
1333 raise UnexpectedInput(
1334 "Cannot detach to tracing session `{}`: viewer is not attached".format(
1335 info.name
1336 )
1337 )
1338
1339 ts_state.is_attached = False
1340 status = _LttngLiveViewerDetachFromTracingSessionReply.Status.OK
1341 return _LttngLiveViewerDetachFromTracingSessionReply(status)
1342
1343 def _handle_get_next_data_stream_index_entry_command(self, cmd):
1344 fmt = 'Handling "get next data stream index entry" command: stream-id={}'
1345 logging.info(fmt.format(cmd.stream_id))
1346 stream_state = self._get_stream_state(cmd.stream_id)
1347 metadata_stream_state = self._get_stream_state(stream_state._metadata_stream_id)
1348
1349 if type(stream_state) is not _LttngLiveViewerSessionDataStreamState:
1350 raise UnexpectedInput(
1351 "Stream with ID {} is not a data stream".format(cmd.stream_id)
1352 )
1353
1354 if stream_state.cur_index_entry is None:
1355 # The viewer is done reading this stream
1356 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.HUP
1357
1358 # Dummy data stream index entry to use with the `HUP` status
1359 # (the reply needs one, but the viewer ignores it)
1360 index_entry = _LttngDataStreamIndexEntry(0, 0, 0, 0, 0, 0, 0)
1361
1362 return _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1363 status, index_entry, False, False
1364 )
1365
1366 timestamp_begin = _get_entry_timestamp_begin(stream_state.cur_index_entry)
1367
1368 if needs_new_metadata_section(metadata_stream_state, timestamp_begin):
1369 metadata_stream_state.is_sent = False
1370 metadata_stream_state.goto_next_section()
1371
1372 # The viewer only checks the `has_new_metadata` flag if the
1373 # reply's status is `OK`, so we need to provide an index here
1374 has_new_metadata = stream_state.tracing_session_state.has_new_metadata
1375 if type(stream_state.cur_index_entry) is _LttngDataStreamIndexEntry:
1376 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.OK
1377 else:
1378 assert type(stream_state.cur_index_entry) is _LttngDataStreamBeaconEntry
1379 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.INACTIVE
1380
1381 reply = _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1382 status, stream_state.cur_index_entry, has_new_metadata, False
1383 )
1384 stream_state.goto_next_index_entry()
1385 return reply
1386
1387 def _handle_get_data_stream_packet_data_command(self, cmd):
1388 fmt = 'Handling "get data stream packet data" command: stream-id={}, offset={}, req-length={}'
1389 logging.info(fmt.format(cmd.stream_id, cmd.offset, cmd.req_length))
1390 stream_state = self._get_stream_state(cmd.stream_id)
1391 data_response_length = cmd.req_length
1392
1393 if type(stream_state) is not _LttngLiveViewerSessionDataStreamState:
1394 raise UnexpectedInput(
1395 "Stream with ID {} is not a data stream".format(cmd.stream_id)
1396 )
1397
1398 if stream_state.tracing_session_state.has_new_metadata:
1399 status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.ERROR
1400 return _LttngLiveViewerGetDataStreamPacketDataReply(
1401 status, bytes(), True, False
1402 )
1403
1404 if self._max_query_data_response_size:
1405 # Enforce a server side limit on the query requested length.
1406 # To ensure that the transaction terminate take the minimum of both
1407 # value.
1408 data_response_length = min(
1409 cmd.req_length, self._max_query_data_response_size
1410 )
1411 fmt = 'Limiting "get data stream packet data" command: req-length={} actual response size={}'
1412 logging.info(fmt.format(cmd.req_length, data_response_length))
1413
1414 data = stream_state.data_stream.get_data(cmd.offset, data_response_length)
1415 status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.OK
1416 return _LttngLiveViewerGetDataStreamPacketDataReply(status, data, False, False)
1417
1418 def _handle_get_metadata_stream_data_command(self, cmd):
1419 fmt = 'Handling "get metadata stream data" command: stream-id={}'
1420 logging.info(fmt.format(cmd.stream_id))
1421 metadata_stream_state = self._get_stream_state(cmd.stream_id)
1422
1423 if (
1424 type(metadata_stream_state)
1425 is not _LttngLiveViewerSessionMetadataStreamState
1426 ):
1427 raise UnexpectedInput(
1428 "Stream with ID {} is not a metadata stream".format(cmd.stream_id)
1429 )
1430
1431 if metadata_stream_state.is_sent:
1432 status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.NO_NEW
1433 return _LttngLiveViewerGetMetadataStreamDataContentReply(status, bytes())
1434
1435 metadata_stream_state.is_sent = True
1436 status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.OK
1437 metadata_section = metadata_stream_state.cur_section
1438
1439 # If we are sending an empty section, ready the next one right away.
1440 if len(metadata_section.data) == 0:
1441 metadata_stream_state.is_sent = False
1442 metadata_stream_state.goto_next_section()
1443
1444 fmt = 'Replying to "get metadata stream data" command: metadata-size={}'
1445 logging.info(fmt.format(len(metadata_section.data)))
1446 return _LttngLiveViewerGetMetadataStreamDataContentReply(
1447 status, metadata_section.data
1448 )
1449
1450 def _handle_get_new_stream_infos_command(self, cmd):
1451 fmt = 'Handling "get new stream infos" command: ts-id={}'
1452 logging.info(fmt.format(cmd.tracing_session_id))
1453
1454 # As of this version, all the tracing session's stream infos are
1455 # always given to the viewer when sending the "attach to tracing
1456 # session" reply, so there's nothing new here. Return the `HUP`
1457 # status as, if we're handling this command, the viewer consumed
1458 # all the existing data streams.
1459 status = _LttngLiveViewerGetNewStreamInfosReply.Status.HUP
1460 return _LttngLiveViewerGetNewStreamInfosReply(status, [])
1461
1462 def _handle_get_tracing_session_infos_command(self, cmd):
1463 logging.info('Handling "get tracing session infos" command.')
1464 infos = [
1465 tss.tracing_session_descriptor.info for tss in self._ts_states.values()
1466 ]
1467 infos.sort(key=lambda info: info.name)
1468 return _LttngLiveViewerGetTracingSessionInfosReply(infos)
1469
1470 def _handle_create_viewer_session_command(self, cmd):
1471 logging.info('Handling "create viewer session" command.')
1472 status = _LttngLiveViewerCreateViewerSessionReply.Status.OK
1473
1474 # This does nothing here. In the LTTng relay daemon, it
1475 # allocates the viewer session's state.
1476 return _LttngLiveViewerCreateViewerSessionReply(status)
1477
1478
1479# An LTTng live TCP server.
1480#
1481# On creation, it binds to `localhost` on the TCP port `port` if not `None`, or
1482# on an OS-assigned TCP port otherwise. It writes the decimal TCP port number
1483# to a temporary port file. It renames the temporary port file to
1484# `port_filename`.
1485#
1486# `tracing_session_descriptors` is a list of tracing session descriptors
1487# (`LttngTracingSessionDescriptor`) to serve.
1488#
1489# This server accepts a single viewer (client).
1490#
1491# When the viewer closes the connection, the server's constructor
1492# returns.
1493class LttngLiveServer:
1494 def __init__(
1495 self,
1496 port,
1497 port_filename,
1498 tracing_session_descriptors,
1499 max_query_data_response_size,
1500 ):
1501 logging.info("Server configuration:")
1502
1503 logging.info(" Port file name: `{}`".format(port_filename))
1504
1505 if max_query_data_response_size is not None:
1506 logging.info(
1507 " Maximum response data query size: `{}`".format(
1508 max_query_data_response_size
1509 )
1510 )
1511
1512 for ts_descr in tracing_session_descriptors:
1513 info = ts_descr.info
1514 fmt = ' TS descriptor: name="{}", id={}, hostname="{}", live-timer-freq={}, client-count={}, stream-count={}:'
1515 logging.info(
1516 fmt.format(
1517 info.name,
1518 info.tracing_session_id,
1519 info.hostname,
1520 info.live_timer_freq,
1521 info.client_count,
1522 info.stream_count,
1523 )
1524 )
1525
1526 for trace in ts_descr.traces:
1527 logging.info(' Trace: path="{}"'.format(trace.path))
1528
1529 self._ts_descriptors = tracing_session_descriptors
1530 self._max_query_data_response_size = max_query_data_response_size
1531 self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1532 self._codec = _LttngLiveViewerProtocolCodec()
1533
1534 # Port 0: OS assigns an unused port
1535 serv_addr = ("localhost", port if port is not None else 0)
1536 self._sock.bind(serv_addr)
1537 self._write_port_to_file(port_filename)
1538
1539 try:
1540 self._listen()
1541 finally:
1542 self._sock.close()
1543 logging.info("Closed connection and socket.")
1544
1545 @property
1546 def _server_port(self):
1547 return self._sock.getsockname()[1]
1548
1549 def _recv_command(self):
1550 data = bytes()
1551
1552 while True:
1553 logging.info("Waiting for viewer command.")
1554 buf = self._conn.recv(128)
1555
1556 if not buf:
1557 logging.info("Client closed connection.")
1558
1559 if data:
1560 raise UnexpectedInput(
1561 "Client closed connection after having sent {} command bytes.".format(
1562 len(data)
1563 )
1564 )
1565
1566 return
1567
1568 logging.info("Received data from viewer: length={}".format(len(buf)))
1569
1570 data += buf
1571
1572 try:
1573 cmd = self._codec.decode(data)
1574 except struct.error as exc:
1575 raise UnexpectedInput("Malformed command: {}".format(exc)) from exc
1576
1577 if cmd is not None:
1578 logging.info(
1579 "Received command from viewer: cmd-cls-name={}".format(
1580 cmd.__class__.__name__
1581 )
1582 )
1583 return cmd
1584
1585 def _send_reply(self, reply):
1586 data = self._codec.encode(reply)
1587 logging.info(
1588 "Sending reply to viewer: reply-cls-name={}, length={}".format(
1589 reply.__class__.__name__, len(data)
1590 )
1591 )
1592 self._conn.sendall(data)
1593
1594 def _handle_connection(self):
1595 # First command must be "connect"
1596 cmd = self._recv_command()
1597
1598 if type(cmd) is not _LttngLiveViewerConnectCommand:
1599 raise UnexpectedInput(
1600 'First command is not "connect": cmd-cls-name={}'.format(
1601 cmd.__class__.__name__
1602 )
1603 )
1604
1605 # Create viewer session (arbitrary ID 23)
1606 logging.info(
1607 "LTTng live viewer connected: version={}.{}".format(cmd.major, cmd.minor)
1608 )
1609 viewer_session = _LttngLiveViewerSession(
1610 23, self._ts_descriptors, self._max_query_data_response_size
1611 )
1612
1613 # Send "connect" reply
1614 self._send_reply(
1615 _LttngLiveViewerConnectReply(viewer_session.viewer_session_id, 2, 10)
1616 )
1617
1618 # Make the viewer session handle the remaining commands
1619 while True:
1620 cmd = self._recv_command()
1621
1622 if cmd is None:
1623 # Connection closed (at an expected location within the
1624 # conversation)
1625 return
1626
1627 self._send_reply(viewer_session.handle_command(cmd))
1628
1629 def _listen(self):
1630 logging.info("Listening: port={}".format(self._server_port))
1631 # Backlog must be present for Python version < 3.5.
1632 # 128 is an arbitrary number since we expect only 1 connection anyway.
1633 self._sock.listen(128)
1634 self._conn, viewer_addr = self._sock.accept()
1635 logging.info(
1636 "Accepted viewer: addr={}:{}".format(viewer_addr[0], viewer_addr[1])
1637 )
1638
1639 try:
1640 self._handle_connection()
1641 finally:
1642 self._conn.close()
1643
1644 def _write_port_to_file(self, port_filename):
1645 # Write the port number to a temporary file.
1646 with tempfile.NamedTemporaryFile(mode="w", delete=False) as tmp_port_file:
1647 print(self._server_port, end="", file=tmp_port_file)
1648
1649 # Rename temporary file to real file
1650 os.replace(tmp_port_file.name, port_filename)
1651 logging.info(
1652 'Renamed port file: src-path="{}", dst-path="{}"'.format(
1653 tmp_port_file.name, port_filename
1654 )
1655 )
1656
1657
1658def _session_descriptors_from_path(sessions_filename, trace_path_prefix):
1659 # File format is:
1660 #
1661 # [
1662 # {
1663 # "name": "my-session",
1664 # "id": 17,
1665 # "hostname": "myhost",
1666 # "live-timer-freq": 1000000,
1667 # "client-count": 23,
1668 # "traces": [
1669 # {
1670 # "path": "lol"
1671 # },
1672 # {
1673 # "path": "meow/mix",
1674 # "beacons": {
1675 # "my_stream": [ 5235787, 728375283 ]
1676 # },
1677 # "metadata-sections": [
1678 # {
1679 # "line": 1,
1680 # "timestamp": 0
1681 # }
1682 # ]
1683 # }
1684 # ]
1685 # }
1686 # ]
1687 with open(sessions_filename, "r") as sessions_file:
1688 params = json.load(sessions_file)
1689
1690 sessions = []
1691
1692 for session in params:
1693 name = session["name"]
1694 tracing_session_id = session["id"]
1695 hostname = session["hostname"]
1696 live_timer_freq = session["live-timer-freq"]
1697 client_count = session["client-count"]
1698 traces = []
1699
1700 for trace in session["traces"]:
1701 metadata_sections = trace.get("metadata-sections")
1702 beacons = trace.get("beacons")
1703 path = trace["path"]
1704
1705 if not os.path.isabs(path):
1706 path = os.path.join(trace_path_prefix, path)
1707
1708 traces.append(LttngTrace(path, metadata_sections, beacons))
1709
1710 sessions.append(
1711 LttngTracingSessionDescriptor(
1712 name,
1713 tracing_session_id,
1714 hostname,
1715 live_timer_freq,
1716 client_count,
1717 traces,
1718 )
1719 )
1720
1721 return sessions
1722
1723
1724def _loglevel_parser(string):
1725 loglevels = {"info": logging.INFO, "warning": logging.WARNING}
1726 if string not in loglevels:
1727 msg = "{} is not a valid loglevel".format(string)
1728 raise argparse.ArgumentTypeError(msg)
1729 return loglevels[string]
1730
1731
1732if __name__ == "__main__":
1733 logging.basicConfig(format="# %(asctime)-25s%(message)s")
1734 parser = argparse.ArgumentParser(
1735 description="LTTng-live protocol mocker", add_help=False
1736 )
1737 parser.add_argument(
1738 "--log-level",
1739 default="warning",
1740 choices=["info", "warning"],
1741 help="The loglevel to be used.",
1742 )
1743
1744 loglevel_namespace, remaining_args = parser.parse_known_args()
1745 logging.getLogger().setLevel(_loglevel_parser(loglevel_namespace.log_level))
1746
1747 parser.add_argument(
1748 "--port",
1749 help="The port to bind to. If missing, use an OS-assigned port..",
1750 type=int,
1751 )
1752 parser.add_argument(
1753 "--port-filename",
1754 help="The final port file. This file is present when the server is ready to receive connection.",
1755 required=True,
1756 )
1757 parser.add_argument(
1758 "--max-query-data-response-size",
1759 type=int,
1760 help="The maximum size of control data response in bytes",
1761 )
1762 parser.add_argument(
1763 "--trace-path-prefix",
1764 type=str,
1765 help="Prefix to prepend to the trace paths of session configurations",
1766 )
1767 parser.add_argument(
1768 "--sessions-filename",
1769 type=str,
1770 help="Path to a session configuration file",
1771 )
1772 parser.add_argument(
1773 "-h",
1774 "--help",
1775 action="help",
1776 default=argparse.SUPPRESS,
1777 help="Show this help message and exit.",
1778 )
1779
1780 args = parser.parse_args(args=remaining_args)
1781 try:
1782 sessions = _session_descriptors_from_path(
1783 args.sessions_filename,
1784 args.trace_path_prefix,
1785 )
1786 LttngLiveServer(
1787 args.port, args.port_filename, sessions, args.max_query_data_response_size
1788 )
1789 except UnexpectedInput as exc:
1790 logging.error(str(exc))
1791 print(exc, file=sys.stderr)
1792 sys.exit(1)
This page took 0.032958 seconds and 4 git commands to generate.