bt2c::Logger: remove unused cLevel() method
[babeltrace.git] / tests / data / plugins / src.ctf.lttng-live / lttng_live_server.py
CommitLineData
0235b0db 1# SPDX-License-Identifier: MIT
584af91e 2#
0235b0db 3# Copyright (C) 2019 Philippe Proulx <pproulx@efficios.com>
584af91e 4#
584af91e 5
aca7de76
SM
6# pyright: strict, reportTypeCommentUsage=false, reportMissingTypeStubs=false
7
584af91e 8import os
584af91e 9import re
6362d281 10import time
584af91e
PP
11import socket
12import struct
5995b304
SM
13import logging
14import os.path
15import argparse
584af91e 16import tempfile
3c22c122 17from abc import ABC, abstractmethod
aca7de76
SM
18from typing import Dict, Union, Iterable, Optional, Sequence, overload
19
20import tjson
21
22# isort: off
23from typing import Any, Callable # noqa: F401
24
25# isort: on
584af91e
PP
26
27
ee1171e5
SM
28# An entry within the index of an LTTng data stream.
29class _LttngDataStreamIndexEntry:
30 def __init__(
31 self,
aca7de76
SM
32 offset_bytes: int,
33 total_size_bits: int,
34 content_size_bits: int,
35 timestamp_begin: int,
36 timestamp_end: int,
37 events_discarded: int,
38 stream_class_id: int,
ee1171e5
SM
39 ):
40 self._offset_bytes = offset_bytes
41 self._total_size_bits = total_size_bits
42 self._content_size_bits = content_size_bits
43 self._timestamp_begin = timestamp_begin
44 self._timestamp_end = timestamp_end
45 self._events_discarded = events_discarded
46 self._stream_class_id = stream_class_id
47
48 @property
49 def offset_bytes(self):
50 return self._offset_bytes
51
52 @property
53 def total_size_bits(self):
54 return self._total_size_bits
55
56 @property
57 def total_size_bytes(self):
58 return self._total_size_bits // 8
59
60 @property
61 def content_size_bits(self):
62 return self._content_size_bits
63
64 @property
65 def content_size_bytes(self):
66 return self._content_size_bits // 8
67
68 @property
69 def timestamp_begin(self):
70 return self._timestamp_begin
71
72 @property
73 def timestamp_end(self):
74 return self._timestamp_end
75
76 @property
77 def events_discarded(self):
78 return self._events_discarded
79
80 @property
81 def stream_class_id(self):
82 return self._stream_class_id
83
84
85# An entry within the index of an LTTng data stream. While a stream beacon entry
86# is conceptually unrelated to an index, it is sent as a reply to a
87# LttngLiveViewerGetNextDataStreamIndexEntryCommand
aca7de76
SM
88class _LttngDataStreamBeaconIndexEntry:
89 def __init__(self, stream_class_id: int, timestamp: int):
ee1171e5
SM
90 self._stream_class_id = stream_class_id
91 self._timestamp = timestamp
92
93 @property
94 def timestamp(self):
95 return self._timestamp
96
97 @property
98 def stream_class_id(self):
99 return self._stream_class_id
100
101
aca7de76
SM
102_LttngIndexEntryT = Union[_LttngDataStreamIndexEntry, _LttngDataStreamBeaconIndexEntry]
103
104
584af91e 105class _LttngLiveViewerCommand:
aca7de76 106 def __init__(self, version: int):
584af91e
PP
107 self._version = version
108
109 @property
110 def version(self):
111 return self._version
112
113
114class _LttngLiveViewerConnectCommand(_LttngLiveViewerCommand):
aca7de76 115 def __init__(self, version: int, viewer_session_id: int, major: int, minor: int):
584af91e
PP
116 super().__init__(version)
117 self._viewer_session_id = viewer_session_id
118 self._major = major
119 self._minor = minor
120
121 @property
122 def viewer_session_id(self):
123 return self._viewer_session_id
124
125 @property
126 def major(self):
127 return self._major
128
129 @property
130 def minor(self):
131 return self._minor
132
133
aca7de76
SM
134class _LttngLiveViewerReply:
135 pass
136
137
138class _LttngLiveViewerConnectReply(_LttngLiveViewerReply):
139 def __init__(self, viewer_session_id: int, major: int, minor: int):
584af91e
PP
140 self._viewer_session_id = viewer_session_id
141 self._major = major
142 self._minor = minor
143
144 @property
145 def viewer_session_id(self):
146 return self._viewer_session_id
147
148 @property
149 def major(self):
150 return self._major
151
152 @property
153 def minor(self):
154 return self._minor
155
156
157class _LttngLiveViewerGetTracingSessionInfosCommand(_LttngLiveViewerCommand):
158 pass
159
160
161class _LttngLiveViewerTracingSessionInfo:
162 def __init__(
163 self,
aca7de76
SM
164 tracing_session_id: int,
165 live_timer_freq: int,
166 client_count: int,
167 stream_count: int,
168 hostname: str,
169 name: str,
584af91e
PP
170 ):
171 self._tracing_session_id = tracing_session_id
172 self._live_timer_freq = live_timer_freq
173 self._client_count = client_count
174 self._stream_count = stream_count
175 self._hostname = hostname
176 self._name = name
177
178 @property
179 def tracing_session_id(self):
180 return self._tracing_session_id
181
182 @property
183 def live_timer_freq(self):
184 return self._live_timer_freq
185
186 @property
187 def client_count(self):
188 return self._client_count
189
190 @property
191 def stream_count(self):
192 return self._stream_count
193
194 @property
195 def hostname(self):
196 return self._hostname
197
198 @property
199 def name(self):
200 return self._name
201
202
aca7de76
SM
203class _LttngLiveViewerGetTracingSessionInfosReply(_LttngLiveViewerReply):
204 def __init__(
205 self, tracing_session_infos: Sequence[_LttngLiveViewerTracingSessionInfo]
206 ):
584af91e
PP
207 self._tracing_session_infos = tracing_session_infos
208
209 @property
210 def tracing_session_infos(self):
211 return self._tracing_session_infos
212
213
214class _LttngLiveViewerAttachToTracingSessionCommand(_LttngLiveViewerCommand):
215 class SeekType:
216 BEGINNING = 1
217 LAST = 2
218
aca7de76
SM
219 def __init__(
220 self, version: int, tracing_session_id: int, offset: int, seek_type: int
221 ):
584af91e
PP
222 super().__init__(version)
223 self._tracing_session_id = tracing_session_id
224 self._offset = offset
225 self._seek_type = seek_type
226
227 @property
228 def tracing_session_id(self):
229 return self._tracing_session_id
230
231 @property
232 def offset(self):
233 return self._offset
234
235 @property
236 def seek_type(self):
237 return self._seek_type
238
239
240class _LttngLiveViewerStreamInfo:
aca7de76
SM
241 def __init__(
242 self, id: int, trace_id: int, is_metadata: bool, path: str, channel_name: str
243 ):
584af91e
PP
244 self._id = id
245 self._trace_id = trace_id
246 self._is_metadata = is_metadata
247 self._path = path
248 self._channel_name = channel_name
249
250 @property
251 def id(self):
252 return self._id
253
254 @property
255 def trace_id(self):
256 return self._trace_id
257
258 @property
259 def is_metadata(self):
260 return self._is_metadata
261
262 @property
263 def path(self):
264 return self._path
265
266 @property
267 def channel_name(self):
268 return self._channel_name
269
270
aca7de76 271class _LttngLiveViewerAttachToTracingSessionReply(_LttngLiveViewerReply):
584af91e
PP
272 class Status:
273 OK = 1
274 ALREADY = 2
275 UNKNOWN = 3
276 NOT_LIVE = 4
277 SEEK_ERROR = 5
278 NO_SESSION = 6
279
aca7de76 280 def __init__(self, status: int, stream_infos: Sequence[_LttngLiveViewerStreamInfo]):
584af91e
PP
281 self._status = status
282 self._stream_infos = stream_infos
283
284 @property
285 def status(self):
286 return self._status
287
288 @property
289 def stream_infos(self):
290 return self._stream_infos
291
292
293class _LttngLiveViewerGetNextDataStreamIndexEntryCommand(_LttngLiveViewerCommand):
aca7de76 294 def __init__(self, version: int, stream_id: int):
584af91e
PP
295 super().__init__(version)
296 self._stream_id = stream_id
297
298 @property
299 def stream_id(self):
300 return self._stream_id
301
302
aca7de76 303class _LttngLiveViewerGetNextDataStreamIndexEntryReply(_LttngLiveViewerReply):
584af91e
PP
304 class Status:
305 OK = 1
306 RETRY = 2
307 HUP = 3
308 ERROR = 4
309 INACTIVE = 5
310 EOF = 6
311
aca7de76
SM
312 def __init__(
313 self,
314 status: int,
315 index_entry: _LttngIndexEntryT,
316 has_new_metadata: bool,
317 has_new_data_stream: bool,
318 ):
584af91e
PP
319 self._status = status
320 self._index_entry = index_entry
321 self._has_new_metadata = has_new_metadata
322 self._has_new_data_stream = has_new_data_stream
323
324 @property
325 def status(self):
326 return self._status
327
328 @property
329 def index_entry(self):
330 return self._index_entry
331
332 @property
333 def has_new_metadata(self):
334 return self._has_new_metadata
335
336 @property
337 def has_new_data_stream(self):
338 return self._has_new_data_stream
339
340
341class _LttngLiveViewerGetDataStreamPacketDataCommand(_LttngLiveViewerCommand):
aca7de76 342 def __init__(self, version: int, stream_id: int, offset: int, req_length: int):
584af91e
PP
343 super().__init__(version)
344 self._stream_id = stream_id
345 self._offset = offset
346 self._req_length = req_length
347
348 @property
349 def stream_id(self):
350 return self._stream_id
351
352 @property
353 def offset(self):
354 return self._offset
355
356 @property
357 def req_length(self):
358 return self._req_length
359
360
aca7de76 361class _LttngLiveViewerGetDataStreamPacketDataReply(_LttngLiveViewerReply):
584af91e
PP
362 class Status:
363 OK = 1
364 RETRY = 2
365 ERROR = 3
366 EOF = 4
367
aca7de76
SM
368 def __init__(
369 self,
370 status: int,
371 data: bytes,
372 has_new_metadata: bool,
373 has_new_data_stream: bool,
374 ):
584af91e
PP
375 self._status = status
376 self._data = data
377 self._has_new_metadata = has_new_metadata
378 self._has_new_data_stream = has_new_data_stream
379
380 @property
381 def status(self):
382 return self._status
383
384 @property
385 def data(self):
386 return self._data
387
388 @property
389 def has_new_metadata(self):
390 return self._has_new_metadata
391
392 @property
393 def has_new_data_stream(self):
394 return self._has_new_data_stream
395
396
397class _LttngLiveViewerGetMetadataStreamDataCommand(_LttngLiveViewerCommand):
aca7de76 398 def __init__(self, version: int, stream_id: int):
584af91e
PP
399 super().__init__(version)
400 self._stream_id = stream_id
401
402 @property
403 def stream_id(self):
404 return self._stream_id
405
406
aca7de76 407class _LttngLiveViewerGetMetadataStreamDataContentReply(_LttngLiveViewerReply):
584af91e
PP
408 class Status:
409 OK = 1
410 NO_NEW = 2
411 ERROR = 3
412
aca7de76 413 def __init__(self, status: int, data: bytes):
584af91e
PP
414 self._status = status
415 self._data = data
416
417 @property
418 def status(self):
419 return self._status
420
421 @property
422 def data(self):
423 return self._data
424
425
426class _LttngLiveViewerGetNewStreamInfosCommand(_LttngLiveViewerCommand):
aca7de76 427 def __init__(self, version: int, tracing_session_id: int):
584af91e
PP
428 super().__init__(version)
429 self._tracing_session_id = tracing_session_id
430
431 @property
432 def tracing_session_id(self):
433 return self._tracing_session_id
434
435
aca7de76 436class _LttngLiveViewerGetNewStreamInfosReply(_LttngLiveViewerReply):
584af91e
PP
437 class Status:
438 OK = 1
439 NO_NEW = 2
440 ERROR = 3
441 HUP = 4
442
aca7de76 443 def __init__(self, status: int, stream_infos: Sequence[_LttngLiveViewerStreamInfo]):
584af91e
PP
444 self._status = status
445 self._stream_infos = stream_infos
446
447 @property
448 def status(self):
449 return self._status
450
451 @property
452 def stream_infos(self):
453 return self._stream_infos
454
455
456class _LttngLiveViewerCreateViewerSessionCommand(_LttngLiveViewerCommand):
457 pass
458
459
aca7de76 460class _LttngLiveViewerCreateViewerSessionReply(_LttngLiveViewerReply):
584af91e
PP
461 class Status:
462 OK = 1
463 ERROR = 2
464
aca7de76 465 def __init__(self, status: int):
584af91e
PP
466 self._status = status
467
468 @property
469 def status(self):
470 return self._status
471
472
473class _LttngLiveViewerDetachFromTracingSessionCommand(_LttngLiveViewerCommand):
aca7de76 474 def __init__(self, version: int, tracing_session_id: int):
584af91e
PP
475 super().__init__(version)
476 self._tracing_session_id = tracing_session_id
477
478 @property
479 def tracing_session_id(self):
480 return self._tracing_session_id
481
482
aca7de76 483class _LttngLiveViewerDetachFromTracingSessionReply(_LttngLiveViewerReply):
584af91e
PP
484 class Status:
485 OK = 1
486 UNKNOWN = 2
487 ERROR = 3
488
aca7de76 489 def __init__(self, status: int):
584af91e
PP
490 self._status = status
491
492 @property
493 def status(self):
494 return self._status
495
496
497# An LTTng live protocol codec can convert bytes to command objects and
498# reply objects to bytes.
499class _LttngLiveViewerProtocolCodec:
f5567ea8 500 _COMMAND_HEADER_STRUCT_FMT = "QII"
584af91e
PP
501 _COMMAND_HEADER_SIZE_BYTES = struct.calcsize(_COMMAND_HEADER_STRUCT_FMT)
502
503 def __init__(self):
504 pass
505
aca7de76 506 def _unpack(self, fmt: str, data: bytes, offset: int = 0):
f5567ea8 507 fmt = "!" + fmt
584af91e
PP
508 return struct.unpack_from(fmt, data, offset)
509
aca7de76 510 def _unpack_payload(self, fmt: str, data: bytes):
584af91e
PP
511 return self._unpack(
512 fmt, data, _LttngLiveViewerProtocolCodec._COMMAND_HEADER_SIZE_BYTES
513 )
514
aca7de76 515 def decode(self, data: bytes):
584af91e
PP
516 if len(data) < self._COMMAND_HEADER_SIZE_BYTES:
517 # Not enough data to read the command header
518 return
519
520 payload_size, cmd_type, version = self._unpack(
521 self._COMMAND_HEADER_STRUCT_FMT, data
522 )
523 logging.info(
f5567ea8 524 "Decoded command header: payload-size={}, cmd-type={}, version={}".format(
584af91e
PP
525 payload_size, cmd_type, version
526 )
527 )
528
529 if len(data) < self._COMMAND_HEADER_SIZE_BYTES + payload_size:
530 # Not enough data to read the whole command
531 return
532
533 if cmd_type == 1:
aca7de76 534 viewer_session_id, major, minor, _ = self._unpack_payload("QIII", data)
584af91e
PP
535 return _LttngLiveViewerConnectCommand(
536 version, viewer_session_id, major, minor
537 )
538 elif cmd_type == 2:
539 return _LttngLiveViewerGetTracingSessionInfosCommand(version)
540 elif cmd_type == 3:
f5567ea8 541 tracing_session_id, offset, seek_type = self._unpack_payload("QQI", data)
584af91e
PP
542 return _LttngLiveViewerAttachToTracingSessionCommand(
543 version, tracing_session_id, offset, seek_type
544 )
545 elif cmd_type == 4:
f5567ea8 546 (stream_id,) = self._unpack_payload("Q", data)
584af91e
PP
547 return _LttngLiveViewerGetNextDataStreamIndexEntryCommand(
548 version, stream_id
549 )
550 elif cmd_type == 5:
f5567ea8 551 stream_id, offset, req_length = self._unpack_payload("QQI", data)
584af91e
PP
552 return _LttngLiveViewerGetDataStreamPacketDataCommand(
553 version, stream_id, offset, req_length
554 )
555 elif cmd_type == 6:
f5567ea8 556 (stream_id,) = self._unpack_payload("Q", data)
584af91e
PP
557 return _LttngLiveViewerGetMetadataStreamDataCommand(version, stream_id)
558 elif cmd_type == 7:
f5567ea8 559 (tracing_session_id,) = self._unpack_payload("Q", data)
584af91e
PP
560 return _LttngLiveViewerGetNewStreamInfosCommand(version, tracing_session_id)
561 elif cmd_type == 8:
562 return _LttngLiveViewerCreateViewerSessionCommand(version)
563 elif cmd_type == 9:
f5567ea8 564 (tracing_session_id,) = self._unpack_payload("Q", data)
584af91e
PP
565 return _LttngLiveViewerDetachFromTracingSessionCommand(
566 version, tracing_session_id
567 )
568 else:
f2958352 569 raise RuntimeError("Unknown command type {}".format(cmd_type))
584af91e 570
aca7de76 571 def _pack(self, fmt: str, *args: Any):
584af91e 572 # Force network byte order
f5567ea8 573 return struct.pack("!" + fmt, *args)
584af91e 574
aca7de76 575 def _encode_zero_padded_str(self, string: str, length: int):
584af91e 576 data = string.encode()
f5567ea8 577 return data.ljust(length, b"\x00")
584af91e 578
aca7de76 579 def _encode_stream_info(self, info: _LttngLiveViewerStreamInfo):
f5567ea8 580 data = self._pack("QQI", info.id, info.trace_id, int(info.is_metadata))
584af91e
PP
581 data += self._encode_zero_padded_str(info.path, 4096)
582 data += self._encode_zero_padded_str(info.channel_name, 255)
583 return data
584
aca7de76
SM
585 def _get_has_new_stuff_flags(
586 self, has_new_metadata: bool, has_new_data_streams: bool
587 ):
584af91e
PP
588 flags = 0
589
590 if has_new_metadata:
591 flags |= 1
592
593 if has_new_data_streams:
594 flags |= 2
595
596 return flags
597
aca7de76
SM
598 def encode(
599 self,
600 reply: _LttngLiveViewerReply,
601 ) -> bytes:
584af91e
PP
602 if type(reply) is _LttngLiveViewerConnectReply:
603 data = self._pack(
f5567ea8 604 "QIII", reply.viewer_session_id, reply.major, reply.minor, 2
584af91e
PP
605 )
606 elif type(reply) is _LttngLiveViewerGetTracingSessionInfosReply:
f5567ea8 607 data = self._pack("I", len(reply.tracing_session_infos))
584af91e
PP
608
609 for info in reply.tracing_session_infos:
610 data += self._pack(
f5567ea8 611 "QIII",
584af91e
PP
612 info.tracing_session_id,
613 info.live_timer_freq,
614 info.client_count,
615 info.stream_count,
616 )
617 data += self._encode_zero_padded_str(info.hostname, 64)
618 data += self._encode_zero_padded_str(info.name, 255)
619 elif type(reply) is _LttngLiveViewerAttachToTracingSessionReply:
f5567ea8 620 data = self._pack("II", reply.status, len(reply.stream_infos))
584af91e
PP
621
622 for info in reply.stream_infos:
623 data += self._encode_stream_info(info)
624 elif type(reply) is _LttngLiveViewerGetNextDataStreamIndexEntryReply:
f5567ea8 625 index_format = "QQQQQQQII"
584af91e
PP
626 entry = reply.index_entry
627 flags = self._get_has_new_stuff_flags(
628 reply.has_new_metadata, reply.has_new_data_stream
629 )
630
aca7de76 631 if isinstance(entry, _LttngDataStreamIndexEntry):
71f56e5f
JG
632 data = self._pack(
633 index_format,
634 entry.offset_bytes,
635 entry.total_size_bits,
636 entry.content_size_bits,
637 entry.timestamp_begin,
638 entry.timestamp_end,
639 entry.events_discarded,
640 entry.stream_class_id,
641 reply.status,
642 flags,
643 )
644 else:
71f56e5f
JG
645 data = self._pack(
646 index_format,
647 0,
648 0,
649 0,
650 0,
651 entry.timestamp,
652 0,
653 entry.stream_class_id,
654 reply.status,
655 flags,
656 )
584af91e
PP
657 elif type(reply) is _LttngLiveViewerGetDataStreamPacketDataReply:
658 flags = self._get_has_new_stuff_flags(
659 reply.has_new_metadata, reply.has_new_data_stream
660 )
f5567ea8 661 data = self._pack("III", reply.status, len(reply.data), flags)
584af91e
PP
662 data += reply.data
663 elif type(reply) is _LttngLiveViewerGetMetadataStreamDataContentReply:
f5567ea8 664 data = self._pack("QI", len(reply.data), reply.status)
584af91e
PP
665 data += reply.data
666 elif type(reply) is _LttngLiveViewerGetNewStreamInfosReply:
f5567ea8 667 data = self._pack("II", reply.status, len(reply.stream_infos))
584af91e
PP
668
669 for info in reply.stream_infos:
670 data += self._encode_stream_info(info)
671 elif type(reply) is _LttngLiveViewerCreateViewerSessionReply:
f5567ea8 672 data = self._pack("I", reply.status)
584af91e 673 elif type(reply) is _LttngLiveViewerDetachFromTracingSessionReply:
f5567ea8 674 data = self._pack("I", reply.status)
584af91e
PP
675 else:
676 raise ValueError(
f5567ea8 677 "Unknown reply object with class `{}`".format(reply.__class__.__name__)
584af91e
PP
678 )
679
680 return data
681
682
aca7de76
SM
683def _get_entry_timestamp_begin(
684 entry: _LttngIndexEntryT,
685):
686 if isinstance(entry, _LttngDataStreamBeaconIndexEntry):
78169723
FD
687 return entry.timestamp
688 else:
78169723
FD
689 return entry.timestamp_begin
690
691
584af91e 692# The index of an LTTng data stream, a sequence of index entries.
aca7de76
SM
693class _LttngDataStreamIndex(Sequence[_LttngIndexEntryT]):
694 def __init__(self, path: str, beacons: Optional[tjson.ArrayVal]):
584af91e
PP
695 self._path = path
696 self._build()
71f56e5f
JG
697
698 if beacons:
699 stream_class_id = self._entries[0].stream_class_id
aca7de76
SM
700
701 beacons_list = [] # type: list[_LttngDataStreamBeaconIndexEntry]
702 for ts in beacons.iter(tjson.IntVal):
703 beacons_list.append(
704 _LttngDataStreamBeaconIndexEntry(stream_class_id, ts.val)
705 )
706
707 self._add_beacons(beacons_list)
71f56e5f 708
584af91e
PP
709 logging.info(
710 'Built data stream index entries: path="{}", count={}'.format(
711 path, len(self._entries)
712 )
713 )
714
715 def _build(self):
aca7de76 716 self._entries = [] # type: list[_LttngIndexEntryT]
584af91e 717
f5567ea8 718 with open(self._path, "rb") as f:
584af91e 719 # Read header first
f5567ea8 720 fmt = ">IIII"
584af91e
PP
721 size = struct.calcsize(fmt)
722 data = f.read(size)
723 assert len(data) == size
aca7de76 724 magic, _, _, index_entry_length = struct.unpack(fmt, data)
584af91e
PP
725 assert magic == 0xC1F1DCC1
726
727 # Read index entries
f5567ea8 728 fmt = ">QQQQQQQ"
584af91e
PP
729 size = struct.calcsize(fmt)
730
731 while True:
732 logging.debug(
733 'Decoding data stream index entry: path="{}", offset={}'.format(
734 self._path, f.tell()
735 )
736 )
737 data = f.read(size)
738
739 if not data:
740 # Done
741 break
742
743 assert len(data) == size
75882e97
FD
744 (
745 offset_bytes,
746 total_size_bits,
747 content_size_bits,
748 timestamp_begin,
749 timestamp_end,
750 events_discarded,
751 stream_class_id,
752 ) = struct.unpack(fmt, data)
584af91e
PP
753
754 self._entries.append(
755 _LttngDataStreamIndexEntry(
756 offset_bytes,
757 total_size_bits,
758 content_size_bits,
759 timestamp_begin,
760 timestamp_end,
761 events_discarded,
762 stream_class_id,
763 )
764 )
765
766 # Skip anything else before the next entry
767 f.seek(index_entry_length - size, os.SEEK_CUR)
768
aca7de76 769 def _add_beacons(self, beacons: Iterable[_LttngDataStreamBeaconIndexEntry]):
71f56e5f 770 # Assumes entries[n + 1].timestamp_end >= entries[n].timestamp_begin
aca7de76
SM
771 def sort_key(
772 entry: Union[_LttngDataStreamIndexEntry, _LttngDataStreamBeaconIndexEntry],
773 ) -> int:
774 if isinstance(entry, _LttngDataStreamBeaconIndexEntry):
71f56e5f
JG
775 return entry.timestamp
776 else:
777 return entry.timestamp_end
778
779 self._entries += beacons
780 self._entries.sort(key=sort_key)
781
aca7de76
SM
782 @overload
783 def __getitem__(self, index: int) -> _LttngIndexEntryT:
784 ...
785
786 @overload
787 def __getitem__(self, index: slice) -> Sequence[_LttngIndexEntryT]: # noqa: F811
788 ...
789
790 def __getitem__( # noqa: F811
791 self, index: Union[int, slice]
792 ) -> Union[_LttngIndexEntryT, Sequence[_LttngIndexEntryT],]:
584af91e
PP
793 return self._entries[index]
794
795 def __len__(self):
796 return len(self._entries)
797
798 @property
799 def path(self):
800 return self._path
801
802
3c22c122
JG
803# Any LTTng stream (metadata or data).
804class _LttngStream(ABC):
805 @abstractmethod
806 def __init__(self, creation_timestamp: int):
807 self._creation_timestamp: int = creation_timestamp
808
809 @property
810 def creation_timestamp(self):
811 return self._creation_timestamp
812
813
584af91e 814# An LTTng data stream.
3c22c122
JG
815class _LttngDataStream(_LttngStream):
816 def __init__(
817 self, path: str, beacons_json: Optional[tjson.ArrayVal], creation_timestamp: int
818 ):
819 super().__init__(creation_timestamp)
584af91e
PP
820 self._path = path
821 filename = os.path.basename(path)
f5567ea8 822 match = re.match(r"(.*)_\d+", filename)
b8b97725
SM
823 if not match:
824 raise RuntimeError(
825 "Unexpected data stream file name pattern: {}".format(filename)
826 )
827
584af91e
PP
828 self._channel_name = match.group(1)
829 trace_dir = os.path.dirname(path)
f5567ea8 830 index_path = os.path.join(trace_dir, "index", filename + ".idx")
aca7de76 831 self._index = _LttngDataStreamIndex(index_path, beacons_json)
584af91e 832 assert os.path.isfile(path)
f5567ea8 833 self._file = open(path, "rb")
584af91e
PP
834 logging.info(
835 'Built data stream: path="{}", channel-name="{}"'.format(
836 path, self._channel_name
837 )
838 )
839
840 @property
841 def path(self):
842 return self._path
843
844 @property
845 def channel_name(self):
846 return self._channel_name
847
848 @property
849 def index(self):
850 return self._index
851
aca7de76 852 def get_data(self, offset_bytes: int, len_bytes: int):
584af91e
PP
853 self._file.seek(offset_bytes)
854 return self._file.read(len_bytes)
855
856
78169723 857class _LttngMetadataStreamSection:
aca7de76 858 def __init__(self, timestamp: int, data: Optional[bytes]):
78169723
FD
859 self._timestamp = timestamp
860 if data is None:
861 self._data = bytes()
862 else:
863 self._data = data
864 logging.info(
f5567ea8 865 "Built metadata stream section: ts={}, data-len={}".format(
78169723
FD
866 self._timestamp, len(self._data)
867 )
868 )
869
870 @property
871 def timestamp(self):
872 return self._timestamp
873
874 @property
875 def data(self):
876 return self._data
877
878
584af91e 879# An LTTng metadata stream.
3c22c122 880class _LttngMetadataStream(_LttngStream):
aca7de76
SM
881 def __init__(
882 self,
883 metadata_file_path: str,
884 config_sections: Sequence[_LttngMetadataStreamSection],
3c22c122 885 creation_timestamp: int,
aca7de76 886 ):
3c22c122 887 super().__init__(creation_timestamp)
78169723
FD
888 self._path = metadata_file_path
889 self._sections = config_sections
890 logging.info(
f5567ea8 891 "Built metadata stream: path={}, section-len={}".format(
78169723
FD
892 self._path, len(self._sections)
893 )
894 )
584af91e
PP
895
896 @property
897 def path(self):
898 return self._path
899
900 @property
78169723
FD
901 def sections(self):
902 return self._sections
584af91e 903
78169723 904
aca7de76
SM
905class LttngMetadataConfigSection:
906 def __init__(self, line: int, timestamp: int, is_empty: bool):
907 self._line = line
908 self._timestamp = timestamp
909 self._is_empty = is_empty
910
911 @property
912 def line(self):
913 return self._line
78169723 914
aca7de76
SM
915 @property
916 def timestamp(self):
917 return self._timestamp
78169723 918
aca7de76
SM
919 @property
920 def is_empty(self):
921 return self._is_empty
922
923
924def _parse_metadata_sections_config(metadata_sections_json: tjson.ArrayVal):
925 metadata_sections = [] # type: list[LttngMetadataConfigSection]
78169723
FD
926 append_empty_section = False
927 last_timestamp = 0
928 last_line = 0
929
aca7de76
SM
930 for section in metadata_sections_json:
931 if isinstance(section, tjson.StrVal):
932 if section.val == "empty":
3c22c122
JG
933 # Found an empty section marker. Actually append the
934 # section at the timestamp of the next concrete section.
aca7de76
SM
935 append_empty_section = True
936 else:
937 raise ValueError("Invalid string value at {}.".format(section.path))
938 elif isinstance(section, tjson.ObjVal):
939 line = section.at("line", tjson.IntVal).val
940 ts = section.at("timestamp", tjson.IntVal).val
b8b97725 941
78169723
FD
942 # Sections' timestamps and lines must both be increasing.
943 assert ts > last_timestamp
944 last_timestamp = ts
aca7de76 945
78169723
FD
946 assert line > last_line
947 last_line = line
948
949 if append_empty_section:
aca7de76 950 metadata_sections.append(LttngMetadataConfigSection(line, ts, True))
78169723
FD
951 append_empty_section = False
952
aca7de76
SM
953 metadata_sections.append(LttngMetadataConfigSection(line, ts, False))
954 else:
955 raise TypeError(
956 "`{}`: expecting a string or object value".format(section.path)
957 )
78169723 958
aca7de76 959 return metadata_sections
78169723 960
78169723 961
aca7de76
SM
962def _split_metadata_sections(
963 metadata_file_path: str, metadata_sections_json: tjson.ArrayVal
964):
965 metadata_sections = _parse_metadata_sections_config(metadata_sections_json)
78169723 966
aca7de76 967 sections = [] # type: list[_LttngMetadataStreamSection]
f5567ea8 968 with open(metadata_file_path, "r") as metadata_file:
78169723
FD
969 metadata_lines = [line for line in metadata_file]
970
aca7de76 971 metadata_section_idx = 0
78169723
FD
972 curr_metadata_section = bytearray()
973
974 for idx, line_content in enumerate(metadata_lines):
975 # Add one to the index to convert from the zero-indexing of the
976 # enumerate() function to the one-indexing used by humans when
977 # viewing a text file.
978 curr_line_number = idx + 1
979
980 # If there are no more sections, simply append the line.
aca7de76 981 if metadata_section_idx + 1 >= len(metadata_sections):
f5567ea8 982 curr_metadata_section += bytearray(line_content, "utf8")
78169723
FD
983 continue
984
aca7de76 985 next_section_line_number = metadata_sections[metadata_section_idx + 1].line
78169723
FD
986
987 # If the next section begins at the current line, create a
988 # section with the metadata we gathered so far.
989 if curr_line_number >= next_section_line_number:
78169723
FD
990 # Flushing the metadata of the current section.
991 sections.append(
992 _LttngMetadataStreamSection(
aca7de76 993 metadata_sections[metadata_section_idx].timestamp,
78169723
FD
994 bytes(curr_metadata_section),
995 )
996 )
997
998 # Move to the next section.
aca7de76 999 metadata_section_idx += 1
78169723
FD
1000
1001 # Clear old content and append current line for the next section.
1002 curr_metadata_section.clear()
f5567ea8 1003 curr_metadata_section += bytearray(line_content, "utf8")
78169723
FD
1004
1005 # Append any empty sections.
aca7de76 1006 while metadata_sections[metadata_section_idx].is_empty:
78169723
FD
1007 sections.append(
1008 _LttngMetadataStreamSection(
aca7de76 1009 metadata_sections[metadata_section_idx].timestamp, None
78169723
FD
1010 )
1011 )
aca7de76 1012 metadata_section_idx += 1
78169723
FD
1013 else:
1014 # Append line_content to the current metadata section.
f5567ea8 1015 curr_metadata_section += bytearray(line_content, "utf8")
78169723
FD
1016
1017 # We iterated over all the lines of the metadata file. Close the current section.
1018 sections.append(
1019 _LttngMetadataStreamSection(
aca7de76 1020 metadata_sections[metadata_section_idx].timestamp,
78169723
FD
1021 bytes(curr_metadata_section),
1022 )
1023 )
1024
1025 return sections
584af91e
PP
1026
1027
aca7de76
SM
1028_StreamBeaconsT = Dict[str, Iterable[int]]
1029
1030
584af91e 1031# An LTTng trace, a sequence of LTTng data streams.
aca7de76
SM
1032class LttngTrace(Sequence[_LttngDataStream]):
1033 def __init__(
1034 self,
1035 trace_dir: str,
1036 metadata_sections_json: Optional[tjson.ArrayVal],
1037 beacons_json: Optional[tjson.ObjVal],
3c22c122 1038 creation_timestamp: int,
aca7de76 1039 ):
584af91e 1040 self._path = trace_dir
3c22c122 1041 self._creation_timestamp = creation_timestamp
aca7de76
SM
1042 self._create_metadata_stream(trace_dir, metadata_sections_json)
1043 self._create_data_streams(trace_dir, beacons_json)
584af91e
PP
1044 logging.info('Built trace: path="{}"'.format(trace_dir))
1045
aca7de76
SM
1046 def _create_data_streams(
1047 self, trace_dir: str, beacons_json: Optional[tjson.ObjVal]
1048 ):
1049 data_stream_paths = [] # type: list[str]
584af91e
PP
1050
1051 for filename in os.listdir(trace_dir):
1052 path = os.path.join(trace_dir, filename)
1053
1054 if not os.path.isfile(path):
1055 continue
1056
f5567ea8 1057 if filename.startswith("."):
584af91e
PP
1058 continue
1059
f5567ea8 1060 if filename == "metadata":
584af91e
PP
1061 continue
1062
1063 data_stream_paths.append(path)
1064
1065 data_stream_paths.sort()
aca7de76 1066 self._data_streams = [] # type: list[_LttngDataStream]
584af91e
PP
1067
1068 for data_stream_path in data_stream_paths:
71f56e5f 1069 stream_name = os.path.basename(data_stream_path)
aca7de76
SM
1070 this_beacons_json = None
1071 if beacons_json is not None and stream_name in beacons_json:
1072 this_beacons_json = beacons_json.at(stream_name, tjson.ArrayVal)
71f56e5f
JG
1073
1074 self._data_streams.append(
3c22c122
JG
1075 _LttngDataStream(
1076 data_stream_path, this_beacons_json, self._creation_timestamp
1077 )
71f56e5f 1078 )
584af91e 1079
aca7de76
SM
1080 def _create_metadata_stream(
1081 self, trace_dir: str, metadata_sections_json: Optional[tjson.ArrayVal]
1082 ):
f5567ea8 1083 metadata_path = os.path.join(trace_dir, "metadata")
aca7de76 1084 metadata_sections = [] # type: list[_LttngMetadataStreamSection]
78169723 1085
aca7de76 1086 if metadata_sections_json is None:
f5567ea8 1087 with open(metadata_path, "rb") as metadata_file:
78169723
FD
1088 metadata_sections.append(
1089 _LttngMetadataStreamSection(0, metadata_file.read())
1090 )
1091 else:
1092 metadata_sections = _split_metadata_sections(
aca7de76 1093 metadata_path, metadata_sections_json
78169723
FD
1094 )
1095
3c22c122
JG
1096 self._metadata_stream = _LttngMetadataStream(
1097 metadata_path, metadata_sections, self.creation_timestamp
1098 )
78169723 1099
584af91e
PP
1100 @property
1101 def path(self):
1102 return self._path
1103
1104 @property
1105 def metadata_stream(self):
1106 return self._metadata_stream
1107
3c22c122
JG
1108 @property
1109 def creation_timestamp(self):
1110 return self._creation_timestamp
1111
aca7de76
SM
1112 @overload
1113 def __getitem__(self, index: int) -> _LttngDataStream:
1114 ...
1115
1116 @overload
1117 def __getitem__(self, index: slice) -> Sequence[_LttngDataStream]: # noqa: F811
1118 ...
1119
1120 def __getitem__( # noqa: F811
1121 self, index: Union[int, slice]
1122 ) -> Union[_LttngDataStream, Sequence[_LttngDataStream]]:
584af91e
PP
1123 return self._data_streams[index]
1124
1125 def __len__(self):
1126 return len(self._data_streams)
1127
1128
3c22c122
JG
1129# Stream (metadata or data) state specific to the LTTng live protocol.
1130class _LttngLiveViewerSessionStreamState:
1131 @abstractmethod
1132 def __init__(self):
1133 # A stream is considered "announced" when it has been returned
1134 # to the LTTng live client in response to a "get new stream
1135 # infos" (`_LttngLiveViewerGetNewStreamInfosCommand`) command.
1136 self._announced = False # type: bool
1137 pass
1138
1139 @property
1140 def is_announced(self):
1141 return self._announced
1142
1143 def mark_as_announced(self):
1144 self._announced = True
1145
1146 @property
1147 @abstractmethod
1148 def stream(self) -> _LttngStream:
1149 pass
1150
1151
584af91e 1152# The state of a single data stream.
3c22c122 1153class _LttngLiveViewerSessionDataStreamState(_LttngLiveViewerSessionStreamState):
aca7de76
SM
1154 def __init__(
1155 self,
1156 ts_state: "_LttngLiveViewerSessionTracingSessionState",
1157 info: _LttngLiveViewerStreamInfo,
1158 data_stream: _LttngDataStream,
1159 metadata_stream_id: int,
1160 ):
3c22c122 1161 super().__init__()
584af91e
PP
1162 self._ts_state = ts_state
1163 self._info = info
1164 self._data_stream = data_stream
78169723 1165 self._metadata_stream_id = metadata_stream_id
584af91e
PP
1166 self._cur_index_entry_index = 0
1167 fmt = 'Built data stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
1168 logging.info(
1169 fmt.format(
1170 info.id,
1171 ts_state.tracing_session_descriptor.info.tracing_session_id,
1172 ts_state.tracing_session_descriptor.info.name,
1173 data_stream.path,
1174 )
1175 )
1176
1177 @property
1178 def tracing_session_state(self):
1179 return self._ts_state
1180
1181 @property
1182 def info(self):
1183 return self._info
1184
1185 @property
3c22c122 1186 def stream(self):
584af91e
PP
1187 return self._data_stream
1188
1189 @property
1190 def cur_index_entry(self):
1191 if self._cur_index_entry_index == len(self._data_stream.index):
1192 return
1193
1194 return self._data_stream.index[self._cur_index_entry_index]
1195
aca7de76
SM
1196 @property
1197 def metadata_stream_id(self):
1198 return self._metadata_stream_id
1199
584af91e
PP
1200 def goto_next_index_entry(self):
1201 self._cur_index_entry_index += 1
1202
1203
1204# The state of a single metadata stream.
3c22c122 1205class _LttngLiveViewerSessionMetadataStreamState(_LttngLiveViewerSessionStreamState):
aca7de76
SM
1206 def __init__(
1207 self,
1208 ts_state: "_LttngLiveViewerSessionTracingSessionState",
1209 info: _LttngLiveViewerStreamInfo,
1210 metadata_stream: _LttngMetadataStream,
1211 ):
3c22c122 1212 super().__init__()
584af91e
PP
1213 self._ts_state = ts_state
1214 self._info = info
1215 self._metadata_stream = metadata_stream
78169723
FD
1216 self._cur_metadata_stream_section_index = 0
1217 if len(metadata_stream.sections) > 1:
1218 self._next_metadata_stream_section_timestamp = metadata_stream.sections[
1219 1
1220 ].timestamp
1221 else:
1222 self._next_metadata_stream_section_timestamp = None
1223
3c22c122 1224 self._all_data_is_sent = False
584af91e
PP
1225 fmt = 'Built metadata stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
1226 logging.info(
1227 fmt.format(
1228 info.id,
1229 ts_state.tracing_session_descriptor.info.tracing_session_id,
1230 ts_state.tracing_session_descriptor.info.name,
1231 metadata_stream.path,
1232 )
1233 )
1234
584af91e
PP
1235 @property
1236 def info(self):
1237 return self._info
1238
1239 @property
3c22c122 1240 def stream(self):
584af91e
PP
1241 return self._metadata_stream
1242
1243 @property
3c22c122
JG
1244 def all_data_is_sent(self):
1245 return self._all_data_is_sent
584af91e 1246
3c22c122
JG
1247 @all_data_is_sent.setter
1248 def all_data_is_sent(self, value: bool):
1249 self._all_data_is_sent = value
584af91e 1250
78169723
FD
1251 @property
1252 def cur_section(self):
1253 fmt = "Get current metadata section: section-idx={}"
1254 logging.info(fmt.format(self._cur_metadata_stream_section_index))
1255 if self._cur_metadata_stream_section_index == len(
1256 self._metadata_stream.sections
1257 ):
1258 return
1259
1260 return self._metadata_stream.sections[self._cur_metadata_stream_section_index]
1261
1262 def goto_next_section(self):
1263 self._cur_metadata_stream_section_index += 1
1264 if self.cur_section:
1265 self._next_metadata_stream_section_timestamp = self.cur_section.timestamp
1266 else:
1267 self._next_metadata_stream_section_timestamp = None
1268
1269 @property
1270 def next_section_timestamp(self):
1271 return self._next_metadata_stream_section_timestamp
1272
584af91e 1273
ee1171e5
SM
1274# A tracing session descriptor.
1275#
1276# In the constructor, `traces` is a list of LTTng traces (`LttngTrace`
1277# objects).
1278class LttngTracingSessionDescriptor:
1279 def __init__(
aca7de76
SM
1280 self,
1281 name: str,
1282 tracing_session_id: int,
1283 hostname: str,
1284 live_timer_freq: int,
1285 client_count: int,
1286 traces: Iterable[LttngTrace],
ee1171e5
SM
1287 ):
1288 for trace in traces:
1289 if name not in trace.path:
1290 fmt = "Tracing session name must be part of every trace path (`{}` not found in `{}`)"
1291 raise ValueError(fmt.format(name, trace.path))
1292
1293 self._traces = traces
1294 stream_count = sum([len(t) + 1 for t in traces])
1295 self._info = _LttngLiveViewerTracingSessionInfo(
1296 tracing_session_id,
1297 live_timer_freq,
1298 client_count,
1299 stream_count,
1300 hostname,
1301 name,
1302 )
1303
1304 @property
1305 def traces(self):
1306 return self._traces
1307
1308 @property
1309 def info(self):
1310 return self._info
1311
1312
584af91e
PP
1313# The state of a tracing session.
1314class _LttngLiveViewerSessionTracingSessionState:
aca7de76 1315 def __init__(self, tc_descr: LttngTracingSessionDescriptor, base_stream_id: int):
584af91e 1316 self._tc_descr = tc_descr
3c22c122 1317 self._client_visible_stream_infos = [] # type: list[_LttngLiveViewerStreamInfo]
aca7de76
SM
1318 self._ds_states = {} # type: dict[int, _LttngLiveViewerSessionDataStreamState]
1319 self._ms_states = (
1320 {}
1321 ) # type: dict[int, _LttngLiveViewerSessionMetadataStreamState]
3c22c122
JG
1322 self._last_delivered_index_timestamp = 0
1323 self._last_allocated_stream_id = base_stream_id
584af91e
PP
1324
1325 for trace in tc_descr.traces:
3c22c122
JG
1326 trace_stream_infos = [] # type: list[_LttngLiveViewerStreamInfo]
1327 trace_id = self._last_allocated_stream_id * 1000
584af91e 1328
78169723
FD
1329 # Metadata stream -> stream info and metadata stream state
1330 info = _LttngLiveViewerStreamInfo(
3c22c122
JG
1331 self._last_allocated_stream_id,
1332 trace_id,
1333 True,
1334 trace.metadata_stream.path,
1335 "metadata",
78169723 1336 )
3c22c122
JG
1337
1338 trace_stream_infos.append(info)
1339 self._ms_states[
1340 self._last_allocated_stream_id
1341 ] = _LttngLiveViewerSessionMetadataStreamState(
78169723
FD
1342 self, info, trace.metadata_stream
1343 )
3c22c122
JG
1344 metadata_stream_id = self._last_allocated_stream_id
1345 self._last_allocated_stream_id += 1
78169723 1346
584af91e
PP
1347 # Data streams -> stream infos and data stream states
1348 for data_stream in trace:
1349 info = _LttngLiveViewerStreamInfo(
3c22c122 1350 self._last_allocated_stream_id,
584af91e
PP
1351 trace_id,
1352 False,
1353 data_stream.path,
1354 data_stream.channel_name,
1355 )
3c22c122
JG
1356 trace_stream_infos.append(info)
1357 self._ds_states[
1358 self._last_allocated_stream_id
1359 ] = _LttngLiveViewerSessionDataStreamState(
78169723 1360 self, info, data_stream, metadata_stream_id
584af91e 1361 )
3c22c122
JG
1362 self._last_allocated_stream_id += 1
1363
1364 if trace.creation_timestamp == 0:
1365 # Only announce streams for traces that are created at
1366 # the origin.
1367 #
1368 # The rest of the streams will be discovered by the
1369 # client as indexes are received with a "has new data
1370 # streams" flag set in the reply.
1371 self._client_visible_stream_infos.extend(trace_stream_infos)
584af91e 1372
584af91e
PP
1373 self._is_attached = False
1374 fmt = 'Built tracing session state: id={}, name="{}"'
1375 logging.info(fmt.format(tc_descr.info.tracing_session_id, tc_descr.info.name))
1376
1377 @property
1378 def tracing_session_descriptor(self):
1379 return self._tc_descr
1380
1381 @property
1382 def data_stream_states(self):
1383 return self._ds_states
1384
1385 @property
1386 def metadata_stream_states(self):
1387 return self._ms_states
1388
1389 @property
3c22c122
JG
1390 def client_visible_stream_infos(self):
1391 return self._client_visible_stream_infos
584af91e
PP
1392
1393 @property
1394 def has_new_metadata(self):
3c22c122
JG
1395 return any(
1396 [
1397 ms.is_announced and not ms.all_data_is_sent
1398 for ms in self._ms_states.values()
1399 ]
1400 )
584af91e
PP
1401
1402 @property
1403 def is_attached(self):
1404 return self._is_attached
1405
1406 @is_attached.setter
aca7de76 1407 def is_attached(self, value: bool):
584af91e
PP
1408 self._is_attached = value
1409
1410
aca7de76
SM
1411def needs_new_metadata_section(
1412 metadata_stream_state: _LttngLiveViewerSessionMetadataStreamState,
1413 latest_timestamp: int,
1414):
78169723
FD
1415 if metadata_stream_state.next_section_timestamp is None:
1416 return False
1417
1418 if latest_timestamp >= metadata_stream_state.next_section_timestamp:
1419 return True
1420 else:
1421 return False
1422
1423
584af91e
PP
1424# An LTTng live viewer session manages a view on tracing sessions
1425# and replies to commands accordingly.
1426class _LttngLiveViewerSession:
1427 def __init__(
1428 self,
aca7de76
SM
1429 viewer_session_id: int,
1430 tracing_session_descriptors: Iterable[LttngTracingSessionDescriptor],
1431 max_query_data_response_size: Optional[int],
584af91e
PP
1432 ):
1433 self._viewer_session_id = viewer_session_id
aca7de76
SM
1434 self._ts_states = (
1435 {}
1436 ) # type: dict[int, _LttngLiveViewerSessionTracingSessionState]
1437 self._stream_states = (
1438 {}
1439 ) # type: dict[int, _LttngLiveViewerSessionDataStreamState | _LttngLiveViewerSessionMetadataStreamState]
584af91e
PP
1440 self._max_query_data_response_size = max_query_data_response_size
1441 total_stream_infos = 0
1442
1443 for ts_descr in tracing_session_descriptors:
1444 ts_state = _LttngLiveViewerSessionTracingSessionState(
1445 ts_descr, total_stream_infos
1446 )
1447 ts_id = ts_state.tracing_session_descriptor.info.tracing_session_id
1448 self._ts_states[ts_id] = ts_state
3c22c122 1449 total_stream_infos += len(ts_state.client_visible_stream_infos)
584af91e
PP
1450
1451 # Update session's stream states to have the new states
1452 self._stream_states.update(ts_state.data_stream_states)
1453 self._stream_states.update(ts_state.metadata_stream_states)
1454
1455 self._command_handlers = {
1456 _LttngLiveViewerAttachToTracingSessionCommand: self._handle_attach_to_tracing_session_command,
1457 _LttngLiveViewerCreateViewerSessionCommand: self._handle_create_viewer_session_command,
1458 _LttngLiveViewerDetachFromTracingSessionCommand: self._handle_detach_from_tracing_session_command,
1459 _LttngLiveViewerGetDataStreamPacketDataCommand: self._handle_get_data_stream_packet_data_command,
1460 _LttngLiveViewerGetMetadataStreamDataCommand: self._handle_get_metadata_stream_data_command,
1461 _LttngLiveViewerGetNewStreamInfosCommand: self._handle_get_new_stream_infos_command,
1462 _LttngLiveViewerGetNextDataStreamIndexEntryCommand: self._handle_get_next_data_stream_index_entry_command,
1463 _LttngLiveViewerGetTracingSessionInfosCommand: self._handle_get_tracing_session_infos_command,
aca7de76 1464 } # type: dict[type[_LttngLiveViewerCommand], Callable[[Any], _LttngLiveViewerReply]]
584af91e
PP
1465
1466 @property
1467 def viewer_session_id(self):
1468 return self._viewer_session_id
1469
aca7de76 1470 def _get_tracing_session_state(self, tracing_session_id: int):
584af91e 1471 if tracing_session_id not in self._ts_states:
f2958352 1472 raise RuntimeError(
f5567ea8 1473 "Unknown tracing session ID {}".format(tracing_session_id)
584af91e
PP
1474 )
1475
1476 return self._ts_states[tracing_session_id]
1477
aca7de76 1478 def _get_data_stream_state(self, stream_id: int):
584af91e 1479 if stream_id not in self._stream_states:
aca7de76 1480 RuntimeError("Unknown stream ID {}".format(stream_id))
584af91e 1481
aca7de76
SM
1482 stream = self._stream_states[stream_id]
1483 if type(stream) is not _LttngLiveViewerSessionDataStreamState:
1484 raise RuntimeError("Stream is not a data stream")
584af91e 1485
aca7de76
SM
1486 return stream
1487
1488 def _get_metadata_stream_state(self, stream_id: int):
1489 if stream_id not in self._stream_states:
1490 RuntimeError("Unknown stream ID {}".format(stream_id))
1491
1492 stream = self._stream_states[stream_id]
1493 if type(stream) is not _LttngLiveViewerSessionMetadataStreamState:
1494 raise RuntimeError("Stream is not a metadata stream")
1495
1496 return stream
1497
1498 def handle_command(self, cmd: _LttngLiveViewerCommand):
584af91e 1499 logging.info(
f5567ea8 1500 "Handling command in viewer session: cmd-cls-name={}".format(
584af91e
PP
1501 cmd.__class__.__name__
1502 )
1503 )
1504 cmd_type = type(cmd)
1505
1506 if cmd_type not in self._command_handlers:
f2958352 1507 raise RuntimeError(
f5567ea8 1508 "Unexpected command: cmd-cls-name={}".format(cmd.__class__.__name__)
584af91e
PP
1509 )
1510
1511 return self._command_handlers[cmd_type](cmd)
1512
aca7de76
SM
1513 def _handle_attach_to_tracing_session_command(
1514 self, cmd: _LttngLiveViewerAttachToTracingSessionCommand
1515 ):
584af91e
PP
1516 fmt = 'Handling "attach to tracing session" command: ts-id={}, offset={}, seek-type={}'
1517 logging.info(fmt.format(cmd.tracing_session_id, cmd.offset, cmd.seek_type))
1518 ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
1519 info = ts_state.tracing_session_descriptor.info
1520
1521 if ts_state.is_attached:
f2958352 1522 raise RuntimeError(
f5567ea8 1523 "Cannot attach to tracing session `{}`: viewer is already attached".format(
584af91e
PP
1524 info.name
1525 )
1526 )
1527
1528 ts_state.is_attached = True
1529 status = _LttngLiveViewerAttachToTracingSessionReply.Status.OK
3c22c122
JG
1530 stream_infos_to_announce = ts_state.client_visible_stream_infos
1531
1532 # Mark stream infos transmitted as part of the reply as
1533 # announced.
1534 for si in stream_infos_to_announce:
1535 if si.is_metadata:
1536 self._get_metadata_stream_state(si.id).mark_as_announced()
1537 else:
1538 self._get_data_stream_state(si.id).mark_as_announced()
1539
584af91e 1540 return _LttngLiveViewerAttachToTracingSessionReply(
3c22c122 1541 status, stream_infos_to_announce
584af91e
PP
1542 )
1543
aca7de76
SM
1544 def _handle_detach_from_tracing_session_command(
1545 self, cmd: _LttngLiveViewerDetachFromTracingSessionCommand
1546 ):
584af91e
PP
1547 fmt = 'Handling "detach from tracing session" command: ts-id={}'
1548 logging.info(fmt.format(cmd.tracing_session_id))
1549 ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
1550 info = ts_state.tracing_session_descriptor.info
1551
1552 if not ts_state.is_attached:
f2958352 1553 raise RuntimeError(
f5567ea8 1554 "Cannot detach to tracing session `{}`: viewer is not attached".format(
584af91e
PP
1555 info.name
1556 )
1557 )
1558
1559 ts_state.is_attached = False
1560 status = _LttngLiveViewerDetachFromTracingSessionReply.Status.OK
1561 return _LttngLiveViewerDetachFromTracingSessionReply(status)
1562
3c22c122
JG
1563 @staticmethod
1564 def _stream_is_ready(
1565 stream_state: _LttngLiveViewerSessionStreamState, creation_timestamp: int
1566 ):
1567 return (
1568 not stream_state.is_announced
1569 and stream_state.stream.creation_timestamp <= creation_timestamp
1570 )
1571
1572 def _needs_new_streams(self, current_timestamp: int):
1573 return any(
1574 self._stream_is_ready(ss, current_timestamp)
1575 for ss in self._stream_states.values()
1576 )
1577
aca7de76
SM
1578 def _handle_get_next_data_stream_index_entry_command(
1579 self, cmd: _LttngLiveViewerGetNextDataStreamIndexEntryCommand
1580 ):
584af91e
PP
1581 fmt = 'Handling "get next data stream index entry" command: stream-id={}'
1582 logging.info(fmt.format(cmd.stream_id))
aca7de76
SM
1583 stream_state = self._get_data_stream_state(cmd.stream_id)
1584 metadata_stream_state = self._get_metadata_stream_state(
1585 stream_state.metadata_stream_id
1586 )
584af91e
PP
1587
1588 if stream_state.cur_index_entry is None:
1589 # The viewer is done reading this stream
1590 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.HUP
1591
1592 # Dummy data stream index entry to use with the `HUP` status
1593 # (the reply needs one, but the viewer ignores it)
1594 index_entry = _LttngDataStreamIndexEntry(0, 0, 0, 0, 0, 0, 0)
1595
1596 return _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1597 status, index_entry, False, False
1598 )
1599
78169723
FD
1600 timestamp_begin = _get_entry_timestamp_begin(stream_state.cur_index_entry)
1601
1602 if needs_new_metadata_section(metadata_stream_state, timestamp_begin):
3c22c122 1603 metadata_stream_state.all_data_is_sent = False
78169723
FD
1604 metadata_stream_state.goto_next_section()
1605
584af91e
PP
1606 # The viewer only checks the `has_new_metadata` flag if the
1607 # reply's status is `OK`, so we need to provide an index here
1608 has_new_metadata = stream_state.tracing_session_state.has_new_metadata
aca7de76 1609 if isinstance(stream_state.cur_index_entry, _LttngDataStreamIndexEntry):
71f56e5f
JG
1610 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.OK
1611 else:
71f56e5f
JG
1612 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.INACTIVE
1613
584af91e 1614 reply = _LttngLiveViewerGetNextDataStreamIndexEntryReply(
3c22c122
JG
1615 status,
1616 stream_state.cur_index_entry,
1617 has_new_metadata,
1618 self._needs_new_streams(timestamp_begin),
584af91e 1619 )
3c22c122 1620 self._last_delivered_index_timestamp_begin = timestamp_begin
584af91e
PP
1621 stream_state.goto_next_index_entry()
1622 return reply
1623
aca7de76
SM
1624 def _handle_get_data_stream_packet_data_command(
1625 self, cmd: _LttngLiveViewerGetDataStreamPacketDataCommand
1626 ):
584af91e
PP
1627 fmt = 'Handling "get data stream packet data" command: stream-id={}, offset={}, req-length={}'
1628 logging.info(fmt.format(cmd.stream_id, cmd.offset, cmd.req_length))
aca7de76 1629 stream_state = self._get_data_stream_state(cmd.stream_id)
584af91e
PP
1630 data_response_length = cmd.req_length
1631
584af91e
PP
1632 if stream_state.tracing_session_state.has_new_metadata:
1633 status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.ERROR
1634 return _LttngLiveViewerGetDataStreamPacketDataReply(
1635 status, bytes(), True, False
1636 )
1637
1638 if self._max_query_data_response_size:
1639 # Enforce a server side limit on the query requested length.
1640 # To ensure that the transaction terminate take the minimum of both
1641 # value.
1642 data_response_length = min(
1643 cmd.req_length, self._max_query_data_response_size
1644 )
1645 fmt = 'Limiting "get data stream packet data" command: req-length={} actual response size={}'
1646 logging.info(fmt.format(cmd.req_length, data_response_length))
1647
3c22c122 1648 data = stream_state.stream.get_data(cmd.offset, data_response_length)
584af91e
PP
1649 status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.OK
1650 return _LttngLiveViewerGetDataStreamPacketDataReply(status, data, False, False)
1651
aca7de76
SM
1652 def _handle_get_metadata_stream_data_command(
1653 self, cmd: _LttngLiveViewerGetMetadataStreamDataCommand
1654 ):
584af91e
PP
1655 fmt = 'Handling "get metadata stream data" command: stream-id={}'
1656 logging.info(fmt.format(cmd.stream_id))
aca7de76 1657 metadata_stream_state = self._get_metadata_stream_state(cmd.stream_id)
584af91e 1658
3c22c122 1659 if metadata_stream_state.all_data_is_sent:
584af91e
PP
1660 status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.NO_NEW
1661 return _LttngLiveViewerGetMetadataStreamDataContentReply(status, bytes())
1662
3c22c122 1663 metadata_stream_state.all_data_is_sent = True
584af91e 1664 status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.OK
78169723 1665 metadata_section = metadata_stream_state.cur_section
b8b97725 1666 assert metadata_section is not None
78169723
FD
1667
1668 # If we are sending an empty section, ready the next one right away.
1669 if len(metadata_section.data) == 0:
3c22c122 1670 metadata_stream_state.all_data_is_sent = False
78169723
FD
1671 metadata_stream_state.goto_next_section()
1672
1673 fmt = 'Replying to "get metadata stream data" command: metadata-size={}'
1674 logging.info(fmt.format(len(metadata_section.data)))
584af91e 1675 return _LttngLiveViewerGetMetadataStreamDataContentReply(
78169723 1676 status, metadata_section.data
584af91e
PP
1677 )
1678
3c22c122
JG
1679 def _get_stream_infos_ready_for_announcement(self):
1680 ready_stream_infos = [] # type: list[_LttngLiveViewerStreamInfo]
1681
1682 for ss in self._stream_states.values():
1683 if self._stream_is_ready(ss, ss.stream.creation_timestamp):
1684 ready_stream_infos.append(ss.info)
1685
1686 return ready_stream_infos
1687
1688 # A stream is considered finished if it has been announced and all
1689 # of its index entries have been provided to the client.
1690 def _all_streams_finished(self):
1691 return all(
1692 isinstance(stream_state, _LttngLiveViewerSessionMetadataStreamState)
1693 or (stream_state.cur_index_entry is None and stream_state.is_announced)
1694 for stream_state in self._stream_states.values()
1695 )
1696
aca7de76
SM
1697 def _handle_get_new_stream_infos_command(
1698 self, cmd: _LttngLiveViewerGetNewStreamInfosCommand
1699 ):
584af91e
PP
1700 fmt = 'Handling "get new stream infos" command: ts-id={}'
1701 logging.info(fmt.format(cmd.tracing_session_id))
3c22c122 1702 newly_announced_stream_infos = self._get_stream_infos_ready_for_announcement()
584af91e 1703
3c22c122
JG
1704 # Mark stream infos transmitted as part of the reply as
1705 # announced.
1706 for si in newly_announced_stream_infos:
1707 if si.is_metadata:
1708 self._get_metadata_stream_state(si.id).mark_as_announced()
1709 else:
1710 self._get_data_stream_state(si.id).mark_as_announced()
1711
1712 status = _LttngLiveViewerGetNewStreamInfosReply.Status.OK
1713
1714 if len(newly_announced_stream_infos) == 0:
1715 # If all streams have been transmitted and no new traces are
1716 # scheduled for creation, hang up to signal that the tracing
1717 # session is "done".
1718 status = (
1719 _LttngLiveViewerGetNewStreamInfosReply.Status.HUP
1720 if self._all_streams_finished()
1721 else _LttngLiveViewerGetNewStreamInfosReply.Status.NO_NEW
1722 )
1723
1724 return _LttngLiveViewerGetNewStreamInfosReply(
1725 status, newly_announced_stream_infos
1726 )
584af91e 1727
aca7de76
SM
1728 def _handle_get_tracing_session_infos_command(
1729 self, cmd: _LttngLiveViewerGetTracingSessionInfosCommand
1730 ):
584af91e
PP
1731 logging.info('Handling "get tracing session infos" command.')
1732 infos = [
1733 tss.tracing_session_descriptor.info for tss in self._ts_states.values()
1734 ]
1735 infos.sort(key=lambda info: info.name)
1736 return _LttngLiveViewerGetTracingSessionInfosReply(infos)
1737
aca7de76
SM
1738 def _handle_create_viewer_session_command(
1739 self, cmd: _LttngLiveViewerCreateViewerSessionCommand
1740 ):
584af91e
PP
1741 logging.info('Handling "create viewer session" command.')
1742 status = _LttngLiveViewerCreateViewerSessionReply.Status.OK
1743
1744 # This does nothing here. In the LTTng relay daemon, it
1745 # allocates the viewer session's state.
1746 return _LttngLiveViewerCreateViewerSessionReply(status)
1747
1748
1749# An LTTng live TCP server.
1750#
e51141d3
SM
1751# On creation, it binds to `localhost` on the TCP port `port` if not `None`, or
1752# on an OS-assigned TCP port otherwise. It writes the decimal TCP port number
1753# to a temporary port file. It renames the temporary port file to
1754# `port_filename`.
584af91e
PP
1755#
1756# `tracing_session_descriptors` is a list of tracing session descriptors
1757# (`LttngTracingSessionDescriptor`) to serve.
1758#
1759# This server accepts a single viewer (client).
1760#
1761# When the viewer closes the connection, the server's constructor
1762# returns.
1763class LttngLiveServer:
1764 def __init__(
e51141d3 1765 self,
aca7de76 1766 port: Optional[int],
d11f45ef 1767 port_filename: Optional[str],
aca7de76
SM
1768 tracing_session_descriptors: Iterable[LttngTracingSessionDescriptor],
1769 max_query_data_response_size: Optional[int],
584af91e 1770 ):
f5567ea8 1771 logging.info("Server configuration:")
584af91e 1772
f5567ea8 1773 logging.info(" Port file name: `{}`".format(port_filename))
584af91e
PP
1774
1775 if max_query_data_response_size is not None:
1776 logging.info(
f5567ea8 1777 " Maximum response data query size: `{}`".format(
584af91e
PP
1778 max_query_data_response_size
1779 )
1780 )
1781
1782 for ts_descr in tracing_session_descriptors:
1783 info = ts_descr.info
1784 fmt = ' TS descriptor: name="{}", id={}, hostname="{}", live-timer-freq={}, client-count={}, stream-count={}:'
1785 logging.info(
1786 fmt.format(
1787 info.name,
1788 info.tracing_session_id,
1789 info.hostname,
1790 info.live_timer_freq,
1791 info.client_count,
1792 info.stream_count,
1793 )
1794 )
1795
1796 for trace in ts_descr.traces:
1797 logging.info(' Trace: path="{}"'.format(trace.path))
1798
1799 self._ts_descriptors = tracing_session_descriptors
1800 self._max_query_data_response_size = max_query_data_response_size
1801 self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1802 self._codec = _LttngLiveViewerProtocolCodec()
1803
1804 # Port 0: OS assigns an unused port
e51141d3 1805 serv_addr = ("localhost", port if port is not None else 0)
584af91e 1806 self._sock.bind(serv_addr)
d11f45ef
SM
1807
1808 if port_filename is not None:
1809 self._write_port_to_file(port_filename)
1810
1811 print("Listening on port {}".format(self._server_port))
584af91e 1812
2439d12b
SM
1813 for ts_descr in tracing_session_descriptors:
1814 info = ts_descr.info
1815 print(
1816 "net://localhost:{}/host/{}/{}".format(
1817 self._server_port, info.hostname, info.name
1818 )
1819 )
1820
584af91e
PP
1821 try:
1822 self._listen()
1823 finally:
1824 self._sock.close()
f5567ea8 1825 logging.info("Closed connection and socket.")
584af91e
PP
1826
1827 @property
1828 def _server_port(self):
1829 return self._sock.getsockname()[1]
1830
1831 def _recv_command(self):
1832 data = bytes()
1833
1834 while True:
f5567ea8 1835 logging.info("Waiting for viewer command.")
584af91e
PP
1836 buf = self._conn.recv(128)
1837
1838 if not buf:
f5567ea8 1839 logging.info("Client closed connection.")
584af91e
PP
1840
1841 if data:
f2958352 1842 raise RuntimeError(
f5567ea8 1843 "Client closed connection after having sent {} command bytes.".format(
584af91e
PP
1844 len(data)
1845 )
1846 )
1847
1848 return
1849
f5567ea8 1850 logging.info("Received data from viewer: length={}".format(len(buf)))
584af91e
PP
1851
1852 data += buf
1853
1854 try:
1855 cmd = self._codec.decode(data)
1856 except struct.error as exc:
f2958352 1857 raise RuntimeError("Malformed command: {}".format(exc)) from exc
584af91e
PP
1858
1859 if cmd is not None:
1860 logging.info(
f5567ea8 1861 "Received command from viewer: cmd-cls-name={}".format(
584af91e
PP
1862 cmd.__class__.__name__
1863 )
1864 )
1865 return cmd
1866
aca7de76 1867 def _send_reply(self, reply: _LttngLiveViewerReply):
584af91e
PP
1868 data = self._codec.encode(reply)
1869 logging.info(
f5567ea8 1870 "Sending reply to viewer: reply-cls-name={}, length={}".format(
584af91e
PP
1871 reply.__class__.__name__, len(data)
1872 )
1873 )
1874 self._conn.sendall(data)
1875
1876 def _handle_connection(self):
1877 # First command must be "connect"
1878 cmd = self._recv_command()
1879
1880 if type(cmd) is not _LttngLiveViewerConnectCommand:
f2958352 1881 raise RuntimeError(
584af91e
PP
1882 'First command is not "connect": cmd-cls-name={}'.format(
1883 cmd.__class__.__name__
1884 )
1885 )
1886
1887 # Create viewer session (arbitrary ID 23)
1888 logging.info(
f5567ea8 1889 "LTTng live viewer connected: version={}.{}".format(cmd.major, cmd.minor)
584af91e
PP
1890 )
1891 viewer_session = _LttngLiveViewerSession(
1892 23, self._ts_descriptors, self._max_query_data_response_size
1893 )
1894
1895 # Send "connect" reply
1896 self._send_reply(
1897 _LttngLiveViewerConnectReply(viewer_session.viewer_session_id, 2, 10)
1898 )
1899
1900 # Make the viewer session handle the remaining commands
1901 while True:
1902 cmd = self._recv_command()
1903
1904 if cmd is None:
1905 # Connection closed (at an expected location within the
1906 # conversation)
1907 return
1908
1909 self._send_reply(viewer_session.handle_command(cmd))
1910
1911 def _listen(self):
f5567ea8 1912 logging.info("Listening: port={}".format(self._server_port))
1726ac08
JR
1913 # Backlog must be present for Python version < 3.5.
1914 # 128 is an arbitrary number since we expect only 1 connection anyway.
1915 self._sock.listen(128)
584af91e
PP
1916 self._conn, viewer_addr = self._sock.accept()
1917 logging.info(
f5567ea8 1918 "Accepted viewer: addr={}:{}".format(viewer_addr[0], viewer_addr[1])
584af91e
PP
1919 )
1920
1921 try:
1922 self._handle_connection()
1923 finally:
1924 self._conn.close()
1925
aca7de76 1926 def _write_port_to_file(self, port_filename: str):
584af91e 1927 # Write the port number to a temporary file.
a0a92e79
SM
1928 with tempfile.NamedTemporaryFile(
1929 mode="w", delete=False, dir=os.path.dirname(port_filename)
1930 ) as tmp_port_file:
f5567ea8 1931 print(self._server_port, end="", file=tmp_port_file)
584af91e 1932
6362d281
SM
1933 # Rename temporary file to real file.
1934 #
1935 # For unknown reasons, on Windows, moving the port file from its
1936 # temporary location to its final location (where the user of
1937 # the server expects it to appear) may raise a `PermissionError`
1938 # exception.
1939 #
1940 # We suppose it's possible that something in the Windows kernel
1941 # hasn't completely finished using the file when we try to move
1942 # it.
1943 #
1944 # Use a wait-and-retry scheme as a (bad) workaround.
1945 num_attempts = 5
1946 retry_delay_s = 1
1947
1948 for attempt in reversed(range(num_attempts)):
1949 try:
1950 os.replace(tmp_port_file.name, port_filename)
1951 logging.info(
1952 'Renamed port file: src-path="{}", dst-path="{}"'.format(
1953 tmp_port_file.name, port_filename
1954 )
1955 )
1956 return
1957 except PermissionError:
1958 logging.info(
1959 'Permission error while attempting to rename port file; retrying in {} second: src-path="{}", dst-path="{}"'.format(
1960 retry_delay_s, tmp_port_file.name, port_filename
1961 )
1962 )
1963
1964 if attempt == 0:
1965 raise
1966
1967 time.sleep(retry_delay_s)
584af91e
PP
1968
1969
aca7de76
SM
1970def _session_descriptors_from_path(
1971 sessions_filename: str, trace_path_prefix: Optional[str]
1972):
2b763e29
JG
1973 # File format is:
1974 #
1975 # [
1976 # {
1977 # "name": "my-session",
1978 # "id": 17,
1979 # "hostname": "myhost",
1980 # "live-timer-freq": 1000000,
1981 # "client-count": 23,
1982 # "traces": [
1983 # {
3c22c122
JG
1984 # "path": "lol",
1985 # creation-timestamp: 12948
2b763e29
JG
1986 # },
1987 # {
71f56e5f
JG
1988 # "path": "meow/mix",
1989 # "beacons": {
1990 # "my_stream": [ 5235787, 728375283 ]
78169723
FD
1991 # },
1992 # "metadata-sections": [
1993 # {
1994 # "line": 1,
1995 # "timestamp": 0
1996 # }
1997 # ]
2b763e29
JG
1998 # }
1999 # ]
2000 # }
2001 # ]
f5567ea8 2002 with open(sessions_filename, "r") as sessions_file:
aca7de76
SM
2003 sessions_json = tjson.load(sessions_file, tjson.ArrayVal)
2004
2005 sessions = [] # type: list[LttngTracingSessionDescriptor]
2b763e29 2006
aca7de76
SM
2007 for session_json in sessions_json.iter(tjson.ObjVal):
2008 name = session_json.at("name", tjson.StrVal).val
2009 tracing_session_id = session_json.at("id", tjson.IntVal).val
2010 hostname = session_json.at("hostname", tjson.StrVal).val
2011 live_timer_freq = session_json.at("live-timer-freq", tjson.IntVal).val
2012 client_count = session_json.at("client-count", tjson.IntVal).val
2013 traces_json = session_json.at("traces", tjson.ArrayVal)
2b763e29 2014
aca7de76 2015 traces = [] # type: list[LttngTrace]
2b763e29 2016
aca7de76
SM
2017 for trace_json in traces_json.iter(tjson.ObjVal):
2018 metadata_sections = (
2019 trace_json.at("metadata-sections", tjson.ArrayVal)
2020 if "metadata-sections" in trace_json
2021 else None
2022 )
2023 beacons = (
2024 trace_json.at("beacons", tjson.ObjVal)
2025 if "beacons" in trace_json
2026 else None
2027 )
2028 path = trace_json.at("path", tjson.StrVal).val
3c22c122
JG
2029 creation_timestamp = (
2030 trace_json.at("creation-timestamp", tjson.IntVal).val
2031 if "creation-timestamp" in trace_json
2032 else 0
2033 ) # type: int
2b763e29 2034
aca7de76 2035 if not os.path.isabs(path) and trace_path_prefix:
2b763e29
JG
2036 path = os.path.join(trace_path_prefix, path)
2037
aca7de76 2038 traces.append(
3c22c122 2039 LttngTrace(path, metadata_sections, beacons, creation_timestamp)
aca7de76 2040 )
2b763e29
JG
2041
2042 sessions.append(
2043 LttngTracingSessionDescriptor(
2044 name,
2045 tracing_session_id,
2046 hostname,
2047 live_timer_freq,
2048 client_count,
2049 traces,
2050 )
2051 )
2052
2053 return sessions
584af91e
PP
2054
2055
aca7de76 2056def _loglevel_parser(string: str):
f5567ea8 2057 loglevels = {"info": logging.INFO, "warning": logging.WARNING}
584af91e
PP
2058 if string not in loglevels:
2059 msg = "{} is not a valid loglevel".format(string)
2060 raise argparse.ArgumentTypeError(msg)
2061 return loglevels[string]
2062
2063
f5567ea8
FD
2064if __name__ == "__main__":
2065 logging.basicConfig(format="# %(asctime)-25s%(message)s")
584af91e 2066 parser = argparse.ArgumentParser(
f5567ea8 2067 description="LTTng-live protocol mocker", add_help=False
584af91e
PP
2068 )
2069 parser.add_argument(
f5567ea8
FD
2070 "--log-level",
2071 default="warning",
2072 choices=["info", "warning"],
2073 help="The loglevel to be used.",
584af91e
PP
2074 )
2075
2076 loglevel_namespace, remaining_args = parser.parse_known_args()
2077 logging.getLogger().setLevel(_loglevel_parser(loglevel_namespace.log_level))
2078
e51141d3
SM
2079 parser.add_argument(
2080 "--port",
2081 help="The port to bind to. If missing, use an OS-assigned port..",
2082 type=int,
2083 )
584af91e 2084 parser.add_argument(
f5567ea8
FD
2085 "--port-filename",
2086 help="The final port file. This file is present when the server is ready to receive connection.",
584af91e
PP
2087 )
2088 parser.add_argument(
f5567ea8 2089 "--max-query-data-response-size",
584af91e 2090 type=int,
f5567ea8 2091 help="The maximum size of control data response in bytes",
584af91e
PP
2092 )
2093 parser.add_argument(
f5567ea8 2094 "--trace-path-prefix",
2b763e29 2095 type=str,
f5567ea8 2096 help="Prefix to prepend to the trace paths of session configurations",
2b763e29
JG
2097 )
2098 parser.add_argument(
e46b8462 2099 "sessions_filename",
776a2a25 2100 type=str,
f5567ea8 2101 help="Path to a session configuration file",
e46b8462 2102 metavar="sessions-filename",
584af91e
PP
2103 )
2104 parser.add_argument(
f5567ea8
FD
2105 "-h",
2106 "--help",
2107 action="help",
584af91e 2108 default=argparse.SUPPRESS,
f5567ea8 2109 help="Show this help message and exit.",
584af91e
PP
2110 )
2111
2112 args = parser.parse_args(args=remaining_args)
f2958352
SM
2113 sessions_filename = args.sessions_filename # type: str
2114 trace_path_prefix = args.trace_path_prefix # type: str | None
2115 sessions = _session_descriptors_from_path(
2116 sessions_filename,
2117 trace_path_prefix,
2118 )
aca7de76 2119
f2958352 2120 port = args.port # type: int | None
d11f45ef 2121 port_filename = args.port_filename # type: str | None
f2958352
SM
2122 max_query_data_response_size = args.max_query_data_response_size # type: int | None
2123 LttngLiveServer(port, port_filename, sessions, max_query_data_response_size)
This page took 0.156062 seconds and 5 git commands to generate.