tests: retry os.rename on PermissionError failure in lttng_live_server.py
[babeltrace.git] / tests / data / plugins / src.ctf.lttng-live / lttng_live_server.py
CommitLineData
0235b0db 1# SPDX-License-Identifier: MIT
584af91e 2#
0235b0db 3# Copyright (C) 2019 Philippe Proulx <pproulx@efficios.com>
584af91e 4#
584af91e 5
aca7de76
SM
6# pyright: strict, reportTypeCommentUsage=false, reportMissingTypeStubs=false
7
584af91e 8import os
584af91e 9import re
6362d281 10import time
584af91e
PP
11import socket
12import struct
5995b304
SM
13import logging
14import os.path
15import argparse
584af91e 16import tempfile
aca7de76
SM
17from typing import Dict, Union, Iterable, Optional, Sequence, overload
18
19import tjson
20
21# isort: off
22from typing import Any, Callable # noqa: F401
23
24# isort: on
584af91e
PP
25
26
ee1171e5
SM
27# An entry within the index of an LTTng data stream.
28class _LttngDataStreamIndexEntry:
29 def __init__(
30 self,
aca7de76
SM
31 offset_bytes: int,
32 total_size_bits: int,
33 content_size_bits: int,
34 timestamp_begin: int,
35 timestamp_end: int,
36 events_discarded: int,
37 stream_class_id: int,
ee1171e5
SM
38 ):
39 self._offset_bytes = offset_bytes
40 self._total_size_bits = total_size_bits
41 self._content_size_bits = content_size_bits
42 self._timestamp_begin = timestamp_begin
43 self._timestamp_end = timestamp_end
44 self._events_discarded = events_discarded
45 self._stream_class_id = stream_class_id
46
47 @property
48 def offset_bytes(self):
49 return self._offset_bytes
50
51 @property
52 def total_size_bits(self):
53 return self._total_size_bits
54
55 @property
56 def total_size_bytes(self):
57 return self._total_size_bits // 8
58
59 @property
60 def content_size_bits(self):
61 return self._content_size_bits
62
63 @property
64 def content_size_bytes(self):
65 return self._content_size_bits // 8
66
67 @property
68 def timestamp_begin(self):
69 return self._timestamp_begin
70
71 @property
72 def timestamp_end(self):
73 return self._timestamp_end
74
75 @property
76 def events_discarded(self):
77 return self._events_discarded
78
79 @property
80 def stream_class_id(self):
81 return self._stream_class_id
82
83
84# An entry within the index of an LTTng data stream. While a stream beacon entry
85# is conceptually unrelated to an index, it is sent as a reply to a
86# LttngLiveViewerGetNextDataStreamIndexEntryCommand
aca7de76
SM
87class _LttngDataStreamBeaconIndexEntry:
88 def __init__(self, stream_class_id: int, timestamp: int):
ee1171e5
SM
89 self._stream_class_id = stream_class_id
90 self._timestamp = timestamp
91
92 @property
93 def timestamp(self):
94 return self._timestamp
95
96 @property
97 def stream_class_id(self):
98 return self._stream_class_id
99
100
aca7de76
SM
101_LttngIndexEntryT = Union[_LttngDataStreamIndexEntry, _LttngDataStreamBeaconIndexEntry]
102
103
584af91e 104class _LttngLiveViewerCommand:
aca7de76 105 def __init__(self, version: int):
584af91e
PP
106 self._version = version
107
108 @property
109 def version(self):
110 return self._version
111
112
113class _LttngLiveViewerConnectCommand(_LttngLiveViewerCommand):
aca7de76 114 def __init__(self, version: int, viewer_session_id: int, major: int, minor: int):
584af91e
PP
115 super().__init__(version)
116 self._viewer_session_id = viewer_session_id
117 self._major = major
118 self._minor = minor
119
120 @property
121 def viewer_session_id(self):
122 return self._viewer_session_id
123
124 @property
125 def major(self):
126 return self._major
127
128 @property
129 def minor(self):
130 return self._minor
131
132
aca7de76
SM
133class _LttngLiveViewerReply:
134 pass
135
136
137class _LttngLiveViewerConnectReply(_LttngLiveViewerReply):
138 def __init__(self, viewer_session_id: int, major: int, minor: int):
584af91e
PP
139 self._viewer_session_id = viewer_session_id
140 self._major = major
141 self._minor = minor
142
143 @property
144 def viewer_session_id(self):
145 return self._viewer_session_id
146
147 @property
148 def major(self):
149 return self._major
150
151 @property
152 def minor(self):
153 return self._minor
154
155
156class _LttngLiveViewerGetTracingSessionInfosCommand(_LttngLiveViewerCommand):
157 pass
158
159
160class _LttngLiveViewerTracingSessionInfo:
161 def __init__(
162 self,
aca7de76
SM
163 tracing_session_id: int,
164 live_timer_freq: int,
165 client_count: int,
166 stream_count: int,
167 hostname: str,
168 name: str,
584af91e
PP
169 ):
170 self._tracing_session_id = tracing_session_id
171 self._live_timer_freq = live_timer_freq
172 self._client_count = client_count
173 self._stream_count = stream_count
174 self._hostname = hostname
175 self._name = name
176
177 @property
178 def tracing_session_id(self):
179 return self._tracing_session_id
180
181 @property
182 def live_timer_freq(self):
183 return self._live_timer_freq
184
185 @property
186 def client_count(self):
187 return self._client_count
188
189 @property
190 def stream_count(self):
191 return self._stream_count
192
193 @property
194 def hostname(self):
195 return self._hostname
196
197 @property
198 def name(self):
199 return self._name
200
201
aca7de76
SM
202class _LttngLiveViewerGetTracingSessionInfosReply(_LttngLiveViewerReply):
203 def __init__(
204 self, tracing_session_infos: Sequence[_LttngLiveViewerTracingSessionInfo]
205 ):
584af91e
PP
206 self._tracing_session_infos = tracing_session_infos
207
208 @property
209 def tracing_session_infos(self):
210 return self._tracing_session_infos
211
212
213class _LttngLiveViewerAttachToTracingSessionCommand(_LttngLiveViewerCommand):
214 class SeekType:
215 BEGINNING = 1
216 LAST = 2
217
aca7de76
SM
218 def __init__(
219 self, version: int, tracing_session_id: int, offset: int, seek_type: int
220 ):
584af91e
PP
221 super().__init__(version)
222 self._tracing_session_id = tracing_session_id
223 self._offset = offset
224 self._seek_type = seek_type
225
226 @property
227 def tracing_session_id(self):
228 return self._tracing_session_id
229
230 @property
231 def offset(self):
232 return self._offset
233
234 @property
235 def seek_type(self):
236 return self._seek_type
237
238
239class _LttngLiveViewerStreamInfo:
aca7de76
SM
240 def __init__(
241 self, id: int, trace_id: int, is_metadata: bool, path: str, channel_name: str
242 ):
584af91e
PP
243 self._id = id
244 self._trace_id = trace_id
245 self._is_metadata = is_metadata
246 self._path = path
247 self._channel_name = channel_name
248
249 @property
250 def id(self):
251 return self._id
252
253 @property
254 def trace_id(self):
255 return self._trace_id
256
257 @property
258 def is_metadata(self):
259 return self._is_metadata
260
261 @property
262 def path(self):
263 return self._path
264
265 @property
266 def channel_name(self):
267 return self._channel_name
268
269
aca7de76 270class _LttngLiveViewerAttachToTracingSessionReply(_LttngLiveViewerReply):
584af91e
PP
271 class Status:
272 OK = 1
273 ALREADY = 2
274 UNKNOWN = 3
275 NOT_LIVE = 4
276 SEEK_ERROR = 5
277 NO_SESSION = 6
278
aca7de76 279 def __init__(self, status: int, stream_infos: Sequence[_LttngLiveViewerStreamInfo]):
584af91e
PP
280 self._status = status
281 self._stream_infos = stream_infos
282
283 @property
284 def status(self):
285 return self._status
286
287 @property
288 def stream_infos(self):
289 return self._stream_infos
290
291
292class _LttngLiveViewerGetNextDataStreamIndexEntryCommand(_LttngLiveViewerCommand):
aca7de76 293 def __init__(self, version: int, stream_id: int):
584af91e
PP
294 super().__init__(version)
295 self._stream_id = stream_id
296
297 @property
298 def stream_id(self):
299 return self._stream_id
300
301
aca7de76 302class _LttngLiveViewerGetNextDataStreamIndexEntryReply(_LttngLiveViewerReply):
584af91e
PP
303 class Status:
304 OK = 1
305 RETRY = 2
306 HUP = 3
307 ERROR = 4
308 INACTIVE = 5
309 EOF = 6
310
aca7de76
SM
311 def __init__(
312 self,
313 status: int,
314 index_entry: _LttngIndexEntryT,
315 has_new_metadata: bool,
316 has_new_data_stream: bool,
317 ):
584af91e
PP
318 self._status = status
319 self._index_entry = index_entry
320 self._has_new_metadata = has_new_metadata
321 self._has_new_data_stream = has_new_data_stream
322
323 @property
324 def status(self):
325 return self._status
326
327 @property
328 def index_entry(self):
329 return self._index_entry
330
331 @property
332 def has_new_metadata(self):
333 return self._has_new_metadata
334
335 @property
336 def has_new_data_stream(self):
337 return self._has_new_data_stream
338
339
340class _LttngLiveViewerGetDataStreamPacketDataCommand(_LttngLiveViewerCommand):
aca7de76 341 def __init__(self, version: int, stream_id: int, offset: int, req_length: int):
584af91e
PP
342 super().__init__(version)
343 self._stream_id = stream_id
344 self._offset = offset
345 self._req_length = req_length
346
347 @property
348 def stream_id(self):
349 return self._stream_id
350
351 @property
352 def offset(self):
353 return self._offset
354
355 @property
356 def req_length(self):
357 return self._req_length
358
359
aca7de76 360class _LttngLiveViewerGetDataStreamPacketDataReply(_LttngLiveViewerReply):
584af91e
PP
361 class Status:
362 OK = 1
363 RETRY = 2
364 ERROR = 3
365 EOF = 4
366
aca7de76
SM
367 def __init__(
368 self,
369 status: int,
370 data: bytes,
371 has_new_metadata: bool,
372 has_new_data_stream: bool,
373 ):
584af91e
PP
374 self._status = status
375 self._data = data
376 self._has_new_metadata = has_new_metadata
377 self._has_new_data_stream = has_new_data_stream
378
379 @property
380 def status(self):
381 return self._status
382
383 @property
384 def data(self):
385 return self._data
386
387 @property
388 def has_new_metadata(self):
389 return self._has_new_metadata
390
391 @property
392 def has_new_data_stream(self):
393 return self._has_new_data_stream
394
395
396class _LttngLiveViewerGetMetadataStreamDataCommand(_LttngLiveViewerCommand):
aca7de76 397 def __init__(self, version: int, stream_id: int):
584af91e
PP
398 super().__init__(version)
399 self._stream_id = stream_id
400
401 @property
402 def stream_id(self):
403 return self._stream_id
404
405
aca7de76 406class _LttngLiveViewerGetMetadataStreamDataContentReply(_LttngLiveViewerReply):
584af91e
PP
407 class Status:
408 OK = 1
409 NO_NEW = 2
410 ERROR = 3
411
aca7de76 412 def __init__(self, status: int, data: bytes):
584af91e
PP
413 self._status = status
414 self._data = data
415
416 @property
417 def status(self):
418 return self._status
419
420 @property
421 def data(self):
422 return self._data
423
424
425class _LttngLiveViewerGetNewStreamInfosCommand(_LttngLiveViewerCommand):
aca7de76 426 def __init__(self, version: int, tracing_session_id: int):
584af91e
PP
427 super().__init__(version)
428 self._tracing_session_id = tracing_session_id
429
430 @property
431 def tracing_session_id(self):
432 return self._tracing_session_id
433
434
aca7de76 435class _LttngLiveViewerGetNewStreamInfosReply(_LttngLiveViewerReply):
584af91e
PP
436 class Status:
437 OK = 1
438 NO_NEW = 2
439 ERROR = 3
440 HUP = 4
441
aca7de76 442 def __init__(self, status: int, stream_infos: Sequence[_LttngLiveViewerStreamInfo]):
584af91e
PP
443 self._status = status
444 self._stream_infos = stream_infos
445
446 @property
447 def status(self):
448 return self._status
449
450 @property
451 def stream_infos(self):
452 return self._stream_infos
453
454
455class _LttngLiveViewerCreateViewerSessionCommand(_LttngLiveViewerCommand):
456 pass
457
458
aca7de76 459class _LttngLiveViewerCreateViewerSessionReply(_LttngLiveViewerReply):
584af91e
PP
460 class Status:
461 OK = 1
462 ERROR = 2
463
aca7de76 464 def __init__(self, status: int):
584af91e
PP
465 self._status = status
466
467 @property
468 def status(self):
469 return self._status
470
471
472class _LttngLiveViewerDetachFromTracingSessionCommand(_LttngLiveViewerCommand):
aca7de76 473 def __init__(self, version: int, tracing_session_id: int):
584af91e
PP
474 super().__init__(version)
475 self._tracing_session_id = tracing_session_id
476
477 @property
478 def tracing_session_id(self):
479 return self._tracing_session_id
480
481
aca7de76 482class _LttngLiveViewerDetachFromTracingSessionReply(_LttngLiveViewerReply):
584af91e
PP
483 class Status:
484 OK = 1
485 UNKNOWN = 2
486 ERROR = 3
487
aca7de76 488 def __init__(self, status: int):
584af91e
PP
489 self._status = status
490
491 @property
492 def status(self):
493 return self._status
494
495
496# An LTTng live protocol codec can convert bytes to command objects and
497# reply objects to bytes.
498class _LttngLiveViewerProtocolCodec:
f5567ea8 499 _COMMAND_HEADER_STRUCT_FMT = "QII"
584af91e
PP
500 _COMMAND_HEADER_SIZE_BYTES = struct.calcsize(_COMMAND_HEADER_STRUCT_FMT)
501
502 def __init__(self):
503 pass
504
aca7de76 505 def _unpack(self, fmt: str, data: bytes, offset: int = 0):
f5567ea8 506 fmt = "!" + fmt
584af91e
PP
507 return struct.unpack_from(fmt, data, offset)
508
aca7de76 509 def _unpack_payload(self, fmt: str, data: bytes):
584af91e
PP
510 return self._unpack(
511 fmt, data, _LttngLiveViewerProtocolCodec._COMMAND_HEADER_SIZE_BYTES
512 )
513
aca7de76 514 def decode(self, data: bytes):
584af91e
PP
515 if len(data) < self._COMMAND_HEADER_SIZE_BYTES:
516 # Not enough data to read the command header
517 return
518
519 payload_size, cmd_type, version = self._unpack(
520 self._COMMAND_HEADER_STRUCT_FMT, data
521 )
522 logging.info(
f5567ea8 523 "Decoded command header: payload-size={}, cmd-type={}, version={}".format(
584af91e
PP
524 payload_size, cmd_type, version
525 )
526 )
527
528 if len(data) < self._COMMAND_HEADER_SIZE_BYTES + payload_size:
529 # Not enough data to read the whole command
530 return
531
532 if cmd_type == 1:
aca7de76 533 viewer_session_id, major, minor, _ = self._unpack_payload("QIII", data)
584af91e
PP
534 return _LttngLiveViewerConnectCommand(
535 version, viewer_session_id, major, minor
536 )
537 elif cmd_type == 2:
538 return _LttngLiveViewerGetTracingSessionInfosCommand(version)
539 elif cmd_type == 3:
f5567ea8 540 tracing_session_id, offset, seek_type = self._unpack_payload("QQI", data)
584af91e
PP
541 return _LttngLiveViewerAttachToTracingSessionCommand(
542 version, tracing_session_id, offset, seek_type
543 )
544 elif cmd_type == 4:
f5567ea8 545 (stream_id,) = self._unpack_payload("Q", data)
584af91e
PP
546 return _LttngLiveViewerGetNextDataStreamIndexEntryCommand(
547 version, stream_id
548 )
549 elif cmd_type == 5:
f5567ea8 550 stream_id, offset, req_length = self._unpack_payload("QQI", data)
584af91e
PP
551 return _LttngLiveViewerGetDataStreamPacketDataCommand(
552 version, stream_id, offset, req_length
553 )
554 elif cmd_type == 6:
f5567ea8 555 (stream_id,) = self._unpack_payload("Q", data)
584af91e
PP
556 return _LttngLiveViewerGetMetadataStreamDataCommand(version, stream_id)
557 elif cmd_type == 7:
f5567ea8 558 (tracing_session_id,) = self._unpack_payload("Q", data)
584af91e
PP
559 return _LttngLiveViewerGetNewStreamInfosCommand(version, tracing_session_id)
560 elif cmd_type == 8:
561 return _LttngLiveViewerCreateViewerSessionCommand(version)
562 elif cmd_type == 9:
f5567ea8 563 (tracing_session_id,) = self._unpack_payload("Q", data)
584af91e
PP
564 return _LttngLiveViewerDetachFromTracingSessionCommand(
565 version, tracing_session_id
566 )
567 else:
f2958352 568 raise RuntimeError("Unknown command type {}".format(cmd_type))
584af91e 569
aca7de76 570 def _pack(self, fmt: str, *args: Any):
584af91e 571 # Force network byte order
f5567ea8 572 return struct.pack("!" + fmt, *args)
584af91e 573
aca7de76 574 def _encode_zero_padded_str(self, string: str, length: int):
584af91e 575 data = string.encode()
f5567ea8 576 return data.ljust(length, b"\x00")
584af91e 577
aca7de76 578 def _encode_stream_info(self, info: _LttngLiveViewerStreamInfo):
f5567ea8 579 data = self._pack("QQI", info.id, info.trace_id, int(info.is_metadata))
584af91e
PP
580 data += self._encode_zero_padded_str(info.path, 4096)
581 data += self._encode_zero_padded_str(info.channel_name, 255)
582 return data
583
aca7de76
SM
584 def _get_has_new_stuff_flags(
585 self, has_new_metadata: bool, has_new_data_streams: bool
586 ):
584af91e
PP
587 flags = 0
588
589 if has_new_metadata:
590 flags |= 1
591
592 if has_new_data_streams:
593 flags |= 2
594
595 return flags
596
aca7de76
SM
597 def encode(
598 self,
599 reply: _LttngLiveViewerReply,
600 ) -> bytes:
584af91e
PP
601 if type(reply) is _LttngLiveViewerConnectReply:
602 data = self._pack(
f5567ea8 603 "QIII", reply.viewer_session_id, reply.major, reply.minor, 2
584af91e
PP
604 )
605 elif type(reply) is _LttngLiveViewerGetTracingSessionInfosReply:
f5567ea8 606 data = self._pack("I", len(reply.tracing_session_infos))
584af91e
PP
607
608 for info in reply.tracing_session_infos:
609 data += self._pack(
f5567ea8 610 "QIII",
584af91e
PP
611 info.tracing_session_id,
612 info.live_timer_freq,
613 info.client_count,
614 info.stream_count,
615 )
616 data += self._encode_zero_padded_str(info.hostname, 64)
617 data += self._encode_zero_padded_str(info.name, 255)
618 elif type(reply) is _LttngLiveViewerAttachToTracingSessionReply:
f5567ea8 619 data = self._pack("II", reply.status, len(reply.stream_infos))
584af91e
PP
620
621 for info in reply.stream_infos:
622 data += self._encode_stream_info(info)
623 elif type(reply) is _LttngLiveViewerGetNextDataStreamIndexEntryReply:
f5567ea8 624 index_format = "QQQQQQQII"
584af91e
PP
625 entry = reply.index_entry
626 flags = self._get_has_new_stuff_flags(
627 reply.has_new_metadata, reply.has_new_data_stream
628 )
629
aca7de76 630 if isinstance(entry, _LttngDataStreamIndexEntry):
71f56e5f
JG
631 data = self._pack(
632 index_format,
633 entry.offset_bytes,
634 entry.total_size_bits,
635 entry.content_size_bits,
636 entry.timestamp_begin,
637 entry.timestamp_end,
638 entry.events_discarded,
639 entry.stream_class_id,
640 reply.status,
641 flags,
642 )
643 else:
71f56e5f
JG
644 data = self._pack(
645 index_format,
646 0,
647 0,
648 0,
649 0,
650 entry.timestamp,
651 0,
652 entry.stream_class_id,
653 reply.status,
654 flags,
655 )
584af91e
PP
656 elif type(reply) is _LttngLiveViewerGetDataStreamPacketDataReply:
657 flags = self._get_has_new_stuff_flags(
658 reply.has_new_metadata, reply.has_new_data_stream
659 )
f5567ea8 660 data = self._pack("III", reply.status, len(reply.data), flags)
584af91e
PP
661 data += reply.data
662 elif type(reply) is _LttngLiveViewerGetMetadataStreamDataContentReply:
f5567ea8 663 data = self._pack("QI", len(reply.data), reply.status)
584af91e
PP
664 data += reply.data
665 elif type(reply) is _LttngLiveViewerGetNewStreamInfosReply:
f5567ea8 666 data = self._pack("II", reply.status, len(reply.stream_infos))
584af91e
PP
667
668 for info in reply.stream_infos:
669 data += self._encode_stream_info(info)
670 elif type(reply) is _LttngLiveViewerCreateViewerSessionReply:
f5567ea8 671 data = self._pack("I", reply.status)
584af91e 672 elif type(reply) is _LttngLiveViewerDetachFromTracingSessionReply:
f5567ea8 673 data = self._pack("I", reply.status)
584af91e
PP
674 else:
675 raise ValueError(
f5567ea8 676 "Unknown reply object with class `{}`".format(reply.__class__.__name__)
584af91e
PP
677 )
678
679 return data
680
681
aca7de76
SM
682def _get_entry_timestamp_begin(
683 entry: _LttngIndexEntryT,
684):
685 if isinstance(entry, _LttngDataStreamBeaconIndexEntry):
78169723
FD
686 return entry.timestamp
687 else:
78169723
FD
688 return entry.timestamp_begin
689
690
584af91e 691# The index of an LTTng data stream, a sequence of index entries.
aca7de76
SM
692class _LttngDataStreamIndex(Sequence[_LttngIndexEntryT]):
693 def __init__(self, path: str, beacons: Optional[tjson.ArrayVal]):
584af91e
PP
694 self._path = path
695 self._build()
71f56e5f
JG
696
697 if beacons:
698 stream_class_id = self._entries[0].stream_class_id
aca7de76
SM
699
700 beacons_list = [] # type: list[_LttngDataStreamBeaconIndexEntry]
701 for ts in beacons.iter(tjson.IntVal):
702 beacons_list.append(
703 _LttngDataStreamBeaconIndexEntry(stream_class_id, ts.val)
704 )
705
706 self._add_beacons(beacons_list)
71f56e5f 707
584af91e
PP
708 logging.info(
709 'Built data stream index entries: path="{}", count={}'.format(
710 path, len(self._entries)
711 )
712 )
713
714 def _build(self):
aca7de76 715 self._entries = [] # type: list[_LttngIndexEntryT]
584af91e 716
f5567ea8 717 with open(self._path, "rb") as f:
584af91e 718 # Read header first
f5567ea8 719 fmt = ">IIII"
584af91e
PP
720 size = struct.calcsize(fmt)
721 data = f.read(size)
722 assert len(data) == size
aca7de76 723 magic, _, _, index_entry_length = struct.unpack(fmt, data)
584af91e
PP
724 assert magic == 0xC1F1DCC1
725
726 # Read index entries
f5567ea8 727 fmt = ">QQQQQQQ"
584af91e
PP
728 size = struct.calcsize(fmt)
729
730 while True:
731 logging.debug(
732 'Decoding data stream index entry: path="{}", offset={}'.format(
733 self._path, f.tell()
734 )
735 )
736 data = f.read(size)
737
738 if not data:
739 # Done
740 break
741
742 assert len(data) == size
75882e97
FD
743 (
744 offset_bytes,
745 total_size_bits,
746 content_size_bits,
747 timestamp_begin,
748 timestamp_end,
749 events_discarded,
750 stream_class_id,
751 ) = struct.unpack(fmt, data)
584af91e
PP
752
753 self._entries.append(
754 _LttngDataStreamIndexEntry(
755 offset_bytes,
756 total_size_bits,
757 content_size_bits,
758 timestamp_begin,
759 timestamp_end,
760 events_discarded,
761 stream_class_id,
762 )
763 )
764
765 # Skip anything else before the next entry
766 f.seek(index_entry_length - size, os.SEEK_CUR)
767
aca7de76 768 def _add_beacons(self, beacons: Iterable[_LttngDataStreamBeaconIndexEntry]):
71f56e5f 769 # Assumes entries[n + 1].timestamp_end >= entries[n].timestamp_begin
aca7de76
SM
770 def sort_key(
771 entry: Union[_LttngDataStreamIndexEntry, _LttngDataStreamBeaconIndexEntry],
772 ) -> int:
773 if isinstance(entry, _LttngDataStreamBeaconIndexEntry):
71f56e5f
JG
774 return entry.timestamp
775 else:
776 return entry.timestamp_end
777
778 self._entries += beacons
779 self._entries.sort(key=sort_key)
780
aca7de76
SM
781 @overload
782 def __getitem__(self, index: int) -> _LttngIndexEntryT:
783 ...
784
785 @overload
786 def __getitem__(self, index: slice) -> Sequence[_LttngIndexEntryT]: # noqa: F811
787 ...
788
789 def __getitem__( # noqa: F811
790 self, index: Union[int, slice]
791 ) -> Union[_LttngIndexEntryT, Sequence[_LttngIndexEntryT],]:
584af91e
PP
792 return self._entries[index]
793
794 def __len__(self):
795 return len(self._entries)
796
797 @property
798 def path(self):
799 return self._path
800
801
802# An LTTng data stream.
803class _LttngDataStream:
aca7de76 804 def __init__(self, path: str, beacons_json: Optional[tjson.ArrayVal]):
584af91e
PP
805 self._path = path
806 filename = os.path.basename(path)
f5567ea8 807 match = re.match(r"(.*)_\d+", filename)
b8b97725
SM
808 if not match:
809 raise RuntimeError(
810 "Unexpected data stream file name pattern: {}".format(filename)
811 )
812
584af91e
PP
813 self._channel_name = match.group(1)
814 trace_dir = os.path.dirname(path)
f5567ea8 815 index_path = os.path.join(trace_dir, "index", filename + ".idx")
aca7de76 816 self._index = _LttngDataStreamIndex(index_path, beacons_json)
584af91e 817 assert os.path.isfile(path)
f5567ea8 818 self._file = open(path, "rb")
584af91e
PP
819 logging.info(
820 'Built data stream: path="{}", channel-name="{}"'.format(
821 path, self._channel_name
822 )
823 )
824
825 @property
826 def path(self):
827 return self._path
828
829 @property
830 def channel_name(self):
831 return self._channel_name
832
833 @property
834 def index(self):
835 return self._index
836
aca7de76 837 def get_data(self, offset_bytes: int, len_bytes: int):
584af91e
PP
838 self._file.seek(offset_bytes)
839 return self._file.read(len_bytes)
840
841
78169723 842class _LttngMetadataStreamSection:
aca7de76 843 def __init__(self, timestamp: int, data: Optional[bytes]):
78169723
FD
844 self._timestamp = timestamp
845 if data is None:
846 self._data = bytes()
847 else:
848 self._data = data
849 logging.info(
f5567ea8 850 "Built metadata stream section: ts={}, data-len={}".format(
78169723
FD
851 self._timestamp, len(self._data)
852 )
853 )
854
855 @property
856 def timestamp(self):
857 return self._timestamp
858
859 @property
860 def data(self):
861 return self._data
862
863
584af91e
PP
864# An LTTng metadata stream.
865class _LttngMetadataStream:
aca7de76
SM
866 def __init__(
867 self,
868 metadata_file_path: str,
869 config_sections: Sequence[_LttngMetadataStreamSection],
870 ):
78169723
FD
871 self._path = metadata_file_path
872 self._sections = config_sections
873 logging.info(
f5567ea8 874 "Built metadata stream: path={}, section-len={}".format(
78169723
FD
875 self._path, len(self._sections)
876 )
877 )
584af91e
PP
878
879 @property
880 def path(self):
881 return self._path
882
883 @property
78169723
FD
884 def sections(self):
885 return self._sections
584af91e 886
78169723 887
aca7de76
SM
888class LttngMetadataConfigSection:
889 def __init__(self, line: int, timestamp: int, is_empty: bool):
890 self._line = line
891 self._timestamp = timestamp
892 self._is_empty = is_empty
893
894 @property
895 def line(self):
896 return self._line
78169723 897
aca7de76
SM
898 @property
899 def timestamp(self):
900 return self._timestamp
78169723 901
aca7de76
SM
902 @property
903 def is_empty(self):
904 return self._is_empty
905
906
907def _parse_metadata_sections_config(metadata_sections_json: tjson.ArrayVal):
908 metadata_sections = [] # type: list[LttngMetadataConfigSection]
78169723
FD
909 append_empty_section = False
910 last_timestamp = 0
911 last_line = 0
912
aca7de76
SM
913 for section in metadata_sections_json:
914 if isinstance(section, tjson.StrVal):
915 if section.val == "empty":
916 # Found a empty section marker. Actually append the section at the
917 # timestamp of the next concrete section.
918 append_empty_section = True
919 else:
920 raise ValueError("Invalid string value at {}.".format(section.path))
921 elif isinstance(section, tjson.ObjVal):
922 line = section.at("line", tjson.IntVal).val
923 ts = section.at("timestamp", tjson.IntVal).val
b8b97725 924
78169723
FD
925 # Sections' timestamps and lines must both be increasing.
926 assert ts > last_timestamp
927 last_timestamp = ts
aca7de76 928
78169723
FD
929 assert line > last_line
930 last_line = line
931
932 if append_empty_section:
aca7de76 933 metadata_sections.append(LttngMetadataConfigSection(line, ts, True))
78169723
FD
934 append_empty_section = False
935
aca7de76
SM
936 metadata_sections.append(LttngMetadataConfigSection(line, ts, False))
937 else:
938 raise TypeError(
939 "`{}`: expecting a string or object value".format(section.path)
940 )
78169723 941
aca7de76 942 return metadata_sections
78169723 943
78169723 944
aca7de76
SM
945def _split_metadata_sections(
946 metadata_file_path: str, metadata_sections_json: tjson.ArrayVal
947):
948 metadata_sections = _parse_metadata_sections_config(metadata_sections_json)
78169723 949
aca7de76 950 sections = [] # type: list[_LttngMetadataStreamSection]
f5567ea8 951 with open(metadata_file_path, "r") as metadata_file:
78169723
FD
952 metadata_lines = [line for line in metadata_file]
953
aca7de76 954 metadata_section_idx = 0
78169723
FD
955 curr_metadata_section = bytearray()
956
957 for idx, line_content in enumerate(metadata_lines):
958 # Add one to the index to convert from the zero-indexing of the
959 # enumerate() function to the one-indexing used by humans when
960 # viewing a text file.
961 curr_line_number = idx + 1
962
963 # If there are no more sections, simply append the line.
aca7de76 964 if metadata_section_idx + 1 >= len(metadata_sections):
f5567ea8 965 curr_metadata_section += bytearray(line_content, "utf8")
78169723
FD
966 continue
967
aca7de76 968 next_section_line_number = metadata_sections[metadata_section_idx + 1].line
78169723
FD
969
970 # If the next section begins at the current line, create a
971 # section with the metadata we gathered so far.
972 if curr_line_number >= next_section_line_number:
78169723
FD
973 # Flushing the metadata of the current section.
974 sections.append(
975 _LttngMetadataStreamSection(
aca7de76 976 metadata_sections[metadata_section_idx].timestamp,
78169723
FD
977 bytes(curr_metadata_section),
978 )
979 )
980
981 # Move to the next section.
aca7de76 982 metadata_section_idx += 1
78169723
FD
983
984 # Clear old content and append current line for the next section.
985 curr_metadata_section.clear()
f5567ea8 986 curr_metadata_section += bytearray(line_content, "utf8")
78169723
FD
987
988 # Append any empty sections.
aca7de76 989 while metadata_sections[metadata_section_idx].is_empty:
78169723
FD
990 sections.append(
991 _LttngMetadataStreamSection(
aca7de76 992 metadata_sections[metadata_section_idx].timestamp, None
78169723
FD
993 )
994 )
aca7de76 995 metadata_section_idx += 1
78169723
FD
996 else:
997 # Append line_content to the current metadata section.
f5567ea8 998 curr_metadata_section += bytearray(line_content, "utf8")
78169723
FD
999
1000 # We iterated over all the lines of the metadata file. Close the current section.
1001 sections.append(
1002 _LttngMetadataStreamSection(
aca7de76 1003 metadata_sections[metadata_section_idx].timestamp,
78169723
FD
1004 bytes(curr_metadata_section),
1005 )
1006 )
1007
1008 return sections
584af91e
PP
1009
1010
aca7de76
SM
1011_StreamBeaconsT = Dict[str, Iterable[int]]
1012
1013
584af91e 1014# An LTTng trace, a sequence of LTTng data streams.
aca7de76
SM
1015class LttngTrace(Sequence[_LttngDataStream]):
1016 def __init__(
1017 self,
1018 trace_dir: str,
1019 metadata_sections_json: Optional[tjson.ArrayVal],
1020 beacons_json: Optional[tjson.ObjVal],
1021 ):
584af91e 1022 self._path = trace_dir
aca7de76
SM
1023 self._create_metadata_stream(trace_dir, metadata_sections_json)
1024 self._create_data_streams(trace_dir, beacons_json)
584af91e
PP
1025 logging.info('Built trace: path="{}"'.format(trace_dir))
1026
aca7de76
SM
1027 def _create_data_streams(
1028 self, trace_dir: str, beacons_json: Optional[tjson.ObjVal]
1029 ):
1030 data_stream_paths = [] # type: list[str]
584af91e
PP
1031
1032 for filename in os.listdir(trace_dir):
1033 path = os.path.join(trace_dir, filename)
1034
1035 if not os.path.isfile(path):
1036 continue
1037
f5567ea8 1038 if filename.startswith("."):
584af91e
PP
1039 continue
1040
f5567ea8 1041 if filename == "metadata":
584af91e
PP
1042 continue
1043
1044 data_stream_paths.append(path)
1045
1046 data_stream_paths.sort()
aca7de76 1047 self._data_streams = [] # type: list[_LttngDataStream]
584af91e
PP
1048
1049 for data_stream_path in data_stream_paths:
71f56e5f 1050 stream_name = os.path.basename(data_stream_path)
aca7de76
SM
1051 this_beacons_json = None
1052 if beacons_json is not None and stream_name in beacons_json:
1053 this_beacons_json = beacons_json.at(stream_name, tjson.ArrayVal)
71f56e5f
JG
1054
1055 self._data_streams.append(
aca7de76 1056 _LttngDataStream(data_stream_path, this_beacons_json)
71f56e5f 1057 )
584af91e 1058
aca7de76
SM
1059 def _create_metadata_stream(
1060 self, trace_dir: str, metadata_sections_json: Optional[tjson.ArrayVal]
1061 ):
f5567ea8 1062 metadata_path = os.path.join(trace_dir, "metadata")
aca7de76 1063 metadata_sections = [] # type: list[_LttngMetadataStreamSection]
78169723 1064
aca7de76 1065 if metadata_sections_json is None:
f5567ea8 1066 with open(metadata_path, "rb") as metadata_file:
78169723
FD
1067 metadata_sections.append(
1068 _LttngMetadataStreamSection(0, metadata_file.read())
1069 )
1070 else:
1071 metadata_sections = _split_metadata_sections(
aca7de76 1072 metadata_path, metadata_sections_json
78169723
FD
1073 )
1074
1075 self._metadata_stream = _LttngMetadataStream(metadata_path, metadata_sections)
1076
584af91e
PP
1077 @property
1078 def path(self):
1079 return self._path
1080
1081 @property
1082 def metadata_stream(self):
1083 return self._metadata_stream
1084
aca7de76
SM
1085 @overload
1086 def __getitem__(self, index: int) -> _LttngDataStream:
1087 ...
1088
1089 @overload
1090 def __getitem__(self, index: slice) -> Sequence[_LttngDataStream]: # noqa: F811
1091 ...
1092
1093 def __getitem__( # noqa: F811
1094 self, index: Union[int, slice]
1095 ) -> Union[_LttngDataStream, Sequence[_LttngDataStream]]:
584af91e
PP
1096 return self._data_streams[index]
1097
1098 def __len__(self):
1099 return len(self._data_streams)
1100
1101
1102# The state of a single data stream.
1103class _LttngLiveViewerSessionDataStreamState:
aca7de76
SM
1104 def __init__(
1105 self,
1106 ts_state: "_LttngLiveViewerSessionTracingSessionState",
1107 info: _LttngLiveViewerStreamInfo,
1108 data_stream: _LttngDataStream,
1109 metadata_stream_id: int,
1110 ):
584af91e
PP
1111 self._ts_state = ts_state
1112 self._info = info
1113 self._data_stream = data_stream
78169723 1114 self._metadata_stream_id = metadata_stream_id
584af91e
PP
1115 self._cur_index_entry_index = 0
1116 fmt = 'Built data stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
1117 logging.info(
1118 fmt.format(
1119 info.id,
1120 ts_state.tracing_session_descriptor.info.tracing_session_id,
1121 ts_state.tracing_session_descriptor.info.name,
1122 data_stream.path,
1123 )
1124 )
1125
1126 @property
1127 def tracing_session_state(self):
1128 return self._ts_state
1129
1130 @property
1131 def info(self):
1132 return self._info
1133
1134 @property
1135 def data_stream(self):
1136 return self._data_stream
1137
1138 @property
1139 def cur_index_entry(self):
1140 if self._cur_index_entry_index == len(self._data_stream.index):
1141 return
1142
1143 return self._data_stream.index[self._cur_index_entry_index]
1144
aca7de76
SM
1145 @property
1146 def metadata_stream_id(self):
1147 return self._metadata_stream_id
1148
584af91e
PP
1149 def goto_next_index_entry(self):
1150 self._cur_index_entry_index += 1
1151
1152
1153# The state of a single metadata stream.
1154class _LttngLiveViewerSessionMetadataStreamState:
aca7de76
SM
1155 def __init__(
1156 self,
1157 ts_state: "_LttngLiveViewerSessionTracingSessionState",
1158 info: _LttngLiveViewerStreamInfo,
1159 metadata_stream: _LttngMetadataStream,
1160 ):
584af91e
PP
1161 self._ts_state = ts_state
1162 self._info = info
1163 self._metadata_stream = metadata_stream
78169723
FD
1164 self._cur_metadata_stream_section_index = 0
1165 if len(metadata_stream.sections) > 1:
1166 self._next_metadata_stream_section_timestamp = metadata_stream.sections[
1167 1
1168 ].timestamp
1169 else:
1170 self._next_metadata_stream_section_timestamp = None
1171
584af91e
PP
1172 self._is_sent = False
1173 fmt = 'Built metadata stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
1174 logging.info(
1175 fmt.format(
1176 info.id,
1177 ts_state.tracing_session_descriptor.info.tracing_session_id,
1178 ts_state.tracing_session_descriptor.info.name,
1179 metadata_stream.path,
1180 )
1181 )
1182
584af91e
PP
1183 @property
1184 def info(self):
1185 return self._info
1186
1187 @property
1188 def metadata_stream(self):
1189 return self._metadata_stream
1190
1191 @property
1192 def is_sent(self):
1193 return self._is_sent
1194
1195 @is_sent.setter
aca7de76 1196 def is_sent(self, value: bool):
584af91e
PP
1197 self._is_sent = value
1198
78169723
FD
1199 @property
1200 def cur_section(self):
1201 fmt = "Get current metadata section: section-idx={}"
1202 logging.info(fmt.format(self._cur_metadata_stream_section_index))
1203 if self._cur_metadata_stream_section_index == len(
1204 self._metadata_stream.sections
1205 ):
1206 return
1207
1208 return self._metadata_stream.sections[self._cur_metadata_stream_section_index]
1209
1210 def goto_next_section(self):
1211 self._cur_metadata_stream_section_index += 1
1212 if self.cur_section:
1213 self._next_metadata_stream_section_timestamp = self.cur_section.timestamp
1214 else:
1215 self._next_metadata_stream_section_timestamp = None
1216
1217 @property
1218 def next_section_timestamp(self):
1219 return self._next_metadata_stream_section_timestamp
1220
584af91e 1221
ee1171e5
SM
1222# A tracing session descriptor.
1223#
1224# In the constructor, `traces` is a list of LTTng traces (`LttngTrace`
1225# objects).
1226class LttngTracingSessionDescriptor:
1227 def __init__(
aca7de76
SM
1228 self,
1229 name: str,
1230 tracing_session_id: int,
1231 hostname: str,
1232 live_timer_freq: int,
1233 client_count: int,
1234 traces: Iterable[LttngTrace],
ee1171e5
SM
1235 ):
1236 for trace in traces:
1237 if name not in trace.path:
1238 fmt = "Tracing session name must be part of every trace path (`{}` not found in `{}`)"
1239 raise ValueError(fmt.format(name, trace.path))
1240
1241 self._traces = traces
1242 stream_count = sum([len(t) + 1 for t in traces])
1243 self._info = _LttngLiveViewerTracingSessionInfo(
1244 tracing_session_id,
1245 live_timer_freq,
1246 client_count,
1247 stream_count,
1248 hostname,
1249 name,
1250 )
1251
1252 @property
1253 def traces(self):
1254 return self._traces
1255
1256 @property
1257 def info(self):
1258 return self._info
1259
1260
584af91e
PP
1261# The state of a tracing session.
1262class _LttngLiveViewerSessionTracingSessionState:
aca7de76 1263 def __init__(self, tc_descr: LttngTracingSessionDescriptor, base_stream_id: int):
584af91e 1264 self._tc_descr = tc_descr
aca7de76
SM
1265 self._stream_infos = [] # type: list[_LttngLiveViewerStreamInfo]
1266 self._ds_states = {} # type: dict[int, _LttngLiveViewerSessionDataStreamState]
1267 self._ms_states = (
1268 {}
1269 ) # type: dict[int, _LttngLiveViewerSessionMetadataStreamState]
584af91e
PP
1270 stream_id = base_stream_id
1271
1272 for trace in tc_descr.traces:
1273 trace_id = stream_id * 1000
1274
78169723
FD
1275 # Metadata stream -> stream info and metadata stream state
1276 info = _LttngLiveViewerStreamInfo(
f5567ea8 1277 stream_id, trace_id, True, trace.metadata_stream.path, "metadata"
78169723
FD
1278 )
1279 self._stream_infos.append(info)
1280 self._ms_states[stream_id] = _LttngLiveViewerSessionMetadataStreamState(
1281 self, info, trace.metadata_stream
1282 )
1283 metadata_stream_id = stream_id
1284 stream_id += 1
1285
584af91e
PP
1286 # Data streams -> stream infos and data stream states
1287 for data_stream in trace:
1288 info = _LttngLiveViewerStreamInfo(
1289 stream_id,
1290 trace_id,
1291 False,
1292 data_stream.path,
1293 data_stream.channel_name,
1294 )
1295 self._stream_infos.append(info)
1296 self._ds_states[stream_id] = _LttngLiveViewerSessionDataStreamState(
78169723 1297 self, info, data_stream, metadata_stream_id
584af91e
PP
1298 )
1299 stream_id += 1
1300
584af91e
PP
1301 self._is_attached = False
1302 fmt = 'Built tracing session state: id={}, name="{}"'
1303 logging.info(fmt.format(tc_descr.info.tracing_session_id, tc_descr.info.name))
1304
1305 @property
1306 def tracing_session_descriptor(self):
1307 return self._tc_descr
1308
1309 @property
1310 def data_stream_states(self):
1311 return self._ds_states
1312
1313 @property
1314 def metadata_stream_states(self):
1315 return self._ms_states
1316
1317 @property
1318 def stream_infos(self):
1319 return self._stream_infos
1320
1321 @property
1322 def has_new_metadata(self):
1323 return any([not ms.is_sent for ms in self._ms_states.values()])
1324
1325 @property
1326 def is_attached(self):
1327 return self._is_attached
1328
1329 @is_attached.setter
aca7de76 1330 def is_attached(self, value: bool):
584af91e
PP
1331 self._is_attached = value
1332
1333
aca7de76
SM
1334def needs_new_metadata_section(
1335 metadata_stream_state: _LttngLiveViewerSessionMetadataStreamState,
1336 latest_timestamp: int,
1337):
78169723
FD
1338 if metadata_stream_state.next_section_timestamp is None:
1339 return False
1340
1341 if latest_timestamp >= metadata_stream_state.next_section_timestamp:
1342 return True
1343 else:
1344 return False
1345
1346
584af91e
PP
1347# An LTTng live viewer session manages a view on tracing sessions
1348# and replies to commands accordingly.
1349class _LttngLiveViewerSession:
1350 def __init__(
1351 self,
aca7de76
SM
1352 viewer_session_id: int,
1353 tracing_session_descriptors: Iterable[LttngTracingSessionDescriptor],
1354 max_query_data_response_size: Optional[int],
584af91e
PP
1355 ):
1356 self._viewer_session_id = viewer_session_id
aca7de76
SM
1357 self._ts_states = (
1358 {}
1359 ) # type: dict[int, _LttngLiveViewerSessionTracingSessionState]
1360 self._stream_states = (
1361 {}
1362 ) # type: dict[int, _LttngLiveViewerSessionDataStreamState | _LttngLiveViewerSessionMetadataStreamState]
584af91e
PP
1363 self._max_query_data_response_size = max_query_data_response_size
1364 total_stream_infos = 0
1365
1366 for ts_descr in tracing_session_descriptors:
1367 ts_state = _LttngLiveViewerSessionTracingSessionState(
1368 ts_descr, total_stream_infos
1369 )
1370 ts_id = ts_state.tracing_session_descriptor.info.tracing_session_id
1371 self._ts_states[ts_id] = ts_state
1372 total_stream_infos += len(ts_state.stream_infos)
1373
1374 # Update session's stream states to have the new states
1375 self._stream_states.update(ts_state.data_stream_states)
1376 self._stream_states.update(ts_state.metadata_stream_states)
1377
1378 self._command_handlers = {
1379 _LttngLiveViewerAttachToTracingSessionCommand: self._handle_attach_to_tracing_session_command,
1380 _LttngLiveViewerCreateViewerSessionCommand: self._handle_create_viewer_session_command,
1381 _LttngLiveViewerDetachFromTracingSessionCommand: self._handle_detach_from_tracing_session_command,
1382 _LttngLiveViewerGetDataStreamPacketDataCommand: self._handle_get_data_stream_packet_data_command,
1383 _LttngLiveViewerGetMetadataStreamDataCommand: self._handle_get_metadata_stream_data_command,
1384 _LttngLiveViewerGetNewStreamInfosCommand: self._handle_get_new_stream_infos_command,
1385 _LttngLiveViewerGetNextDataStreamIndexEntryCommand: self._handle_get_next_data_stream_index_entry_command,
1386 _LttngLiveViewerGetTracingSessionInfosCommand: self._handle_get_tracing_session_infos_command,
aca7de76 1387 } # type: dict[type[_LttngLiveViewerCommand], Callable[[Any], _LttngLiveViewerReply]]
584af91e
PP
1388
1389 @property
1390 def viewer_session_id(self):
1391 return self._viewer_session_id
1392
aca7de76 1393 def _get_tracing_session_state(self, tracing_session_id: int):
584af91e 1394 if tracing_session_id not in self._ts_states:
f2958352 1395 raise RuntimeError(
f5567ea8 1396 "Unknown tracing session ID {}".format(tracing_session_id)
584af91e
PP
1397 )
1398
1399 return self._ts_states[tracing_session_id]
1400
aca7de76 1401 def _get_data_stream_state(self, stream_id: int):
584af91e 1402 if stream_id not in self._stream_states:
aca7de76 1403 RuntimeError("Unknown stream ID {}".format(stream_id))
584af91e 1404
aca7de76
SM
1405 stream = self._stream_states[stream_id]
1406 if type(stream) is not _LttngLiveViewerSessionDataStreamState:
1407 raise RuntimeError("Stream is not a data stream")
584af91e 1408
aca7de76
SM
1409 return stream
1410
1411 def _get_metadata_stream_state(self, stream_id: int):
1412 if stream_id not in self._stream_states:
1413 RuntimeError("Unknown stream ID {}".format(stream_id))
1414
1415 stream = self._stream_states[stream_id]
1416 if type(stream) is not _LttngLiveViewerSessionMetadataStreamState:
1417 raise RuntimeError("Stream is not a metadata stream")
1418
1419 return stream
1420
1421 def handle_command(self, cmd: _LttngLiveViewerCommand):
584af91e 1422 logging.info(
f5567ea8 1423 "Handling command in viewer session: cmd-cls-name={}".format(
584af91e
PP
1424 cmd.__class__.__name__
1425 )
1426 )
1427 cmd_type = type(cmd)
1428
1429 if cmd_type not in self._command_handlers:
f2958352 1430 raise RuntimeError(
f5567ea8 1431 "Unexpected command: cmd-cls-name={}".format(cmd.__class__.__name__)
584af91e
PP
1432 )
1433
1434 return self._command_handlers[cmd_type](cmd)
1435
aca7de76
SM
1436 def _handle_attach_to_tracing_session_command(
1437 self, cmd: _LttngLiveViewerAttachToTracingSessionCommand
1438 ):
584af91e
PP
1439 fmt = 'Handling "attach to tracing session" command: ts-id={}, offset={}, seek-type={}'
1440 logging.info(fmt.format(cmd.tracing_session_id, cmd.offset, cmd.seek_type))
1441 ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
1442 info = ts_state.tracing_session_descriptor.info
1443
1444 if ts_state.is_attached:
f2958352 1445 raise RuntimeError(
f5567ea8 1446 "Cannot attach to tracing session `{}`: viewer is already attached".format(
584af91e
PP
1447 info.name
1448 )
1449 )
1450
1451 ts_state.is_attached = True
1452 status = _LttngLiveViewerAttachToTracingSessionReply.Status.OK
1453 return _LttngLiveViewerAttachToTracingSessionReply(
1454 status, ts_state.stream_infos
1455 )
1456
aca7de76
SM
1457 def _handle_detach_from_tracing_session_command(
1458 self, cmd: _LttngLiveViewerDetachFromTracingSessionCommand
1459 ):
584af91e
PP
1460 fmt = 'Handling "detach from tracing session" command: ts-id={}'
1461 logging.info(fmt.format(cmd.tracing_session_id))
1462 ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
1463 info = ts_state.tracing_session_descriptor.info
1464
1465 if not ts_state.is_attached:
f2958352 1466 raise RuntimeError(
f5567ea8 1467 "Cannot detach to tracing session `{}`: viewer is not attached".format(
584af91e
PP
1468 info.name
1469 )
1470 )
1471
1472 ts_state.is_attached = False
1473 status = _LttngLiveViewerDetachFromTracingSessionReply.Status.OK
1474 return _LttngLiveViewerDetachFromTracingSessionReply(status)
1475
aca7de76
SM
1476 def _handle_get_next_data_stream_index_entry_command(
1477 self, cmd: _LttngLiveViewerGetNextDataStreamIndexEntryCommand
1478 ):
584af91e
PP
1479 fmt = 'Handling "get next data stream index entry" command: stream-id={}'
1480 logging.info(fmt.format(cmd.stream_id))
aca7de76
SM
1481 stream_state = self._get_data_stream_state(cmd.stream_id)
1482 metadata_stream_state = self._get_metadata_stream_state(
1483 stream_state.metadata_stream_id
1484 )
584af91e
PP
1485
1486 if stream_state.cur_index_entry is None:
1487 # The viewer is done reading this stream
1488 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.HUP
1489
1490 # Dummy data stream index entry to use with the `HUP` status
1491 # (the reply needs one, but the viewer ignores it)
1492 index_entry = _LttngDataStreamIndexEntry(0, 0, 0, 0, 0, 0, 0)
1493
1494 return _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1495 status, index_entry, False, False
1496 )
1497
78169723
FD
1498 timestamp_begin = _get_entry_timestamp_begin(stream_state.cur_index_entry)
1499
1500 if needs_new_metadata_section(metadata_stream_state, timestamp_begin):
1501 metadata_stream_state.is_sent = False
1502 metadata_stream_state.goto_next_section()
1503
584af91e
PP
1504 # The viewer only checks the `has_new_metadata` flag if the
1505 # reply's status is `OK`, so we need to provide an index here
1506 has_new_metadata = stream_state.tracing_session_state.has_new_metadata
aca7de76 1507 if isinstance(stream_state.cur_index_entry, _LttngDataStreamIndexEntry):
71f56e5f
JG
1508 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.OK
1509 else:
71f56e5f
JG
1510 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.INACTIVE
1511
584af91e
PP
1512 reply = _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1513 status, stream_state.cur_index_entry, has_new_metadata, False
1514 )
1515 stream_state.goto_next_index_entry()
1516 return reply
1517
aca7de76
SM
1518 def _handle_get_data_stream_packet_data_command(
1519 self, cmd: _LttngLiveViewerGetDataStreamPacketDataCommand
1520 ):
584af91e
PP
1521 fmt = 'Handling "get data stream packet data" command: stream-id={}, offset={}, req-length={}'
1522 logging.info(fmt.format(cmd.stream_id, cmd.offset, cmd.req_length))
aca7de76 1523 stream_state = self._get_data_stream_state(cmd.stream_id)
584af91e
PP
1524 data_response_length = cmd.req_length
1525
584af91e
PP
1526 if stream_state.tracing_session_state.has_new_metadata:
1527 status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.ERROR
1528 return _LttngLiveViewerGetDataStreamPacketDataReply(
1529 status, bytes(), True, False
1530 )
1531
1532 if self._max_query_data_response_size:
1533 # Enforce a server side limit on the query requested length.
1534 # To ensure that the transaction terminate take the minimum of both
1535 # value.
1536 data_response_length = min(
1537 cmd.req_length, self._max_query_data_response_size
1538 )
1539 fmt = 'Limiting "get data stream packet data" command: req-length={} actual response size={}'
1540 logging.info(fmt.format(cmd.req_length, data_response_length))
1541
1542 data = stream_state.data_stream.get_data(cmd.offset, data_response_length)
1543 status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.OK
1544 return _LttngLiveViewerGetDataStreamPacketDataReply(status, data, False, False)
1545
aca7de76
SM
1546 def _handle_get_metadata_stream_data_command(
1547 self, cmd: _LttngLiveViewerGetMetadataStreamDataCommand
1548 ):
584af91e
PP
1549 fmt = 'Handling "get metadata stream data" command: stream-id={}'
1550 logging.info(fmt.format(cmd.stream_id))
aca7de76 1551 metadata_stream_state = self._get_metadata_stream_state(cmd.stream_id)
584af91e 1552
78169723 1553 if metadata_stream_state.is_sent:
584af91e
PP
1554 status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.NO_NEW
1555 return _LttngLiveViewerGetMetadataStreamDataContentReply(status, bytes())
1556
78169723 1557 metadata_stream_state.is_sent = True
584af91e 1558 status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.OK
78169723 1559 metadata_section = metadata_stream_state.cur_section
b8b97725 1560 assert metadata_section is not None
78169723
FD
1561
1562 # If we are sending an empty section, ready the next one right away.
1563 if len(metadata_section.data) == 0:
1564 metadata_stream_state.is_sent = False
1565 metadata_stream_state.goto_next_section()
1566
1567 fmt = 'Replying to "get metadata stream data" command: metadata-size={}'
1568 logging.info(fmt.format(len(metadata_section.data)))
584af91e 1569 return _LttngLiveViewerGetMetadataStreamDataContentReply(
78169723 1570 status, metadata_section.data
584af91e
PP
1571 )
1572
aca7de76
SM
1573 def _handle_get_new_stream_infos_command(
1574 self, cmd: _LttngLiveViewerGetNewStreamInfosCommand
1575 ):
584af91e
PP
1576 fmt = 'Handling "get new stream infos" command: ts-id={}'
1577 logging.info(fmt.format(cmd.tracing_session_id))
1578
1579 # As of this version, all the tracing session's stream infos are
1580 # always given to the viewer when sending the "attach to tracing
1581 # session" reply, so there's nothing new here. Return the `HUP`
1582 # status as, if we're handling this command, the viewer consumed
1583 # all the existing data streams.
1584 status = _LttngLiveViewerGetNewStreamInfosReply.Status.HUP
1585 return _LttngLiveViewerGetNewStreamInfosReply(status, [])
1586
aca7de76
SM
1587 def _handle_get_tracing_session_infos_command(
1588 self, cmd: _LttngLiveViewerGetTracingSessionInfosCommand
1589 ):
584af91e
PP
1590 logging.info('Handling "get tracing session infos" command.')
1591 infos = [
1592 tss.tracing_session_descriptor.info for tss in self._ts_states.values()
1593 ]
1594 infos.sort(key=lambda info: info.name)
1595 return _LttngLiveViewerGetTracingSessionInfosReply(infos)
1596
aca7de76
SM
1597 def _handle_create_viewer_session_command(
1598 self, cmd: _LttngLiveViewerCreateViewerSessionCommand
1599 ):
584af91e
PP
1600 logging.info('Handling "create viewer session" command.')
1601 status = _LttngLiveViewerCreateViewerSessionReply.Status.OK
1602
1603 # This does nothing here. In the LTTng relay daemon, it
1604 # allocates the viewer session's state.
1605 return _LttngLiveViewerCreateViewerSessionReply(status)
1606
1607
1608# An LTTng live TCP server.
1609#
e51141d3
SM
1610# On creation, it binds to `localhost` on the TCP port `port` if not `None`, or
1611# on an OS-assigned TCP port otherwise. It writes the decimal TCP port number
1612# to a temporary port file. It renames the temporary port file to
1613# `port_filename`.
584af91e
PP
1614#
1615# `tracing_session_descriptors` is a list of tracing session descriptors
1616# (`LttngTracingSessionDescriptor`) to serve.
1617#
1618# This server accepts a single viewer (client).
1619#
1620# When the viewer closes the connection, the server's constructor
1621# returns.
1622class LttngLiveServer:
1623 def __init__(
e51141d3 1624 self,
aca7de76 1625 port: Optional[int],
d11f45ef 1626 port_filename: Optional[str],
aca7de76
SM
1627 tracing_session_descriptors: Iterable[LttngTracingSessionDescriptor],
1628 max_query_data_response_size: Optional[int],
584af91e 1629 ):
f5567ea8 1630 logging.info("Server configuration:")
584af91e 1631
f5567ea8 1632 logging.info(" Port file name: `{}`".format(port_filename))
584af91e
PP
1633
1634 if max_query_data_response_size is not None:
1635 logging.info(
f5567ea8 1636 " Maximum response data query size: `{}`".format(
584af91e
PP
1637 max_query_data_response_size
1638 )
1639 )
1640
1641 for ts_descr in tracing_session_descriptors:
1642 info = ts_descr.info
1643 fmt = ' TS descriptor: name="{}", id={}, hostname="{}", live-timer-freq={}, client-count={}, stream-count={}:'
1644 logging.info(
1645 fmt.format(
1646 info.name,
1647 info.tracing_session_id,
1648 info.hostname,
1649 info.live_timer_freq,
1650 info.client_count,
1651 info.stream_count,
1652 )
1653 )
1654
1655 for trace in ts_descr.traces:
1656 logging.info(' Trace: path="{}"'.format(trace.path))
1657
1658 self._ts_descriptors = tracing_session_descriptors
1659 self._max_query_data_response_size = max_query_data_response_size
1660 self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1661 self._codec = _LttngLiveViewerProtocolCodec()
1662
1663 # Port 0: OS assigns an unused port
e51141d3 1664 serv_addr = ("localhost", port if port is not None else 0)
584af91e 1665 self._sock.bind(serv_addr)
d11f45ef
SM
1666
1667 if port_filename is not None:
1668 self._write_port_to_file(port_filename)
1669
1670 print("Listening on port {}".format(self._server_port))
584af91e 1671
2439d12b
SM
1672 for ts_descr in tracing_session_descriptors:
1673 info = ts_descr.info
1674 print(
1675 "net://localhost:{}/host/{}/{}".format(
1676 self._server_port, info.hostname, info.name
1677 )
1678 )
1679
584af91e
PP
1680 try:
1681 self._listen()
1682 finally:
1683 self._sock.close()
f5567ea8 1684 logging.info("Closed connection and socket.")
584af91e
PP
1685
1686 @property
1687 def _server_port(self):
1688 return self._sock.getsockname()[1]
1689
1690 def _recv_command(self):
1691 data = bytes()
1692
1693 while True:
f5567ea8 1694 logging.info("Waiting for viewer command.")
584af91e
PP
1695 buf = self._conn.recv(128)
1696
1697 if not buf:
f5567ea8 1698 logging.info("Client closed connection.")
584af91e
PP
1699
1700 if data:
f2958352 1701 raise RuntimeError(
f5567ea8 1702 "Client closed connection after having sent {} command bytes.".format(
584af91e
PP
1703 len(data)
1704 )
1705 )
1706
1707 return
1708
f5567ea8 1709 logging.info("Received data from viewer: length={}".format(len(buf)))
584af91e
PP
1710
1711 data += buf
1712
1713 try:
1714 cmd = self._codec.decode(data)
1715 except struct.error as exc:
f2958352 1716 raise RuntimeError("Malformed command: {}".format(exc)) from exc
584af91e
PP
1717
1718 if cmd is not None:
1719 logging.info(
f5567ea8 1720 "Received command from viewer: cmd-cls-name={}".format(
584af91e
PP
1721 cmd.__class__.__name__
1722 )
1723 )
1724 return cmd
1725
aca7de76 1726 def _send_reply(self, reply: _LttngLiveViewerReply):
584af91e
PP
1727 data = self._codec.encode(reply)
1728 logging.info(
f5567ea8 1729 "Sending reply to viewer: reply-cls-name={}, length={}".format(
584af91e
PP
1730 reply.__class__.__name__, len(data)
1731 )
1732 )
1733 self._conn.sendall(data)
1734
1735 def _handle_connection(self):
1736 # First command must be "connect"
1737 cmd = self._recv_command()
1738
1739 if type(cmd) is not _LttngLiveViewerConnectCommand:
f2958352 1740 raise RuntimeError(
584af91e
PP
1741 'First command is not "connect": cmd-cls-name={}'.format(
1742 cmd.__class__.__name__
1743 )
1744 )
1745
1746 # Create viewer session (arbitrary ID 23)
1747 logging.info(
f5567ea8 1748 "LTTng live viewer connected: version={}.{}".format(cmd.major, cmd.minor)
584af91e
PP
1749 )
1750 viewer_session = _LttngLiveViewerSession(
1751 23, self._ts_descriptors, self._max_query_data_response_size
1752 )
1753
1754 # Send "connect" reply
1755 self._send_reply(
1756 _LttngLiveViewerConnectReply(viewer_session.viewer_session_id, 2, 10)
1757 )
1758
1759 # Make the viewer session handle the remaining commands
1760 while True:
1761 cmd = self._recv_command()
1762
1763 if cmd is None:
1764 # Connection closed (at an expected location within the
1765 # conversation)
1766 return
1767
1768 self._send_reply(viewer_session.handle_command(cmd))
1769
1770 def _listen(self):
f5567ea8 1771 logging.info("Listening: port={}".format(self._server_port))
1726ac08
JR
1772 # Backlog must be present for Python version < 3.5.
1773 # 128 is an arbitrary number since we expect only 1 connection anyway.
1774 self._sock.listen(128)
584af91e
PP
1775 self._conn, viewer_addr = self._sock.accept()
1776 logging.info(
f5567ea8 1777 "Accepted viewer: addr={}:{}".format(viewer_addr[0], viewer_addr[1])
584af91e
PP
1778 )
1779
1780 try:
1781 self._handle_connection()
1782 finally:
1783 self._conn.close()
1784
aca7de76 1785 def _write_port_to_file(self, port_filename: str):
584af91e 1786 # Write the port number to a temporary file.
a0a92e79
SM
1787 with tempfile.NamedTemporaryFile(
1788 mode="w", delete=False, dir=os.path.dirname(port_filename)
1789 ) as tmp_port_file:
f5567ea8 1790 print(self._server_port, end="", file=tmp_port_file)
584af91e 1791
6362d281
SM
1792 # Rename temporary file to real file.
1793 #
1794 # For unknown reasons, on Windows, moving the port file from its
1795 # temporary location to its final location (where the user of
1796 # the server expects it to appear) may raise a `PermissionError`
1797 # exception.
1798 #
1799 # We suppose it's possible that something in the Windows kernel
1800 # hasn't completely finished using the file when we try to move
1801 # it.
1802 #
1803 # Use a wait-and-retry scheme as a (bad) workaround.
1804 num_attempts = 5
1805 retry_delay_s = 1
1806
1807 for attempt in reversed(range(num_attempts)):
1808 try:
1809 os.replace(tmp_port_file.name, port_filename)
1810 logging.info(
1811 'Renamed port file: src-path="{}", dst-path="{}"'.format(
1812 tmp_port_file.name, port_filename
1813 )
1814 )
1815 return
1816 except PermissionError:
1817 logging.info(
1818 'Permission error while attempting to rename port file; retrying in {} second: src-path="{}", dst-path="{}"'.format(
1819 retry_delay_s, tmp_port_file.name, port_filename
1820 )
1821 )
1822
1823 if attempt == 0:
1824 raise
1825
1826 time.sleep(retry_delay_s)
584af91e
PP
1827
1828
aca7de76
SM
1829def _session_descriptors_from_path(
1830 sessions_filename: str, trace_path_prefix: Optional[str]
1831):
2b763e29
JG
1832 # File format is:
1833 #
1834 # [
1835 # {
1836 # "name": "my-session",
1837 # "id": 17,
1838 # "hostname": "myhost",
1839 # "live-timer-freq": 1000000,
1840 # "client-count": 23,
1841 # "traces": [
1842 # {
1843 # "path": "lol"
1844 # },
1845 # {
71f56e5f
JG
1846 # "path": "meow/mix",
1847 # "beacons": {
1848 # "my_stream": [ 5235787, 728375283 ]
78169723
FD
1849 # },
1850 # "metadata-sections": [
1851 # {
1852 # "line": 1,
1853 # "timestamp": 0
1854 # }
1855 # ]
2b763e29
JG
1856 # }
1857 # ]
1858 # }
1859 # ]
f5567ea8 1860 with open(sessions_filename, "r") as sessions_file:
aca7de76
SM
1861 sessions_json = tjson.load(sessions_file, tjson.ArrayVal)
1862
1863 sessions = [] # type: list[LttngTracingSessionDescriptor]
2b763e29 1864
aca7de76
SM
1865 for session_json in sessions_json.iter(tjson.ObjVal):
1866 name = session_json.at("name", tjson.StrVal).val
1867 tracing_session_id = session_json.at("id", tjson.IntVal).val
1868 hostname = session_json.at("hostname", tjson.StrVal).val
1869 live_timer_freq = session_json.at("live-timer-freq", tjson.IntVal).val
1870 client_count = session_json.at("client-count", tjson.IntVal).val
1871 traces_json = session_json.at("traces", tjson.ArrayVal)
2b763e29 1872
aca7de76 1873 traces = [] # type: list[LttngTrace]
2b763e29 1874
aca7de76
SM
1875 for trace_json in traces_json.iter(tjson.ObjVal):
1876 metadata_sections = (
1877 trace_json.at("metadata-sections", tjson.ArrayVal)
1878 if "metadata-sections" in trace_json
1879 else None
1880 )
1881 beacons = (
1882 trace_json.at("beacons", tjson.ObjVal)
1883 if "beacons" in trace_json
1884 else None
1885 )
1886 path = trace_json.at("path", tjson.StrVal).val
2b763e29 1887
aca7de76 1888 if not os.path.isabs(path) and trace_path_prefix:
2b763e29
JG
1889 path = os.path.join(trace_path_prefix, path)
1890
aca7de76
SM
1891 traces.append(
1892 LttngTrace(
1893 path,
1894 metadata_sections,
1895 beacons,
1896 )
1897 )
2b763e29
JG
1898
1899 sessions.append(
1900 LttngTracingSessionDescriptor(
1901 name,
1902 tracing_session_id,
1903 hostname,
1904 live_timer_freq,
1905 client_count,
1906 traces,
1907 )
1908 )
1909
1910 return sessions
584af91e
PP
1911
1912
aca7de76 1913def _loglevel_parser(string: str):
f5567ea8 1914 loglevels = {"info": logging.INFO, "warning": logging.WARNING}
584af91e
PP
1915 if string not in loglevels:
1916 msg = "{} is not a valid loglevel".format(string)
1917 raise argparse.ArgumentTypeError(msg)
1918 return loglevels[string]
1919
1920
f5567ea8
FD
1921if __name__ == "__main__":
1922 logging.basicConfig(format="# %(asctime)-25s%(message)s")
584af91e 1923 parser = argparse.ArgumentParser(
f5567ea8 1924 description="LTTng-live protocol mocker", add_help=False
584af91e
PP
1925 )
1926 parser.add_argument(
f5567ea8
FD
1927 "--log-level",
1928 default="warning",
1929 choices=["info", "warning"],
1930 help="The loglevel to be used.",
584af91e
PP
1931 )
1932
1933 loglevel_namespace, remaining_args = parser.parse_known_args()
1934 logging.getLogger().setLevel(_loglevel_parser(loglevel_namespace.log_level))
1935
e51141d3
SM
1936 parser.add_argument(
1937 "--port",
1938 help="The port to bind to. If missing, use an OS-assigned port..",
1939 type=int,
1940 )
584af91e 1941 parser.add_argument(
f5567ea8
FD
1942 "--port-filename",
1943 help="The final port file. This file is present when the server is ready to receive connection.",
584af91e
PP
1944 )
1945 parser.add_argument(
f5567ea8 1946 "--max-query-data-response-size",
584af91e 1947 type=int,
f5567ea8 1948 help="The maximum size of control data response in bytes",
584af91e
PP
1949 )
1950 parser.add_argument(
f5567ea8 1951 "--trace-path-prefix",
2b763e29 1952 type=str,
f5567ea8 1953 help="Prefix to prepend to the trace paths of session configurations",
2b763e29
JG
1954 )
1955 parser.add_argument(
e46b8462 1956 "sessions_filename",
776a2a25 1957 type=str,
f5567ea8 1958 help="Path to a session configuration file",
e46b8462 1959 metavar="sessions-filename",
584af91e
PP
1960 )
1961 parser.add_argument(
f5567ea8
FD
1962 "-h",
1963 "--help",
1964 action="help",
584af91e 1965 default=argparse.SUPPRESS,
f5567ea8 1966 help="Show this help message and exit.",
584af91e
PP
1967 )
1968
1969 args = parser.parse_args(args=remaining_args)
f2958352
SM
1970 sessions_filename = args.sessions_filename # type: str
1971 trace_path_prefix = args.trace_path_prefix # type: str | None
1972 sessions = _session_descriptors_from_path(
1973 sessions_filename,
1974 trace_path_prefix,
1975 )
aca7de76 1976
f2958352 1977 port = args.port # type: int | None
d11f45ef 1978 port_filename = args.port_filename # type: str | None
f2958352
SM
1979 max_query_data_response_size = args.max_query_data_response_size # type: int | None
1980 LttngLiveServer(port, port_filename, sessions, max_query_data_response_size)
This page took 0.143171 seconds and 4 git commands to generate.