tests: add strongly-typed JSON wrappers
[babeltrace.git] / tests / data / plugins / src.ctf.lttng-live / lttng_live_server.py
... / ...
CommitLineData
1# SPDX-License-Identifier: MIT
2#
3# Copyright (C) 2019 Philippe Proulx <pproulx@efficios.com>
4#
5
6import os
7import re
8import sys
9import json
10import socket
11import struct
12import logging
13import os.path
14import argparse
15import tempfile
16import collections.abc
17from collections import namedtuple
18
19
20class UnexpectedInput(RuntimeError):
21 pass
22
23
24# An entry within the index of an LTTng data stream.
25class _LttngDataStreamIndexEntry:
26 def __init__(
27 self,
28 offset_bytes,
29 total_size_bits,
30 content_size_bits,
31 timestamp_begin,
32 timestamp_end,
33 events_discarded,
34 stream_class_id,
35 ):
36 self._offset_bytes = offset_bytes
37 self._total_size_bits = total_size_bits
38 self._content_size_bits = content_size_bits
39 self._timestamp_begin = timestamp_begin
40 self._timestamp_end = timestamp_end
41 self._events_discarded = events_discarded
42 self._stream_class_id = stream_class_id
43
44 @property
45 def offset_bytes(self):
46 return self._offset_bytes
47
48 @property
49 def total_size_bits(self):
50 return self._total_size_bits
51
52 @property
53 def total_size_bytes(self):
54 return self._total_size_bits // 8
55
56 @property
57 def content_size_bits(self):
58 return self._content_size_bits
59
60 @property
61 def content_size_bytes(self):
62 return self._content_size_bits // 8
63
64 @property
65 def timestamp_begin(self):
66 return self._timestamp_begin
67
68 @property
69 def timestamp_end(self):
70 return self._timestamp_end
71
72 @property
73 def events_discarded(self):
74 return self._events_discarded
75
76 @property
77 def stream_class_id(self):
78 return self._stream_class_id
79
80
81# An entry within the index of an LTTng data stream. While a stream beacon entry
82# is conceptually unrelated to an index, it is sent as a reply to a
83# LttngLiveViewerGetNextDataStreamIndexEntryCommand
84class _LttngDataStreamBeaconEntry:
85 def __init__(self, stream_class_id, timestamp):
86 self._stream_class_id = stream_class_id
87 self._timestamp = timestamp
88
89 @property
90 def timestamp(self):
91 return self._timestamp
92
93 @property
94 def stream_class_id(self):
95 return self._stream_class_id
96
97
98class _LttngLiveViewerCommand:
99 def __init__(self, version):
100 self._version = version
101
102 @property
103 def version(self):
104 return self._version
105
106
107class _LttngLiveViewerConnectCommand(_LttngLiveViewerCommand):
108 def __init__(self, version, viewer_session_id, major, minor):
109 super().__init__(version)
110 self._viewer_session_id = viewer_session_id
111 self._major = major
112 self._minor = minor
113
114 @property
115 def viewer_session_id(self):
116 return self._viewer_session_id
117
118 @property
119 def major(self):
120 return self._major
121
122 @property
123 def minor(self):
124 return self._minor
125
126
127class _LttngLiveViewerConnectReply:
128 def __init__(self, viewer_session_id, major, minor):
129 self._viewer_session_id = viewer_session_id
130 self._major = major
131 self._minor = minor
132
133 @property
134 def viewer_session_id(self):
135 return self._viewer_session_id
136
137 @property
138 def major(self):
139 return self._major
140
141 @property
142 def minor(self):
143 return self._minor
144
145
146class _LttngLiveViewerGetTracingSessionInfosCommand(_LttngLiveViewerCommand):
147 pass
148
149
150class _LttngLiveViewerTracingSessionInfo:
151 def __init__(
152 self,
153 tracing_session_id,
154 live_timer_freq,
155 client_count,
156 stream_count,
157 hostname,
158 name,
159 ):
160 self._tracing_session_id = tracing_session_id
161 self._live_timer_freq = live_timer_freq
162 self._client_count = client_count
163 self._stream_count = stream_count
164 self._hostname = hostname
165 self._name = name
166
167 @property
168 def tracing_session_id(self):
169 return self._tracing_session_id
170
171 @property
172 def live_timer_freq(self):
173 return self._live_timer_freq
174
175 @property
176 def client_count(self):
177 return self._client_count
178
179 @property
180 def stream_count(self):
181 return self._stream_count
182
183 @property
184 def hostname(self):
185 return self._hostname
186
187 @property
188 def name(self):
189 return self._name
190
191
192class _LttngLiveViewerGetTracingSessionInfosReply:
193 def __init__(self, tracing_session_infos):
194 self._tracing_session_infos = tracing_session_infos
195
196 @property
197 def tracing_session_infos(self):
198 return self._tracing_session_infos
199
200
201class _LttngLiveViewerAttachToTracingSessionCommand(_LttngLiveViewerCommand):
202 class SeekType:
203 BEGINNING = 1
204 LAST = 2
205
206 def __init__(self, version, tracing_session_id, offset, seek_type):
207 super().__init__(version)
208 self._tracing_session_id = tracing_session_id
209 self._offset = offset
210 self._seek_type = seek_type
211
212 @property
213 def tracing_session_id(self):
214 return self._tracing_session_id
215
216 @property
217 def offset(self):
218 return self._offset
219
220 @property
221 def seek_type(self):
222 return self._seek_type
223
224
225class _LttngLiveViewerStreamInfo:
226 def __init__(self, id, trace_id, is_metadata, path, channel_name):
227 self._id = id
228 self._trace_id = trace_id
229 self._is_metadata = is_metadata
230 self._path = path
231 self._channel_name = channel_name
232
233 @property
234 def id(self):
235 return self._id
236
237 @property
238 def trace_id(self):
239 return self._trace_id
240
241 @property
242 def is_metadata(self):
243 return self._is_metadata
244
245 @property
246 def path(self):
247 return self._path
248
249 @property
250 def channel_name(self):
251 return self._channel_name
252
253
254class _LttngLiveViewerAttachToTracingSessionReply:
255 class Status:
256 OK = 1
257 ALREADY = 2
258 UNKNOWN = 3
259 NOT_LIVE = 4
260 SEEK_ERROR = 5
261 NO_SESSION = 6
262
263 def __init__(self, status, stream_infos):
264 self._status = status
265 self._stream_infos = stream_infos
266
267 @property
268 def status(self):
269 return self._status
270
271 @property
272 def stream_infos(self):
273 return self._stream_infos
274
275
276class _LttngLiveViewerGetNextDataStreamIndexEntryCommand(_LttngLiveViewerCommand):
277 def __init__(self, version, stream_id):
278 super().__init__(version)
279 self._stream_id = stream_id
280
281 @property
282 def stream_id(self):
283 return self._stream_id
284
285
286class _LttngLiveViewerGetNextDataStreamIndexEntryReply:
287 class Status:
288 OK = 1
289 RETRY = 2
290 HUP = 3
291 ERROR = 4
292 INACTIVE = 5
293 EOF = 6
294
295 def __init__(self, status, index_entry, has_new_metadata, has_new_data_stream):
296 self._status = status
297 self._index_entry = index_entry
298 self._has_new_metadata = has_new_metadata
299 self._has_new_data_stream = has_new_data_stream
300
301 @property
302 def status(self):
303 return self._status
304
305 @property
306 def index_entry(self):
307 return self._index_entry
308
309 @property
310 def has_new_metadata(self):
311 return self._has_new_metadata
312
313 @property
314 def has_new_data_stream(self):
315 return self._has_new_data_stream
316
317
318class _LttngLiveViewerGetDataStreamPacketDataCommand(_LttngLiveViewerCommand):
319 def __init__(self, version, stream_id, offset, req_length):
320 super().__init__(version)
321 self._stream_id = stream_id
322 self._offset = offset
323 self._req_length = req_length
324
325 @property
326 def stream_id(self):
327 return self._stream_id
328
329 @property
330 def offset(self):
331 return self._offset
332
333 @property
334 def req_length(self):
335 return self._req_length
336
337
338class _LttngLiveViewerGetDataStreamPacketDataReply:
339 class Status:
340 OK = 1
341 RETRY = 2
342 ERROR = 3
343 EOF = 4
344
345 def __init__(self, status, data, has_new_metadata, has_new_data_stream):
346 self._status = status
347 self._data = data
348 self._has_new_metadata = has_new_metadata
349 self._has_new_data_stream = has_new_data_stream
350
351 @property
352 def status(self):
353 return self._status
354
355 @property
356 def data(self):
357 return self._data
358
359 @property
360 def has_new_metadata(self):
361 return self._has_new_metadata
362
363 @property
364 def has_new_data_stream(self):
365 return self._has_new_data_stream
366
367
368class _LttngLiveViewerGetMetadataStreamDataCommand(_LttngLiveViewerCommand):
369 def __init__(self, version, stream_id):
370 super().__init__(version)
371 self._stream_id = stream_id
372
373 @property
374 def stream_id(self):
375 return self._stream_id
376
377
378class _LttngLiveViewerGetMetadataStreamDataContentReply:
379 class Status:
380 OK = 1
381 NO_NEW = 2
382 ERROR = 3
383
384 def __init__(self, status, data):
385 self._status = status
386 self._data = data
387
388 @property
389 def status(self):
390 return self._status
391
392 @property
393 def data(self):
394 return self._data
395
396
397class _LttngLiveViewerGetNewStreamInfosCommand(_LttngLiveViewerCommand):
398 def __init__(self, version, tracing_session_id):
399 super().__init__(version)
400 self._tracing_session_id = tracing_session_id
401
402 @property
403 def tracing_session_id(self):
404 return self._tracing_session_id
405
406
407class _LttngLiveViewerGetNewStreamInfosReply:
408 class Status:
409 OK = 1
410 NO_NEW = 2
411 ERROR = 3
412 HUP = 4
413
414 def __init__(self, status, stream_infos):
415 self._status = status
416 self._stream_infos = stream_infos
417
418 @property
419 def status(self):
420 return self._status
421
422 @property
423 def stream_infos(self):
424 return self._stream_infos
425
426
427class _LttngLiveViewerCreateViewerSessionCommand(_LttngLiveViewerCommand):
428 pass
429
430
431class _LttngLiveViewerCreateViewerSessionReply:
432 class Status:
433 OK = 1
434 ERROR = 2
435
436 def __init__(self, status):
437 self._status = status
438
439 @property
440 def status(self):
441 return self._status
442
443
444class _LttngLiveViewerDetachFromTracingSessionCommand(_LttngLiveViewerCommand):
445 def __init__(self, version, tracing_session_id):
446 super().__init__(version)
447 self._tracing_session_id = tracing_session_id
448
449 @property
450 def tracing_session_id(self):
451 return self._tracing_session_id
452
453
454class _LttngLiveViewerDetachFromTracingSessionReply:
455 class Status:
456 OK = 1
457 UNKNOWN = 2
458 ERROR = 3
459
460 def __init__(self, status):
461 self._status = status
462
463 @property
464 def status(self):
465 return self._status
466
467
468# An LTTng live protocol codec can convert bytes to command objects and
469# reply objects to bytes.
470class _LttngLiveViewerProtocolCodec:
471 _COMMAND_HEADER_STRUCT_FMT = "QII"
472 _COMMAND_HEADER_SIZE_BYTES = struct.calcsize(_COMMAND_HEADER_STRUCT_FMT)
473
474 def __init__(self):
475 pass
476
477 def _unpack(self, fmt, data, offset=0):
478 fmt = "!" + fmt
479 return struct.unpack_from(fmt, data, offset)
480
481 def _unpack_payload(self, fmt, data):
482 return self._unpack(
483 fmt, data, _LttngLiveViewerProtocolCodec._COMMAND_HEADER_SIZE_BYTES
484 )
485
486 def decode(self, data):
487 if len(data) < self._COMMAND_HEADER_SIZE_BYTES:
488 # Not enough data to read the command header
489 return
490
491 payload_size, cmd_type, version = self._unpack(
492 self._COMMAND_HEADER_STRUCT_FMT, data
493 )
494 logging.info(
495 "Decoded command header: payload-size={}, cmd-type={}, version={}".format(
496 payload_size, cmd_type, version
497 )
498 )
499
500 if len(data) < self._COMMAND_HEADER_SIZE_BYTES + payload_size:
501 # Not enough data to read the whole command
502 return
503
504 if cmd_type == 1:
505 viewer_session_id, major, minor, conn_type = self._unpack_payload(
506 "QIII", data
507 )
508 return _LttngLiveViewerConnectCommand(
509 version, viewer_session_id, major, minor
510 )
511 elif cmd_type == 2:
512 return _LttngLiveViewerGetTracingSessionInfosCommand(version)
513 elif cmd_type == 3:
514 tracing_session_id, offset, seek_type = self._unpack_payload("QQI", data)
515 return _LttngLiveViewerAttachToTracingSessionCommand(
516 version, tracing_session_id, offset, seek_type
517 )
518 elif cmd_type == 4:
519 (stream_id,) = self._unpack_payload("Q", data)
520 return _LttngLiveViewerGetNextDataStreamIndexEntryCommand(
521 version, stream_id
522 )
523 elif cmd_type == 5:
524 stream_id, offset, req_length = self._unpack_payload("QQI", data)
525 return _LttngLiveViewerGetDataStreamPacketDataCommand(
526 version, stream_id, offset, req_length
527 )
528 elif cmd_type == 6:
529 (stream_id,) = self._unpack_payload("Q", data)
530 return _LttngLiveViewerGetMetadataStreamDataCommand(version, stream_id)
531 elif cmd_type == 7:
532 (tracing_session_id,) = self._unpack_payload("Q", data)
533 return _LttngLiveViewerGetNewStreamInfosCommand(version, tracing_session_id)
534 elif cmd_type == 8:
535 return _LttngLiveViewerCreateViewerSessionCommand(version)
536 elif cmd_type == 9:
537 (tracing_session_id,) = self._unpack_payload("Q", data)
538 return _LttngLiveViewerDetachFromTracingSessionCommand(
539 version, tracing_session_id
540 )
541 else:
542 raise UnexpectedInput("Unknown command type {}".format(cmd_type))
543
544 def _pack(self, fmt, *args):
545 # Force network byte order
546 return struct.pack("!" + fmt, *args)
547
548 def _encode_zero_padded_str(self, string, length):
549 data = string.encode()
550 return data.ljust(length, b"\x00")
551
552 def _encode_stream_info(self, info):
553 data = self._pack("QQI", info.id, info.trace_id, int(info.is_metadata))
554 data += self._encode_zero_padded_str(info.path, 4096)
555 data += self._encode_zero_padded_str(info.channel_name, 255)
556 return data
557
558 def _get_has_new_stuff_flags(self, has_new_metadata, has_new_data_streams):
559 flags = 0
560
561 if has_new_metadata:
562 flags |= 1
563
564 if has_new_data_streams:
565 flags |= 2
566
567 return flags
568
569 def encode(self, reply):
570 if type(reply) is _LttngLiveViewerConnectReply:
571 data = self._pack(
572 "QIII", reply.viewer_session_id, reply.major, reply.minor, 2
573 )
574 elif type(reply) is _LttngLiveViewerGetTracingSessionInfosReply:
575 data = self._pack("I", len(reply.tracing_session_infos))
576
577 for info in reply.tracing_session_infos:
578 data += self._pack(
579 "QIII",
580 info.tracing_session_id,
581 info.live_timer_freq,
582 info.client_count,
583 info.stream_count,
584 )
585 data += self._encode_zero_padded_str(info.hostname, 64)
586 data += self._encode_zero_padded_str(info.name, 255)
587 elif type(reply) is _LttngLiveViewerAttachToTracingSessionReply:
588 data = self._pack("II", reply.status, len(reply.stream_infos))
589
590 for info in reply.stream_infos:
591 data += self._encode_stream_info(info)
592 elif type(reply) is _LttngLiveViewerGetNextDataStreamIndexEntryReply:
593 index_format = "QQQQQQQII"
594 entry = reply.index_entry
595 flags = self._get_has_new_stuff_flags(
596 reply.has_new_metadata, reply.has_new_data_stream
597 )
598
599 if type(entry) is _LttngDataStreamIndexEntry:
600 data = self._pack(
601 index_format,
602 entry.offset_bytes,
603 entry.total_size_bits,
604 entry.content_size_bits,
605 entry.timestamp_begin,
606 entry.timestamp_end,
607 entry.events_discarded,
608 entry.stream_class_id,
609 reply.status,
610 flags,
611 )
612 else:
613 assert type(entry) is _LttngDataStreamBeaconEntry
614 data = self._pack(
615 index_format,
616 0,
617 0,
618 0,
619 0,
620 entry.timestamp,
621 0,
622 entry.stream_class_id,
623 reply.status,
624 flags,
625 )
626 elif type(reply) is _LttngLiveViewerGetDataStreamPacketDataReply:
627 flags = self._get_has_new_stuff_flags(
628 reply.has_new_metadata, reply.has_new_data_stream
629 )
630 data = self._pack("III", reply.status, len(reply.data), flags)
631 data += reply.data
632 elif type(reply) is _LttngLiveViewerGetMetadataStreamDataContentReply:
633 data = self._pack("QI", len(reply.data), reply.status)
634 data += reply.data
635 elif type(reply) is _LttngLiveViewerGetNewStreamInfosReply:
636 data = self._pack("II", reply.status, len(reply.stream_infos))
637
638 for info in reply.stream_infos:
639 data += self._encode_stream_info(info)
640 elif type(reply) is _LttngLiveViewerCreateViewerSessionReply:
641 data = self._pack("I", reply.status)
642 elif type(reply) is _LttngLiveViewerDetachFromTracingSessionReply:
643 data = self._pack("I", reply.status)
644 else:
645 raise ValueError(
646 "Unknown reply object with class `{}`".format(reply.__class__.__name__)
647 )
648
649 return data
650
651
652def _get_entry_timestamp_begin(entry):
653 if type(entry) is _LttngDataStreamBeaconEntry:
654 return entry.timestamp
655 else:
656 assert type(entry) is _LttngDataStreamIndexEntry
657 return entry.timestamp_begin
658
659
660# The index of an LTTng data stream, a sequence of index entries.
661class _LttngDataStreamIndex(collections.abc.Sequence):
662 def __init__(self, path, beacons):
663 self._path = path
664 self._build()
665
666 if beacons:
667 stream_class_id = self._entries[0].stream_class_id
668 beacons = [
669 _LttngDataStreamBeaconEntry(stream_class_id, ts) for ts in beacons
670 ]
671 self._add_beacons(beacons)
672
673 logging.info(
674 'Built data stream index entries: path="{}", count={}'.format(
675 path, len(self._entries)
676 )
677 )
678
679 def _build(self):
680 self._entries = []
681 assert os.path.isfile(self._path)
682
683 with open(self._path, "rb") as f:
684 # Read header first
685 fmt = ">IIII"
686 size = struct.calcsize(fmt)
687 data = f.read(size)
688 assert len(data) == size
689 magic, index_major, index_minor, index_entry_length = struct.unpack(
690 fmt, data
691 )
692 assert magic == 0xC1F1DCC1
693
694 # Read index entries
695 fmt = ">QQQQQQQ"
696 size = struct.calcsize(fmt)
697
698 while True:
699 logging.debug(
700 'Decoding data stream index entry: path="{}", offset={}'.format(
701 self._path, f.tell()
702 )
703 )
704 data = f.read(size)
705
706 if not data:
707 # Done
708 break
709
710 assert len(data) == size
711 (
712 offset_bytes,
713 total_size_bits,
714 content_size_bits,
715 timestamp_begin,
716 timestamp_end,
717 events_discarded,
718 stream_class_id,
719 ) = struct.unpack(fmt, data)
720
721 self._entries.append(
722 _LttngDataStreamIndexEntry(
723 offset_bytes,
724 total_size_bits,
725 content_size_bits,
726 timestamp_begin,
727 timestamp_end,
728 events_discarded,
729 stream_class_id,
730 )
731 )
732
733 # Skip anything else before the next entry
734 f.seek(index_entry_length - size, os.SEEK_CUR)
735
736 def _add_beacons(self, beacons):
737 # Assumes entries[n + 1].timestamp_end >= entries[n].timestamp_begin
738 def sort_key(entry):
739 if type(entry) is _LttngDataStreamBeaconEntry:
740 return entry.timestamp
741 else:
742 return entry.timestamp_end
743
744 self._entries += beacons
745 self._entries.sort(key=sort_key)
746
747 def __getitem__(self, index):
748 return self._entries[index]
749
750 def __len__(self):
751 return len(self._entries)
752
753 @property
754 def path(self):
755 return self._path
756
757
758# An LTTng data stream.
759class _LttngDataStream:
760 def __init__(self, path, beacons):
761 self._path = path
762 filename = os.path.basename(path)
763 match = re.match(r"(.*)_\d+", filename)
764 if not match:
765 raise RuntimeError(
766 "Unexpected data stream file name pattern: {}".format(filename)
767 )
768
769 self._channel_name = match.group(1)
770 trace_dir = os.path.dirname(path)
771 index_path = os.path.join(trace_dir, "index", filename + ".idx")
772 self._index = _LttngDataStreamIndex(index_path, beacons)
773 assert os.path.isfile(path)
774 self._file = open(path, "rb")
775 logging.info(
776 'Built data stream: path="{}", channel-name="{}"'.format(
777 path, self._channel_name
778 )
779 )
780
781 @property
782 def path(self):
783 return self._path
784
785 @property
786 def channel_name(self):
787 return self._channel_name
788
789 @property
790 def index(self):
791 return self._index
792
793 def get_data(self, offset_bytes, len_bytes):
794 self._file.seek(offset_bytes)
795 return self._file.read(len_bytes)
796
797
798class _LttngMetadataStreamSection:
799 def __init__(self, timestamp, data):
800 self._timestamp = timestamp
801 if data is None:
802 self._data = bytes()
803 else:
804 self._data = data
805 logging.info(
806 "Built metadata stream section: ts={}, data-len={}".format(
807 self._timestamp, len(self._data)
808 )
809 )
810
811 @property
812 def timestamp(self):
813 return self._timestamp
814
815 @property
816 def data(self):
817 return self._data
818
819
820# An LTTng metadata stream.
821class _LttngMetadataStream:
822 def __init__(self, metadata_file_path, config_sections):
823 self._path = metadata_file_path
824 self._sections = config_sections
825 logging.info(
826 "Built metadata stream: path={}, section-len={}".format(
827 self._path, len(self._sections)
828 )
829 )
830
831 @property
832 def path(self):
833 return self._path
834
835 @property
836 def sections(self):
837 return self._sections
838
839
840LttngMetadataConfigSection = namedtuple(
841 "LttngMetadataConfigSection", ["line", "timestamp", "is_empty"]
842)
843
844
845def _parse_metadata_sections_config(config_sections):
846 assert config_sections is not None
847 config_metadata_sections = []
848 append_empty_section = False
849 last_timestamp = 0
850 last_line = 0
851
852 for config_section in config_sections:
853 if config_section == "empty":
854 # Found a empty section marker. Actually append the section at the
855 # timestamp of the next concrete section.
856 append_empty_section = True
857 else:
858 assert type(config_section) is dict
859 line = config_section.get("line")
860 ts = config_section.get("timestamp")
861
862 if type(line) is not int:
863 raise RuntimeError("`line` is not an integer")
864
865 if type(ts) is not int:
866 raise RuntimeError("`timestamp` is not an integer")
867
868 # Sections' timestamps and lines must both be increasing.
869 assert ts > last_timestamp
870 last_timestamp = ts
871 assert line > last_line
872 last_line = line
873
874 if append_empty_section:
875 config_metadata_sections.append(
876 LttngMetadataConfigSection(line, ts, True)
877 )
878 append_empty_section = False
879
880 config_metadata_sections.append(LttngMetadataConfigSection(line, ts, False))
881
882 return config_metadata_sections
883
884
885def _split_metadata_sections(metadata_file_path, raw_config_sections):
886 assert isinstance(raw_config_sections, collections.abc.Sequence)
887
888 parsed_sections = _parse_metadata_sections_config(raw_config_sections)
889
890 sections = []
891 with open(metadata_file_path, "r") as metadata_file:
892 metadata_lines = [line for line in metadata_file]
893
894 config_metadata_sections_idx = 0
895 curr_metadata_section = bytearray()
896
897 for idx, line_content in enumerate(metadata_lines):
898 # Add one to the index to convert from the zero-indexing of the
899 # enumerate() function to the one-indexing used by humans when
900 # viewing a text file.
901 curr_line_number = idx + 1
902
903 # If there are no more sections, simply append the line.
904 if config_metadata_sections_idx + 1 >= len(parsed_sections):
905 curr_metadata_section += bytearray(line_content, "utf8")
906 continue
907
908 next_section_line_number = parsed_sections[
909 config_metadata_sections_idx + 1
910 ].line
911
912 # If the next section begins at the current line, create a
913 # section with the metadata we gathered so far.
914 if curr_line_number >= next_section_line_number:
915 # Flushing the metadata of the current section.
916 sections.append(
917 _LttngMetadataStreamSection(
918 parsed_sections[config_metadata_sections_idx].timestamp,
919 bytes(curr_metadata_section),
920 )
921 )
922
923 # Move to the next section.
924 config_metadata_sections_idx += 1
925
926 # Clear old content and append current line for the next section.
927 curr_metadata_section.clear()
928 curr_metadata_section += bytearray(line_content, "utf8")
929
930 # Append any empty sections.
931 while parsed_sections[config_metadata_sections_idx].is_empty:
932 sections.append(
933 _LttngMetadataStreamSection(
934 parsed_sections[config_metadata_sections_idx].timestamp, None
935 )
936 )
937 config_metadata_sections_idx += 1
938 else:
939 # Append line_content to the current metadata section.
940 curr_metadata_section += bytearray(line_content, "utf8")
941
942 # We iterated over all the lines of the metadata file. Close the current section.
943 sections.append(
944 _LttngMetadataStreamSection(
945 parsed_sections[config_metadata_sections_idx].timestamp,
946 bytes(curr_metadata_section),
947 )
948 )
949
950 return sections
951
952
953# An LTTng trace, a sequence of LTTng data streams.
954class LttngTrace(collections.abc.Sequence):
955 def __init__(self, trace_dir, metadata_sections, beacons):
956 assert os.path.isdir(trace_dir)
957 self._path = trace_dir
958 self._create_metadata_stream(trace_dir, metadata_sections)
959 self._create_data_streams(trace_dir, beacons)
960 logging.info('Built trace: path="{}"'.format(trace_dir))
961
962 def _create_data_streams(self, trace_dir, beacons):
963 data_stream_paths = []
964
965 for filename in os.listdir(trace_dir):
966 path = os.path.join(trace_dir, filename)
967
968 if not os.path.isfile(path):
969 continue
970
971 if filename.startswith("."):
972 continue
973
974 if filename == "metadata":
975 continue
976
977 data_stream_paths.append(path)
978
979 data_stream_paths.sort()
980 self._data_streams = []
981
982 for data_stream_path in data_stream_paths:
983 stream_name = os.path.basename(data_stream_path)
984 this_stream_beacons = None
985
986 if beacons is not None and stream_name in beacons:
987 this_stream_beacons = beacons[stream_name]
988
989 self._data_streams.append(
990 _LttngDataStream(data_stream_path, this_stream_beacons)
991 )
992
993 def _create_metadata_stream(self, trace_dir, config_metadata_sections):
994 metadata_path = os.path.join(trace_dir, "metadata")
995 metadata_sections = []
996
997 if config_metadata_sections is None:
998 with open(metadata_path, "rb") as metadata_file:
999 metadata_sections.append(
1000 _LttngMetadataStreamSection(0, metadata_file.read())
1001 )
1002 else:
1003 metadata_sections = _split_metadata_sections(
1004 metadata_path, config_metadata_sections
1005 )
1006
1007 self._metadata_stream = _LttngMetadataStream(metadata_path, metadata_sections)
1008
1009 @property
1010 def path(self):
1011 return self._path
1012
1013 @property
1014 def metadata_stream(self):
1015 return self._metadata_stream
1016
1017 def __getitem__(self, index):
1018 return self._data_streams[index]
1019
1020 def __len__(self):
1021 return len(self._data_streams)
1022
1023
1024# The state of a single data stream.
1025class _LttngLiveViewerSessionDataStreamState:
1026 def __init__(self, ts_state, info, data_stream, metadata_stream_id):
1027 self._ts_state = ts_state
1028 self._info = info
1029 self._data_stream = data_stream
1030 self._metadata_stream_id = metadata_stream_id
1031 self._cur_index_entry_index = 0
1032 fmt = 'Built data stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
1033 logging.info(
1034 fmt.format(
1035 info.id,
1036 ts_state.tracing_session_descriptor.info.tracing_session_id,
1037 ts_state.tracing_session_descriptor.info.name,
1038 data_stream.path,
1039 )
1040 )
1041
1042 @property
1043 def tracing_session_state(self):
1044 return self._ts_state
1045
1046 @property
1047 def info(self):
1048 return self._info
1049
1050 @property
1051 def data_stream(self):
1052 return self._data_stream
1053
1054 @property
1055 def cur_index_entry(self):
1056 if self._cur_index_entry_index == len(self._data_stream.index):
1057 return
1058
1059 return self._data_stream.index[self._cur_index_entry_index]
1060
1061 def goto_next_index_entry(self):
1062 self._cur_index_entry_index += 1
1063
1064
1065# The state of a single metadata stream.
1066class _LttngLiveViewerSessionMetadataStreamState:
1067 def __init__(self, ts_state, info, metadata_stream):
1068 self._ts_state = ts_state
1069 self._info = info
1070 self._metadata_stream = metadata_stream
1071 self._cur_metadata_stream_section_index = 0
1072 if len(metadata_stream.sections) > 1:
1073 self._next_metadata_stream_section_timestamp = metadata_stream.sections[
1074 1
1075 ].timestamp
1076 else:
1077 self._next_metadata_stream_section_timestamp = None
1078
1079 self._is_sent = False
1080 fmt = 'Built metadata stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
1081 logging.info(
1082 fmt.format(
1083 info.id,
1084 ts_state.tracing_session_descriptor.info.tracing_session_id,
1085 ts_state.tracing_session_descriptor.info.name,
1086 metadata_stream.path,
1087 )
1088 )
1089
1090 @property
1091 def info(self):
1092 return self._info
1093
1094 @property
1095 def metadata_stream(self):
1096 return self._metadata_stream
1097
1098 @property
1099 def is_sent(self):
1100 return self._is_sent
1101
1102 @is_sent.setter
1103 def is_sent(self, value):
1104 self._is_sent = value
1105
1106 @property
1107 def cur_section(self):
1108 fmt = "Get current metadata section: section-idx={}"
1109 logging.info(fmt.format(self._cur_metadata_stream_section_index))
1110 if self._cur_metadata_stream_section_index == len(
1111 self._metadata_stream.sections
1112 ):
1113 return
1114
1115 return self._metadata_stream.sections[self._cur_metadata_stream_section_index]
1116
1117 def goto_next_section(self):
1118 self._cur_metadata_stream_section_index += 1
1119 if self.cur_section:
1120 self._next_metadata_stream_section_timestamp = self.cur_section.timestamp
1121 else:
1122 self._next_metadata_stream_section_timestamp = None
1123
1124 @property
1125 def next_section_timestamp(self):
1126 return self._next_metadata_stream_section_timestamp
1127
1128
1129# A tracing session descriptor.
1130#
1131# In the constructor, `traces` is a list of LTTng traces (`LttngTrace`
1132# objects).
1133class LttngTracingSessionDescriptor:
1134 def __init__(
1135 self, name, tracing_session_id, hostname, live_timer_freq, client_count, traces
1136 ):
1137 for trace in traces:
1138 if name not in trace.path:
1139 fmt = "Tracing session name must be part of every trace path (`{}` not found in `{}`)"
1140 raise ValueError(fmt.format(name, trace.path))
1141
1142 self._traces = traces
1143 stream_count = sum([len(t) + 1 for t in traces])
1144 self._info = _LttngLiveViewerTracingSessionInfo(
1145 tracing_session_id,
1146 live_timer_freq,
1147 client_count,
1148 stream_count,
1149 hostname,
1150 name,
1151 )
1152
1153 @property
1154 def traces(self):
1155 return self._traces
1156
1157 @property
1158 def info(self):
1159 return self._info
1160
1161
1162# The state of a tracing session.
1163class _LttngLiveViewerSessionTracingSessionState:
1164 def __init__(self, tc_descr, base_stream_id):
1165 self._tc_descr = tc_descr
1166 self._stream_infos = []
1167 self._ds_states = {}
1168 self._ms_states = {}
1169 stream_id = base_stream_id
1170
1171 for trace in tc_descr.traces:
1172 trace_id = stream_id * 1000
1173
1174 # Metadata stream -> stream info and metadata stream state
1175 info = _LttngLiveViewerStreamInfo(
1176 stream_id, trace_id, True, trace.metadata_stream.path, "metadata"
1177 )
1178 self._stream_infos.append(info)
1179 self._ms_states[stream_id] = _LttngLiveViewerSessionMetadataStreamState(
1180 self, info, trace.metadata_stream
1181 )
1182 metadata_stream_id = stream_id
1183 stream_id += 1
1184
1185 # Data streams -> stream infos and data stream states
1186 for data_stream in trace:
1187 info = _LttngLiveViewerStreamInfo(
1188 stream_id,
1189 trace_id,
1190 False,
1191 data_stream.path,
1192 data_stream.channel_name,
1193 )
1194 self._stream_infos.append(info)
1195 self._ds_states[stream_id] = _LttngLiveViewerSessionDataStreamState(
1196 self, info, data_stream, metadata_stream_id
1197 )
1198 stream_id += 1
1199
1200 self._is_attached = False
1201 fmt = 'Built tracing session state: id={}, name="{}"'
1202 logging.info(fmt.format(tc_descr.info.tracing_session_id, tc_descr.info.name))
1203
1204 @property
1205 def tracing_session_descriptor(self):
1206 return self._tc_descr
1207
1208 @property
1209 def data_stream_states(self):
1210 return self._ds_states
1211
1212 @property
1213 def metadata_stream_states(self):
1214 return self._ms_states
1215
1216 @property
1217 def stream_infos(self):
1218 return self._stream_infos
1219
1220 @property
1221 def has_new_metadata(self):
1222 return any([not ms.is_sent for ms in self._ms_states.values()])
1223
1224 @property
1225 def is_attached(self):
1226 return self._is_attached
1227
1228 @is_attached.setter
1229 def is_attached(self, value):
1230 self._is_attached = value
1231
1232
1233def needs_new_metadata_section(metadata_stream_state, latest_timestamp):
1234 if metadata_stream_state.next_section_timestamp is None:
1235 return False
1236
1237 if latest_timestamp >= metadata_stream_state.next_section_timestamp:
1238 return True
1239 else:
1240 return False
1241
1242
1243# An LTTng live viewer session manages a view on tracing sessions
1244# and replies to commands accordingly.
1245class _LttngLiveViewerSession:
1246 def __init__(
1247 self,
1248 viewer_session_id,
1249 tracing_session_descriptors,
1250 max_query_data_response_size,
1251 ):
1252 self._viewer_session_id = viewer_session_id
1253 self._ts_states = {}
1254 self._stream_states = {}
1255 self._max_query_data_response_size = max_query_data_response_size
1256 total_stream_infos = 0
1257
1258 for ts_descr in tracing_session_descriptors:
1259 ts_state = _LttngLiveViewerSessionTracingSessionState(
1260 ts_descr, total_stream_infos
1261 )
1262 ts_id = ts_state.tracing_session_descriptor.info.tracing_session_id
1263 self._ts_states[ts_id] = ts_state
1264 total_stream_infos += len(ts_state.stream_infos)
1265
1266 # Update session's stream states to have the new states
1267 self._stream_states.update(ts_state.data_stream_states)
1268 self._stream_states.update(ts_state.metadata_stream_states)
1269
1270 self._command_handlers = {
1271 _LttngLiveViewerAttachToTracingSessionCommand: self._handle_attach_to_tracing_session_command,
1272 _LttngLiveViewerCreateViewerSessionCommand: self._handle_create_viewer_session_command,
1273 _LttngLiveViewerDetachFromTracingSessionCommand: self._handle_detach_from_tracing_session_command,
1274 _LttngLiveViewerGetDataStreamPacketDataCommand: self._handle_get_data_stream_packet_data_command,
1275 _LttngLiveViewerGetMetadataStreamDataCommand: self._handle_get_metadata_stream_data_command,
1276 _LttngLiveViewerGetNewStreamInfosCommand: self._handle_get_new_stream_infos_command,
1277 _LttngLiveViewerGetNextDataStreamIndexEntryCommand: self._handle_get_next_data_stream_index_entry_command,
1278 _LttngLiveViewerGetTracingSessionInfosCommand: self._handle_get_tracing_session_infos_command,
1279 }
1280
1281 @property
1282 def viewer_session_id(self):
1283 return self._viewer_session_id
1284
1285 def _get_tracing_session_state(self, tracing_session_id):
1286 if tracing_session_id not in self._ts_states:
1287 raise UnexpectedInput(
1288 "Unknown tracing session ID {}".format(tracing_session_id)
1289 )
1290
1291 return self._ts_states[tracing_session_id]
1292
1293 def _get_stream_state(self, stream_id):
1294 if stream_id not in self._stream_states:
1295 UnexpectedInput("Unknown stream ID {}".format(stream_id))
1296
1297 return self._stream_states[stream_id]
1298
1299 def handle_command(self, cmd):
1300 logging.info(
1301 "Handling command in viewer session: cmd-cls-name={}".format(
1302 cmd.__class__.__name__
1303 )
1304 )
1305 cmd_type = type(cmd)
1306
1307 if cmd_type not in self._command_handlers:
1308 raise UnexpectedInput(
1309 "Unexpected command: cmd-cls-name={}".format(cmd.__class__.__name__)
1310 )
1311
1312 return self._command_handlers[cmd_type](cmd)
1313
1314 def _handle_attach_to_tracing_session_command(self, cmd):
1315 fmt = 'Handling "attach to tracing session" command: ts-id={}, offset={}, seek-type={}'
1316 logging.info(fmt.format(cmd.tracing_session_id, cmd.offset, cmd.seek_type))
1317 ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
1318 info = ts_state.tracing_session_descriptor.info
1319
1320 if ts_state.is_attached:
1321 raise UnexpectedInput(
1322 "Cannot attach to tracing session `{}`: viewer is already attached".format(
1323 info.name
1324 )
1325 )
1326
1327 ts_state.is_attached = True
1328 status = _LttngLiveViewerAttachToTracingSessionReply.Status.OK
1329 return _LttngLiveViewerAttachToTracingSessionReply(
1330 status, ts_state.stream_infos
1331 )
1332
1333 def _handle_detach_from_tracing_session_command(self, cmd):
1334 fmt = 'Handling "detach from tracing session" command: ts-id={}'
1335 logging.info(fmt.format(cmd.tracing_session_id))
1336 ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
1337 info = ts_state.tracing_session_descriptor.info
1338
1339 if not ts_state.is_attached:
1340 raise UnexpectedInput(
1341 "Cannot detach to tracing session `{}`: viewer is not attached".format(
1342 info.name
1343 )
1344 )
1345
1346 ts_state.is_attached = False
1347 status = _LttngLiveViewerDetachFromTracingSessionReply.Status.OK
1348 return _LttngLiveViewerDetachFromTracingSessionReply(status)
1349
1350 def _handle_get_next_data_stream_index_entry_command(self, cmd):
1351 fmt = 'Handling "get next data stream index entry" command: stream-id={}'
1352 logging.info(fmt.format(cmd.stream_id))
1353 stream_state = self._get_stream_state(cmd.stream_id)
1354 metadata_stream_state = self._get_stream_state(stream_state._metadata_stream_id)
1355
1356 if type(stream_state) is not _LttngLiveViewerSessionDataStreamState:
1357 raise UnexpectedInput(
1358 "Stream with ID {} is not a data stream".format(cmd.stream_id)
1359 )
1360
1361 if stream_state.cur_index_entry is None:
1362 # The viewer is done reading this stream
1363 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.HUP
1364
1365 # Dummy data stream index entry to use with the `HUP` status
1366 # (the reply needs one, but the viewer ignores it)
1367 index_entry = _LttngDataStreamIndexEntry(0, 0, 0, 0, 0, 0, 0)
1368
1369 return _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1370 status, index_entry, False, False
1371 )
1372
1373 timestamp_begin = _get_entry_timestamp_begin(stream_state.cur_index_entry)
1374
1375 if needs_new_metadata_section(metadata_stream_state, timestamp_begin):
1376 metadata_stream_state.is_sent = False
1377 metadata_stream_state.goto_next_section()
1378
1379 # The viewer only checks the `has_new_metadata` flag if the
1380 # reply's status is `OK`, so we need to provide an index here
1381 has_new_metadata = stream_state.tracing_session_state.has_new_metadata
1382 if type(stream_state.cur_index_entry) is _LttngDataStreamIndexEntry:
1383 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.OK
1384 else:
1385 assert type(stream_state.cur_index_entry) is _LttngDataStreamBeaconEntry
1386 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.INACTIVE
1387
1388 reply = _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1389 status, stream_state.cur_index_entry, has_new_metadata, False
1390 )
1391 stream_state.goto_next_index_entry()
1392 return reply
1393
1394 def _handle_get_data_stream_packet_data_command(self, cmd):
1395 fmt = 'Handling "get data stream packet data" command: stream-id={}, offset={}, req-length={}'
1396 logging.info(fmt.format(cmd.stream_id, cmd.offset, cmd.req_length))
1397 stream_state = self._get_stream_state(cmd.stream_id)
1398 data_response_length = cmd.req_length
1399
1400 if type(stream_state) is not _LttngLiveViewerSessionDataStreamState:
1401 raise UnexpectedInput(
1402 "Stream with ID {} is not a data stream".format(cmd.stream_id)
1403 )
1404
1405 if stream_state.tracing_session_state.has_new_metadata:
1406 status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.ERROR
1407 return _LttngLiveViewerGetDataStreamPacketDataReply(
1408 status, bytes(), True, False
1409 )
1410
1411 if self._max_query_data_response_size:
1412 # Enforce a server side limit on the query requested length.
1413 # To ensure that the transaction terminate take the minimum of both
1414 # value.
1415 data_response_length = min(
1416 cmd.req_length, self._max_query_data_response_size
1417 )
1418 fmt = 'Limiting "get data stream packet data" command: req-length={} actual response size={}'
1419 logging.info(fmt.format(cmd.req_length, data_response_length))
1420
1421 data = stream_state.data_stream.get_data(cmd.offset, data_response_length)
1422 status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.OK
1423 return _LttngLiveViewerGetDataStreamPacketDataReply(status, data, False, False)
1424
1425 def _handle_get_metadata_stream_data_command(self, cmd):
1426 fmt = 'Handling "get metadata stream data" command: stream-id={}'
1427 logging.info(fmt.format(cmd.stream_id))
1428 metadata_stream_state = self._get_stream_state(cmd.stream_id)
1429
1430 if (
1431 type(metadata_stream_state)
1432 is not _LttngLiveViewerSessionMetadataStreamState
1433 ):
1434 raise UnexpectedInput(
1435 "Stream with ID {} is not a metadata stream".format(cmd.stream_id)
1436 )
1437
1438 if metadata_stream_state.is_sent:
1439 status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.NO_NEW
1440 return _LttngLiveViewerGetMetadataStreamDataContentReply(status, bytes())
1441
1442 metadata_stream_state.is_sent = True
1443 status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.OK
1444 metadata_section = metadata_stream_state.cur_section
1445 assert metadata_section is not None
1446
1447 # If we are sending an empty section, ready the next one right away.
1448 if len(metadata_section.data) == 0:
1449 metadata_stream_state.is_sent = False
1450 metadata_stream_state.goto_next_section()
1451
1452 fmt = 'Replying to "get metadata stream data" command: metadata-size={}'
1453 logging.info(fmt.format(len(metadata_section.data)))
1454 return _LttngLiveViewerGetMetadataStreamDataContentReply(
1455 status, metadata_section.data
1456 )
1457
1458 def _handle_get_new_stream_infos_command(self, cmd):
1459 fmt = 'Handling "get new stream infos" command: ts-id={}'
1460 logging.info(fmt.format(cmd.tracing_session_id))
1461
1462 # As of this version, all the tracing session's stream infos are
1463 # always given to the viewer when sending the "attach to tracing
1464 # session" reply, so there's nothing new here. Return the `HUP`
1465 # status as, if we're handling this command, the viewer consumed
1466 # all the existing data streams.
1467 status = _LttngLiveViewerGetNewStreamInfosReply.Status.HUP
1468 return _LttngLiveViewerGetNewStreamInfosReply(status, [])
1469
1470 def _handle_get_tracing_session_infos_command(self, cmd):
1471 logging.info('Handling "get tracing session infos" command.')
1472 infos = [
1473 tss.tracing_session_descriptor.info for tss in self._ts_states.values()
1474 ]
1475 infos.sort(key=lambda info: info.name)
1476 return _LttngLiveViewerGetTracingSessionInfosReply(infos)
1477
1478 def _handle_create_viewer_session_command(self, cmd):
1479 logging.info('Handling "create viewer session" command.')
1480 status = _LttngLiveViewerCreateViewerSessionReply.Status.OK
1481
1482 # This does nothing here. In the LTTng relay daemon, it
1483 # allocates the viewer session's state.
1484 return _LttngLiveViewerCreateViewerSessionReply(status)
1485
1486
1487# An LTTng live TCP server.
1488#
1489# On creation, it binds to `localhost` on the TCP port `port` if not `None`, or
1490# on an OS-assigned TCP port otherwise. It writes the decimal TCP port number
1491# to a temporary port file. It renames the temporary port file to
1492# `port_filename`.
1493#
1494# `tracing_session_descriptors` is a list of tracing session descriptors
1495# (`LttngTracingSessionDescriptor`) to serve.
1496#
1497# This server accepts a single viewer (client).
1498#
1499# When the viewer closes the connection, the server's constructor
1500# returns.
1501class LttngLiveServer:
1502 def __init__(
1503 self,
1504 port,
1505 port_filename,
1506 tracing_session_descriptors,
1507 max_query_data_response_size,
1508 ):
1509 logging.info("Server configuration:")
1510
1511 logging.info(" Port file name: `{}`".format(port_filename))
1512
1513 if max_query_data_response_size is not None:
1514 logging.info(
1515 " Maximum response data query size: `{}`".format(
1516 max_query_data_response_size
1517 )
1518 )
1519
1520 for ts_descr in tracing_session_descriptors:
1521 info = ts_descr.info
1522 fmt = ' TS descriptor: name="{}", id={}, hostname="{}", live-timer-freq={}, client-count={}, stream-count={}:'
1523 logging.info(
1524 fmt.format(
1525 info.name,
1526 info.tracing_session_id,
1527 info.hostname,
1528 info.live_timer_freq,
1529 info.client_count,
1530 info.stream_count,
1531 )
1532 )
1533
1534 for trace in ts_descr.traces:
1535 logging.info(' Trace: path="{}"'.format(trace.path))
1536
1537 self._ts_descriptors = tracing_session_descriptors
1538 self._max_query_data_response_size = max_query_data_response_size
1539 self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1540 self._codec = _LttngLiveViewerProtocolCodec()
1541
1542 # Port 0: OS assigns an unused port
1543 serv_addr = ("localhost", port if port is not None else 0)
1544 self._sock.bind(serv_addr)
1545 self._write_port_to_file(port_filename)
1546
1547 try:
1548 self._listen()
1549 finally:
1550 self._sock.close()
1551 logging.info("Closed connection and socket.")
1552
1553 @property
1554 def _server_port(self):
1555 return self._sock.getsockname()[1]
1556
1557 def _recv_command(self):
1558 data = bytes()
1559
1560 while True:
1561 logging.info("Waiting for viewer command.")
1562 buf = self._conn.recv(128)
1563
1564 if not buf:
1565 logging.info("Client closed connection.")
1566
1567 if data:
1568 raise UnexpectedInput(
1569 "Client closed connection after having sent {} command bytes.".format(
1570 len(data)
1571 )
1572 )
1573
1574 return
1575
1576 logging.info("Received data from viewer: length={}".format(len(buf)))
1577
1578 data += buf
1579
1580 try:
1581 cmd = self._codec.decode(data)
1582 except struct.error as exc:
1583 raise UnexpectedInput("Malformed command: {}".format(exc)) from exc
1584
1585 if cmd is not None:
1586 logging.info(
1587 "Received command from viewer: cmd-cls-name={}".format(
1588 cmd.__class__.__name__
1589 )
1590 )
1591 return cmd
1592
1593 def _send_reply(self, reply):
1594 data = self._codec.encode(reply)
1595 logging.info(
1596 "Sending reply to viewer: reply-cls-name={}, length={}".format(
1597 reply.__class__.__name__, len(data)
1598 )
1599 )
1600 self._conn.sendall(data)
1601
1602 def _handle_connection(self):
1603 # First command must be "connect"
1604 cmd = self._recv_command()
1605
1606 if type(cmd) is not _LttngLiveViewerConnectCommand:
1607 raise UnexpectedInput(
1608 'First command is not "connect": cmd-cls-name={}'.format(
1609 cmd.__class__.__name__
1610 )
1611 )
1612
1613 # Create viewer session (arbitrary ID 23)
1614 logging.info(
1615 "LTTng live viewer connected: version={}.{}".format(cmd.major, cmd.minor)
1616 )
1617 viewer_session = _LttngLiveViewerSession(
1618 23, self._ts_descriptors, self._max_query_data_response_size
1619 )
1620
1621 # Send "connect" reply
1622 self._send_reply(
1623 _LttngLiveViewerConnectReply(viewer_session.viewer_session_id, 2, 10)
1624 )
1625
1626 # Make the viewer session handle the remaining commands
1627 while True:
1628 cmd = self._recv_command()
1629
1630 if cmd is None:
1631 # Connection closed (at an expected location within the
1632 # conversation)
1633 return
1634
1635 self._send_reply(viewer_session.handle_command(cmd))
1636
1637 def _listen(self):
1638 logging.info("Listening: port={}".format(self._server_port))
1639 # Backlog must be present for Python version < 3.5.
1640 # 128 is an arbitrary number since we expect only 1 connection anyway.
1641 self._sock.listen(128)
1642 self._conn, viewer_addr = self._sock.accept()
1643 logging.info(
1644 "Accepted viewer: addr={}:{}".format(viewer_addr[0], viewer_addr[1])
1645 )
1646
1647 try:
1648 self._handle_connection()
1649 finally:
1650 self._conn.close()
1651
1652 def _write_port_to_file(self, port_filename):
1653 # Write the port number to a temporary file.
1654 with tempfile.NamedTemporaryFile(mode="w", delete=False) as tmp_port_file:
1655 print(self._server_port, end="", file=tmp_port_file)
1656
1657 # Rename temporary file to real file
1658 os.replace(tmp_port_file.name, port_filename)
1659 logging.info(
1660 'Renamed port file: src-path="{}", dst-path="{}"'.format(
1661 tmp_port_file.name, port_filename
1662 )
1663 )
1664
1665
1666def _session_descriptors_from_path(sessions_filename, trace_path_prefix):
1667 # File format is:
1668 #
1669 # [
1670 # {
1671 # "name": "my-session",
1672 # "id": 17,
1673 # "hostname": "myhost",
1674 # "live-timer-freq": 1000000,
1675 # "client-count": 23,
1676 # "traces": [
1677 # {
1678 # "path": "lol"
1679 # },
1680 # {
1681 # "path": "meow/mix",
1682 # "beacons": {
1683 # "my_stream": [ 5235787, 728375283 ]
1684 # },
1685 # "metadata-sections": [
1686 # {
1687 # "line": 1,
1688 # "timestamp": 0
1689 # }
1690 # ]
1691 # }
1692 # ]
1693 # }
1694 # ]
1695 with open(sessions_filename, "r") as sessions_file:
1696 params = json.load(sessions_file)
1697
1698 sessions = []
1699
1700 for session in params:
1701 name = session["name"]
1702 tracing_session_id = session["id"]
1703 hostname = session["hostname"]
1704 live_timer_freq = session["live-timer-freq"]
1705 client_count = session["client-count"]
1706 traces = []
1707
1708 for trace in session["traces"]:
1709 metadata_sections = trace.get("metadata-sections")
1710 beacons = trace.get("beacons")
1711 path = trace["path"]
1712
1713 if not os.path.isabs(path):
1714 path = os.path.join(trace_path_prefix, path)
1715
1716 traces.append(LttngTrace(path, metadata_sections, beacons))
1717
1718 sessions.append(
1719 LttngTracingSessionDescriptor(
1720 name,
1721 tracing_session_id,
1722 hostname,
1723 live_timer_freq,
1724 client_count,
1725 traces,
1726 )
1727 )
1728
1729 return sessions
1730
1731
1732def _loglevel_parser(string):
1733 loglevels = {"info": logging.INFO, "warning": logging.WARNING}
1734 if string not in loglevels:
1735 msg = "{} is not a valid loglevel".format(string)
1736 raise argparse.ArgumentTypeError(msg)
1737 return loglevels[string]
1738
1739
1740if __name__ == "__main__":
1741 logging.basicConfig(format="# %(asctime)-25s%(message)s")
1742 parser = argparse.ArgumentParser(
1743 description="LTTng-live protocol mocker", add_help=False
1744 )
1745 parser.add_argument(
1746 "--log-level",
1747 default="warning",
1748 choices=["info", "warning"],
1749 help="The loglevel to be used.",
1750 )
1751
1752 loglevel_namespace, remaining_args = parser.parse_known_args()
1753 logging.getLogger().setLevel(_loglevel_parser(loglevel_namespace.log_level))
1754
1755 parser.add_argument(
1756 "--port",
1757 help="The port to bind to. If missing, use an OS-assigned port..",
1758 type=int,
1759 )
1760 parser.add_argument(
1761 "--port-filename",
1762 help="The final port file. This file is present when the server is ready to receive connection.",
1763 required=True,
1764 )
1765 parser.add_argument(
1766 "--max-query-data-response-size",
1767 type=int,
1768 help="The maximum size of control data response in bytes",
1769 )
1770 parser.add_argument(
1771 "--trace-path-prefix",
1772 type=str,
1773 help="Prefix to prepend to the trace paths of session configurations",
1774 )
1775 parser.add_argument(
1776 "--sessions-filename",
1777 type=str,
1778 help="Path to a session configuration file",
1779 )
1780 parser.add_argument(
1781 "-h",
1782 "--help",
1783 action="help",
1784 default=argparse.SUPPRESS,
1785 help="Show this help message and exit.",
1786 )
1787
1788 args = parser.parse_args(args=remaining_args)
1789 try:
1790 sessions = _session_descriptors_from_path(
1791 args.sessions_filename,
1792 args.trace_path_prefix,
1793 )
1794 LttngLiveServer(
1795 args.port, args.port_filename, sessions, args.max_query_data_response_size
1796 )
1797 except UnexpectedInput as exc:
1798 logging.error(str(exc))
1799 print(exc, file=sys.stderr)
1800 sys.exit(1)
This page took 0.028274 seconds and 4 git commands to generate.