098f46442e61dacb767508d514998e617bf8c1d5
[babeltrace.git] / tests / data / plugins / src.ctf.lttng-live / lttng_live_server.py
1 # SPDX-License-Identifier: MIT
2 #
3 # Copyright (C) 2019 Philippe Proulx <pproulx@efficios.com>
4 #
5
6 # pyright: strict, reportTypeCommentUsage=false, reportMissingTypeStubs=false
7
8 import os
9 import re
10 import socket
11 import struct
12 import logging
13 import os.path
14 import argparse
15 import tempfile
16 from typing import Dict, Union, Iterable, Optional, Sequence, overload
17
18 import tjson
19
20 # isort: off
21 from typing import Any, Callable # noqa: F401
22
23 # isort: on
24
25
26 # An entry within the index of an LTTng data stream.
27 class _LttngDataStreamIndexEntry:
28 def __init__(
29 self,
30 offset_bytes: int,
31 total_size_bits: int,
32 content_size_bits: int,
33 timestamp_begin: int,
34 timestamp_end: int,
35 events_discarded: int,
36 stream_class_id: int,
37 ):
38 self._offset_bytes = offset_bytes
39 self._total_size_bits = total_size_bits
40 self._content_size_bits = content_size_bits
41 self._timestamp_begin = timestamp_begin
42 self._timestamp_end = timestamp_end
43 self._events_discarded = events_discarded
44 self._stream_class_id = stream_class_id
45
46 @property
47 def offset_bytes(self):
48 return self._offset_bytes
49
50 @property
51 def total_size_bits(self):
52 return self._total_size_bits
53
54 @property
55 def total_size_bytes(self):
56 return self._total_size_bits // 8
57
58 @property
59 def content_size_bits(self):
60 return self._content_size_bits
61
62 @property
63 def content_size_bytes(self):
64 return self._content_size_bits // 8
65
66 @property
67 def timestamp_begin(self):
68 return self._timestamp_begin
69
70 @property
71 def timestamp_end(self):
72 return self._timestamp_end
73
74 @property
75 def events_discarded(self):
76 return self._events_discarded
77
78 @property
79 def stream_class_id(self):
80 return self._stream_class_id
81
82
83 # An entry within the index of an LTTng data stream. While a stream beacon entry
84 # is conceptually unrelated to an index, it is sent as a reply to a
85 # LttngLiveViewerGetNextDataStreamIndexEntryCommand
86 class _LttngDataStreamBeaconIndexEntry:
87 def __init__(self, stream_class_id: int, timestamp: int):
88 self._stream_class_id = stream_class_id
89 self._timestamp = timestamp
90
91 @property
92 def timestamp(self):
93 return self._timestamp
94
95 @property
96 def stream_class_id(self):
97 return self._stream_class_id
98
99
100 _LttngIndexEntryT = Union[_LttngDataStreamIndexEntry, _LttngDataStreamBeaconIndexEntry]
101
102
103 class _LttngLiveViewerCommand:
104 def __init__(self, version: int):
105 self._version = version
106
107 @property
108 def version(self):
109 return self._version
110
111
112 class _LttngLiveViewerConnectCommand(_LttngLiveViewerCommand):
113 def __init__(self, version: int, viewer_session_id: int, major: int, minor: int):
114 super().__init__(version)
115 self._viewer_session_id = viewer_session_id
116 self._major = major
117 self._minor = minor
118
119 @property
120 def viewer_session_id(self):
121 return self._viewer_session_id
122
123 @property
124 def major(self):
125 return self._major
126
127 @property
128 def minor(self):
129 return self._minor
130
131
132 class _LttngLiveViewerReply:
133 pass
134
135
136 class _LttngLiveViewerConnectReply(_LttngLiveViewerReply):
137 def __init__(self, viewer_session_id: int, major: int, minor: int):
138 self._viewer_session_id = viewer_session_id
139 self._major = major
140 self._minor = minor
141
142 @property
143 def viewer_session_id(self):
144 return self._viewer_session_id
145
146 @property
147 def major(self):
148 return self._major
149
150 @property
151 def minor(self):
152 return self._minor
153
154
155 class _LttngLiveViewerGetTracingSessionInfosCommand(_LttngLiveViewerCommand):
156 pass
157
158
159 class _LttngLiveViewerTracingSessionInfo:
160 def __init__(
161 self,
162 tracing_session_id: int,
163 live_timer_freq: int,
164 client_count: int,
165 stream_count: int,
166 hostname: str,
167 name: str,
168 ):
169 self._tracing_session_id = tracing_session_id
170 self._live_timer_freq = live_timer_freq
171 self._client_count = client_count
172 self._stream_count = stream_count
173 self._hostname = hostname
174 self._name = name
175
176 @property
177 def tracing_session_id(self):
178 return self._tracing_session_id
179
180 @property
181 def live_timer_freq(self):
182 return self._live_timer_freq
183
184 @property
185 def client_count(self):
186 return self._client_count
187
188 @property
189 def stream_count(self):
190 return self._stream_count
191
192 @property
193 def hostname(self):
194 return self._hostname
195
196 @property
197 def name(self):
198 return self._name
199
200
201 class _LttngLiveViewerGetTracingSessionInfosReply(_LttngLiveViewerReply):
202 def __init__(
203 self, tracing_session_infos: Sequence[_LttngLiveViewerTracingSessionInfo]
204 ):
205 self._tracing_session_infos = tracing_session_infos
206
207 @property
208 def tracing_session_infos(self):
209 return self._tracing_session_infos
210
211
212 class _LttngLiveViewerAttachToTracingSessionCommand(_LttngLiveViewerCommand):
213 class SeekType:
214 BEGINNING = 1
215 LAST = 2
216
217 def __init__(
218 self, version: int, tracing_session_id: int, offset: int, seek_type: int
219 ):
220 super().__init__(version)
221 self._tracing_session_id = tracing_session_id
222 self._offset = offset
223 self._seek_type = seek_type
224
225 @property
226 def tracing_session_id(self):
227 return self._tracing_session_id
228
229 @property
230 def offset(self):
231 return self._offset
232
233 @property
234 def seek_type(self):
235 return self._seek_type
236
237
238 class _LttngLiveViewerStreamInfo:
239 def __init__(
240 self, id: int, trace_id: int, is_metadata: bool, path: str, channel_name: str
241 ):
242 self._id = id
243 self._trace_id = trace_id
244 self._is_metadata = is_metadata
245 self._path = path
246 self._channel_name = channel_name
247
248 @property
249 def id(self):
250 return self._id
251
252 @property
253 def trace_id(self):
254 return self._trace_id
255
256 @property
257 def is_metadata(self):
258 return self._is_metadata
259
260 @property
261 def path(self):
262 return self._path
263
264 @property
265 def channel_name(self):
266 return self._channel_name
267
268
269 class _LttngLiveViewerAttachToTracingSessionReply(_LttngLiveViewerReply):
270 class Status:
271 OK = 1
272 ALREADY = 2
273 UNKNOWN = 3
274 NOT_LIVE = 4
275 SEEK_ERROR = 5
276 NO_SESSION = 6
277
278 def __init__(self, status: int, stream_infos: Sequence[_LttngLiveViewerStreamInfo]):
279 self._status = status
280 self._stream_infos = stream_infos
281
282 @property
283 def status(self):
284 return self._status
285
286 @property
287 def stream_infos(self):
288 return self._stream_infos
289
290
291 class _LttngLiveViewerGetNextDataStreamIndexEntryCommand(_LttngLiveViewerCommand):
292 def __init__(self, version: int, stream_id: int):
293 super().__init__(version)
294 self._stream_id = stream_id
295
296 @property
297 def stream_id(self):
298 return self._stream_id
299
300
301 class _LttngLiveViewerGetNextDataStreamIndexEntryReply(_LttngLiveViewerReply):
302 class Status:
303 OK = 1
304 RETRY = 2
305 HUP = 3
306 ERROR = 4
307 INACTIVE = 5
308 EOF = 6
309
310 def __init__(
311 self,
312 status: int,
313 index_entry: _LttngIndexEntryT,
314 has_new_metadata: bool,
315 has_new_data_stream: bool,
316 ):
317 self._status = status
318 self._index_entry = index_entry
319 self._has_new_metadata = has_new_metadata
320 self._has_new_data_stream = has_new_data_stream
321
322 @property
323 def status(self):
324 return self._status
325
326 @property
327 def index_entry(self):
328 return self._index_entry
329
330 @property
331 def has_new_metadata(self):
332 return self._has_new_metadata
333
334 @property
335 def has_new_data_stream(self):
336 return self._has_new_data_stream
337
338
339 class _LttngLiveViewerGetDataStreamPacketDataCommand(_LttngLiveViewerCommand):
340 def __init__(self, version: int, stream_id: int, offset: int, req_length: int):
341 super().__init__(version)
342 self._stream_id = stream_id
343 self._offset = offset
344 self._req_length = req_length
345
346 @property
347 def stream_id(self):
348 return self._stream_id
349
350 @property
351 def offset(self):
352 return self._offset
353
354 @property
355 def req_length(self):
356 return self._req_length
357
358
359 class _LttngLiveViewerGetDataStreamPacketDataReply(_LttngLiveViewerReply):
360 class Status:
361 OK = 1
362 RETRY = 2
363 ERROR = 3
364 EOF = 4
365
366 def __init__(
367 self,
368 status: int,
369 data: bytes,
370 has_new_metadata: bool,
371 has_new_data_stream: bool,
372 ):
373 self._status = status
374 self._data = data
375 self._has_new_metadata = has_new_metadata
376 self._has_new_data_stream = has_new_data_stream
377
378 @property
379 def status(self):
380 return self._status
381
382 @property
383 def data(self):
384 return self._data
385
386 @property
387 def has_new_metadata(self):
388 return self._has_new_metadata
389
390 @property
391 def has_new_data_stream(self):
392 return self._has_new_data_stream
393
394
395 class _LttngLiveViewerGetMetadataStreamDataCommand(_LttngLiveViewerCommand):
396 def __init__(self, version: int, stream_id: int):
397 super().__init__(version)
398 self._stream_id = stream_id
399
400 @property
401 def stream_id(self):
402 return self._stream_id
403
404
405 class _LttngLiveViewerGetMetadataStreamDataContentReply(_LttngLiveViewerReply):
406 class Status:
407 OK = 1
408 NO_NEW = 2
409 ERROR = 3
410
411 def __init__(self, status: int, data: bytes):
412 self._status = status
413 self._data = data
414
415 @property
416 def status(self):
417 return self._status
418
419 @property
420 def data(self):
421 return self._data
422
423
424 class _LttngLiveViewerGetNewStreamInfosCommand(_LttngLiveViewerCommand):
425 def __init__(self, version: int, tracing_session_id: int):
426 super().__init__(version)
427 self._tracing_session_id = tracing_session_id
428
429 @property
430 def tracing_session_id(self):
431 return self._tracing_session_id
432
433
434 class _LttngLiveViewerGetNewStreamInfosReply(_LttngLiveViewerReply):
435 class Status:
436 OK = 1
437 NO_NEW = 2
438 ERROR = 3
439 HUP = 4
440
441 def __init__(self, status: int, stream_infos: Sequence[_LttngLiveViewerStreamInfo]):
442 self._status = status
443 self._stream_infos = stream_infos
444
445 @property
446 def status(self):
447 return self._status
448
449 @property
450 def stream_infos(self):
451 return self._stream_infos
452
453
454 class _LttngLiveViewerCreateViewerSessionCommand(_LttngLiveViewerCommand):
455 pass
456
457
458 class _LttngLiveViewerCreateViewerSessionReply(_LttngLiveViewerReply):
459 class Status:
460 OK = 1
461 ERROR = 2
462
463 def __init__(self, status: int):
464 self._status = status
465
466 @property
467 def status(self):
468 return self._status
469
470
471 class _LttngLiveViewerDetachFromTracingSessionCommand(_LttngLiveViewerCommand):
472 def __init__(self, version: int, tracing_session_id: int):
473 super().__init__(version)
474 self._tracing_session_id = tracing_session_id
475
476 @property
477 def tracing_session_id(self):
478 return self._tracing_session_id
479
480
481 class _LttngLiveViewerDetachFromTracingSessionReply(_LttngLiveViewerReply):
482 class Status:
483 OK = 1
484 UNKNOWN = 2
485 ERROR = 3
486
487 def __init__(self, status: int):
488 self._status = status
489
490 @property
491 def status(self):
492 return self._status
493
494
495 # An LTTng live protocol codec can convert bytes to command objects and
496 # reply objects to bytes.
497 class _LttngLiveViewerProtocolCodec:
498 _COMMAND_HEADER_STRUCT_FMT = "QII"
499 _COMMAND_HEADER_SIZE_BYTES = struct.calcsize(_COMMAND_HEADER_STRUCT_FMT)
500
501 def __init__(self):
502 pass
503
504 def _unpack(self, fmt: str, data: bytes, offset: int = 0):
505 fmt = "!" + fmt
506 return struct.unpack_from(fmt, data, offset)
507
508 def _unpack_payload(self, fmt: str, data: bytes):
509 return self._unpack(
510 fmt, data, _LttngLiveViewerProtocolCodec._COMMAND_HEADER_SIZE_BYTES
511 )
512
513 def decode(self, data: bytes):
514 if len(data) < self._COMMAND_HEADER_SIZE_BYTES:
515 # Not enough data to read the command header
516 return
517
518 payload_size, cmd_type, version = self._unpack(
519 self._COMMAND_HEADER_STRUCT_FMT, data
520 )
521 logging.info(
522 "Decoded command header: payload-size={}, cmd-type={}, version={}".format(
523 payload_size, cmd_type, version
524 )
525 )
526
527 if len(data) < self._COMMAND_HEADER_SIZE_BYTES + payload_size:
528 # Not enough data to read the whole command
529 return
530
531 if cmd_type == 1:
532 viewer_session_id, major, minor, _ = self._unpack_payload("QIII", data)
533 return _LttngLiveViewerConnectCommand(
534 version, viewer_session_id, major, minor
535 )
536 elif cmd_type == 2:
537 return _LttngLiveViewerGetTracingSessionInfosCommand(version)
538 elif cmd_type == 3:
539 tracing_session_id, offset, seek_type = self._unpack_payload("QQI", data)
540 return _LttngLiveViewerAttachToTracingSessionCommand(
541 version, tracing_session_id, offset, seek_type
542 )
543 elif cmd_type == 4:
544 (stream_id,) = self._unpack_payload("Q", data)
545 return _LttngLiveViewerGetNextDataStreamIndexEntryCommand(
546 version, stream_id
547 )
548 elif cmd_type == 5:
549 stream_id, offset, req_length = self._unpack_payload("QQI", data)
550 return _LttngLiveViewerGetDataStreamPacketDataCommand(
551 version, stream_id, offset, req_length
552 )
553 elif cmd_type == 6:
554 (stream_id,) = self._unpack_payload("Q", data)
555 return _LttngLiveViewerGetMetadataStreamDataCommand(version, stream_id)
556 elif cmd_type == 7:
557 (tracing_session_id,) = self._unpack_payload("Q", data)
558 return _LttngLiveViewerGetNewStreamInfosCommand(version, tracing_session_id)
559 elif cmd_type == 8:
560 return _LttngLiveViewerCreateViewerSessionCommand(version)
561 elif cmd_type == 9:
562 (tracing_session_id,) = self._unpack_payload("Q", data)
563 return _LttngLiveViewerDetachFromTracingSessionCommand(
564 version, tracing_session_id
565 )
566 else:
567 raise RuntimeError("Unknown command type {}".format(cmd_type))
568
569 def _pack(self, fmt: str, *args: Any):
570 # Force network byte order
571 return struct.pack("!" + fmt, *args)
572
573 def _encode_zero_padded_str(self, string: str, length: int):
574 data = string.encode()
575 return data.ljust(length, b"\x00")
576
577 def _encode_stream_info(self, info: _LttngLiveViewerStreamInfo):
578 data = self._pack("QQI", info.id, info.trace_id, int(info.is_metadata))
579 data += self._encode_zero_padded_str(info.path, 4096)
580 data += self._encode_zero_padded_str(info.channel_name, 255)
581 return data
582
583 def _get_has_new_stuff_flags(
584 self, has_new_metadata: bool, has_new_data_streams: bool
585 ):
586 flags = 0
587
588 if has_new_metadata:
589 flags |= 1
590
591 if has_new_data_streams:
592 flags |= 2
593
594 return flags
595
596 def encode(
597 self,
598 reply: _LttngLiveViewerReply,
599 ) -> bytes:
600 if type(reply) is _LttngLiveViewerConnectReply:
601 data = self._pack(
602 "QIII", reply.viewer_session_id, reply.major, reply.minor, 2
603 )
604 elif type(reply) is _LttngLiveViewerGetTracingSessionInfosReply:
605 data = self._pack("I", len(reply.tracing_session_infos))
606
607 for info in reply.tracing_session_infos:
608 data += self._pack(
609 "QIII",
610 info.tracing_session_id,
611 info.live_timer_freq,
612 info.client_count,
613 info.stream_count,
614 )
615 data += self._encode_zero_padded_str(info.hostname, 64)
616 data += self._encode_zero_padded_str(info.name, 255)
617 elif type(reply) is _LttngLiveViewerAttachToTracingSessionReply:
618 data = self._pack("II", reply.status, len(reply.stream_infos))
619
620 for info in reply.stream_infos:
621 data += self._encode_stream_info(info)
622 elif type(reply) is _LttngLiveViewerGetNextDataStreamIndexEntryReply:
623 index_format = "QQQQQQQII"
624 entry = reply.index_entry
625 flags = self._get_has_new_stuff_flags(
626 reply.has_new_metadata, reply.has_new_data_stream
627 )
628
629 if isinstance(entry, _LttngDataStreamIndexEntry):
630 data = self._pack(
631 index_format,
632 entry.offset_bytes,
633 entry.total_size_bits,
634 entry.content_size_bits,
635 entry.timestamp_begin,
636 entry.timestamp_end,
637 entry.events_discarded,
638 entry.stream_class_id,
639 reply.status,
640 flags,
641 )
642 else:
643 data = self._pack(
644 index_format,
645 0,
646 0,
647 0,
648 0,
649 entry.timestamp,
650 0,
651 entry.stream_class_id,
652 reply.status,
653 flags,
654 )
655 elif type(reply) is _LttngLiveViewerGetDataStreamPacketDataReply:
656 flags = self._get_has_new_stuff_flags(
657 reply.has_new_metadata, reply.has_new_data_stream
658 )
659 data = self._pack("III", reply.status, len(reply.data), flags)
660 data += reply.data
661 elif type(reply) is _LttngLiveViewerGetMetadataStreamDataContentReply:
662 data = self._pack("QI", len(reply.data), reply.status)
663 data += reply.data
664 elif type(reply) is _LttngLiveViewerGetNewStreamInfosReply:
665 data = self._pack("II", reply.status, len(reply.stream_infos))
666
667 for info in reply.stream_infos:
668 data += self._encode_stream_info(info)
669 elif type(reply) is _LttngLiveViewerCreateViewerSessionReply:
670 data = self._pack("I", reply.status)
671 elif type(reply) is _LttngLiveViewerDetachFromTracingSessionReply:
672 data = self._pack("I", reply.status)
673 else:
674 raise ValueError(
675 "Unknown reply object with class `{}`".format(reply.__class__.__name__)
676 )
677
678 return data
679
680
681 def _get_entry_timestamp_begin(
682 entry: _LttngIndexEntryT,
683 ):
684 if isinstance(entry, _LttngDataStreamBeaconIndexEntry):
685 return entry.timestamp
686 else:
687 return entry.timestamp_begin
688
689
690 # The index of an LTTng data stream, a sequence of index entries.
691 class _LttngDataStreamIndex(Sequence[_LttngIndexEntryT]):
692 def __init__(self, path: str, beacons: Optional[tjson.ArrayVal]):
693 self._path = path
694 self._build()
695
696 if beacons:
697 stream_class_id = self._entries[0].stream_class_id
698
699 beacons_list = [] # type: list[_LttngDataStreamBeaconIndexEntry]
700 for ts in beacons.iter(tjson.IntVal):
701 beacons_list.append(
702 _LttngDataStreamBeaconIndexEntry(stream_class_id, ts.val)
703 )
704
705 self._add_beacons(beacons_list)
706
707 logging.info(
708 'Built data stream index entries: path="{}", count={}'.format(
709 path, len(self._entries)
710 )
711 )
712
713 def _build(self):
714 self._entries = [] # type: list[_LttngIndexEntryT]
715 assert os.path.isfile(self._path)
716
717 with open(self._path, "rb") as f:
718 # Read header first
719 fmt = ">IIII"
720 size = struct.calcsize(fmt)
721 data = f.read(size)
722 assert len(data) == size
723 magic, _, _, index_entry_length = struct.unpack(fmt, data)
724 assert magic == 0xC1F1DCC1
725
726 # Read index entries
727 fmt = ">QQQQQQQ"
728 size = struct.calcsize(fmt)
729
730 while True:
731 logging.debug(
732 'Decoding data stream index entry: path="{}", offset={}'.format(
733 self._path, f.tell()
734 )
735 )
736 data = f.read(size)
737
738 if not data:
739 # Done
740 break
741
742 assert len(data) == size
743 (
744 offset_bytes,
745 total_size_bits,
746 content_size_bits,
747 timestamp_begin,
748 timestamp_end,
749 events_discarded,
750 stream_class_id,
751 ) = struct.unpack(fmt, data)
752
753 self._entries.append(
754 _LttngDataStreamIndexEntry(
755 offset_bytes,
756 total_size_bits,
757 content_size_bits,
758 timestamp_begin,
759 timestamp_end,
760 events_discarded,
761 stream_class_id,
762 )
763 )
764
765 # Skip anything else before the next entry
766 f.seek(index_entry_length - size, os.SEEK_CUR)
767
768 def _add_beacons(self, beacons: Iterable[_LttngDataStreamBeaconIndexEntry]):
769 # Assumes entries[n + 1].timestamp_end >= entries[n].timestamp_begin
770 def sort_key(
771 entry: Union[_LttngDataStreamIndexEntry, _LttngDataStreamBeaconIndexEntry],
772 ) -> int:
773 if isinstance(entry, _LttngDataStreamBeaconIndexEntry):
774 return entry.timestamp
775 else:
776 return entry.timestamp_end
777
778 self._entries += beacons
779 self._entries.sort(key=sort_key)
780
781 @overload
782 def __getitem__(self, index: int) -> _LttngIndexEntryT:
783 ...
784
785 @overload
786 def __getitem__(self, index: slice) -> Sequence[_LttngIndexEntryT]: # noqa: F811
787 ...
788
789 def __getitem__( # noqa: F811
790 self, index: Union[int, slice]
791 ) -> Union[_LttngIndexEntryT, Sequence[_LttngIndexEntryT],]:
792 return self._entries[index]
793
794 def __len__(self):
795 return len(self._entries)
796
797 @property
798 def path(self):
799 return self._path
800
801
802 # An LTTng data stream.
803 class _LttngDataStream:
804 def __init__(self, path: str, beacons_json: Optional[tjson.ArrayVal]):
805 self._path = path
806 filename = os.path.basename(path)
807 match = re.match(r"(.*)_\d+", filename)
808 if not match:
809 raise RuntimeError(
810 "Unexpected data stream file name pattern: {}".format(filename)
811 )
812
813 self._channel_name = match.group(1)
814 trace_dir = os.path.dirname(path)
815 index_path = os.path.join(trace_dir, "index", filename + ".idx")
816 self._index = _LttngDataStreamIndex(index_path, beacons_json)
817 assert os.path.isfile(path)
818 self._file = open(path, "rb")
819 logging.info(
820 'Built data stream: path="{}", channel-name="{}"'.format(
821 path, self._channel_name
822 )
823 )
824
825 @property
826 def path(self):
827 return self._path
828
829 @property
830 def channel_name(self):
831 return self._channel_name
832
833 @property
834 def index(self):
835 return self._index
836
837 def get_data(self, offset_bytes: int, len_bytes: int):
838 self._file.seek(offset_bytes)
839 return self._file.read(len_bytes)
840
841
842 class _LttngMetadataStreamSection:
843 def __init__(self, timestamp: int, data: Optional[bytes]):
844 self._timestamp = timestamp
845 if data is None:
846 self._data = bytes()
847 else:
848 self._data = data
849 logging.info(
850 "Built metadata stream section: ts={}, data-len={}".format(
851 self._timestamp, len(self._data)
852 )
853 )
854
855 @property
856 def timestamp(self):
857 return self._timestamp
858
859 @property
860 def data(self):
861 return self._data
862
863
864 # An LTTng metadata stream.
865 class _LttngMetadataStream:
866 def __init__(
867 self,
868 metadata_file_path: str,
869 config_sections: Sequence[_LttngMetadataStreamSection],
870 ):
871 self._path = metadata_file_path
872 self._sections = config_sections
873 logging.info(
874 "Built metadata stream: path={}, section-len={}".format(
875 self._path, len(self._sections)
876 )
877 )
878
879 @property
880 def path(self):
881 return self._path
882
883 @property
884 def sections(self):
885 return self._sections
886
887
888 class LttngMetadataConfigSection:
889 def __init__(self, line: int, timestamp: int, is_empty: bool):
890 self._line = line
891 self._timestamp = timestamp
892 self._is_empty = is_empty
893
894 @property
895 def line(self):
896 return self._line
897
898 @property
899 def timestamp(self):
900 return self._timestamp
901
902 @property
903 def is_empty(self):
904 return self._is_empty
905
906
907 def _parse_metadata_sections_config(metadata_sections_json: tjson.ArrayVal):
908 metadata_sections = [] # type: list[LttngMetadataConfigSection]
909 append_empty_section = False
910 last_timestamp = 0
911 last_line = 0
912
913 for section in metadata_sections_json:
914 if isinstance(section, tjson.StrVal):
915 if section.val == "empty":
916 # Found a empty section marker. Actually append the section at the
917 # timestamp of the next concrete section.
918 append_empty_section = True
919 else:
920 raise ValueError("Invalid string value at {}.".format(section.path))
921 elif isinstance(section, tjson.ObjVal):
922 line = section.at("line", tjson.IntVal).val
923 ts = section.at("timestamp", tjson.IntVal).val
924
925 # Sections' timestamps and lines must both be increasing.
926 assert ts > last_timestamp
927 last_timestamp = ts
928
929 assert line > last_line
930 last_line = line
931
932 if append_empty_section:
933 metadata_sections.append(LttngMetadataConfigSection(line, ts, True))
934 append_empty_section = False
935
936 metadata_sections.append(LttngMetadataConfigSection(line, ts, False))
937 else:
938 raise TypeError(
939 "`{}`: expecting a string or object value".format(section.path)
940 )
941
942 return metadata_sections
943
944
945 def _split_metadata_sections(
946 metadata_file_path: str, metadata_sections_json: tjson.ArrayVal
947 ):
948 metadata_sections = _parse_metadata_sections_config(metadata_sections_json)
949
950 sections = [] # type: list[_LttngMetadataStreamSection]
951 with open(metadata_file_path, "r") as metadata_file:
952 metadata_lines = [line for line in metadata_file]
953
954 metadata_section_idx = 0
955 curr_metadata_section = bytearray()
956
957 for idx, line_content in enumerate(metadata_lines):
958 # Add one to the index to convert from the zero-indexing of the
959 # enumerate() function to the one-indexing used by humans when
960 # viewing a text file.
961 curr_line_number = idx + 1
962
963 # If there are no more sections, simply append the line.
964 if metadata_section_idx + 1 >= len(metadata_sections):
965 curr_metadata_section += bytearray(line_content, "utf8")
966 continue
967
968 next_section_line_number = metadata_sections[metadata_section_idx + 1].line
969
970 # If the next section begins at the current line, create a
971 # section with the metadata we gathered so far.
972 if curr_line_number >= next_section_line_number:
973 # Flushing the metadata of the current section.
974 sections.append(
975 _LttngMetadataStreamSection(
976 metadata_sections[metadata_section_idx].timestamp,
977 bytes(curr_metadata_section),
978 )
979 )
980
981 # Move to the next section.
982 metadata_section_idx += 1
983
984 # Clear old content and append current line for the next section.
985 curr_metadata_section.clear()
986 curr_metadata_section += bytearray(line_content, "utf8")
987
988 # Append any empty sections.
989 while metadata_sections[metadata_section_idx].is_empty:
990 sections.append(
991 _LttngMetadataStreamSection(
992 metadata_sections[metadata_section_idx].timestamp, None
993 )
994 )
995 metadata_section_idx += 1
996 else:
997 # Append line_content to the current metadata section.
998 curr_metadata_section += bytearray(line_content, "utf8")
999
1000 # We iterated over all the lines of the metadata file. Close the current section.
1001 sections.append(
1002 _LttngMetadataStreamSection(
1003 metadata_sections[metadata_section_idx].timestamp,
1004 bytes(curr_metadata_section),
1005 )
1006 )
1007
1008 return sections
1009
1010
1011 _StreamBeaconsT = Dict[str, Iterable[int]]
1012
1013
1014 # An LTTng trace, a sequence of LTTng data streams.
1015 class LttngTrace(Sequence[_LttngDataStream]):
1016 def __init__(
1017 self,
1018 trace_dir: str,
1019 metadata_sections_json: Optional[tjson.ArrayVal],
1020 beacons_json: Optional[tjson.ObjVal],
1021 ):
1022 assert os.path.isdir(trace_dir)
1023 self._path = trace_dir
1024 self._create_metadata_stream(trace_dir, metadata_sections_json)
1025 self._create_data_streams(trace_dir, beacons_json)
1026 logging.info('Built trace: path="{}"'.format(trace_dir))
1027
1028 def _create_data_streams(
1029 self, trace_dir: str, beacons_json: Optional[tjson.ObjVal]
1030 ):
1031 data_stream_paths = [] # type: list[str]
1032
1033 for filename in os.listdir(trace_dir):
1034 path = os.path.join(trace_dir, filename)
1035
1036 if not os.path.isfile(path):
1037 continue
1038
1039 if filename.startswith("."):
1040 continue
1041
1042 if filename == "metadata":
1043 continue
1044
1045 data_stream_paths.append(path)
1046
1047 data_stream_paths.sort()
1048 self._data_streams = [] # type: list[_LttngDataStream]
1049
1050 for data_stream_path in data_stream_paths:
1051 stream_name = os.path.basename(data_stream_path)
1052 this_beacons_json = None
1053 if beacons_json is not None and stream_name in beacons_json:
1054 this_beacons_json = beacons_json.at(stream_name, tjson.ArrayVal)
1055
1056 self._data_streams.append(
1057 _LttngDataStream(data_stream_path, this_beacons_json)
1058 )
1059
1060 def _create_metadata_stream(
1061 self, trace_dir: str, metadata_sections_json: Optional[tjson.ArrayVal]
1062 ):
1063 metadata_path = os.path.join(trace_dir, "metadata")
1064 metadata_sections = [] # type: list[_LttngMetadataStreamSection]
1065
1066 if metadata_sections_json is None:
1067 with open(metadata_path, "rb") as metadata_file:
1068 metadata_sections.append(
1069 _LttngMetadataStreamSection(0, metadata_file.read())
1070 )
1071 else:
1072 metadata_sections = _split_metadata_sections(
1073 metadata_path, metadata_sections_json
1074 )
1075
1076 self._metadata_stream = _LttngMetadataStream(metadata_path, metadata_sections)
1077
1078 @property
1079 def path(self):
1080 return self._path
1081
1082 @property
1083 def metadata_stream(self):
1084 return self._metadata_stream
1085
1086 @overload
1087 def __getitem__(self, index: int) -> _LttngDataStream:
1088 ...
1089
1090 @overload
1091 def __getitem__(self, index: slice) -> Sequence[_LttngDataStream]: # noqa: F811
1092 ...
1093
1094 def __getitem__( # noqa: F811
1095 self, index: Union[int, slice]
1096 ) -> Union[_LttngDataStream, Sequence[_LttngDataStream]]:
1097 return self._data_streams[index]
1098
1099 def __len__(self):
1100 return len(self._data_streams)
1101
1102
1103 # The state of a single data stream.
1104 class _LttngLiveViewerSessionDataStreamState:
1105 def __init__(
1106 self,
1107 ts_state: "_LttngLiveViewerSessionTracingSessionState",
1108 info: _LttngLiveViewerStreamInfo,
1109 data_stream: _LttngDataStream,
1110 metadata_stream_id: int,
1111 ):
1112 self._ts_state = ts_state
1113 self._info = info
1114 self._data_stream = data_stream
1115 self._metadata_stream_id = metadata_stream_id
1116 self._cur_index_entry_index = 0
1117 fmt = 'Built data stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
1118 logging.info(
1119 fmt.format(
1120 info.id,
1121 ts_state.tracing_session_descriptor.info.tracing_session_id,
1122 ts_state.tracing_session_descriptor.info.name,
1123 data_stream.path,
1124 )
1125 )
1126
1127 @property
1128 def tracing_session_state(self):
1129 return self._ts_state
1130
1131 @property
1132 def info(self):
1133 return self._info
1134
1135 @property
1136 def data_stream(self):
1137 return self._data_stream
1138
1139 @property
1140 def cur_index_entry(self):
1141 if self._cur_index_entry_index == len(self._data_stream.index):
1142 return
1143
1144 return self._data_stream.index[self._cur_index_entry_index]
1145
1146 @property
1147 def metadata_stream_id(self):
1148 return self._metadata_stream_id
1149
1150 def goto_next_index_entry(self):
1151 self._cur_index_entry_index += 1
1152
1153
1154 # The state of a single metadata stream.
1155 class _LttngLiveViewerSessionMetadataStreamState:
1156 def __init__(
1157 self,
1158 ts_state: "_LttngLiveViewerSessionTracingSessionState",
1159 info: _LttngLiveViewerStreamInfo,
1160 metadata_stream: _LttngMetadataStream,
1161 ):
1162 self._ts_state = ts_state
1163 self._info = info
1164 self._metadata_stream = metadata_stream
1165 self._cur_metadata_stream_section_index = 0
1166 if len(metadata_stream.sections) > 1:
1167 self._next_metadata_stream_section_timestamp = metadata_stream.sections[
1168 1
1169 ].timestamp
1170 else:
1171 self._next_metadata_stream_section_timestamp = None
1172
1173 self._is_sent = False
1174 fmt = 'Built metadata stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
1175 logging.info(
1176 fmt.format(
1177 info.id,
1178 ts_state.tracing_session_descriptor.info.tracing_session_id,
1179 ts_state.tracing_session_descriptor.info.name,
1180 metadata_stream.path,
1181 )
1182 )
1183
1184 @property
1185 def info(self):
1186 return self._info
1187
1188 @property
1189 def metadata_stream(self):
1190 return self._metadata_stream
1191
1192 @property
1193 def is_sent(self):
1194 return self._is_sent
1195
1196 @is_sent.setter
1197 def is_sent(self, value: bool):
1198 self._is_sent = value
1199
1200 @property
1201 def cur_section(self):
1202 fmt = "Get current metadata section: section-idx={}"
1203 logging.info(fmt.format(self._cur_metadata_stream_section_index))
1204 if self._cur_metadata_stream_section_index == len(
1205 self._metadata_stream.sections
1206 ):
1207 return
1208
1209 return self._metadata_stream.sections[self._cur_metadata_stream_section_index]
1210
1211 def goto_next_section(self):
1212 self._cur_metadata_stream_section_index += 1
1213 if self.cur_section:
1214 self._next_metadata_stream_section_timestamp = self.cur_section.timestamp
1215 else:
1216 self._next_metadata_stream_section_timestamp = None
1217
1218 @property
1219 def next_section_timestamp(self):
1220 return self._next_metadata_stream_section_timestamp
1221
1222
1223 # A tracing session descriptor.
1224 #
1225 # In the constructor, `traces` is a list of LTTng traces (`LttngTrace`
1226 # objects).
1227 class LttngTracingSessionDescriptor:
1228 def __init__(
1229 self,
1230 name: str,
1231 tracing_session_id: int,
1232 hostname: str,
1233 live_timer_freq: int,
1234 client_count: int,
1235 traces: Iterable[LttngTrace],
1236 ):
1237 for trace in traces:
1238 if name not in trace.path:
1239 fmt = "Tracing session name must be part of every trace path (`{}` not found in `{}`)"
1240 raise ValueError(fmt.format(name, trace.path))
1241
1242 self._traces = traces
1243 stream_count = sum([len(t) + 1 for t in traces])
1244 self._info = _LttngLiveViewerTracingSessionInfo(
1245 tracing_session_id,
1246 live_timer_freq,
1247 client_count,
1248 stream_count,
1249 hostname,
1250 name,
1251 )
1252
1253 @property
1254 def traces(self):
1255 return self._traces
1256
1257 @property
1258 def info(self):
1259 return self._info
1260
1261
1262 # The state of a tracing session.
1263 class _LttngLiveViewerSessionTracingSessionState:
1264 def __init__(self, tc_descr: LttngTracingSessionDescriptor, base_stream_id: int):
1265 self._tc_descr = tc_descr
1266 self._stream_infos = [] # type: list[_LttngLiveViewerStreamInfo]
1267 self._ds_states = {} # type: dict[int, _LttngLiveViewerSessionDataStreamState]
1268 self._ms_states = (
1269 {}
1270 ) # type: dict[int, _LttngLiveViewerSessionMetadataStreamState]
1271 stream_id = base_stream_id
1272
1273 for trace in tc_descr.traces:
1274 trace_id = stream_id * 1000
1275
1276 # Metadata stream -> stream info and metadata stream state
1277 info = _LttngLiveViewerStreamInfo(
1278 stream_id, trace_id, True, trace.metadata_stream.path, "metadata"
1279 )
1280 self._stream_infos.append(info)
1281 self._ms_states[stream_id] = _LttngLiveViewerSessionMetadataStreamState(
1282 self, info, trace.metadata_stream
1283 )
1284 metadata_stream_id = stream_id
1285 stream_id += 1
1286
1287 # Data streams -> stream infos and data stream states
1288 for data_stream in trace:
1289 info = _LttngLiveViewerStreamInfo(
1290 stream_id,
1291 trace_id,
1292 False,
1293 data_stream.path,
1294 data_stream.channel_name,
1295 )
1296 self._stream_infos.append(info)
1297 self._ds_states[stream_id] = _LttngLiveViewerSessionDataStreamState(
1298 self, info, data_stream, metadata_stream_id
1299 )
1300 stream_id += 1
1301
1302 self._is_attached = False
1303 fmt = 'Built tracing session state: id={}, name="{}"'
1304 logging.info(fmt.format(tc_descr.info.tracing_session_id, tc_descr.info.name))
1305
1306 @property
1307 def tracing_session_descriptor(self):
1308 return self._tc_descr
1309
1310 @property
1311 def data_stream_states(self):
1312 return self._ds_states
1313
1314 @property
1315 def metadata_stream_states(self):
1316 return self._ms_states
1317
1318 @property
1319 def stream_infos(self):
1320 return self._stream_infos
1321
1322 @property
1323 def has_new_metadata(self):
1324 return any([not ms.is_sent for ms in self._ms_states.values()])
1325
1326 @property
1327 def is_attached(self):
1328 return self._is_attached
1329
1330 @is_attached.setter
1331 def is_attached(self, value: bool):
1332 self._is_attached = value
1333
1334
1335 def needs_new_metadata_section(
1336 metadata_stream_state: _LttngLiveViewerSessionMetadataStreamState,
1337 latest_timestamp: int,
1338 ):
1339 if metadata_stream_state.next_section_timestamp is None:
1340 return False
1341
1342 if latest_timestamp >= metadata_stream_state.next_section_timestamp:
1343 return True
1344 else:
1345 return False
1346
1347
1348 # An LTTng live viewer session manages a view on tracing sessions
1349 # and replies to commands accordingly.
1350 class _LttngLiveViewerSession:
1351 def __init__(
1352 self,
1353 viewer_session_id: int,
1354 tracing_session_descriptors: Iterable[LttngTracingSessionDescriptor],
1355 max_query_data_response_size: Optional[int],
1356 ):
1357 self._viewer_session_id = viewer_session_id
1358 self._ts_states = (
1359 {}
1360 ) # type: dict[int, _LttngLiveViewerSessionTracingSessionState]
1361 self._stream_states = (
1362 {}
1363 ) # type: dict[int, _LttngLiveViewerSessionDataStreamState | _LttngLiveViewerSessionMetadataStreamState]
1364 self._max_query_data_response_size = max_query_data_response_size
1365 total_stream_infos = 0
1366
1367 for ts_descr in tracing_session_descriptors:
1368 ts_state = _LttngLiveViewerSessionTracingSessionState(
1369 ts_descr, total_stream_infos
1370 )
1371 ts_id = ts_state.tracing_session_descriptor.info.tracing_session_id
1372 self._ts_states[ts_id] = ts_state
1373 total_stream_infos += len(ts_state.stream_infos)
1374
1375 # Update session's stream states to have the new states
1376 self._stream_states.update(ts_state.data_stream_states)
1377 self._stream_states.update(ts_state.metadata_stream_states)
1378
1379 self._command_handlers = {
1380 _LttngLiveViewerAttachToTracingSessionCommand: self._handle_attach_to_tracing_session_command,
1381 _LttngLiveViewerCreateViewerSessionCommand: self._handle_create_viewer_session_command,
1382 _LttngLiveViewerDetachFromTracingSessionCommand: self._handle_detach_from_tracing_session_command,
1383 _LttngLiveViewerGetDataStreamPacketDataCommand: self._handle_get_data_stream_packet_data_command,
1384 _LttngLiveViewerGetMetadataStreamDataCommand: self._handle_get_metadata_stream_data_command,
1385 _LttngLiveViewerGetNewStreamInfosCommand: self._handle_get_new_stream_infos_command,
1386 _LttngLiveViewerGetNextDataStreamIndexEntryCommand: self._handle_get_next_data_stream_index_entry_command,
1387 _LttngLiveViewerGetTracingSessionInfosCommand: self._handle_get_tracing_session_infos_command,
1388 } # type: dict[type[_LttngLiveViewerCommand], Callable[[Any], _LttngLiveViewerReply]]
1389
1390 @property
1391 def viewer_session_id(self):
1392 return self._viewer_session_id
1393
1394 def _get_tracing_session_state(self, tracing_session_id: int):
1395 if tracing_session_id not in self._ts_states:
1396 raise RuntimeError(
1397 "Unknown tracing session ID {}".format(tracing_session_id)
1398 )
1399
1400 return self._ts_states[tracing_session_id]
1401
1402 def _get_data_stream_state(self, stream_id: int):
1403 if stream_id not in self._stream_states:
1404 RuntimeError("Unknown stream ID {}".format(stream_id))
1405
1406 stream = self._stream_states[stream_id]
1407 if type(stream) is not _LttngLiveViewerSessionDataStreamState:
1408 raise RuntimeError("Stream is not a data stream")
1409
1410 return stream
1411
1412 def _get_metadata_stream_state(self, stream_id: int):
1413 if stream_id not in self._stream_states:
1414 RuntimeError("Unknown stream ID {}".format(stream_id))
1415
1416 stream = self._stream_states[stream_id]
1417 if type(stream) is not _LttngLiveViewerSessionMetadataStreamState:
1418 raise RuntimeError("Stream is not a metadata stream")
1419
1420 return stream
1421
1422 def handle_command(self, cmd: _LttngLiveViewerCommand):
1423 logging.info(
1424 "Handling command in viewer session: cmd-cls-name={}".format(
1425 cmd.__class__.__name__
1426 )
1427 )
1428 cmd_type = type(cmd)
1429
1430 if cmd_type not in self._command_handlers:
1431 raise RuntimeError(
1432 "Unexpected command: cmd-cls-name={}".format(cmd.__class__.__name__)
1433 )
1434
1435 return self._command_handlers[cmd_type](cmd)
1436
1437 def _handle_attach_to_tracing_session_command(
1438 self, cmd: _LttngLiveViewerAttachToTracingSessionCommand
1439 ):
1440 fmt = 'Handling "attach to tracing session" command: ts-id={}, offset={}, seek-type={}'
1441 logging.info(fmt.format(cmd.tracing_session_id, cmd.offset, cmd.seek_type))
1442 ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
1443 info = ts_state.tracing_session_descriptor.info
1444
1445 if ts_state.is_attached:
1446 raise RuntimeError(
1447 "Cannot attach to tracing session `{}`: viewer is already attached".format(
1448 info.name
1449 )
1450 )
1451
1452 ts_state.is_attached = True
1453 status = _LttngLiveViewerAttachToTracingSessionReply.Status.OK
1454 return _LttngLiveViewerAttachToTracingSessionReply(
1455 status, ts_state.stream_infos
1456 )
1457
1458 def _handle_detach_from_tracing_session_command(
1459 self, cmd: _LttngLiveViewerDetachFromTracingSessionCommand
1460 ):
1461 fmt = 'Handling "detach from tracing session" command: ts-id={}'
1462 logging.info(fmt.format(cmd.tracing_session_id))
1463 ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
1464 info = ts_state.tracing_session_descriptor.info
1465
1466 if not ts_state.is_attached:
1467 raise RuntimeError(
1468 "Cannot detach to tracing session `{}`: viewer is not attached".format(
1469 info.name
1470 )
1471 )
1472
1473 ts_state.is_attached = False
1474 status = _LttngLiveViewerDetachFromTracingSessionReply.Status.OK
1475 return _LttngLiveViewerDetachFromTracingSessionReply(status)
1476
1477 def _handle_get_next_data_stream_index_entry_command(
1478 self, cmd: _LttngLiveViewerGetNextDataStreamIndexEntryCommand
1479 ):
1480 fmt = 'Handling "get next data stream index entry" command: stream-id={}'
1481 logging.info(fmt.format(cmd.stream_id))
1482 stream_state = self._get_data_stream_state(cmd.stream_id)
1483 metadata_stream_state = self._get_metadata_stream_state(
1484 stream_state.metadata_stream_id
1485 )
1486
1487 if stream_state.cur_index_entry is None:
1488 # The viewer is done reading this stream
1489 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.HUP
1490
1491 # Dummy data stream index entry to use with the `HUP` status
1492 # (the reply needs one, but the viewer ignores it)
1493 index_entry = _LttngDataStreamIndexEntry(0, 0, 0, 0, 0, 0, 0)
1494
1495 return _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1496 status, index_entry, False, False
1497 )
1498
1499 timestamp_begin = _get_entry_timestamp_begin(stream_state.cur_index_entry)
1500
1501 if needs_new_metadata_section(metadata_stream_state, timestamp_begin):
1502 metadata_stream_state.is_sent = False
1503 metadata_stream_state.goto_next_section()
1504
1505 # The viewer only checks the `has_new_metadata` flag if the
1506 # reply's status is `OK`, so we need to provide an index here
1507 has_new_metadata = stream_state.tracing_session_state.has_new_metadata
1508 if isinstance(stream_state.cur_index_entry, _LttngDataStreamIndexEntry):
1509 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.OK
1510 else:
1511 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.INACTIVE
1512
1513 reply = _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1514 status, stream_state.cur_index_entry, has_new_metadata, False
1515 )
1516 stream_state.goto_next_index_entry()
1517 return reply
1518
1519 def _handle_get_data_stream_packet_data_command(
1520 self, cmd: _LttngLiveViewerGetDataStreamPacketDataCommand
1521 ):
1522 fmt = 'Handling "get data stream packet data" command: stream-id={}, offset={}, req-length={}'
1523 logging.info(fmt.format(cmd.stream_id, cmd.offset, cmd.req_length))
1524 stream_state = self._get_data_stream_state(cmd.stream_id)
1525 data_response_length = cmd.req_length
1526
1527 if stream_state.tracing_session_state.has_new_metadata:
1528 status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.ERROR
1529 return _LttngLiveViewerGetDataStreamPacketDataReply(
1530 status, bytes(), True, False
1531 )
1532
1533 if self._max_query_data_response_size:
1534 # Enforce a server side limit on the query requested length.
1535 # To ensure that the transaction terminate take the minimum of both
1536 # value.
1537 data_response_length = min(
1538 cmd.req_length, self._max_query_data_response_size
1539 )
1540 fmt = 'Limiting "get data stream packet data" command: req-length={} actual response size={}'
1541 logging.info(fmt.format(cmd.req_length, data_response_length))
1542
1543 data = stream_state.data_stream.get_data(cmd.offset, data_response_length)
1544 status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.OK
1545 return _LttngLiveViewerGetDataStreamPacketDataReply(status, data, False, False)
1546
1547 def _handle_get_metadata_stream_data_command(
1548 self, cmd: _LttngLiveViewerGetMetadataStreamDataCommand
1549 ):
1550 fmt = 'Handling "get metadata stream data" command: stream-id={}'
1551 logging.info(fmt.format(cmd.stream_id))
1552 metadata_stream_state = self._get_metadata_stream_state(cmd.stream_id)
1553
1554 if metadata_stream_state.is_sent:
1555 status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.NO_NEW
1556 return _LttngLiveViewerGetMetadataStreamDataContentReply(status, bytes())
1557
1558 metadata_stream_state.is_sent = True
1559 status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.OK
1560 metadata_section = metadata_stream_state.cur_section
1561 assert metadata_section is not None
1562
1563 # If we are sending an empty section, ready the next one right away.
1564 if len(metadata_section.data) == 0:
1565 metadata_stream_state.is_sent = False
1566 metadata_stream_state.goto_next_section()
1567
1568 fmt = 'Replying to "get metadata stream data" command: metadata-size={}'
1569 logging.info(fmt.format(len(metadata_section.data)))
1570 return _LttngLiveViewerGetMetadataStreamDataContentReply(
1571 status, metadata_section.data
1572 )
1573
1574 def _handle_get_new_stream_infos_command(
1575 self, cmd: _LttngLiveViewerGetNewStreamInfosCommand
1576 ):
1577 fmt = 'Handling "get new stream infos" command: ts-id={}'
1578 logging.info(fmt.format(cmd.tracing_session_id))
1579
1580 # As of this version, all the tracing session's stream infos are
1581 # always given to the viewer when sending the "attach to tracing
1582 # session" reply, so there's nothing new here. Return the `HUP`
1583 # status as, if we're handling this command, the viewer consumed
1584 # all the existing data streams.
1585 status = _LttngLiveViewerGetNewStreamInfosReply.Status.HUP
1586 return _LttngLiveViewerGetNewStreamInfosReply(status, [])
1587
1588 def _handle_get_tracing_session_infos_command(
1589 self, cmd: _LttngLiveViewerGetTracingSessionInfosCommand
1590 ):
1591 logging.info('Handling "get tracing session infos" command.')
1592 infos = [
1593 tss.tracing_session_descriptor.info for tss in self._ts_states.values()
1594 ]
1595 infos.sort(key=lambda info: info.name)
1596 return _LttngLiveViewerGetTracingSessionInfosReply(infos)
1597
1598 def _handle_create_viewer_session_command(
1599 self, cmd: _LttngLiveViewerCreateViewerSessionCommand
1600 ):
1601 logging.info('Handling "create viewer session" command.')
1602 status = _LttngLiveViewerCreateViewerSessionReply.Status.OK
1603
1604 # This does nothing here. In the LTTng relay daemon, it
1605 # allocates the viewer session's state.
1606 return _LttngLiveViewerCreateViewerSessionReply(status)
1607
1608
1609 # An LTTng live TCP server.
1610 #
1611 # On creation, it binds to `localhost` on the TCP port `port` if not `None`, or
1612 # on an OS-assigned TCP port otherwise. It writes the decimal TCP port number
1613 # to a temporary port file. It renames the temporary port file to
1614 # `port_filename`.
1615 #
1616 # `tracing_session_descriptors` is a list of tracing session descriptors
1617 # (`LttngTracingSessionDescriptor`) to serve.
1618 #
1619 # This server accepts a single viewer (client).
1620 #
1621 # When the viewer closes the connection, the server's constructor
1622 # returns.
1623 class LttngLiveServer:
1624 def __init__(
1625 self,
1626 port: Optional[int],
1627 port_filename: str,
1628 tracing_session_descriptors: Iterable[LttngTracingSessionDescriptor],
1629 max_query_data_response_size: Optional[int],
1630 ):
1631 logging.info("Server configuration:")
1632
1633 logging.info(" Port file name: `{}`".format(port_filename))
1634
1635 if max_query_data_response_size is not None:
1636 logging.info(
1637 " Maximum response data query size: `{}`".format(
1638 max_query_data_response_size
1639 )
1640 )
1641
1642 for ts_descr in tracing_session_descriptors:
1643 info = ts_descr.info
1644 fmt = ' TS descriptor: name="{}", id={}, hostname="{}", live-timer-freq={}, client-count={}, stream-count={}:'
1645 logging.info(
1646 fmt.format(
1647 info.name,
1648 info.tracing_session_id,
1649 info.hostname,
1650 info.live_timer_freq,
1651 info.client_count,
1652 info.stream_count,
1653 )
1654 )
1655
1656 for trace in ts_descr.traces:
1657 logging.info(' Trace: path="{}"'.format(trace.path))
1658
1659 self._ts_descriptors = tracing_session_descriptors
1660 self._max_query_data_response_size = max_query_data_response_size
1661 self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1662 self._codec = _LttngLiveViewerProtocolCodec()
1663
1664 # Port 0: OS assigns an unused port
1665 serv_addr = ("localhost", port if port is not None else 0)
1666 self._sock.bind(serv_addr)
1667 self._write_port_to_file(port_filename)
1668
1669 try:
1670 self._listen()
1671 finally:
1672 self._sock.close()
1673 logging.info("Closed connection and socket.")
1674
1675 @property
1676 def _server_port(self):
1677 return self._sock.getsockname()[1]
1678
1679 def _recv_command(self):
1680 data = bytes()
1681
1682 while True:
1683 logging.info("Waiting for viewer command.")
1684 buf = self._conn.recv(128)
1685
1686 if not buf:
1687 logging.info("Client closed connection.")
1688
1689 if data:
1690 raise RuntimeError(
1691 "Client closed connection after having sent {} command bytes.".format(
1692 len(data)
1693 )
1694 )
1695
1696 return
1697
1698 logging.info("Received data from viewer: length={}".format(len(buf)))
1699
1700 data += buf
1701
1702 try:
1703 cmd = self._codec.decode(data)
1704 except struct.error as exc:
1705 raise RuntimeError("Malformed command: {}".format(exc)) from exc
1706
1707 if cmd is not None:
1708 logging.info(
1709 "Received command from viewer: cmd-cls-name={}".format(
1710 cmd.__class__.__name__
1711 )
1712 )
1713 return cmd
1714
1715 def _send_reply(self, reply: _LttngLiveViewerReply):
1716 data = self._codec.encode(reply)
1717 logging.info(
1718 "Sending reply to viewer: reply-cls-name={}, length={}".format(
1719 reply.__class__.__name__, len(data)
1720 )
1721 )
1722 self._conn.sendall(data)
1723
1724 def _handle_connection(self):
1725 # First command must be "connect"
1726 cmd = self._recv_command()
1727
1728 if type(cmd) is not _LttngLiveViewerConnectCommand:
1729 raise RuntimeError(
1730 'First command is not "connect": cmd-cls-name={}'.format(
1731 cmd.__class__.__name__
1732 )
1733 )
1734
1735 # Create viewer session (arbitrary ID 23)
1736 logging.info(
1737 "LTTng live viewer connected: version={}.{}".format(cmd.major, cmd.minor)
1738 )
1739 viewer_session = _LttngLiveViewerSession(
1740 23, self._ts_descriptors, self._max_query_data_response_size
1741 )
1742
1743 # Send "connect" reply
1744 self._send_reply(
1745 _LttngLiveViewerConnectReply(viewer_session.viewer_session_id, 2, 10)
1746 )
1747
1748 # Make the viewer session handle the remaining commands
1749 while True:
1750 cmd = self._recv_command()
1751
1752 if cmd is None:
1753 # Connection closed (at an expected location within the
1754 # conversation)
1755 return
1756
1757 self._send_reply(viewer_session.handle_command(cmd))
1758
1759 def _listen(self):
1760 logging.info("Listening: port={}".format(self._server_port))
1761 # Backlog must be present for Python version < 3.5.
1762 # 128 is an arbitrary number since we expect only 1 connection anyway.
1763 self._sock.listen(128)
1764 self._conn, viewer_addr = self._sock.accept()
1765 logging.info(
1766 "Accepted viewer: addr={}:{}".format(viewer_addr[0], viewer_addr[1])
1767 )
1768
1769 try:
1770 self._handle_connection()
1771 finally:
1772 self._conn.close()
1773
1774 def _write_port_to_file(self, port_filename: str):
1775 # Write the port number to a temporary file.
1776 with tempfile.NamedTemporaryFile(mode="w", delete=False) as tmp_port_file:
1777 print(self._server_port, end="", file=tmp_port_file)
1778
1779 # Rename temporary file to real file
1780 os.replace(tmp_port_file.name, port_filename)
1781 logging.info(
1782 'Renamed port file: src-path="{}", dst-path="{}"'.format(
1783 tmp_port_file.name, port_filename
1784 )
1785 )
1786
1787
1788 def _session_descriptors_from_path(
1789 sessions_filename: str, trace_path_prefix: Optional[str]
1790 ):
1791 # File format is:
1792 #
1793 # [
1794 # {
1795 # "name": "my-session",
1796 # "id": 17,
1797 # "hostname": "myhost",
1798 # "live-timer-freq": 1000000,
1799 # "client-count": 23,
1800 # "traces": [
1801 # {
1802 # "path": "lol"
1803 # },
1804 # {
1805 # "path": "meow/mix",
1806 # "beacons": {
1807 # "my_stream": [ 5235787, 728375283 ]
1808 # },
1809 # "metadata-sections": [
1810 # {
1811 # "line": 1,
1812 # "timestamp": 0
1813 # }
1814 # ]
1815 # }
1816 # ]
1817 # }
1818 # ]
1819 with open(sessions_filename, "r") as sessions_file:
1820 sessions_json = tjson.load(sessions_file, tjson.ArrayVal)
1821
1822 sessions = [] # type: list[LttngTracingSessionDescriptor]
1823
1824 for session_json in sessions_json.iter(tjson.ObjVal):
1825 name = session_json.at("name", tjson.StrVal).val
1826 tracing_session_id = session_json.at("id", tjson.IntVal).val
1827 hostname = session_json.at("hostname", tjson.StrVal).val
1828 live_timer_freq = session_json.at("live-timer-freq", tjson.IntVal).val
1829 client_count = session_json.at("client-count", tjson.IntVal).val
1830 traces_json = session_json.at("traces", tjson.ArrayVal)
1831
1832 traces = [] # type: list[LttngTrace]
1833
1834 for trace_json in traces_json.iter(tjson.ObjVal):
1835 metadata_sections = (
1836 trace_json.at("metadata-sections", tjson.ArrayVal)
1837 if "metadata-sections" in trace_json
1838 else None
1839 )
1840 beacons = (
1841 trace_json.at("beacons", tjson.ObjVal)
1842 if "beacons" in trace_json
1843 else None
1844 )
1845 path = trace_json.at("path", tjson.StrVal).val
1846
1847 if not os.path.isabs(path) and trace_path_prefix:
1848 path = os.path.join(trace_path_prefix, path)
1849
1850 traces.append(
1851 LttngTrace(
1852 path,
1853 metadata_sections,
1854 beacons,
1855 )
1856 )
1857
1858 sessions.append(
1859 LttngTracingSessionDescriptor(
1860 name,
1861 tracing_session_id,
1862 hostname,
1863 live_timer_freq,
1864 client_count,
1865 traces,
1866 )
1867 )
1868
1869 return sessions
1870
1871
1872 def _loglevel_parser(string: str):
1873 loglevels = {"info": logging.INFO, "warning": logging.WARNING}
1874 if string not in loglevels:
1875 msg = "{} is not a valid loglevel".format(string)
1876 raise argparse.ArgumentTypeError(msg)
1877 return loglevels[string]
1878
1879
1880 if __name__ == "__main__":
1881 logging.basicConfig(format="# %(asctime)-25s%(message)s")
1882 parser = argparse.ArgumentParser(
1883 description="LTTng-live protocol mocker", add_help=False
1884 )
1885 parser.add_argument(
1886 "--log-level",
1887 default="warning",
1888 choices=["info", "warning"],
1889 help="The loglevel to be used.",
1890 )
1891
1892 loglevel_namespace, remaining_args = parser.parse_known_args()
1893 logging.getLogger().setLevel(_loglevel_parser(loglevel_namespace.log_level))
1894
1895 parser.add_argument(
1896 "--port",
1897 help="The port to bind to. If missing, use an OS-assigned port..",
1898 type=int,
1899 )
1900 parser.add_argument(
1901 "--port-filename",
1902 help="The final port file. This file is present when the server is ready to receive connection.",
1903 required=True,
1904 )
1905 parser.add_argument(
1906 "--max-query-data-response-size",
1907 type=int,
1908 help="The maximum size of control data response in bytes",
1909 )
1910 parser.add_argument(
1911 "--trace-path-prefix",
1912 type=str,
1913 help="Prefix to prepend to the trace paths of session configurations",
1914 )
1915 parser.add_argument(
1916 "--sessions-filename",
1917 type=str,
1918 help="Path to a session configuration file",
1919 )
1920 parser.add_argument(
1921 "-h",
1922 "--help",
1923 action="help",
1924 default=argparse.SUPPRESS,
1925 help="Show this help message and exit.",
1926 )
1927
1928 args = parser.parse_args(args=remaining_args)
1929 sessions_filename = args.sessions_filename # type: str
1930 trace_path_prefix = args.trace_path_prefix # type: str | None
1931 sessions = _session_descriptors_from_path(
1932 sessions_filename,
1933 trace_path_prefix,
1934 )
1935
1936 port = args.port # type: int | None
1937 port_filename = args.port_filename # type: str
1938 max_query_data_response_size = args.max_query_data_response_size # type: int | None
1939 LttngLiveServer(port, port_filename, sessions, max_query_data_response_size)
This page took 0.067286 seconds and 3 git commands to generate.