tests: add typing annotations to lttng_live_server.py
[babeltrace.git] / tests / data / plugins / src.ctf.lttng-live / lttng_live_server.py
CommitLineData
0235b0db 1# SPDX-License-Identifier: MIT
584af91e 2#
0235b0db 3# Copyright (C) 2019 Philippe Proulx <pproulx@efficios.com>
584af91e 4#
584af91e 5
aca7de76
SM
6# pyright: strict, reportTypeCommentUsage=false, reportMissingTypeStubs=false
7
584af91e 8import os
584af91e 9import re
5995b304 10import sys
584af91e
PP
11import socket
12import struct
5995b304
SM
13import logging
14import os.path
15import argparse
584af91e 16import tempfile
aca7de76
SM
17from typing import Dict, Union, Iterable, Optional, Sequence, overload
18
19import tjson
20
21# isort: off
22from typing import Any, Callable # noqa: F401
23
24# isort: on
584af91e
PP
25
26
27class UnexpectedInput(RuntimeError):
28 pass
29
30
ee1171e5
SM
31# An entry within the index of an LTTng data stream.
32class _LttngDataStreamIndexEntry:
33 def __init__(
34 self,
aca7de76
SM
35 offset_bytes: int,
36 total_size_bits: int,
37 content_size_bits: int,
38 timestamp_begin: int,
39 timestamp_end: int,
40 events_discarded: int,
41 stream_class_id: int,
ee1171e5
SM
42 ):
43 self._offset_bytes = offset_bytes
44 self._total_size_bits = total_size_bits
45 self._content_size_bits = content_size_bits
46 self._timestamp_begin = timestamp_begin
47 self._timestamp_end = timestamp_end
48 self._events_discarded = events_discarded
49 self._stream_class_id = stream_class_id
50
51 @property
52 def offset_bytes(self):
53 return self._offset_bytes
54
55 @property
56 def total_size_bits(self):
57 return self._total_size_bits
58
59 @property
60 def total_size_bytes(self):
61 return self._total_size_bits // 8
62
63 @property
64 def content_size_bits(self):
65 return self._content_size_bits
66
67 @property
68 def content_size_bytes(self):
69 return self._content_size_bits // 8
70
71 @property
72 def timestamp_begin(self):
73 return self._timestamp_begin
74
75 @property
76 def timestamp_end(self):
77 return self._timestamp_end
78
79 @property
80 def events_discarded(self):
81 return self._events_discarded
82
83 @property
84 def stream_class_id(self):
85 return self._stream_class_id
86
87
88# An entry within the index of an LTTng data stream. While a stream beacon entry
89# is conceptually unrelated to an index, it is sent as a reply to a
90# LttngLiveViewerGetNextDataStreamIndexEntryCommand
aca7de76
SM
91class _LttngDataStreamBeaconIndexEntry:
92 def __init__(self, stream_class_id: int, timestamp: int):
ee1171e5
SM
93 self._stream_class_id = stream_class_id
94 self._timestamp = timestamp
95
96 @property
97 def timestamp(self):
98 return self._timestamp
99
100 @property
101 def stream_class_id(self):
102 return self._stream_class_id
103
104
aca7de76
SM
105_LttngIndexEntryT = Union[_LttngDataStreamIndexEntry, _LttngDataStreamBeaconIndexEntry]
106
107
584af91e 108class _LttngLiveViewerCommand:
aca7de76 109 def __init__(self, version: int):
584af91e
PP
110 self._version = version
111
112 @property
113 def version(self):
114 return self._version
115
116
117class _LttngLiveViewerConnectCommand(_LttngLiveViewerCommand):
aca7de76 118 def __init__(self, version: int, viewer_session_id: int, major: int, minor: int):
584af91e
PP
119 super().__init__(version)
120 self._viewer_session_id = viewer_session_id
121 self._major = major
122 self._minor = minor
123
124 @property
125 def viewer_session_id(self):
126 return self._viewer_session_id
127
128 @property
129 def major(self):
130 return self._major
131
132 @property
133 def minor(self):
134 return self._minor
135
136
aca7de76
SM
137class _LttngLiveViewerReply:
138 pass
139
140
141class _LttngLiveViewerConnectReply(_LttngLiveViewerReply):
142 def __init__(self, viewer_session_id: int, major: int, minor: int):
584af91e
PP
143 self._viewer_session_id = viewer_session_id
144 self._major = major
145 self._minor = minor
146
147 @property
148 def viewer_session_id(self):
149 return self._viewer_session_id
150
151 @property
152 def major(self):
153 return self._major
154
155 @property
156 def minor(self):
157 return self._minor
158
159
160class _LttngLiveViewerGetTracingSessionInfosCommand(_LttngLiveViewerCommand):
161 pass
162
163
164class _LttngLiveViewerTracingSessionInfo:
165 def __init__(
166 self,
aca7de76
SM
167 tracing_session_id: int,
168 live_timer_freq: int,
169 client_count: int,
170 stream_count: int,
171 hostname: str,
172 name: str,
584af91e
PP
173 ):
174 self._tracing_session_id = tracing_session_id
175 self._live_timer_freq = live_timer_freq
176 self._client_count = client_count
177 self._stream_count = stream_count
178 self._hostname = hostname
179 self._name = name
180
181 @property
182 def tracing_session_id(self):
183 return self._tracing_session_id
184
185 @property
186 def live_timer_freq(self):
187 return self._live_timer_freq
188
189 @property
190 def client_count(self):
191 return self._client_count
192
193 @property
194 def stream_count(self):
195 return self._stream_count
196
197 @property
198 def hostname(self):
199 return self._hostname
200
201 @property
202 def name(self):
203 return self._name
204
205
aca7de76
SM
206class _LttngLiveViewerGetTracingSessionInfosReply(_LttngLiveViewerReply):
207 def __init__(
208 self, tracing_session_infos: Sequence[_LttngLiveViewerTracingSessionInfo]
209 ):
584af91e
PP
210 self._tracing_session_infos = tracing_session_infos
211
212 @property
213 def tracing_session_infos(self):
214 return self._tracing_session_infos
215
216
217class _LttngLiveViewerAttachToTracingSessionCommand(_LttngLiveViewerCommand):
218 class SeekType:
219 BEGINNING = 1
220 LAST = 2
221
aca7de76
SM
222 def __init__(
223 self, version: int, tracing_session_id: int, offset: int, seek_type: int
224 ):
584af91e
PP
225 super().__init__(version)
226 self._tracing_session_id = tracing_session_id
227 self._offset = offset
228 self._seek_type = seek_type
229
230 @property
231 def tracing_session_id(self):
232 return self._tracing_session_id
233
234 @property
235 def offset(self):
236 return self._offset
237
238 @property
239 def seek_type(self):
240 return self._seek_type
241
242
243class _LttngLiveViewerStreamInfo:
aca7de76
SM
244 def __init__(
245 self, id: int, trace_id: int, is_metadata: bool, path: str, channel_name: str
246 ):
584af91e
PP
247 self._id = id
248 self._trace_id = trace_id
249 self._is_metadata = is_metadata
250 self._path = path
251 self._channel_name = channel_name
252
253 @property
254 def id(self):
255 return self._id
256
257 @property
258 def trace_id(self):
259 return self._trace_id
260
261 @property
262 def is_metadata(self):
263 return self._is_metadata
264
265 @property
266 def path(self):
267 return self._path
268
269 @property
270 def channel_name(self):
271 return self._channel_name
272
273
aca7de76 274class _LttngLiveViewerAttachToTracingSessionReply(_LttngLiveViewerReply):
584af91e
PP
275 class Status:
276 OK = 1
277 ALREADY = 2
278 UNKNOWN = 3
279 NOT_LIVE = 4
280 SEEK_ERROR = 5
281 NO_SESSION = 6
282
aca7de76 283 def __init__(self, status: int, stream_infos: Sequence[_LttngLiveViewerStreamInfo]):
584af91e
PP
284 self._status = status
285 self._stream_infos = stream_infos
286
287 @property
288 def status(self):
289 return self._status
290
291 @property
292 def stream_infos(self):
293 return self._stream_infos
294
295
296class _LttngLiveViewerGetNextDataStreamIndexEntryCommand(_LttngLiveViewerCommand):
aca7de76 297 def __init__(self, version: int, stream_id: int):
584af91e
PP
298 super().__init__(version)
299 self._stream_id = stream_id
300
301 @property
302 def stream_id(self):
303 return self._stream_id
304
305
aca7de76 306class _LttngLiveViewerGetNextDataStreamIndexEntryReply(_LttngLiveViewerReply):
584af91e
PP
307 class Status:
308 OK = 1
309 RETRY = 2
310 HUP = 3
311 ERROR = 4
312 INACTIVE = 5
313 EOF = 6
314
aca7de76
SM
315 def __init__(
316 self,
317 status: int,
318 index_entry: _LttngIndexEntryT,
319 has_new_metadata: bool,
320 has_new_data_stream: bool,
321 ):
584af91e
PP
322 self._status = status
323 self._index_entry = index_entry
324 self._has_new_metadata = has_new_metadata
325 self._has_new_data_stream = has_new_data_stream
326
327 @property
328 def status(self):
329 return self._status
330
331 @property
332 def index_entry(self):
333 return self._index_entry
334
335 @property
336 def has_new_metadata(self):
337 return self._has_new_metadata
338
339 @property
340 def has_new_data_stream(self):
341 return self._has_new_data_stream
342
343
344class _LttngLiveViewerGetDataStreamPacketDataCommand(_LttngLiveViewerCommand):
aca7de76 345 def __init__(self, version: int, stream_id: int, offset: int, req_length: int):
584af91e
PP
346 super().__init__(version)
347 self._stream_id = stream_id
348 self._offset = offset
349 self._req_length = req_length
350
351 @property
352 def stream_id(self):
353 return self._stream_id
354
355 @property
356 def offset(self):
357 return self._offset
358
359 @property
360 def req_length(self):
361 return self._req_length
362
363
aca7de76 364class _LttngLiveViewerGetDataStreamPacketDataReply(_LttngLiveViewerReply):
584af91e
PP
365 class Status:
366 OK = 1
367 RETRY = 2
368 ERROR = 3
369 EOF = 4
370
aca7de76
SM
371 def __init__(
372 self,
373 status: int,
374 data: bytes,
375 has_new_metadata: bool,
376 has_new_data_stream: bool,
377 ):
584af91e
PP
378 self._status = status
379 self._data = data
380 self._has_new_metadata = has_new_metadata
381 self._has_new_data_stream = has_new_data_stream
382
383 @property
384 def status(self):
385 return self._status
386
387 @property
388 def data(self):
389 return self._data
390
391 @property
392 def has_new_metadata(self):
393 return self._has_new_metadata
394
395 @property
396 def has_new_data_stream(self):
397 return self._has_new_data_stream
398
399
400class _LttngLiveViewerGetMetadataStreamDataCommand(_LttngLiveViewerCommand):
aca7de76 401 def __init__(self, version: int, stream_id: int):
584af91e
PP
402 super().__init__(version)
403 self._stream_id = stream_id
404
405 @property
406 def stream_id(self):
407 return self._stream_id
408
409
aca7de76 410class _LttngLiveViewerGetMetadataStreamDataContentReply(_LttngLiveViewerReply):
584af91e
PP
411 class Status:
412 OK = 1
413 NO_NEW = 2
414 ERROR = 3
415
aca7de76 416 def __init__(self, status: int, data: bytes):
584af91e
PP
417 self._status = status
418 self._data = data
419
420 @property
421 def status(self):
422 return self._status
423
424 @property
425 def data(self):
426 return self._data
427
428
429class _LttngLiveViewerGetNewStreamInfosCommand(_LttngLiveViewerCommand):
aca7de76 430 def __init__(self, version: int, tracing_session_id: int):
584af91e
PP
431 super().__init__(version)
432 self._tracing_session_id = tracing_session_id
433
434 @property
435 def tracing_session_id(self):
436 return self._tracing_session_id
437
438
aca7de76 439class _LttngLiveViewerGetNewStreamInfosReply(_LttngLiveViewerReply):
584af91e
PP
440 class Status:
441 OK = 1
442 NO_NEW = 2
443 ERROR = 3
444 HUP = 4
445
aca7de76 446 def __init__(self, status: int, stream_infos: Sequence[_LttngLiveViewerStreamInfo]):
584af91e
PP
447 self._status = status
448 self._stream_infos = stream_infos
449
450 @property
451 def status(self):
452 return self._status
453
454 @property
455 def stream_infos(self):
456 return self._stream_infos
457
458
459class _LttngLiveViewerCreateViewerSessionCommand(_LttngLiveViewerCommand):
460 pass
461
462
aca7de76 463class _LttngLiveViewerCreateViewerSessionReply(_LttngLiveViewerReply):
584af91e
PP
464 class Status:
465 OK = 1
466 ERROR = 2
467
aca7de76 468 def __init__(self, status: int):
584af91e
PP
469 self._status = status
470
471 @property
472 def status(self):
473 return self._status
474
475
476class _LttngLiveViewerDetachFromTracingSessionCommand(_LttngLiveViewerCommand):
aca7de76 477 def __init__(self, version: int, tracing_session_id: int):
584af91e
PP
478 super().__init__(version)
479 self._tracing_session_id = tracing_session_id
480
481 @property
482 def tracing_session_id(self):
483 return self._tracing_session_id
484
485
aca7de76 486class _LttngLiveViewerDetachFromTracingSessionReply(_LttngLiveViewerReply):
584af91e
PP
487 class Status:
488 OK = 1
489 UNKNOWN = 2
490 ERROR = 3
491
aca7de76 492 def __init__(self, status: int):
584af91e
PP
493 self._status = status
494
495 @property
496 def status(self):
497 return self._status
498
499
500# An LTTng live protocol codec can convert bytes to command objects and
501# reply objects to bytes.
502class _LttngLiveViewerProtocolCodec:
f5567ea8 503 _COMMAND_HEADER_STRUCT_FMT = "QII"
584af91e
PP
504 _COMMAND_HEADER_SIZE_BYTES = struct.calcsize(_COMMAND_HEADER_STRUCT_FMT)
505
506 def __init__(self):
507 pass
508
aca7de76 509 def _unpack(self, fmt: str, data: bytes, offset: int = 0):
f5567ea8 510 fmt = "!" + fmt
584af91e
PP
511 return struct.unpack_from(fmt, data, offset)
512
aca7de76 513 def _unpack_payload(self, fmt: str, data: bytes):
584af91e
PP
514 return self._unpack(
515 fmt, data, _LttngLiveViewerProtocolCodec._COMMAND_HEADER_SIZE_BYTES
516 )
517
aca7de76 518 def decode(self, data: bytes):
584af91e
PP
519 if len(data) < self._COMMAND_HEADER_SIZE_BYTES:
520 # Not enough data to read the command header
521 return
522
523 payload_size, cmd_type, version = self._unpack(
524 self._COMMAND_HEADER_STRUCT_FMT, data
525 )
526 logging.info(
f5567ea8 527 "Decoded command header: payload-size={}, cmd-type={}, version={}".format(
584af91e
PP
528 payload_size, cmd_type, version
529 )
530 )
531
532 if len(data) < self._COMMAND_HEADER_SIZE_BYTES + payload_size:
533 # Not enough data to read the whole command
534 return
535
536 if cmd_type == 1:
aca7de76 537 viewer_session_id, major, minor, _ = self._unpack_payload("QIII", data)
584af91e
PP
538 return _LttngLiveViewerConnectCommand(
539 version, viewer_session_id, major, minor
540 )
541 elif cmd_type == 2:
542 return _LttngLiveViewerGetTracingSessionInfosCommand(version)
543 elif cmd_type == 3:
f5567ea8 544 tracing_session_id, offset, seek_type = self._unpack_payload("QQI", data)
584af91e
PP
545 return _LttngLiveViewerAttachToTracingSessionCommand(
546 version, tracing_session_id, offset, seek_type
547 )
548 elif cmd_type == 4:
f5567ea8 549 (stream_id,) = self._unpack_payload("Q", data)
584af91e
PP
550 return _LttngLiveViewerGetNextDataStreamIndexEntryCommand(
551 version, stream_id
552 )
553 elif cmd_type == 5:
f5567ea8 554 stream_id, offset, req_length = self._unpack_payload("QQI", data)
584af91e
PP
555 return _LttngLiveViewerGetDataStreamPacketDataCommand(
556 version, stream_id, offset, req_length
557 )
558 elif cmd_type == 6:
f5567ea8 559 (stream_id,) = self._unpack_payload("Q", data)
584af91e
PP
560 return _LttngLiveViewerGetMetadataStreamDataCommand(version, stream_id)
561 elif cmd_type == 7:
f5567ea8 562 (tracing_session_id,) = self._unpack_payload("Q", data)
584af91e
PP
563 return _LttngLiveViewerGetNewStreamInfosCommand(version, tracing_session_id)
564 elif cmd_type == 8:
565 return _LttngLiveViewerCreateViewerSessionCommand(version)
566 elif cmd_type == 9:
f5567ea8 567 (tracing_session_id,) = self._unpack_payload("Q", data)
584af91e
PP
568 return _LttngLiveViewerDetachFromTracingSessionCommand(
569 version, tracing_session_id
570 )
571 else:
f5567ea8 572 raise UnexpectedInput("Unknown command type {}".format(cmd_type))
584af91e 573
aca7de76 574 def _pack(self, fmt: str, *args: Any):
584af91e 575 # Force network byte order
f5567ea8 576 return struct.pack("!" + fmt, *args)
584af91e 577
aca7de76 578 def _encode_zero_padded_str(self, string: str, length: int):
584af91e 579 data = string.encode()
f5567ea8 580 return data.ljust(length, b"\x00")
584af91e 581
aca7de76 582 def _encode_stream_info(self, info: _LttngLiveViewerStreamInfo):
f5567ea8 583 data = self._pack("QQI", info.id, info.trace_id, int(info.is_metadata))
584af91e
PP
584 data += self._encode_zero_padded_str(info.path, 4096)
585 data += self._encode_zero_padded_str(info.channel_name, 255)
586 return data
587
aca7de76
SM
588 def _get_has_new_stuff_flags(
589 self, has_new_metadata: bool, has_new_data_streams: bool
590 ):
584af91e
PP
591 flags = 0
592
593 if has_new_metadata:
594 flags |= 1
595
596 if has_new_data_streams:
597 flags |= 2
598
599 return flags
600
aca7de76
SM
601 def encode(
602 self,
603 reply: _LttngLiveViewerReply,
604 ) -> bytes:
584af91e
PP
605 if type(reply) is _LttngLiveViewerConnectReply:
606 data = self._pack(
f5567ea8 607 "QIII", reply.viewer_session_id, reply.major, reply.minor, 2
584af91e
PP
608 )
609 elif type(reply) is _LttngLiveViewerGetTracingSessionInfosReply:
f5567ea8 610 data = self._pack("I", len(reply.tracing_session_infos))
584af91e
PP
611
612 for info in reply.tracing_session_infos:
613 data += self._pack(
f5567ea8 614 "QIII",
584af91e
PP
615 info.tracing_session_id,
616 info.live_timer_freq,
617 info.client_count,
618 info.stream_count,
619 )
620 data += self._encode_zero_padded_str(info.hostname, 64)
621 data += self._encode_zero_padded_str(info.name, 255)
622 elif type(reply) is _LttngLiveViewerAttachToTracingSessionReply:
f5567ea8 623 data = self._pack("II", reply.status, len(reply.stream_infos))
584af91e
PP
624
625 for info in reply.stream_infos:
626 data += self._encode_stream_info(info)
627 elif type(reply) is _LttngLiveViewerGetNextDataStreamIndexEntryReply:
f5567ea8 628 index_format = "QQQQQQQII"
584af91e
PP
629 entry = reply.index_entry
630 flags = self._get_has_new_stuff_flags(
631 reply.has_new_metadata, reply.has_new_data_stream
632 )
633
aca7de76 634 if isinstance(entry, _LttngDataStreamIndexEntry):
71f56e5f
JG
635 data = self._pack(
636 index_format,
637 entry.offset_bytes,
638 entry.total_size_bits,
639 entry.content_size_bits,
640 entry.timestamp_begin,
641 entry.timestamp_end,
642 entry.events_discarded,
643 entry.stream_class_id,
644 reply.status,
645 flags,
646 )
647 else:
71f56e5f
JG
648 data = self._pack(
649 index_format,
650 0,
651 0,
652 0,
653 0,
654 entry.timestamp,
655 0,
656 entry.stream_class_id,
657 reply.status,
658 flags,
659 )
584af91e
PP
660 elif type(reply) is _LttngLiveViewerGetDataStreamPacketDataReply:
661 flags = self._get_has_new_stuff_flags(
662 reply.has_new_metadata, reply.has_new_data_stream
663 )
f5567ea8 664 data = self._pack("III", reply.status, len(reply.data), flags)
584af91e
PP
665 data += reply.data
666 elif type(reply) is _LttngLiveViewerGetMetadataStreamDataContentReply:
f5567ea8 667 data = self._pack("QI", len(reply.data), reply.status)
584af91e
PP
668 data += reply.data
669 elif type(reply) is _LttngLiveViewerGetNewStreamInfosReply:
f5567ea8 670 data = self._pack("II", reply.status, len(reply.stream_infos))
584af91e
PP
671
672 for info in reply.stream_infos:
673 data += self._encode_stream_info(info)
674 elif type(reply) is _LttngLiveViewerCreateViewerSessionReply:
f5567ea8 675 data = self._pack("I", reply.status)
584af91e 676 elif type(reply) is _LttngLiveViewerDetachFromTracingSessionReply:
f5567ea8 677 data = self._pack("I", reply.status)
584af91e
PP
678 else:
679 raise ValueError(
f5567ea8 680 "Unknown reply object with class `{}`".format(reply.__class__.__name__)
584af91e
PP
681 )
682
683 return data
684
685
aca7de76
SM
686def _get_entry_timestamp_begin(
687 entry: _LttngIndexEntryT,
688):
689 if isinstance(entry, _LttngDataStreamBeaconIndexEntry):
78169723
FD
690 return entry.timestamp
691 else:
78169723
FD
692 return entry.timestamp_begin
693
694
584af91e 695# The index of an LTTng data stream, a sequence of index entries.
aca7de76
SM
696class _LttngDataStreamIndex(Sequence[_LttngIndexEntryT]):
697 def __init__(self, path: str, beacons: Optional[tjson.ArrayVal]):
584af91e
PP
698 self._path = path
699 self._build()
71f56e5f
JG
700
701 if beacons:
702 stream_class_id = self._entries[0].stream_class_id
aca7de76
SM
703
704 beacons_list = [] # type: list[_LttngDataStreamBeaconIndexEntry]
705 for ts in beacons.iter(tjson.IntVal):
706 beacons_list.append(
707 _LttngDataStreamBeaconIndexEntry(stream_class_id, ts.val)
708 )
709
710 self._add_beacons(beacons_list)
71f56e5f 711
584af91e
PP
712 logging.info(
713 'Built data stream index entries: path="{}", count={}'.format(
714 path, len(self._entries)
715 )
716 )
717
718 def _build(self):
aca7de76 719 self._entries = [] # type: list[_LttngIndexEntryT]
584af91e
PP
720 assert os.path.isfile(self._path)
721
f5567ea8 722 with open(self._path, "rb") as f:
584af91e 723 # Read header first
f5567ea8 724 fmt = ">IIII"
584af91e
PP
725 size = struct.calcsize(fmt)
726 data = f.read(size)
727 assert len(data) == size
aca7de76 728 magic, _, _, index_entry_length = struct.unpack(fmt, data)
584af91e
PP
729 assert magic == 0xC1F1DCC1
730
731 # Read index entries
f5567ea8 732 fmt = ">QQQQQQQ"
584af91e
PP
733 size = struct.calcsize(fmt)
734
735 while True:
736 logging.debug(
737 'Decoding data stream index entry: path="{}", offset={}'.format(
738 self._path, f.tell()
739 )
740 )
741 data = f.read(size)
742
743 if not data:
744 # Done
745 break
746
747 assert len(data) == size
75882e97
FD
748 (
749 offset_bytes,
750 total_size_bits,
751 content_size_bits,
752 timestamp_begin,
753 timestamp_end,
754 events_discarded,
755 stream_class_id,
756 ) = struct.unpack(fmt, data)
584af91e
PP
757
758 self._entries.append(
759 _LttngDataStreamIndexEntry(
760 offset_bytes,
761 total_size_bits,
762 content_size_bits,
763 timestamp_begin,
764 timestamp_end,
765 events_discarded,
766 stream_class_id,
767 )
768 )
769
770 # Skip anything else before the next entry
771 f.seek(index_entry_length - size, os.SEEK_CUR)
772
aca7de76 773 def _add_beacons(self, beacons: Iterable[_LttngDataStreamBeaconIndexEntry]):
71f56e5f 774 # Assumes entries[n + 1].timestamp_end >= entries[n].timestamp_begin
aca7de76
SM
775 def sort_key(
776 entry: Union[_LttngDataStreamIndexEntry, _LttngDataStreamBeaconIndexEntry],
777 ) -> int:
778 if isinstance(entry, _LttngDataStreamBeaconIndexEntry):
71f56e5f
JG
779 return entry.timestamp
780 else:
781 return entry.timestamp_end
782
783 self._entries += beacons
784 self._entries.sort(key=sort_key)
785
aca7de76
SM
786 @overload
787 def __getitem__(self, index: int) -> _LttngIndexEntryT:
788 ...
789
790 @overload
791 def __getitem__(self, index: slice) -> Sequence[_LttngIndexEntryT]: # noqa: F811
792 ...
793
794 def __getitem__( # noqa: F811
795 self, index: Union[int, slice]
796 ) -> Union[_LttngIndexEntryT, Sequence[_LttngIndexEntryT],]:
584af91e
PP
797 return self._entries[index]
798
799 def __len__(self):
800 return len(self._entries)
801
802 @property
803 def path(self):
804 return self._path
805
806
807# An LTTng data stream.
808class _LttngDataStream:
aca7de76 809 def __init__(self, path: str, beacons_json: Optional[tjson.ArrayVal]):
584af91e
PP
810 self._path = path
811 filename = os.path.basename(path)
f5567ea8 812 match = re.match(r"(.*)_\d+", filename)
b8b97725
SM
813 if not match:
814 raise RuntimeError(
815 "Unexpected data stream file name pattern: {}".format(filename)
816 )
817
584af91e
PP
818 self._channel_name = match.group(1)
819 trace_dir = os.path.dirname(path)
f5567ea8 820 index_path = os.path.join(trace_dir, "index", filename + ".idx")
aca7de76 821 self._index = _LttngDataStreamIndex(index_path, beacons_json)
584af91e 822 assert os.path.isfile(path)
f5567ea8 823 self._file = open(path, "rb")
584af91e
PP
824 logging.info(
825 'Built data stream: path="{}", channel-name="{}"'.format(
826 path, self._channel_name
827 )
828 )
829
830 @property
831 def path(self):
832 return self._path
833
834 @property
835 def channel_name(self):
836 return self._channel_name
837
838 @property
839 def index(self):
840 return self._index
841
aca7de76 842 def get_data(self, offset_bytes: int, len_bytes: int):
584af91e
PP
843 self._file.seek(offset_bytes)
844 return self._file.read(len_bytes)
845
846
78169723 847class _LttngMetadataStreamSection:
aca7de76 848 def __init__(self, timestamp: int, data: Optional[bytes]):
78169723
FD
849 self._timestamp = timestamp
850 if data is None:
851 self._data = bytes()
852 else:
853 self._data = data
854 logging.info(
f5567ea8 855 "Built metadata stream section: ts={}, data-len={}".format(
78169723
FD
856 self._timestamp, len(self._data)
857 )
858 )
859
860 @property
861 def timestamp(self):
862 return self._timestamp
863
864 @property
865 def data(self):
866 return self._data
867
868
584af91e
PP
869# An LTTng metadata stream.
870class _LttngMetadataStream:
aca7de76
SM
871 def __init__(
872 self,
873 metadata_file_path: str,
874 config_sections: Sequence[_LttngMetadataStreamSection],
875 ):
78169723
FD
876 self._path = metadata_file_path
877 self._sections = config_sections
878 logging.info(
f5567ea8 879 "Built metadata stream: path={}, section-len={}".format(
78169723
FD
880 self._path, len(self._sections)
881 )
882 )
584af91e
PP
883
884 @property
885 def path(self):
886 return self._path
887
888 @property
78169723
FD
889 def sections(self):
890 return self._sections
584af91e 891
78169723 892
aca7de76
SM
893class LttngMetadataConfigSection:
894 def __init__(self, line: int, timestamp: int, is_empty: bool):
895 self._line = line
896 self._timestamp = timestamp
897 self._is_empty = is_empty
898
899 @property
900 def line(self):
901 return self._line
78169723 902
aca7de76
SM
903 @property
904 def timestamp(self):
905 return self._timestamp
78169723 906
aca7de76
SM
907 @property
908 def is_empty(self):
909 return self._is_empty
910
911
912def _parse_metadata_sections_config(metadata_sections_json: tjson.ArrayVal):
913 metadata_sections = [] # type: list[LttngMetadataConfigSection]
78169723
FD
914 append_empty_section = False
915 last_timestamp = 0
916 last_line = 0
917
aca7de76
SM
918 for section in metadata_sections_json:
919 if isinstance(section, tjson.StrVal):
920 if section.val == "empty":
921 # Found a empty section marker. Actually append the section at the
922 # timestamp of the next concrete section.
923 append_empty_section = True
924 else:
925 raise ValueError("Invalid string value at {}.".format(section.path))
926 elif isinstance(section, tjson.ObjVal):
927 line = section.at("line", tjson.IntVal).val
928 ts = section.at("timestamp", tjson.IntVal).val
b8b97725 929
78169723
FD
930 # Sections' timestamps and lines must both be increasing.
931 assert ts > last_timestamp
932 last_timestamp = ts
aca7de76 933
78169723
FD
934 assert line > last_line
935 last_line = line
936
937 if append_empty_section:
aca7de76 938 metadata_sections.append(LttngMetadataConfigSection(line, ts, True))
78169723
FD
939 append_empty_section = False
940
aca7de76
SM
941 metadata_sections.append(LttngMetadataConfigSection(line, ts, False))
942 else:
943 raise TypeError(
944 "`{}`: expecting a string or object value".format(section.path)
945 )
78169723 946
aca7de76 947 return metadata_sections
78169723 948
78169723 949
aca7de76
SM
950def _split_metadata_sections(
951 metadata_file_path: str, metadata_sections_json: tjson.ArrayVal
952):
953 metadata_sections = _parse_metadata_sections_config(metadata_sections_json)
78169723 954
aca7de76 955 sections = [] # type: list[_LttngMetadataStreamSection]
f5567ea8 956 with open(metadata_file_path, "r") as metadata_file:
78169723
FD
957 metadata_lines = [line for line in metadata_file]
958
aca7de76 959 metadata_section_idx = 0
78169723
FD
960 curr_metadata_section = bytearray()
961
962 for idx, line_content in enumerate(metadata_lines):
963 # Add one to the index to convert from the zero-indexing of the
964 # enumerate() function to the one-indexing used by humans when
965 # viewing a text file.
966 curr_line_number = idx + 1
967
968 # If there are no more sections, simply append the line.
aca7de76 969 if metadata_section_idx + 1 >= len(metadata_sections):
f5567ea8 970 curr_metadata_section += bytearray(line_content, "utf8")
78169723
FD
971 continue
972
aca7de76 973 next_section_line_number = metadata_sections[metadata_section_idx + 1].line
78169723
FD
974
975 # If the next section begins at the current line, create a
976 # section with the metadata we gathered so far.
977 if curr_line_number >= next_section_line_number:
78169723
FD
978 # Flushing the metadata of the current section.
979 sections.append(
980 _LttngMetadataStreamSection(
aca7de76 981 metadata_sections[metadata_section_idx].timestamp,
78169723
FD
982 bytes(curr_metadata_section),
983 )
984 )
985
986 # Move to the next section.
aca7de76 987 metadata_section_idx += 1
78169723
FD
988
989 # Clear old content and append current line for the next section.
990 curr_metadata_section.clear()
f5567ea8 991 curr_metadata_section += bytearray(line_content, "utf8")
78169723
FD
992
993 # Append any empty sections.
aca7de76 994 while metadata_sections[metadata_section_idx].is_empty:
78169723
FD
995 sections.append(
996 _LttngMetadataStreamSection(
aca7de76 997 metadata_sections[metadata_section_idx].timestamp, None
78169723
FD
998 )
999 )
aca7de76 1000 metadata_section_idx += 1
78169723
FD
1001 else:
1002 # Append line_content to the current metadata section.
f5567ea8 1003 curr_metadata_section += bytearray(line_content, "utf8")
78169723
FD
1004
1005 # We iterated over all the lines of the metadata file. Close the current section.
1006 sections.append(
1007 _LttngMetadataStreamSection(
aca7de76 1008 metadata_sections[metadata_section_idx].timestamp,
78169723
FD
1009 bytes(curr_metadata_section),
1010 )
1011 )
1012
1013 return sections
584af91e
PP
1014
1015
aca7de76
SM
1016_StreamBeaconsT = Dict[str, Iterable[int]]
1017
1018
584af91e 1019# An LTTng trace, a sequence of LTTng data streams.
aca7de76
SM
1020class LttngTrace(Sequence[_LttngDataStream]):
1021 def __init__(
1022 self,
1023 trace_dir: str,
1024 metadata_sections_json: Optional[tjson.ArrayVal],
1025 beacons_json: Optional[tjson.ObjVal],
1026 ):
584af91e
PP
1027 assert os.path.isdir(trace_dir)
1028 self._path = trace_dir
aca7de76
SM
1029 self._create_metadata_stream(trace_dir, metadata_sections_json)
1030 self._create_data_streams(trace_dir, beacons_json)
584af91e
PP
1031 logging.info('Built trace: path="{}"'.format(trace_dir))
1032
aca7de76
SM
1033 def _create_data_streams(
1034 self, trace_dir: str, beacons_json: Optional[tjson.ObjVal]
1035 ):
1036 data_stream_paths = [] # type: list[str]
584af91e
PP
1037
1038 for filename in os.listdir(trace_dir):
1039 path = os.path.join(trace_dir, filename)
1040
1041 if not os.path.isfile(path):
1042 continue
1043
f5567ea8 1044 if filename.startswith("."):
584af91e
PP
1045 continue
1046
f5567ea8 1047 if filename == "metadata":
584af91e
PP
1048 continue
1049
1050 data_stream_paths.append(path)
1051
1052 data_stream_paths.sort()
aca7de76 1053 self._data_streams = [] # type: list[_LttngDataStream]
584af91e
PP
1054
1055 for data_stream_path in data_stream_paths:
71f56e5f 1056 stream_name = os.path.basename(data_stream_path)
aca7de76
SM
1057 this_beacons_json = None
1058 if beacons_json is not None and stream_name in beacons_json:
1059 this_beacons_json = beacons_json.at(stream_name, tjson.ArrayVal)
71f56e5f
JG
1060
1061 self._data_streams.append(
aca7de76 1062 _LttngDataStream(data_stream_path, this_beacons_json)
71f56e5f 1063 )
584af91e 1064
aca7de76
SM
1065 def _create_metadata_stream(
1066 self, trace_dir: str, metadata_sections_json: Optional[tjson.ArrayVal]
1067 ):
f5567ea8 1068 metadata_path = os.path.join(trace_dir, "metadata")
aca7de76 1069 metadata_sections = [] # type: list[_LttngMetadataStreamSection]
78169723 1070
aca7de76 1071 if metadata_sections_json is None:
f5567ea8 1072 with open(metadata_path, "rb") as metadata_file:
78169723
FD
1073 metadata_sections.append(
1074 _LttngMetadataStreamSection(0, metadata_file.read())
1075 )
1076 else:
1077 metadata_sections = _split_metadata_sections(
aca7de76 1078 metadata_path, metadata_sections_json
78169723
FD
1079 )
1080
1081 self._metadata_stream = _LttngMetadataStream(metadata_path, metadata_sections)
1082
584af91e
PP
1083 @property
1084 def path(self):
1085 return self._path
1086
1087 @property
1088 def metadata_stream(self):
1089 return self._metadata_stream
1090
aca7de76
SM
1091 @overload
1092 def __getitem__(self, index: int) -> _LttngDataStream:
1093 ...
1094
1095 @overload
1096 def __getitem__(self, index: slice) -> Sequence[_LttngDataStream]: # noqa: F811
1097 ...
1098
1099 def __getitem__( # noqa: F811
1100 self, index: Union[int, slice]
1101 ) -> Union[_LttngDataStream, Sequence[_LttngDataStream]]:
584af91e
PP
1102 return self._data_streams[index]
1103
1104 def __len__(self):
1105 return len(self._data_streams)
1106
1107
1108# The state of a single data stream.
1109class _LttngLiveViewerSessionDataStreamState:
aca7de76
SM
1110 def __init__(
1111 self,
1112 ts_state: "_LttngLiveViewerSessionTracingSessionState",
1113 info: _LttngLiveViewerStreamInfo,
1114 data_stream: _LttngDataStream,
1115 metadata_stream_id: int,
1116 ):
584af91e
PP
1117 self._ts_state = ts_state
1118 self._info = info
1119 self._data_stream = data_stream
78169723 1120 self._metadata_stream_id = metadata_stream_id
584af91e
PP
1121 self._cur_index_entry_index = 0
1122 fmt = 'Built data stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
1123 logging.info(
1124 fmt.format(
1125 info.id,
1126 ts_state.tracing_session_descriptor.info.tracing_session_id,
1127 ts_state.tracing_session_descriptor.info.name,
1128 data_stream.path,
1129 )
1130 )
1131
1132 @property
1133 def tracing_session_state(self):
1134 return self._ts_state
1135
1136 @property
1137 def info(self):
1138 return self._info
1139
1140 @property
1141 def data_stream(self):
1142 return self._data_stream
1143
1144 @property
1145 def cur_index_entry(self):
1146 if self._cur_index_entry_index == len(self._data_stream.index):
1147 return
1148
1149 return self._data_stream.index[self._cur_index_entry_index]
1150
aca7de76
SM
1151 @property
1152 def metadata_stream_id(self):
1153 return self._metadata_stream_id
1154
584af91e
PP
1155 def goto_next_index_entry(self):
1156 self._cur_index_entry_index += 1
1157
1158
1159# The state of a single metadata stream.
1160class _LttngLiveViewerSessionMetadataStreamState:
aca7de76
SM
1161 def __init__(
1162 self,
1163 ts_state: "_LttngLiveViewerSessionTracingSessionState",
1164 info: _LttngLiveViewerStreamInfo,
1165 metadata_stream: _LttngMetadataStream,
1166 ):
584af91e
PP
1167 self._ts_state = ts_state
1168 self._info = info
1169 self._metadata_stream = metadata_stream
78169723
FD
1170 self._cur_metadata_stream_section_index = 0
1171 if len(metadata_stream.sections) > 1:
1172 self._next_metadata_stream_section_timestamp = metadata_stream.sections[
1173 1
1174 ].timestamp
1175 else:
1176 self._next_metadata_stream_section_timestamp = None
1177
584af91e
PP
1178 self._is_sent = False
1179 fmt = 'Built metadata stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
1180 logging.info(
1181 fmt.format(
1182 info.id,
1183 ts_state.tracing_session_descriptor.info.tracing_session_id,
1184 ts_state.tracing_session_descriptor.info.name,
1185 metadata_stream.path,
1186 )
1187 )
1188
584af91e
PP
1189 @property
1190 def info(self):
1191 return self._info
1192
1193 @property
1194 def metadata_stream(self):
1195 return self._metadata_stream
1196
1197 @property
1198 def is_sent(self):
1199 return self._is_sent
1200
1201 @is_sent.setter
aca7de76 1202 def is_sent(self, value: bool):
584af91e
PP
1203 self._is_sent = value
1204
78169723
FD
1205 @property
1206 def cur_section(self):
1207 fmt = "Get current metadata section: section-idx={}"
1208 logging.info(fmt.format(self._cur_metadata_stream_section_index))
1209 if self._cur_metadata_stream_section_index == len(
1210 self._metadata_stream.sections
1211 ):
1212 return
1213
1214 return self._metadata_stream.sections[self._cur_metadata_stream_section_index]
1215
1216 def goto_next_section(self):
1217 self._cur_metadata_stream_section_index += 1
1218 if self.cur_section:
1219 self._next_metadata_stream_section_timestamp = self.cur_section.timestamp
1220 else:
1221 self._next_metadata_stream_section_timestamp = None
1222
1223 @property
1224 def next_section_timestamp(self):
1225 return self._next_metadata_stream_section_timestamp
1226
584af91e 1227
ee1171e5
SM
1228# A tracing session descriptor.
1229#
1230# In the constructor, `traces` is a list of LTTng traces (`LttngTrace`
1231# objects).
1232class LttngTracingSessionDescriptor:
1233 def __init__(
aca7de76
SM
1234 self,
1235 name: str,
1236 tracing_session_id: int,
1237 hostname: str,
1238 live_timer_freq: int,
1239 client_count: int,
1240 traces: Iterable[LttngTrace],
ee1171e5
SM
1241 ):
1242 for trace in traces:
1243 if name not in trace.path:
1244 fmt = "Tracing session name must be part of every trace path (`{}` not found in `{}`)"
1245 raise ValueError(fmt.format(name, trace.path))
1246
1247 self._traces = traces
1248 stream_count = sum([len(t) + 1 for t in traces])
1249 self._info = _LttngLiveViewerTracingSessionInfo(
1250 tracing_session_id,
1251 live_timer_freq,
1252 client_count,
1253 stream_count,
1254 hostname,
1255 name,
1256 )
1257
1258 @property
1259 def traces(self):
1260 return self._traces
1261
1262 @property
1263 def info(self):
1264 return self._info
1265
1266
584af91e
PP
1267# The state of a tracing session.
1268class _LttngLiveViewerSessionTracingSessionState:
aca7de76 1269 def __init__(self, tc_descr: LttngTracingSessionDescriptor, base_stream_id: int):
584af91e 1270 self._tc_descr = tc_descr
aca7de76
SM
1271 self._stream_infos = [] # type: list[_LttngLiveViewerStreamInfo]
1272 self._ds_states = {} # type: dict[int, _LttngLiveViewerSessionDataStreamState]
1273 self._ms_states = (
1274 {}
1275 ) # type: dict[int, _LttngLiveViewerSessionMetadataStreamState]
584af91e
PP
1276 stream_id = base_stream_id
1277
1278 for trace in tc_descr.traces:
1279 trace_id = stream_id * 1000
1280
78169723
FD
1281 # Metadata stream -> stream info and metadata stream state
1282 info = _LttngLiveViewerStreamInfo(
f5567ea8 1283 stream_id, trace_id, True, trace.metadata_stream.path, "metadata"
78169723
FD
1284 )
1285 self._stream_infos.append(info)
1286 self._ms_states[stream_id] = _LttngLiveViewerSessionMetadataStreamState(
1287 self, info, trace.metadata_stream
1288 )
1289 metadata_stream_id = stream_id
1290 stream_id += 1
1291
584af91e
PP
1292 # Data streams -> stream infos and data stream states
1293 for data_stream in trace:
1294 info = _LttngLiveViewerStreamInfo(
1295 stream_id,
1296 trace_id,
1297 False,
1298 data_stream.path,
1299 data_stream.channel_name,
1300 )
1301 self._stream_infos.append(info)
1302 self._ds_states[stream_id] = _LttngLiveViewerSessionDataStreamState(
78169723 1303 self, info, data_stream, metadata_stream_id
584af91e
PP
1304 )
1305 stream_id += 1
1306
584af91e
PP
1307 self._is_attached = False
1308 fmt = 'Built tracing session state: id={}, name="{}"'
1309 logging.info(fmt.format(tc_descr.info.tracing_session_id, tc_descr.info.name))
1310
1311 @property
1312 def tracing_session_descriptor(self):
1313 return self._tc_descr
1314
1315 @property
1316 def data_stream_states(self):
1317 return self._ds_states
1318
1319 @property
1320 def metadata_stream_states(self):
1321 return self._ms_states
1322
1323 @property
1324 def stream_infos(self):
1325 return self._stream_infos
1326
1327 @property
1328 def has_new_metadata(self):
1329 return any([not ms.is_sent for ms in self._ms_states.values()])
1330
1331 @property
1332 def is_attached(self):
1333 return self._is_attached
1334
1335 @is_attached.setter
aca7de76 1336 def is_attached(self, value: bool):
584af91e
PP
1337 self._is_attached = value
1338
1339
aca7de76
SM
1340def needs_new_metadata_section(
1341 metadata_stream_state: _LttngLiveViewerSessionMetadataStreamState,
1342 latest_timestamp: int,
1343):
78169723
FD
1344 if metadata_stream_state.next_section_timestamp is None:
1345 return False
1346
1347 if latest_timestamp >= metadata_stream_state.next_section_timestamp:
1348 return True
1349 else:
1350 return False
1351
1352
584af91e
PP
1353# An LTTng live viewer session manages a view on tracing sessions
1354# and replies to commands accordingly.
1355class _LttngLiveViewerSession:
1356 def __init__(
1357 self,
aca7de76
SM
1358 viewer_session_id: int,
1359 tracing_session_descriptors: Iterable[LttngTracingSessionDescriptor],
1360 max_query_data_response_size: Optional[int],
584af91e
PP
1361 ):
1362 self._viewer_session_id = viewer_session_id
aca7de76
SM
1363 self._ts_states = (
1364 {}
1365 ) # type: dict[int, _LttngLiveViewerSessionTracingSessionState]
1366 self._stream_states = (
1367 {}
1368 ) # type: dict[int, _LttngLiveViewerSessionDataStreamState | _LttngLiveViewerSessionMetadataStreamState]
584af91e
PP
1369 self._max_query_data_response_size = max_query_data_response_size
1370 total_stream_infos = 0
1371
1372 for ts_descr in tracing_session_descriptors:
1373 ts_state = _LttngLiveViewerSessionTracingSessionState(
1374 ts_descr, total_stream_infos
1375 )
1376 ts_id = ts_state.tracing_session_descriptor.info.tracing_session_id
1377 self._ts_states[ts_id] = ts_state
1378 total_stream_infos += len(ts_state.stream_infos)
1379
1380 # Update session's stream states to have the new states
1381 self._stream_states.update(ts_state.data_stream_states)
1382 self._stream_states.update(ts_state.metadata_stream_states)
1383
1384 self._command_handlers = {
1385 _LttngLiveViewerAttachToTracingSessionCommand: self._handle_attach_to_tracing_session_command,
1386 _LttngLiveViewerCreateViewerSessionCommand: self._handle_create_viewer_session_command,
1387 _LttngLiveViewerDetachFromTracingSessionCommand: self._handle_detach_from_tracing_session_command,
1388 _LttngLiveViewerGetDataStreamPacketDataCommand: self._handle_get_data_stream_packet_data_command,
1389 _LttngLiveViewerGetMetadataStreamDataCommand: self._handle_get_metadata_stream_data_command,
1390 _LttngLiveViewerGetNewStreamInfosCommand: self._handle_get_new_stream_infos_command,
1391 _LttngLiveViewerGetNextDataStreamIndexEntryCommand: self._handle_get_next_data_stream_index_entry_command,
1392 _LttngLiveViewerGetTracingSessionInfosCommand: self._handle_get_tracing_session_infos_command,
aca7de76 1393 } # type: dict[type[_LttngLiveViewerCommand], Callable[[Any], _LttngLiveViewerReply]]
584af91e
PP
1394
1395 @property
1396 def viewer_session_id(self):
1397 return self._viewer_session_id
1398
aca7de76 1399 def _get_tracing_session_state(self, tracing_session_id: int):
584af91e
PP
1400 if tracing_session_id not in self._ts_states:
1401 raise UnexpectedInput(
f5567ea8 1402 "Unknown tracing session ID {}".format(tracing_session_id)
584af91e
PP
1403 )
1404
1405 return self._ts_states[tracing_session_id]
1406
aca7de76 1407 def _get_data_stream_state(self, stream_id: int):
584af91e 1408 if stream_id not in self._stream_states:
aca7de76 1409 RuntimeError("Unknown stream ID {}".format(stream_id))
584af91e 1410
aca7de76
SM
1411 stream = self._stream_states[stream_id]
1412 if type(stream) is not _LttngLiveViewerSessionDataStreamState:
1413 raise RuntimeError("Stream is not a data stream")
584af91e 1414
aca7de76
SM
1415 return stream
1416
1417 def _get_metadata_stream_state(self, stream_id: int):
1418 if stream_id not in self._stream_states:
1419 RuntimeError("Unknown stream ID {}".format(stream_id))
1420
1421 stream = self._stream_states[stream_id]
1422 if type(stream) is not _LttngLiveViewerSessionMetadataStreamState:
1423 raise RuntimeError("Stream is not a metadata stream")
1424
1425 return stream
1426
1427 def handle_command(self, cmd: _LttngLiveViewerCommand):
584af91e 1428 logging.info(
f5567ea8 1429 "Handling command in viewer session: cmd-cls-name={}".format(
584af91e
PP
1430 cmd.__class__.__name__
1431 )
1432 )
1433 cmd_type = type(cmd)
1434
1435 if cmd_type not in self._command_handlers:
1436 raise UnexpectedInput(
f5567ea8 1437 "Unexpected command: cmd-cls-name={}".format(cmd.__class__.__name__)
584af91e
PP
1438 )
1439
1440 return self._command_handlers[cmd_type](cmd)
1441
aca7de76
SM
1442 def _handle_attach_to_tracing_session_command(
1443 self, cmd: _LttngLiveViewerAttachToTracingSessionCommand
1444 ):
584af91e
PP
1445 fmt = 'Handling "attach to tracing session" command: ts-id={}, offset={}, seek-type={}'
1446 logging.info(fmt.format(cmd.tracing_session_id, cmd.offset, cmd.seek_type))
1447 ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
1448 info = ts_state.tracing_session_descriptor.info
1449
1450 if ts_state.is_attached:
1451 raise UnexpectedInput(
f5567ea8 1452 "Cannot attach to tracing session `{}`: viewer is already attached".format(
584af91e
PP
1453 info.name
1454 )
1455 )
1456
1457 ts_state.is_attached = True
1458 status = _LttngLiveViewerAttachToTracingSessionReply.Status.OK
1459 return _LttngLiveViewerAttachToTracingSessionReply(
1460 status, ts_state.stream_infos
1461 )
1462
aca7de76
SM
1463 def _handle_detach_from_tracing_session_command(
1464 self, cmd: _LttngLiveViewerDetachFromTracingSessionCommand
1465 ):
584af91e
PP
1466 fmt = 'Handling "detach from tracing session" command: ts-id={}'
1467 logging.info(fmt.format(cmd.tracing_session_id))
1468 ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
1469 info = ts_state.tracing_session_descriptor.info
1470
1471 if not ts_state.is_attached:
1472 raise UnexpectedInput(
f5567ea8 1473 "Cannot detach to tracing session `{}`: viewer is not attached".format(
584af91e
PP
1474 info.name
1475 )
1476 )
1477
1478 ts_state.is_attached = False
1479 status = _LttngLiveViewerDetachFromTracingSessionReply.Status.OK
1480 return _LttngLiveViewerDetachFromTracingSessionReply(status)
1481
aca7de76
SM
1482 def _handle_get_next_data_stream_index_entry_command(
1483 self, cmd: _LttngLiveViewerGetNextDataStreamIndexEntryCommand
1484 ):
584af91e
PP
1485 fmt = 'Handling "get next data stream index entry" command: stream-id={}'
1486 logging.info(fmt.format(cmd.stream_id))
aca7de76
SM
1487 stream_state = self._get_data_stream_state(cmd.stream_id)
1488 metadata_stream_state = self._get_metadata_stream_state(
1489 stream_state.metadata_stream_id
1490 )
584af91e
PP
1491
1492 if stream_state.cur_index_entry is None:
1493 # The viewer is done reading this stream
1494 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.HUP
1495
1496 # Dummy data stream index entry to use with the `HUP` status
1497 # (the reply needs one, but the viewer ignores it)
1498 index_entry = _LttngDataStreamIndexEntry(0, 0, 0, 0, 0, 0, 0)
1499
1500 return _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1501 status, index_entry, False, False
1502 )
1503
78169723
FD
1504 timestamp_begin = _get_entry_timestamp_begin(stream_state.cur_index_entry)
1505
1506 if needs_new_metadata_section(metadata_stream_state, timestamp_begin):
1507 metadata_stream_state.is_sent = False
1508 metadata_stream_state.goto_next_section()
1509
584af91e
PP
1510 # The viewer only checks the `has_new_metadata` flag if the
1511 # reply's status is `OK`, so we need to provide an index here
1512 has_new_metadata = stream_state.tracing_session_state.has_new_metadata
aca7de76 1513 if isinstance(stream_state.cur_index_entry, _LttngDataStreamIndexEntry):
71f56e5f
JG
1514 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.OK
1515 else:
71f56e5f
JG
1516 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.INACTIVE
1517
584af91e
PP
1518 reply = _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1519 status, stream_state.cur_index_entry, has_new_metadata, False
1520 )
1521 stream_state.goto_next_index_entry()
1522 return reply
1523
aca7de76
SM
1524 def _handle_get_data_stream_packet_data_command(
1525 self, cmd: _LttngLiveViewerGetDataStreamPacketDataCommand
1526 ):
584af91e
PP
1527 fmt = 'Handling "get data stream packet data" command: stream-id={}, offset={}, req-length={}'
1528 logging.info(fmt.format(cmd.stream_id, cmd.offset, cmd.req_length))
aca7de76 1529 stream_state = self._get_data_stream_state(cmd.stream_id)
584af91e
PP
1530 data_response_length = cmd.req_length
1531
584af91e
PP
1532 if stream_state.tracing_session_state.has_new_metadata:
1533 status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.ERROR
1534 return _LttngLiveViewerGetDataStreamPacketDataReply(
1535 status, bytes(), True, False
1536 )
1537
1538 if self._max_query_data_response_size:
1539 # Enforce a server side limit on the query requested length.
1540 # To ensure that the transaction terminate take the minimum of both
1541 # value.
1542 data_response_length = min(
1543 cmd.req_length, self._max_query_data_response_size
1544 )
1545 fmt = 'Limiting "get data stream packet data" command: req-length={} actual response size={}'
1546 logging.info(fmt.format(cmd.req_length, data_response_length))
1547
1548 data = stream_state.data_stream.get_data(cmd.offset, data_response_length)
1549 status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.OK
1550 return _LttngLiveViewerGetDataStreamPacketDataReply(status, data, False, False)
1551
aca7de76
SM
1552 def _handle_get_metadata_stream_data_command(
1553 self, cmd: _LttngLiveViewerGetMetadataStreamDataCommand
1554 ):
584af91e
PP
1555 fmt = 'Handling "get metadata stream data" command: stream-id={}'
1556 logging.info(fmt.format(cmd.stream_id))
aca7de76 1557 metadata_stream_state = self._get_metadata_stream_state(cmd.stream_id)
584af91e 1558
78169723 1559 if metadata_stream_state.is_sent:
584af91e
PP
1560 status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.NO_NEW
1561 return _LttngLiveViewerGetMetadataStreamDataContentReply(status, bytes())
1562
78169723 1563 metadata_stream_state.is_sent = True
584af91e 1564 status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.OK
78169723 1565 metadata_section = metadata_stream_state.cur_section
b8b97725 1566 assert metadata_section is not None
78169723
FD
1567
1568 # If we are sending an empty section, ready the next one right away.
1569 if len(metadata_section.data) == 0:
1570 metadata_stream_state.is_sent = False
1571 metadata_stream_state.goto_next_section()
1572
1573 fmt = 'Replying to "get metadata stream data" command: metadata-size={}'
1574 logging.info(fmt.format(len(metadata_section.data)))
584af91e 1575 return _LttngLiveViewerGetMetadataStreamDataContentReply(
78169723 1576 status, metadata_section.data
584af91e
PP
1577 )
1578
aca7de76
SM
1579 def _handle_get_new_stream_infos_command(
1580 self, cmd: _LttngLiveViewerGetNewStreamInfosCommand
1581 ):
584af91e
PP
1582 fmt = 'Handling "get new stream infos" command: ts-id={}'
1583 logging.info(fmt.format(cmd.tracing_session_id))
1584
1585 # As of this version, all the tracing session's stream infos are
1586 # always given to the viewer when sending the "attach to tracing
1587 # session" reply, so there's nothing new here. Return the `HUP`
1588 # status as, if we're handling this command, the viewer consumed
1589 # all the existing data streams.
1590 status = _LttngLiveViewerGetNewStreamInfosReply.Status.HUP
1591 return _LttngLiveViewerGetNewStreamInfosReply(status, [])
1592
aca7de76
SM
1593 def _handle_get_tracing_session_infos_command(
1594 self, cmd: _LttngLiveViewerGetTracingSessionInfosCommand
1595 ):
584af91e
PP
1596 logging.info('Handling "get tracing session infos" command.')
1597 infos = [
1598 tss.tracing_session_descriptor.info for tss in self._ts_states.values()
1599 ]
1600 infos.sort(key=lambda info: info.name)
1601 return _LttngLiveViewerGetTracingSessionInfosReply(infos)
1602
aca7de76
SM
1603 def _handle_create_viewer_session_command(
1604 self, cmd: _LttngLiveViewerCreateViewerSessionCommand
1605 ):
584af91e
PP
1606 logging.info('Handling "create viewer session" command.')
1607 status = _LttngLiveViewerCreateViewerSessionReply.Status.OK
1608
1609 # This does nothing here. In the LTTng relay daemon, it
1610 # allocates the viewer session's state.
1611 return _LttngLiveViewerCreateViewerSessionReply(status)
1612
1613
1614# An LTTng live TCP server.
1615#
e51141d3
SM
1616# On creation, it binds to `localhost` on the TCP port `port` if not `None`, or
1617# on an OS-assigned TCP port otherwise. It writes the decimal TCP port number
1618# to a temporary port file. It renames the temporary port file to
1619# `port_filename`.
584af91e
PP
1620#
1621# `tracing_session_descriptors` is a list of tracing session descriptors
1622# (`LttngTracingSessionDescriptor`) to serve.
1623#
1624# This server accepts a single viewer (client).
1625#
1626# When the viewer closes the connection, the server's constructor
1627# returns.
1628class LttngLiveServer:
1629 def __init__(
e51141d3 1630 self,
aca7de76
SM
1631 port: Optional[int],
1632 port_filename: str,
1633 tracing_session_descriptors: Iterable[LttngTracingSessionDescriptor],
1634 max_query_data_response_size: Optional[int],
584af91e 1635 ):
f5567ea8 1636 logging.info("Server configuration:")
584af91e 1637
f5567ea8 1638 logging.info(" Port file name: `{}`".format(port_filename))
584af91e
PP
1639
1640 if max_query_data_response_size is not None:
1641 logging.info(
f5567ea8 1642 " Maximum response data query size: `{}`".format(
584af91e
PP
1643 max_query_data_response_size
1644 )
1645 )
1646
1647 for ts_descr in tracing_session_descriptors:
1648 info = ts_descr.info
1649 fmt = ' TS descriptor: name="{}", id={}, hostname="{}", live-timer-freq={}, client-count={}, stream-count={}:'
1650 logging.info(
1651 fmt.format(
1652 info.name,
1653 info.tracing_session_id,
1654 info.hostname,
1655 info.live_timer_freq,
1656 info.client_count,
1657 info.stream_count,
1658 )
1659 )
1660
1661 for trace in ts_descr.traces:
1662 logging.info(' Trace: path="{}"'.format(trace.path))
1663
1664 self._ts_descriptors = tracing_session_descriptors
1665 self._max_query_data_response_size = max_query_data_response_size
1666 self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1667 self._codec = _LttngLiveViewerProtocolCodec()
1668
1669 # Port 0: OS assigns an unused port
e51141d3 1670 serv_addr = ("localhost", port if port is not None else 0)
584af91e
PP
1671 self._sock.bind(serv_addr)
1672 self._write_port_to_file(port_filename)
1673
1674 try:
1675 self._listen()
1676 finally:
1677 self._sock.close()
f5567ea8 1678 logging.info("Closed connection and socket.")
584af91e
PP
1679
1680 @property
1681 def _server_port(self):
1682 return self._sock.getsockname()[1]
1683
1684 def _recv_command(self):
1685 data = bytes()
1686
1687 while True:
f5567ea8 1688 logging.info("Waiting for viewer command.")
584af91e
PP
1689 buf = self._conn.recv(128)
1690
1691 if not buf:
f5567ea8 1692 logging.info("Client closed connection.")
584af91e
PP
1693
1694 if data:
1695 raise UnexpectedInput(
f5567ea8 1696 "Client closed connection after having sent {} command bytes.".format(
584af91e
PP
1697 len(data)
1698 )
1699 )
1700
1701 return
1702
f5567ea8 1703 logging.info("Received data from viewer: length={}".format(len(buf)))
584af91e
PP
1704
1705 data += buf
1706
1707 try:
1708 cmd = self._codec.decode(data)
1709 except struct.error as exc:
f5567ea8 1710 raise UnexpectedInput("Malformed command: {}".format(exc)) from exc
584af91e
PP
1711
1712 if cmd is not None:
1713 logging.info(
f5567ea8 1714 "Received command from viewer: cmd-cls-name={}".format(
584af91e
PP
1715 cmd.__class__.__name__
1716 )
1717 )
1718 return cmd
1719
aca7de76 1720 def _send_reply(self, reply: _LttngLiveViewerReply):
584af91e
PP
1721 data = self._codec.encode(reply)
1722 logging.info(
f5567ea8 1723 "Sending reply to viewer: reply-cls-name={}, length={}".format(
584af91e
PP
1724 reply.__class__.__name__, len(data)
1725 )
1726 )
1727 self._conn.sendall(data)
1728
1729 def _handle_connection(self):
1730 # First command must be "connect"
1731 cmd = self._recv_command()
1732
1733 if type(cmd) is not _LttngLiveViewerConnectCommand:
1734 raise UnexpectedInput(
1735 'First command is not "connect": cmd-cls-name={}'.format(
1736 cmd.__class__.__name__
1737 )
1738 )
1739
1740 # Create viewer session (arbitrary ID 23)
1741 logging.info(
f5567ea8 1742 "LTTng live viewer connected: version={}.{}".format(cmd.major, cmd.minor)
584af91e
PP
1743 )
1744 viewer_session = _LttngLiveViewerSession(
1745 23, self._ts_descriptors, self._max_query_data_response_size
1746 )
1747
1748 # Send "connect" reply
1749 self._send_reply(
1750 _LttngLiveViewerConnectReply(viewer_session.viewer_session_id, 2, 10)
1751 )
1752
1753 # Make the viewer session handle the remaining commands
1754 while True:
1755 cmd = self._recv_command()
1756
1757 if cmd is None:
1758 # Connection closed (at an expected location within the
1759 # conversation)
1760 return
1761
1762 self._send_reply(viewer_session.handle_command(cmd))
1763
1764 def _listen(self):
f5567ea8 1765 logging.info("Listening: port={}".format(self._server_port))
1726ac08
JR
1766 # Backlog must be present for Python version < 3.5.
1767 # 128 is an arbitrary number since we expect only 1 connection anyway.
1768 self._sock.listen(128)
584af91e
PP
1769 self._conn, viewer_addr = self._sock.accept()
1770 logging.info(
f5567ea8 1771 "Accepted viewer: addr={}:{}".format(viewer_addr[0], viewer_addr[1])
584af91e
PP
1772 )
1773
1774 try:
1775 self._handle_connection()
1776 finally:
1777 self._conn.close()
1778
aca7de76 1779 def _write_port_to_file(self, port_filename: str):
584af91e 1780 # Write the port number to a temporary file.
f5567ea8
FD
1781 with tempfile.NamedTemporaryFile(mode="w", delete=False) as tmp_port_file:
1782 print(self._server_port, end="", file=tmp_port_file)
584af91e
PP
1783
1784 # Rename temporary file to real file
9c878ece 1785 os.replace(tmp_port_file.name, port_filename)
584af91e
PP
1786 logging.info(
1787 'Renamed port file: src-path="{}", dst-path="{}"'.format(
1788 tmp_port_file.name, port_filename
1789 )
1790 )
1791
1792
aca7de76
SM
1793def _session_descriptors_from_path(
1794 sessions_filename: str, trace_path_prefix: Optional[str]
1795):
2b763e29
JG
1796 # File format is:
1797 #
1798 # [
1799 # {
1800 # "name": "my-session",
1801 # "id": 17,
1802 # "hostname": "myhost",
1803 # "live-timer-freq": 1000000,
1804 # "client-count": 23,
1805 # "traces": [
1806 # {
1807 # "path": "lol"
1808 # },
1809 # {
71f56e5f
JG
1810 # "path": "meow/mix",
1811 # "beacons": {
1812 # "my_stream": [ 5235787, 728375283 ]
78169723
FD
1813 # },
1814 # "metadata-sections": [
1815 # {
1816 # "line": 1,
1817 # "timestamp": 0
1818 # }
1819 # ]
2b763e29
JG
1820 # }
1821 # ]
1822 # }
1823 # ]
f5567ea8 1824 with open(sessions_filename, "r") as sessions_file:
aca7de76
SM
1825 sessions_json = tjson.load(sessions_file, tjson.ArrayVal)
1826
1827 sessions = [] # type: list[LttngTracingSessionDescriptor]
2b763e29 1828
aca7de76
SM
1829 for session_json in sessions_json.iter(tjson.ObjVal):
1830 name = session_json.at("name", tjson.StrVal).val
1831 tracing_session_id = session_json.at("id", tjson.IntVal).val
1832 hostname = session_json.at("hostname", tjson.StrVal).val
1833 live_timer_freq = session_json.at("live-timer-freq", tjson.IntVal).val
1834 client_count = session_json.at("client-count", tjson.IntVal).val
1835 traces_json = session_json.at("traces", tjson.ArrayVal)
2b763e29 1836
aca7de76 1837 traces = [] # type: list[LttngTrace]
2b763e29 1838
aca7de76
SM
1839 for trace_json in traces_json.iter(tjson.ObjVal):
1840 metadata_sections = (
1841 trace_json.at("metadata-sections", tjson.ArrayVal)
1842 if "metadata-sections" in trace_json
1843 else None
1844 )
1845 beacons = (
1846 trace_json.at("beacons", tjson.ObjVal)
1847 if "beacons" in trace_json
1848 else None
1849 )
1850 path = trace_json.at("path", tjson.StrVal).val
2b763e29 1851
aca7de76 1852 if not os.path.isabs(path) and trace_path_prefix:
2b763e29
JG
1853 path = os.path.join(trace_path_prefix, path)
1854
aca7de76
SM
1855 traces.append(
1856 LttngTrace(
1857 path,
1858 metadata_sections,
1859 beacons,
1860 )
1861 )
2b763e29
JG
1862
1863 sessions.append(
1864 LttngTracingSessionDescriptor(
1865 name,
1866 tracing_session_id,
1867 hostname,
1868 live_timer_freq,
1869 client_count,
1870 traces,
1871 )
1872 )
1873
1874 return sessions
584af91e
PP
1875
1876
aca7de76 1877def _loglevel_parser(string: str):
f5567ea8 1878 loglevels = {"info": logging.INFO, "warning": logging.WARNING}
584af91e
PP
1879 if string not in loglevels:
1880 msg = "{} is not a valid loglevel".format(string)
1881 raise argparse.ArgumentTypeError(msg)
1882 return loglevels[string]
1883
1884
f5567ea8
FD
1885if __name__ == "__main__":
1886 logging.basicConfig(format="# %(asctime)-25s%(message)s")
584af91e 1887 parser = argparse.ArgumentParser(
f5567ea8 1888 description="LTTng-live protocol mocker", add_help=False
584af91e
PP
1889 )
1890 parser.add_argument(
f5567ea8
FD
1891 "--log-level",
1892 default="warning",
1893 choices=["info", "warning"],
1894 help="The loglevel to be used.",
584af91e
PP
1895 )
1896
1897 loglevel_namespace, remaining_args = parser.parse_known_args()
1898 logging.getLogger().setLevel(_loglevel_parser(loglevel_namespace.log_level))
1899
e51141d3
SM
1900 parser.add_argument(
1901 "--port",
1902 help="The port to bind to. If missing, use an OS-assigned port..",
1903 type=int,
1904 )
584af91e 1905 parser.add_argument(
f5567ea8
FD
1906 "--port-filename",
1907 help="The final port file. This file is present when the server is ready to receive connection.",
584af91e
PP
1908 required=True,
1909 )
1910 parser.add_argument(
f5567ea8 1911 "--max-query-data-response-size",
584af91e 1912 type=int,
f5567ea8 1913 help="The maximum size of control data response in bytes",
584af91e
PP
1914 )
1915 parser.add_argument(
f5567ea8 1916 "--trace-path-prefix",
2b763e29 1917 type=str,
f5567ea8 1918 help="Prefix to prepend to the trace paths of session configurations",
2b763e29
JG
1919 )
1920 parser.add_argument(
f5567ea8 1921 "--sessions-filename",
776a2a25 1922 type=str,
f5567ea8 1923 help="Path to a session configuration file",
584af91e
PP
1924 )
1925 parser.add_argument(
f5567ea8
FD
1926 "-h",
1927 "--help",
1928 action="help",
584af91e 1929 default=argparse.SUPPRESS,
f5567ea8 1930 help="Show this help message and exit.",
584af91e
PP
1931 )
1932
1933 args = parser.parse_args(args=remaining_args)
1934 try:
aca7de76
SM
1935 sessions_filename = args.sessions_filename # type: str
1936 trace_path_prefix = args.trace_path_prefix # type: str | None
2b763e29 1937 sessions = _session_descriptors_from_path(
aca7de76
SM
1938 sessions_filename,
1939 trace_path_prefix,
e51141d3 1940 )
aca7de76
SM
1941
1942 port = args.port # type: int | None
1943 port_filename = args.port_filename # type: str
1944 max_query_data_response_size = (
1945 args.max_query_data_response_size
1946 ) # type: int | None
1947 LttngLiveServer(port, port_filename, sessions, max_query_data_response_size)
584af91e
PP
1948 except UnexpectedInput as exc:
1949 logging.error(str(exc))
1950 print(exc, file=sys.stderr)
1951 sys.exit(1)
This page took 0.153749 seconds and 4 git commands to generate.