9687929418490c87514e0f71b0028d0c1555f5ba
[babeltrace.git] / tests / data / plugins / src.ctf.lttng-live / lttng_live_server.py
1 # SPDX-License-Identifier: MIT
2 #
3 # Copyright (C) 2019 Philippe Proulx <pproulx@efficios.com>
4 #
5
6 import os
7 import re
8 import sys
9 import json
10 import socket
11 import struct
12 import logging
13 import os.path
14 import argparse
15 import tempfile
16 import collections.abc
17 from collections import namedtuple
18
19
20 class UnexpectedInput(RuntimeError):
21 pass
22
23
24 # An entry within the index of an LTTng data stream.
25 class _LttngDataStreamIndexEntry:
26 def __init__(
27 self,
28 offset_bytes,
29 total_size_bits,
30 content_size_bits,
31 timestamp_begin,
32 timestamp_end,
33 events_discarded,
34 stream_class_id,
35 ):
36 self._offset_bytes = offset_bytes
37 self._total_size_bits = total_size_bits
38 self._content_size_bits = content_size_bits
39 self._timestamp_begin = timestamp_begin
40 self._timestamp_end = timestamp_end
41 self._events_discarded = events_discarded
42 self._stream_class_id = stream_class_id
43
44 @property
45 def offset_bytes(self):
46 return self._offset_bytes
47
48 @property
49 def total_size_bits(self):
50 return self._total_size_bits
51
52 @property
53 def total_size_bytes(self):
54 return self._total_size_bits // 8
55
56 @property
57 def content_size_bits(self):
58 return self._content_size_bits
59
60 @property
61 def content_size_bytes(self):
62 return self._content_size_bits // 8
63
64 @property
65 def timestamp_begin(self):
66 return self._timestamp_begin
67
68 @property
69 def timestamp_end(self):
70 return self._timestamp_end
71
72 @property
73 def events_discarded(self):
74 return self._events_discarded
75
76 @property
77 def stream_class_id(self):
78 return self._stream_class_id
79
80
81 # An entry within the index of an LTTng data stream. While a stream beacon entry
82 # is conceptually unrelated to an index, it is sent as a reply to a
83 # LttngLiveViewerGetNextDataStreamIndexEntryCommand
84 class _LttngDataStreamBeaconEntry:
85 def __init__(self, stream_class_id, timestamp):
86 self._stream_class_id = stream_class_id
87 self._timestamp = timestamp
88
89 @property
90 def timestamp(self):
91 return self._timestamp
92
93 @property
94 def stream_class_id(self):
95 return self._stream_class_id
96
97
98 class _LttngLiveViewerCommand:
99 def __init__(self, version):
100 self._version = version
101
102 @property
103 def version(self):
104 return self._version
105
106
107 class _LttngLiveViewerConnectCommand(_LttngLiveViewerCommand):
108 def __init__(self, version, viewer_session_id, major, minor):
109 super().__init__(version)
110 self._viewer_session_id = viewer_session_id
111 self._major = major
112 self._minor = minor
113
114 @property
115 def viewer_session_id(self):
116 return self._viewer_session_id
117
118 @property
119 def major(self):
120 return self._major
121
122 @property
123 def minor(self):
124 return self._minor
125
126
127 class _LttngLiveViewerConnectReply:
128 def __init__(self, viewer_session_id, major, minor):
129 self._viewer_session_id = viewer_session_id
130 self._major = major
131 self._minor = minor
132
133 @property
134 def viewer_session_id(self):
135 return self._viewer_session_id
136
137 @property
138 def major(self):
139 return self._major
140
141 @property
142 def minor(self):
143 return self._minor
144
145
146 class _LttngLiveViewerGetTracingSessionInfosCommand(_LttngLiveViewerCommand):
147 pass
148
149
150 class _LttngLiveViewerTracingSessionInfo:
151 def __init__(
152 self,
153 tracing_session_id,
154 live_timer_freq,
155 client_count,
156 stream_count,
157 hostname,
158 name,
159 ):
160 self._tracing_session_id = tracing_session_id
161 self._live_timer_freq = live_timer_freq
162 self._client_count = client_count
163 self._stream_count = stream_count
164 self._hostname = hostname
165 self._name = name
166
167 @property
168 def tracing_session_id(self):
169 return self._tracing_session_id
170
171 @property
172 def live_timer_freq(self):
173 return self._live_timer_freq
174
175 @property
176 def client_count(self):
177 return self._client_count
178
179 @property
180 def stream_count(self):
181 return self._stream_count
182
183 @property
184 def hostname(self):
185 return self._hostname
186
187 @property
188 def name(self):
189 return self._name
190
191
192 class _LttngLiveViewerGetTracingSessionInfosReply:
193 def __init__(self, tracing_session_infos):
194 self._tracing_session_infos = tracing_session_infos
195
196 @property
197 def tracing_session_infos(self):
198 return self._tracing_session_infos
199
200
201 class _LttngLiveViewerAttachToTracingSessionCommand(_LttngLiveViewerCommand):
202 class SeekType:
203 BEGINNING = 1
204 LAST = 2
205
206 def __init__(self, version, tracing_session_id, offset, seek_type):
207 super().__init__(version)
208 self._tracing_session_id = tracing_session_id
209 self._offset = offset
210 self._seek_type = seek_type
211
212 @property
213 def tracing_session_id(self):
214 return self._tracing_session_id
215
216 @property
217 def offset(self):
218 return self._offset
219
220 @property
221 def seek_type(self):
222 return self._seek_type
223
224
225 class _LttngLiveViewerStreamInfo:
226 def __init__(self, id, trace_id, is_metadata, path, channel_name):
227 self._id = id
228 self._trace_id = trace_id
229 self._is_metadata = is_metadata
230 self._path = path
231 self._channel_name = channel_name
232
233 @property
234 def id(self):
235 return self._id
236
237 @property
238 def trace_id(self):
239 return self._trace_id
240
241 @property
242 def is_metadata(self):
243 return self._is_metadata
244
245 @property
246 def path(self):
247 return self._path
248
249 @property
250 def channel_name(self):
251 return self._channel_name
252
253
254 class _LttngLiveViewerAttachToTracingSessionReply:
255 class Status:
256 OK = 1
257 ALREADY = 2
258 UNKNOWN = 3
259 NOT_LIVE = 4
260 SEEK_ERROR = 5
261 NO_SESSION = 6
262
263 def __init__(self, status, stream_infos):
264 self._status = status
265 self._stream_infos = stream_infos
266
267 @property
268 def status(self):
269 return self._status
270
271 @property
272 def stream_infos(self):
273 return self._stream_infos
274
275
276 class _LttngLiveViewerGetNextDataStreamIndexEntryCommand(_LttngLiveViewerCommand):
277 def __init__(self, version, stream_id):
278 super().__init__(version)
279 self._stream_id = stream_id
280
281 @property
282 def stream_id(self):
283 return self._stream_id
284
285
286 class _LttngLiveViewerGetNextDataStreamIndexEntryReply:
287 class Status:
288 OK = 1
289 RETRY = 2
290 HUP = 3
291 ERROR = 4
292 INACTIVE = 5
293 EOF = 6
294
295 def __init__(self, status, index_entry, has_new_metadata, has_new_data_stream):
296 self._status = status
297 self._index_entry = index_entry
298 self._has_new_metadata = has_new_metadata
299 self._has_new_data_stream = has_new_data_stream
300
301 @property
302 def status(self):
303 return self._status
304
305 @property
306 def index_entry(self):
307 return self._index_entry
308
309 @property
310 def has_new_metadata(self):
311 return self._has_new_metadata
312
313 @property
314 def has_new_data_stream(self):
315 return self._has_new_data_stream
316
317
318 class _LttngLiveViewerGetDataStreamPacketDataCommand(_LttngLiveViewerCommand):
319 def __init__(self, version, stream_id, offset, req_length):
320 super().__init__(version)
321 self._stream_id = stream_id
322 self._offset = offset
323 self._req_length = req_length
324
325 @property
326 def stream_id(self):
327 return self._stream_id
328
329 @property
330 def offset(self):
331 return self._offset
332
333 @property
334 def req_length(self):
335 return self._req_length
336
337
338 class _LttngLiveViewerGetDataStreamPacketDataReply:
339 class Status:
340 OK = 1
341 RETRY = 2
342 ERROR = 3
343 EOF = 4
344
345 def __init__(self, status, data, has_new_metadata, has_new_data_stream):
346 self._status = status
347 self._data = data
348 self._has_new_metadata = has_new_metadata
349 self._has_new_data_stream = has_new_data_stream
350
351 @property
352 def status(self):
353 return self._status
354
355 @property
356 def data(self):
357 return self._data
358
359 @property
360 def has_new_metadata(self):
361 return self._has_new_metadata
362
363 @property
364 def has_new_data_stream(self):
365 return self._has_new_data_stream
366
367
368 class _LttngLiveViewerGetMetadataStreamDataCommand(_LttngLiveViewerCommand):
369 def __init__(self, version, stream_id):
370 super().__init__(version)
371 self._stream_id = stream_id
372
373 @property
374 def stream_id(self):
375 return self._stream_id
376
377
378 class _LttngLiveViewerGetMetadataStreamDataContentReply:
379 class Status:
380 OK = 1
381 NO_NEW = 2
382 ERROR = 3
383
384 def __init__(self, status, data):
385 self._status = status
386 self._data = data
387
388 @property
389 def status(self):
390 return self._status
391
392 @property
393 def data(self):
394 return self._data
395
396
397 class _LttngLiveViewerGetNewStreamInfosCommand(_LttngLiveViewerCommand):
398 def __init__(self, version, tracing_session_id):
399 super().__init__(version)
400 self._tracing_session_id = tracing_session_id
401
402 @property
403 def tracing_session_id(self):
404 return self._tracing_session_id
405
406
407 class _LttngLiveViewerGetNewStreamInfosReply:
408 class Status:
409 OK = 1
410 NO_NEW = 2
411 ERROR = 3
412 HUP = 4
413
414 def __init__(self, status, stream_infos):
415 self._status = status
416 self._stream_infos = stream_infos
417
418 @property
419 def status(self):
420 return self._status
421
422 @property
423 def stream_infos(self):
424 return self._stream_infos
425
426
427 class _LttngLiveViewerCreateViewerSessionCommand(_LttngLiveViewerCommand):
428 pass
429
430
431 class _LttngLiveViewerCreateViewerSessionReply:
432 class Status:
433 OK = 1
434 ERROR = 2
435
436 def __init__(self, status):
437 self._status = status
438
439 @property
440 def status(self):
441 return self._status
442
443
444 class _LttngLiveViewerDetachFromTracingSessionCommand(_LttngLiveViewerCommand):
445 def __init__(self, version, tracing_session_id):
446 super().__init__(version)
447 self._tracing_session_id = tracing_session_id
448
449 @property
450 def tracing_session_id(self):
451 return self._tracing_session_id
452
453
454 class _LttngLiveViewerDetachFromTracingSessionReply:
455 class Status:
456 OK = 1
457 UNKNOWN = 2
458 ERROR = 3
459
460 def __init__(self, status):
461 self._status = status
462
463 @property
464 def status(self):
465 return self._status
466
467
468 # An LTTng live protocol codec can convert bytes to command objects and
469 # reply objects to bytes.
470 class _LttngLiveViewerProtocolCodec:
471 _COMMAND_HEADER_STRUCT_FMT = "QII"
472 _COMMAND_HEADER_SIZE_BYTES = struct.calcsize(_COMMAND_HEADER_STRUCT_FMT)
473
474 def __init__(self):
475 pass
476
477 def _unpack(self, fmt, data, offset=0):
478 fmt = "!" + fmt
479 return struct.unpack_from(fmt, data, offset)
480
481 def _unpack_payload(self, fmt, data):
482 return self._unpack(
483 fmt, data, _LttngLiveViewerProtocolCodec._COMMAND_HEADER_SIZE_BYTES
484 )
485
486 def decode(self, data):
487 if len(data) < self._COMMAND_HEADER_SIZE_BYTES:
488 # Not enough data to read the command header
489 return
490
491 payload_size, cmd_type, version = self._unpack(
492 self._COMMAND_HEADER_STRUCT_FMT, data
493 )
494 logging.info(
495 "Decoded command header: payload-size={}, cmd-type={}, version={}".format(
496 payload_size, cmd_type, version
497 )
498 )
499
500 if len(data) < self._COMMAND_HEADER_SIZE_BYTES + payload_size:
501 # Not enough data to read the whole command
502 return
503
504 if cmd_type == 1:
505 viewer_session_id, major, minor, conn_type = self._unpack_payload(
506 "QIII", data
507 )
508 return _LttngLiveViewerConnectCommand(
509 version, viewer_session_id, major, minor
510 )
511 elif cmd_type == 2:
512 return _LttngLiveViewerGetTracingSessionInfosCommand(version)
513 elif cmd_type == 3:
514 tracing_session_id, offset, seek_type = self._unpack_payload("QQI", data)
515 return _LttngLiveViewerAttachToTracingSessionCommand(
516 version, tracing_session_id, offset, seek_type
517 )
518 elif cmd_type == 4:
519 (stream_id,) = self._unpack_payload("Q", data)
520 return _LttngLiveViewerGetNextDataStreamIndexEntryCommand(
521 version, stream_id
522 )
523 elif cmd_type == 5:
524 stream_id, offset, req_length = self._unpack_payload("QQI", data)
525 return _LttngLiveViewerGetDataStreamPacketDataCommand(
526 version, stream_id, offset, req_length
527 )
528 elif cmd_type == 6:
529 (stream_id,) = self._unpack_payload("Q", data)
530 return _LttngLiveViewerGetMetadataStreamDataCommand(version, stream_id)
531 elif cmd_type == 7:
532 (tracing_session_id,) = self._unpack_payload("Q", data)
533 return _LttngLiveViewerGetNewStreamInfosCommand(version, tracing_session_id)
534 elif cmd_type == 8:
535 return _LttngLiveViewerCreateViewerSessionCommand(version)
536 elif cmd_type == 9:
537 (tracing_session_id,) = self._unpack_payload("Q", data)
538 return _LttngLiveViewerDetachFromTracingSessionCommand(
539 version, tracing_session_id
540 )
541 else:
542 raise UnexpectedInput("Unknown command type {}".format(cmd_type))
543
544 def _pack(self, fmt, *args):
545 # Force network byte order
546 return struct.pack("!" + fmt, *args)
547
548 def _encode_zero_padded_str(self, string, length):
549 data = string.encode()
550 return data.ljust(length, b"\x00")
551
552 def _encode_stream_info(self, info):
553 data = self._pack("QQI", info.id, info.trace_id, int(info.is_metadata))
554 data += self._encode_zero_padded_str(info.path, 4096)
555 data += self._encode_zero_padded_str(info.channel_name, 255)
556 return data
557
558 def _get_has_new_stuff_flags(self, has_new_metadata, has_new_data_streams):
559 flags = 0
560
561 if has_new_metadata:
562 flags |= 1
563
564 if has_new_data_streams:
565 flags |= 2
566
567 return flags
568
569 def encode(self, reply):
570 if type(reply) is _LttngLiveViewerConnectReply:
571 data = self._pack(
572 "QIII", reply.viewer_session_id, reply.major, reply.minor, 2
573 )
574 elif type(reply) is _LttngLiveViewerGetTracingSessionInfosReply:
575 data = self._pack("I", len(reply.tracing_session_infos))
576
577 for info in reply.tracing_session_infos:
578 data += self._pack(
579 "QIII",
580 info.tracing_session_id,
581 info.live_timer_freq,
582 info.client_count,
583 info.stream_count,
584 )
585 data += self._encode_zero_padded_str(info.hostname, 64)
586 data += self._encode_zero_padded_str(info.name, 255)
587 elif type(reply) is _LttngLiveViewerAttachToTracingSessionReply:
588 data = self._pack("II", reply.status, len(reply.stream_infos))
589
590 for info in reply.stream_infos:
591 data += self._encode_stream_info(info)
592 elif type(reply) is _LttngLiveViewerGetNextDataStreamIndexEntryReply:
593 index_format = "QQQQQQQII"
594 entry = reply.index_entry
595 flags = self._get_has_new_stuff_flags(
596 reply.has_new_metadata, reply.has_new_data_stream
597 )
598
599 if type(entry) is _LttngDataStreamIndexEntry:
600 data = self._pack(
601 index_format,
602 entry.offset_bytes,
603 entry.total_size_bits,
604 entry.content_size_bits,
605 entry.timestamp_begin,
606 entry.timestamp_end,
607 entry.events_discarded,
608 entry.stream_class_id,
609 reply.status,
610 flags,
611 )
612 else:
613 assert type(entry) is _LttngDataStreamBeaconEntry
614 data = self._pack(
615 index_format,
616 0,
617 0,
618 0,
619 0,
620 entry.timestamp,
621 0,
622 entry.stream_class_id,
623 reply.status,
624 flags,
625 )
626 elif type(reply) is _LttngLiveViewerGetDataStreamPacketDataReply:
627 flags = self._get_has_new_stuff_flags(
628 reply.has_new_metadata, reply.has_new_data_stream
629 )
630 data = self._pack("III", reply.status, len(reply.data), flags)
631 data += reply.data
632 elif type(reply) is _LttngLiveViewerGetMetadataStreamDataContentReply:
633 data = self._pack("QI", len(reply.data), reply.status)
634 data += reply.data
635 elif type(reply) is _LttngLiveViewerGetNewStreamInfosReply:
636 data = self._pack("II", reply.status, len(reply.stream_infos))
637
638 for info in reply.stream_infos:
639 data += self._encode_stream_info(info)
640 elif type(reply) is _LttngLiveViewerCreateViewerSessionReply:
641 data = self._pack("I", reply.status)
642 elif type(reply) is _LttngLiveViewerDetachFromTracingSessionReply:
643 data = self._pack("I", reply.status)
644 else:
645 raise ValueError(
646 "Unknown reply object with class `{}`".format(reply.__class__.__name__)
647 )
648
649 return data
650
651
652 def _get_entry_timestamp_begin(entry):
653 if type(entry) is _LttngDataStreamBeaconEntry:
654 return entry.timestamp
655 else:
656 assert type(entry) is _LttngDataStreamIndexEntry
657 return entry.timestamp_begin
658
659
660 # The index of an LTTng data stream, a sequence of index entries.
661 class _LttngDataStreamIndex(collections.abc.Sequence):
662 def __init__(self, path, beacons):
663 self._path = path
664 self._build()
665
666 if beacons:
667 stream_class_id = self._entries[0].stream_class_id
668 beacons = [
669 _LttngDataStreamBeaconEntry(stream_class_id, ts) for ts in beacons
670 ]
671 self._add_beacons(beacons)
672
673 logging.info(
674 'Built data stream index entries: path="{}", count={}'.format(
675 path, len(self._entries)
676 )
677 )
678
679 def _build(self):
680 self._entries = []
681 assert os.path.isfile(self._path)
682
683 with open(self._path, "rb") as f:
684 # Read header first
685 fmt = ">IIII"
686 size = struct.calcsize(fmt)
687 data = f.read(size)
688 assert len(data) == size
689 magic, index_major, index_minor, index_entry_length = struct.unpack(
690 fmt, data
691 )
692 assert magic == 0xC1F1DCC1
693
694 # Read index entries
695 fmt = ">QQQQQQQ"
696 size = struct.calcsize(fmt)
697
698 while True:
699 logging.debug(
700 'Decoding data stream index entry: path="{}", offset={}'.format(
701 self._path, f.tell()
702 )
703 )
704 data = f.read(size)
705
706 if not data:
707 # Done
708 break
709
710 assert len(data) == size
711 (
712 offset_bytes,
713 total_size_bits,
714 content_size_bits,
715 timestamp_begin,
716 timestamp_end,
717 events_discarded,
718 stream_class_id,
719 ) = struct.unpack(fmt, data)
720
721 self._entries.append(
722 _LttngDataStreamIndexEntry(
723 offset_bytes,
724 total_size_bits,
725 content_size_bits,
726 timestamp_begin,
727 timestamp_end,
728 events_discarded,
729 stream_class_id,
730 )
731 )
732
733 # Skip anything else before the next entry
734 f.seek(index_entry_length - size, os.SEEK_CUR)
735
736 def _add_beacons(self, beacons):
737 # Assumes entries[n + 1].timestamp_end >= entries[n].timestamp_begin
738 def sort_key(entry):
739 if type(entry) is _LttngDataStreamBeaconEntry:
740 return entry.timestamp
741 else:
742 return entry.timestamp_end
743
744 self._entries += beacons
745 self._entries.sort(key=sort_key)
746
747 def __getitem__(self, index):
748 return self._entries[index]
749
750 def __len__(self):
751 return len(self._entries)
752
753 @property
754 def path(self):
755 return self._path
756
757
758 # An LTTng data stream.
759 class _LttngDataStream:
760 def __init__(self, path, beacons):
761 self._path = path
762 filename = os.path.basename(path)
763 match = re.match(r"(.*)_\d+", filename)
764 if not match:
765 raise RuntimeError(
766 "Unexpected data stream file name pattern: {}".format(filename)
767 )
768
769 self._channel_name = match.group(1)
770 trace_dir = os.path.dirname(path)
771 index_path = os.path.join(trace_dir, "index", filename + ".idx")
772 self._index = _LttngDataStreamIndex(index_path, beacons)
773 assert os.path.isfile(path)
774 self._file = open(path, "rb")
775 logging.info(
776 'Built data stream: path="{}", channel-name="{}"'.format(
777 path, self._channel_name
778 )
779 )
780
781 @property
782 def path(self):
783 return self._path
784
785 @property
786 def channel_name(self):
787 return self._channel_name
788
789 @property
790 def index(self):
791 return self._index
792
793 def get_data(self, offset_bytes, len_bytes):
794 self._file.seek(offset_bytes)
795 return self._file.read(len_bytes)
796
797
798 class _LttngMetadataStreamSection:
799 def __init__(self, timestamp, data):
800 self._timestamp = timestamp
801 if data is None:
802 self._data = bytes()
803 else:
804 self._data = data
805 logging.info(
806 "Built metadata stream section: ts={}, data-len={}".format(
807 self._timestamp, len(self._data)
808 )
809 )
810
811 @property
812 def timestamp(self):
813 return self._timestamp
814
815 @property
816 def data(self):
817 return self._data
818
819
820 # An LTTng metadata stream.
821 class _LttngMetadataStream:
822 def __init__(self, metadata_file_path, config_sections):
823 self._path = metadata_file_path
824 self._sections = config_sections
825 logging.info(
826 "Built metadata stream: path={}, section-len={}".format(
827 self._path, len(self._sections)
828 )
829 )
830
831 @property
832 def path(self):
833 return self._path
834
835 @property
836 def sections(self):
837 return self._sections
838
839
840 LttngMetadataConfigSection = namedtuple(
841 "LttngMetadataConfigSection", ["line", "timestamp", "is_empty"]
842 )
843
844
845 def _parse_metadata_sections_config(config_sections):
846 assert config_sections is not None
847 config_metadata_sections = []
848 append_empty_section = False
849 last_timestamp = 0
850 last_line = 0
851
852 for config_section in config_sections:
853 if config_section == "empty":
854 # Found a empty section marker. Actually append the section at the
855 # timestamp of the next concrete section.
856 append_empty_section = True
857 else:
858 assert type(config_section) is dict
859 line = config_section.get("line")
860 ts = config_section.get("timestamp")
861
862 if type(line) is not int:
863 raise RuntimeError("`line` is not an integer")
864
865 if type(ts) is not int:
866 raise RuntimeError("`timestamp` is not an integer")
867
868 # Sections' timestamps and lines must both be increasing.
869 assert ts > last_timestamp
870 last_timestamp = ts
871 assert line > last_line
872 last_line = line
873
874 if append_empty_section:
875 config_metadata_sections.append(
876 LttngMetadataConfigSection(line, ts, True)
877 )
878 append_empty_section = False
879
880 config_metadata_sections.append(LttngMetadataConfigSection(line, ts, False))
881
882 return config_metadata_sections
883
884
885 def _split_metadata_sections(metadata_file_path, raw_config_sections):
886 assert isinstance(raw_config_sections, collections.abc.Sequence)
887
888 parsed_sections = _parse_metadata_sections_config(raw_config_sections)
889
890 sections = []
891 with open(metadata_file_path, "r") as metadata_file:
892 metadata_lines = [line for line in metadata_file]
893
894 config_metadata_sections_idx = 0
895 curr_metadata_section = bytearray()
896
897 for idx, line_content in enumerate(metadata_lines):
898 # Add one to the index to convert from the zero-indexing of the
899 # enumerate() function to the one-indexing used by humans when
900 # viewing a text file.
901 curr_line_number = idx + 1
902
903 # If there are no more sections, simply append the line.
904 if config_metadata_sections_idx + 1 >= len(parsed_sections):
905 curr_metadata_section += bytearray(line_content, "utf8")
906 continue
907
908 next_section_line_number = parsed_sections[
909 config_metadata_sections_idx + 1
910 ].line
911
912 # If the next section begins at the current line, create a
913 # section with the metadata we gathered so far.
914 if curr_line_number >= next_section_line_number:
915 # Flushing the metadata of the current section.
916 sections.append(
917 _LttngMetadataStreamSection(
918 parsed_sections[config_metadata_sections_idx].timestamp,
919 bytes(curr_metadata_section),
920 )
921 )
922
923 # Move to the next section.
924 config_metadata_sections_idx += 1
925
926 # Clear old content and append current line for the next section.
927 curr_metadata_section.clear()
928 curr_metadata_section += bytearray(line_content, "utf8")
929
930 # Append any empty sections.
931 while parsed_sections[config_metadata_sections_idx].is_empty:
932 sections.append(
933 _LttngMetadataStreamSection(
934 parsed_sections[config_metadata_sections_idx].timestamp, None
935 )
936 )
937 config_metadata_sections_idx += 1
938 else:
939 # Append line_content to the current metadata section.
940 curr_metadata_section += bytearray(line_content, "utf8")
941
942 # We iterated over all the lines of the metadata file. Close the current section.
943 sections.append(
944 _LttngMetadataStreamSection(
945 parsed_sections[config_metadata_sections_idx].timestamp,
946 bytes(curr_metadata_section),
947 )
948 )
949
950 return sections
951
952
953 # An LTTng trace, a sequence of LTTng data streams.
954 class LttngTrace(collections.abc.Sequence):
955 def __init__(self, trace_dir, metadata_sections, beacons):
956 assert os.path.isdir(trace_dir)
957 self._path = trace_dir
958 self._create_metadata_stream(trace_dir, metadata_sections)
959 self._create_data_streams(trace_dir, beacons)
960 logging.info('Built trace: path="{}"'.format(trace_dir))
961
962 def _create_data_streams(self, trace_dir, beacons):
963 data_stream_paths = []
964
965 for filename in os.listdir(trace_dir):
966 path = os.path.join(trace_dir, filename)
967
968 if not os.path.isfile(path):
969 continue
970
971 if filename.startswith("."):
972 continue
973
974 if filename == "metadata":
975 continue
976
977 data_stream_paths.append(path)
978
979 data_stream_paths.sort()
980 self._data_streams = []
981
982 for data_stream_path in data_stream_paths:
983 stream_name = os.path.basename(data_stream_path)
984 this_stream_beacons = None
985
986 if beacons is not None and stream_name in beacons:
987 this_stream_beacons = beacons[stream_name]
988
989 self._data_streams.append(
990 _LttngDataStream(data_stream_path, this_stream_beacons)
991 )
992
993 def _create_metadata_stream(self, trace_dir, config_metadata_sections):
994 metadata_path = os.path.join(trace_dir, "metadata")
995 metadata_sections = []
996
997 if config_metadata_sections is None:
998 with open(metadata_path, "rb") as metadata_file:
999 metadata_sections.append(
1000 _LttngMetadataStreamSection(0, metadata_file.read())
1001 )
1002 else:
1003 metadata_sections = _split_metadata_sections(
1004 metadata_path, config_metadata_sections
1005 )
1006
1007 self._metadata_stream = _LttngMetadataStream(metadata_path, metadata_sections)
1008
1009 @property
1010 def path(self):
1011 return self._path
1012
1013 @property
1014 def metadata_stream(self):
1015 return self._metadata_stream
1016
1017 def __getitem__(self, index):
1018 return self._data_streams[index]
1019
1020 def __len__(self):
1021 return len(self._data_streams)
1022
1023
1024 # The state of a single data stream.
1025 class _LttngLiveViewerSessionDataStreamState:
1026 def __init__(self, ts_state, info, data_stream, metadata_stream_id):
1027 self._ts_state = ts_state
1028 self._info = info
1029 self._data_stream = data_stream
1030 self._metadata_stream_id = metadata_stream_id
1031 self._cur_index_entry_index = 0
1032 fmt = 'Built data stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
1033 logging.info(
1034 fmt.format(
1035 info.id,
1036 ts_state.tracing_session_descriptor.info.tracing_session_id,
1037 ts_state.tracing_session_descriptor.info.name,
1038 data_stream.path,
1039 )
1040 )
1041
1042 @property
1043 def tracing_session_state(self):
1044 return self._ts_state
1045
1046 @property
1047 def info(self):
1048 return self._info
1049
1050 @property
1051 def data_stream(self):
1052 return self._data_stream
1053
1054 @property
1055 def cur_index_entry(self):
1056 if self._cur_index_entry_index == len(self._data_stream.index):
1057 return
1058
1059 return self._data_stream.index[self._cur_index_entry_index]
1060
1061 def goto_next_index_entry(self):
1062 self._cur_index_entry_index += 1
1063
1064
1065 # The state of a single metadata stream.
1066 class _LttngLiveViewerSessionMetadataStreamState:
1067 def __init__(self, ts_state, info, metadata_stream):
1068 self._ts_state = ts_state
1069 self._info = info
1070 self._metadata_stream = metadata_stream
1071 self._cur_metadata_stream_section_index = 0
1072 if len(metadata_stream.sections) > 1:
1073 self._next_metadata_stream_section_timestamp = metadata_stream.sections[
1074 1
1075 ].timestamp
1076 else:
1077 self._next_metadata_stream_section_timestamp = None
1078
1079 self._is_sent = False
1080 fmt = 'Built metadata stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
1081 logging.info(
1082 fmt.format(
1083 info.id,
1084 ts_state.tracing_session_descriptor.info.tracing_session_id,
1085 ts_state.tracing_session_descriptor.info.name,
1086 metadata_stream.path,
1087 )
1088 )
1089
1090 @property
1091 def info(self):
1092 return self._info
1093
1094 @property
1095 def metadata_stream(self):
1096 return self._metadata_stream
1097
1098 @property
1099 def is_sent(self):
1100 return self._is_sent
1101
1102 @is_sent.setter
1103 def is_sent(self, value):
1104 self._is_sent = value
1105
1106 @property
1107 def cur_section(self):
1108 fmt = "Get current metadata section: section-idx={}"
1109 logging.info(fmt.format(self._cur_metadata_stream_section_index))
1110 if self._cur_metadata_stream_section_index == len(
1111 self._metadata_stream.sections
1112 ):
1113 return
1114
1115 return self._metadata_stream.sections[self._cur_metadata_stream_section_index]
1116
1117 def goto_next_section(self):
1118 self._cur_metadata_stream_section_index += 1
1119 if self.cur_section:
1120 self._next_metadata_stream_section_timestamp = self.cur_section.timestamp
1121 else:
1122 self._next_metadata_stream_section_timestamp = None
1123
1124 @property
1125 def next_section_timestamp(self):
1126 return self._next_metadata_stream_section_timestamp
1127
1128
1129 # A tracing session descriptor.
1130 #
1131 # In the constructor, `traces` is a list of LTTng traces (`LttngTrace`
1132 # objects).
1133 class LttngTracingSessionDescriptor:
1134 def __init__(
1135 self, name, tracing_session_id, hostname, live_timer_freq, client_count, traces
1136 ):
1137 for trace in traces:
1138 if name not in trace.path:
1139 fmt = "Tracing session name must be part of every trace path (`{}` not found in `{}`)"
1140 raise ValueError(fmt.format(name, trace.path))
1141
1142 self._traces = traces
1143 stream_count = sum([len(t) + 1 for t in traces])
1144 self._info = _LttngLiveViewerTracingSessionInfo(
1145 tracing_session_id,
1146 live_timer_freq,
1147 client_count,
1148 stream_count,
1149 hostname,
1150 name,
1151 )
1152
1153 @property
1154 def traces(self):
1155 return self._traces
1156
1157 @property
1158 def info(self):
1159 return self._info
1160
1161
1162 # The state of a tracing session.
1163 class _LttngLiveViewerSessionTracingSessionState:
1164 def __init__(self, tc_descr, base_stream_id):
1165 self._tc_descr = tc_descr
1166 self._stream_infos = []
1167 self._ds_states = {}
1168 self._ms_states = {}
1169 stream_id = base_stream_id
1170
1171 for trace in tc_descr.traces:
1172 trace_id = stream_id * 1000
1173
1174 # Metadata stream -> stream info and metadata stream state
1175 info = _LttngLiveViewerStreamInfo(
1176 stream_id, trace_id, True, trace.metadata_stream.path, "metadata"
1177 )
1178 self._stream_infos.append(info)
1179 self._ms_states[stream_id] = _LttngLiveViewerSessionMetadataStreamState(
1180 self, info, trace.metadata_stream
1181 )
1182 metadata_stream_id = stream_id
1183 stream_id += 1
1184
1185 # Data streams -> stream infos and data stream states
1186 for data_stream in trace:
1187 info = _LttngLiveViewerStreamInfo(
1188 stream_id,
1189 trace_id,
1190 False,
1191 data_stream.path,
1192 data_stream.channel_name,
1193 )
1194 self._stream_infos.append(info)
1195 self._ds_states[stream_id] = _LttngLiveViewerSessionDataStreamState(
1196 self, info, data_stream, metadata_stream_id
1197 )
1198 stream_id += 1
1199
1200 self._is_attached = False
1201 fmt = 'Built tracing session state: id={}, name="{}"'
1202 logging.info(fmt.format(tc_descr.info.tracing_session_id, tc_descr.info.name))
1203
1204 @property
1205 def tracing_session_descriptor(self):
1206 return self._tc_descr
1207
1208 @property
1209 def data_stream_states(self):
1210 return self._ds_states
1211
1212 @property
1213 def metadata_stream_states(self):
1214 return self._ms_states
1215
1216 @property
1217 def stream_infos(self):
1218 return self._stream_infos
1219
1220 @property
1221 def has_new_metadata(self):
1222 return any([not ms.is_sent for ms in self._ms_states.values()])
1223
1224 @property
1225 def is_attached(self):
1226 return self._is_attached
1227
1228 @is_attached.setter
1229 def is_attached(self, value):
1230 self._is_attached = value
1231
1232
1233 def needs_new_metadata_section(metadata_stream_state, latest_timestamp):
1234 if metadata_stream_state.next_section_timestamp is None:
1235 return False
1236
1237 if latest_timestamp >= metadata_stream_state.next_section_timestamp:
1238 return True
1239 else:
1240 return False
1241
1242
1243 # An LTTng live viewer session manages a view on tracing sessions
1244 # and replies to commands accordingly.
1245 class _LttngLiveViewerSession:
1246 def __init__(
1247 self,
1248 viewer_session_id,
1249 tracing_session_descriptors,
1250 max_query_data_response_size,
1251 ):
1252 self._viewer_session_id = viewer_session_id
1253 self._ts_states = {}
1254 self._stream_states = {}
1255 self._max_query_data_response_size = max_query_data_response_size
1256 total_stream_infos = 0
1257
1258 for ts_descr in tracing_session_descriptors:
1259 ts_state = _LttngLiveViewerSessionTracingSessionState(
1260 ts_descr, total_stream_infos
1261 )
1262 ts_id = ts_state.tracing_session_descriptor.info.tracing_session_id
1263 self._ts_states[ts_id] = ts_state
1264 total_stream_infos += len(ts_state.stream_infos)
1265
1266 # Update session's stream states to have the new states
1267 self._stream_states.update(ts_state.data_stream_states)
1268 self._stream_states.update(ts_state.metadata_stream_states)
1269
1270 self._command_handlers = {
1271 _LttngLiveViewerAttachToTracingSessionCommand: self._handle_attach_to_tracing_session_command,
1272 _LttngLiveViewerCreateViewerSessionCommand: self._handle_create_viewer_session_command,
1273 _LttngLiveViewerDetachFromTracingSessionCommand: self._handle_detach_from_tracing_session_command,
1274 _LttngLiveViewerGetDataStreamPacketDataCommand: self._handle_get_data_stream_packet_data_command,
1275 _LttngLiveViewerGetMetadataStreamDataCommand: self._handle_get_metadata_stream_data_command,
1276 _LttngLiveViewerGetNewStreamInfosCommand: self._handle_get_new_stream_infos_command,
1277 _LttngLiveViewerGetNextDataStreamIndexEntryCommand: self._handle_get_next_data_stream_index_entry_command,
1278 _LttngLiveViewerGetTracingSessionInfosCommand: self._handle_get_tracing_session_infos_command,
1279 }
1280
1281 @property
1282 def viewer_session_id(self):
1283 return self._viewer_session_id
1284
1285 def _get_tracing_session_state(self, tracing_session_id):
1286 if tracing_session_id not in self._ts_states:
1287 raise UnexpectedInput(
1288 "Unknown tracing session ID {}".format(tracing_session_id)
1289 )
1290
1291 return self._ts_states[tracing_session_id]
1292
1293 def _get_stream_state(self, stream_id):
1294 if stream_id not in self._stream_states:
1295 UnexpectedInput("Unknown stream ID {}".format(stream_id))
1296
1297 return self._stream_states[stream_id]
1298
1299 def handle_command(self, cmd):
1300 logging.info(
1301 "Handling command in viewer session: cmd-cls-name={}".format(
1302 cmd.__class__.__name__
1303 )
1304 )
1305 cmd_type = type(cmd)
1306
1307 if cmd_type not in self._command_handlers:
1308 raise UnexpectedInput(
1309 "Unexpected command: cmd-cls-name={}".format(cmd.__class__.__name__)
1310 )
1311
1312 return self._command_handlers[cmd_type](cmd)
1313
1314 def _handle_attach_to_tracing_session_command(self, cmd):
1315 fmt = 'Handling "attach to tracing session" command: ts-id={}, offset={}, seek-type={}'
1316 logging.info(fmt.format(cmd.tracing_session_id, cmd.offset, cmd.seek_type))
1317 ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
1318 info = ts_state.tracing_session_descriptor.info
1319
1320 if ts_state.is_attached:
1321 raise UnexpectedInput(
1322 "Cannot attach to tracing session `{}`: viewer is already attached".format(
1323 info.name
1324 )
1325 )
1326
1327 ts_state.is_attached = True
1328 status = _LttngLiveViewerAttachToTracingSessionReply.Status.OK
1329 return _LttngLiveViewerAttachToTracingSessionReply(
1330 status, ts_state.stream_infos
1331 )
1332
1333 def _handle_detach_from_tracing_session_command(self, cmd):
1334 fmt = 'Handling "detach from tracing session" command: ts-id={}'
1335 logging.info(fmt.format(cmd.tracing_session_id))
1336 ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
1337 info = ts_state.tracing_session_descriptor.info
1338
1339 if not ts_state.is_attached:
1340 raise UnexpectedInput(
1341 "Cannot detach to tracing session `{}`: viewer is not attached".format(
1342 info.name
1343 )
1344 )
1345
1346 ts_state.is_attached = False
1347 status = _LttngLiveViewerDetachFromTracingSessionReply.Status.OK
1348 return _LttngLiveViewerDetachFromTracingSessionReply(status)
1349
1350 def _handle_get_next_data_stream_index_entry_command(self, cmd):
1351 fmt = 'Handling "get next data stream index entry" command: stream-id={}'
1352 logging.info(fmt.format(cmd.stream_id))
1353 stream_state = self._get_stream_state(cmd.stream_id)
1354 metadata_stream_state = self._get_stream_state(stream_state._metadata_stream_id)
1355
1356 if type(stream_state) is not _LttngLiveViewerSessionDataStreamState:
1357 raise UnexpectedInput(
1358 "Stream with ID {} is not a data stream".format(cmd.stream_id)
1359 )
1360
1361 if stream_state.cur_index_entry is None:
1362 # The viewer is done reading this stream
1363 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.HUP
1364
1365 # Dummy data stream index entry to use with the `HUP` status
1366 # (the reply needs one, but the viewer ignores it)
1367 index_entry = _LttngDataStreamIndexEntry(0, 0, 0, 0, 0, 0, 0)
1368
1369 return _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1370 status, index_entry, False, False
1371 )
1372
1373 timestamp_begin = _get_entry_timestamp_begin(stream_state.cur_index_entry)
1374
1375 if needs_new_metadata_section(metadata_stream_state, timestamp_begin):
1376 metadata_stream_state.is_sent = False
1377 metadata_stream_state.goto_next_section()
1378
1379 # The viewer only checks the `has_new_metadata` flag if the
1380 # reply's status is `OK`, so we need to provide an index here
1381 has_new_metadata = stream_state.tracing_session_state.has_new_metadata
1382 if type(stream_state.cur_index_entry) is _LttngDataStreamIndexEntry:
1383 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.OK
1384 else:
1385 assert type(stream_state.cur_index_entry) is _LttngDataStreamBeaconEntry
1386 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.INACTIVE
1387
1388 reply = _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1389 status, stream_state.cur_index_entry, has_new_metadata, False
1390 )
1391 stream_state.goto_next_index_entry()
1392 return reply
1393
1394 def _handle_get_data_stream_packet_data_command(self, cmd):
1395 fmt = 'Handling "get data stream packet data" command: stream-id={}, offset={}, req-length={}'
1396 logging.info(fmt.format(cmd.stream_id, cmd.offset, cmd.req_length))
1397 stream_state = self._get_stream_state(cmd.stream_id)
1398 data_response_length = cmd.req_length
1399
1400 if type(stream_state) is not _LttngLiveViewerSessionDataStreamState:
1401 raise UnexpectedInput(
1402 "Stream with ID {} is not a data stream".format(cmd.stream_id)
1403 )
1404
1405 if stream_state.tracing_session_state.has_new_metadata:
1406 status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.ERROR
1407 return _LttngLiveViewerGetDataStreamPacketDataReply(
1408 status, bytes(), True, False
1409 )
1410
1411 if self._max_query_data_response_size:
1412 # Enforce a server side limit on the query requested length.
1413 # To ensure that the transaction terminate take the minimum of both
1414 # value.
1415 data_response_length = min(
1416 cmd.req_length, self._max_query_data_response_size
1417 )
1418 fmt = 'Limiting "get data stream packet data" command: req-length={} actual response size={}'
1419 logging.info(fmt.format(cmd.req_length, data_response_length))
1420
1421 data = stream_state.data_stream.get_data(cmd.offset, data_response_length)
1422 status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.OK
1423 return _LttngLiveViewerGetDataStreamPacketDataReply(status, data, False, False)
1424
1425 def _handle_get_metadata_stream_data_command(self, cmd):
1426 fmt = 'Handling "get metadata stream data" command: stream-id={}'
1427 logging.info(fmt.format(cmd.stream_id))
1428 metadata_stream_state = self._get_stream_state(cmd.stream_id)
1429
1430 if (
1431 type(metadata_stream_state)
1432 is not _LttngLiveViewerSessionMetadataStreamState
1433 ):
1434 raise UnexpectedInput(
1435 "Stream with ID {} is not a metadata stream".format(cmd.stream_id)
1436 )
1437
1438 if metadata_stream_state.is_sent:
1439 status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.NO_NEW
1440 return _LttngLiveViewerGetMetadataStreamDataContentReply(status, bytes())
1441
1442 metadata_stream_state.is_sent = True
1443 status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.OK
1444 metadata_section = metadata_stream_state.cur_section
1445 assert metadata_section is not None
1446
1447 # If we are sending an empty section, ready the next one right away.
1448 if len(metadata_section.data) == 0:
1449 metadata_stream_state.is_sent = False
1450 metadata_stream_state.goto_next_section()
1451
1452 fmt = 'Replying to "get metadata stream data" command: metadata-size={}'
1453 logging.info(fmt.format(len(metadata_section.data)))
1454 return _LttngLiveViewerGetMetadataStreamDataContentReply(
1455 status, metadata_section.data
1456 )
1457
1458 def _handle_get_new_stream_infos_command(self, cmd):
1459 fmt = 'Handling "get new stream infos" command: ts-id={}'
1460 logging.info(fmt.format(cmd.tracing_session_id))
1461
1462 # As of this version, all the tracing session's stream infos are
1463 # always given to the viewer when sending the "attach to tracing
1464 # session" reply, so there's nothing new here. Return the `HUP`
1465 # status as, if we're handling this command, the viewer consumed
1466 # all the existing data streams.
1467 status = _LttngLiveViewerGetNewStreamInfosReply.Status.HUP
1468 return _LttngLiveViewerGetNewStreamInfosReply(status, [])
1469
1470 def _handle_get_tracing_session_infos_command(self, cmd):
1471 logging.info('Handling "get tracing session infos" command.')
1472 infos = [
1473 tss.tracing_session_descriptor.info for tss in self._ts_states.values()
1474 ]
1475 infos.sort(key=lambda info: info.name)
1476 return _LttngLiveViewerGetTracingSessionInfosReply(infos)
1477
1478 def _handle_create_viewer_session_command(self, cmd):
1479 logging.info('Handling "create viewer session" command.')
1480 status = _LttngLiveViewerCreateViewerSessionReply.Status.OK
1481
1482 # This does nothing here. In the LTTng relay daemon, it
1483 # allocates the viewer session's state.
1484 return _LttngLiveViewerCreateViewerSessionReply(status)
1485
1486
1487 # An LTTng live TCP server.
1488 #
1489 # On creation, it binds to `localhost` on the TCP port `port` if not `None`, or
1490 # on an OS-assigned TCP port otherwise. It writes the decimal TCP port number
1491 # to a temporary port file. It renames the temporary port file to
1492 # `port_filename`.
1493 #
1494 # `tracing_session_descriptors` is a list of tracing session descriptors
1495 # (`LttngTracingSessionDescriptor`) to serve.
1496 #
1497 # This server accepts a single viewer (client).
1498 #
1499 # When the viewer closes the connection, the server's constructor
1500 # returns.
1501 class LttngLiveServer:
1502 def __init__(
1503 self,
1504 port,
1505 port_filename,
1506 tracing_session_descriptors,
1507 max_query_data_response_size,
1508 ):
1509 logging.info("Server configuration:")
1510
1511 logging.info(" Port file name: `{}`".format(port_filename))
1512
1513 if max_query_data_response_size is not None:
1514 logging.info(
1515 " Maximum response data query size: `{}`".format(
1516 max_query_data_response_size
1517 )
1518 )
1519
1520 for ts_descr in tracing_session_descriptors:
1521 info = ts_descr.info
1522 fmt = ' TS descriptor: name="{}", id={}, hostname="{}", live-timer-freq={}, client-count={}, stream-count={}:'
1523 logging.info(
1524 fmt.format(
1525 info.name,
1526 info.tracing_session_id,
1527 info.hostname,
1528 info.live_timer_freq,
1529 info.client_count,
1530 info.stream_count,
1531 )
1532 )
1533
1534 for trace in ts_descr.traces:
1535 logging.info(' Trace: path="{}"'.format(trace.path))
1536
1537 self._ts_descriptors = tracing_session_descriptors
1538 self._max_query_data_response_size = max_query_data_response_size
1539 self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1540 self._codec = _LttngLiveViewerProtocolCodec()
1541
1542 # Port 0: OS assigns an unused port
1543 serv_addr = ("localhost", port if port is not None else 0)
1544 self._sock.bind(serv_addr)
1545 self._write_port_to_file(port_filename)
1546
1547 try:
1548 self._listen()
1549 finally:
1550 self._sock.close()
1551 logging.info("Closed connection and socket.")
1552
1553 @property
1554 def _server_port(self):
1555 return self._sock.getsockname()[1]
1556
1557 def _recv_command(self):
1558 data = bytes()
1559
1560 while True:
1561 logging.info("Waiting for viewer command.")
1562 buf = self._conn.recv(128)
1563
1564 if not buf:
1565 logging.info("Client closed connection.")
1566
1567 if data:
1568 raise UnexpectedInput(
1569 "Client closed connection after having sent {} command bytes.".format(
1570 len(data)
1571 )
1572 )
1573
1574 return
1575
1576 logging.info("Received data from viewer: length={}".format(len(buf)))
1577
1578 data += buf
1579
1580 try:
1581 cmd = self._codec.decode(data)
1582 except struct.error as exc:
1583 raise UnexpectedInput("Malformed command: {}".format(exc)) from exc
1584
1585 if cmd is not None:
1586 logging.info(
1587 "Received command from viewer: cmd-cls-name={}".format(
1588 cmd.__class__.__name__
1589 )
1590 )
1591 return cmd
1592
1593 def _send_reply(self, reply):
1594 data = self._codec.encode(reply)
1595 logging.info(
1596 "Sending reply to viewer: reply-cls-name={}, length={}".format(
1597 reply.__class__.__name__, len(data)
1598 )
1599 )
1600 self._conn.sendall(data)
1601
1602 def _handle_connection(self):
1603 # First command must be "connect"
1604 cmd = self._recv_command()
1605
1606 if type(cmd) is not _LttngLiveViewerConnectCommand:
1607 raise UnexpectedInput(
1608 'First command is not "connect": cmd-cls-name={}'.format(
1609 cmd.__class__.__name__
1610 )
1611 )
1612
1613 # Create viewer session (arbitrary ID 23)
1614 logging.info(
1615 "LTTng live viewer connected: version={}.{}".format(cmd.major, cmd.minor)
1616 )
1617 viewer_session = _LttngLiveViewerSession(
1618 23, self._ts_descriptors, self._max_query_data_response_size
1619 )
1620
1621 # Send "connect" reply
1622 self._send_reply(
1623 _LttngLiveViewerConnectReply(viewer_session.viewer_session_id, 2, 10)
1624 )
1625
1626 # Make the viewer session handle the remaining commands
1627 while True:
1628 cmd = self._recv_command()
1629
1630 if cmd is None:
1631 # Connection closed (at an expected location within the
1632 # conversation)
1633 return
1634
1635 self._send_reply(viewer_session.handle_command(cmd))
1636
1637 def _listen(self):
1638 logging.info("Listening: port={}".format(self._server_port))
1639 # Backlog must be present for Python version < 3.5.
1640 # 128 is an arbitrary number since we expect only 1 connection anyway.
1641 self._sock.listen(128)
1642 self._conn, viewer_addr = self._sock.accept()
1643 logging.info(
1644 "Accepted viewer: addr={}:{}".format(viewer_addr[0], viewer_addr[1])
1645 )
1646
1647 try:
1648 self._handle_connection()
1649 finally:
1650 self._conn.close()
1651
1652 def _write_port_to_file(self, port_filename):
1653 # Write the port number to a temporary file.
1654 with tempfile.NamedTemporaryFile(mode="w", delete=False) as tmp_port_file:
1655 print(self._server_port, end="", file=tmp_port_file)
1656
1657 # Rename temporary file to real file
1658 os.replace(tmp_port_file.name, port_filename)
1659 logging.info(
1660 'Renamed port file: src-path="{}", dst-path="{}"'.format(
1661 tmp_port_file.name, port_filename
1662 )
1663 )
1664
1665
1666 def _session_descriptors_from_path(sessions_filename, trace_path_prefix):
1667 # File format is:
1668 #
1669 # [
1670 # {
1671 # "name": "my-session",
1672 # "id": 17,
1673 # "hostname": "myhost",
1674 # "live-timer-freq": 1000000,
1675 # "client-count": 23,
1676 # "traces": [
1677 # {
1678 # "path": "lol"
1679 # },
1680 # {
1681 # "path": "meow/mix",
1682 # "beacons": {
1683 # "my_stream": [ 5235787, 728375283 ]
1684 # },
1685 # "metadata-sections": [
1686 # {
1687 # "line": 1,
1688 # "timestamp": 0
1689 # }
1690 # ]
1691 # }
1692 # ]
1693 # }
1694 # ]
1695 with open(sessions_filename, "r") as sessions_file:
1696 params = json.load(sessions_file)
1697
1698 sessions = []
1699
1700 for session in params:
1701 name = session["name"]
1702 tracing_session_id = session["id"]
1703 hostname = session["hostname"]
1704 live_timer_freq = session["live-timer-freq"]
1705 client_count = session["client-count"]
1706 traces = []
1707
1708 for trace in session["traces"]:
1709 metadata_sections = trace.get("metadata-sections")
1710 beacons = trace.get("beacons")
1711 path = trace["path"]
1712
1713 if not os.path.isabs(path):
1714 path = os.path.join(trace_path_prefix, path)
1715
1716 traces.append(LttngTrace(path, metadata_sections, beacons))
1717
1718 sessions.append(
1719 LttngTracingSessionDescriptor(
1720 name,
1721 tracing_session_id,
1722 hostname,
1723 live_timer_freq,
1724 client_count,
1725 traces,
1726 )
1727 )
1728
1729 return sessions
1730
1731
1732 def _loglevel_parser(string):
1733 loglevels = {"info": logging.INFO, "warning": logging.WARNING}
1734 if string not in loglevels:
1735 msg = "{} is not a valid loglevel".format(string)
1736 raise argparse.ArgumentTypeError(msg)
1737 return loglevels[string]
1738
1739
1740 if __name__ == "__main__":
1741 logging.basicConfig(format="# %(asctime)-25s%(message)s")
1742 parser = argparse.ArgumentParser(
1743 description="LTTng-live protocol mocker", add_help=False
1744 )
1745 parser.add_argument(
1746 "--log-level",
1747 default="warning",
1748 choices=["info", "warning"],
1749 help="The loglevel to be used.",
1750 )
1751
1752 loglevel_namespace, remaining_args = parser.parse_known_args()
1753 logging.getLogger().setLevel(_loglevel_parser(loglevel_namespace.log_level))
1754
1755 parser.add_argument(
1756 "--port",
1757 help="The port to bind to. If missing, use an OS-assigned port..",
1758 type=int,
1759 )
1760 parser.add_argument(
1761 "--port-filename",
1762 help="The final port file. This file is present when the server is ready to receive connection.",
1763 required=True,
1764 )
1765 parser.add_argument(
1766 "--max-query-data-response-size",
1767 type=int,
1768 help="The maximum size of control data response in bytes",
1769 )
1770 parser.add_argument(
1771 "--trace-path-prefix",
1772 type=str,
1773 help="Prefix to prepend to the trace paths of session configurations",
1774 )
1775 parser.add_argument(
1776 "--sessions-filename",
1777 type=str,
1778 help="Path to a session configuration file",
1779 )
1780 parser.add_argument(
1781 "-h",
1782 "--help",
1783 action="help",
1784 default=argparse.SUPPRESS,
1785 help="Show this help message and exit.",
1786 )
1787
1788 args = parser.parse_args(args=remaining_args)
1789 try:
1790 sessions = _session_descriptors_from_path(
1791 args.sessions_filename,
1792 args.trace_path_prefix,
1793 )
1794 LttngLiveServer(
1795 args.port, args.port_filename, sessions, args.max_query_data_response_size
1796 )
1797 except UnexpectedInput as exc:
1798 logging.error(str(exc))
1799 print(exc, file=sys.stderr)
1800 sys.exit(1)
This page took 0.063328 seconds and 3 git commands to generate.