tests: remove isdir/isfile asserts from lttng_live_server.py
[babeltrace.git] / tests / data / plugins / src.ctf.lttng-live / lttng_live_server.py
CommitLineData
0235b0db 1# SPDX-License-Identifier: MIT
584af91e 2#
0235b0db 3# Copyright (C) 2019 Philippe Proulx <pproulx@efficios.com>
584af91e 4#
584af91e 5
aca7de76
SM
6# pyright: strict, reportTypeCommentUsage=false, reportMissingTypeStubs=false
7
584af91e 8import os
584af91e
PP
9import re
10import socket
11import struct
5995b304
SM
12import logging
13import os.path
14import argparse
584af91e 15import tempfile
aca7de76
SM
16from typing import Dict, Union, Iterable, Optional, Sequence, overload
17
18import tjson
19
20# isort: off
21from typing import Any, Callable # noqa: F401
22
23# isort: on
584af91e
PP
24
25
ee1171e5
SM
26# An entry within the index of an LTTng data stream.
27class _LttngDataStreamIndexEntry:
28 def __init__(
29 self,
aca7de76
SM
30 offset_bytes: int,
31 total_size_bits: int,
32 content_size_bits: int,
33 timestamp_begin: int,
34 timestamp_end: int,
35 events_discarded: int,
36 stream_class_id: int,
ee1171e5
SM
37 ):
38 self._offset_bytes = offset_bytes
39 self._total_size_bits = total_size_bits
40 self._content_size_bits = content_size_bits
41 self._timestamp_begin = timestamp_begin
42 self._timestamp_end = timestamp_end
43 self._events_discarded = events_discarded
44 self._stream_class_id = stream_class_id
45
46 @property
47 def offset_bytes(self):
48 return self._offset_bytes
49
50 @property
51 def total_size_bits(self):
52 return self._total_size_bits
53
54 @property
55 def total_size_bytes(self):
56 return self._total_size_bits // 8
57
58 @property
59 def content_size_bits(self):
60 return self._content_size_bits
61
62 @property
63 def content_size_bytes(self):
64 return self._content_size_bits // 8
65
66 @property
67 def timestamp_begin(self):
68 return self._timestamp_begin
69
70 @property
71 def timestamp_end(self):
72 return self._timestamp_end
73
74 @property
75 def events_discarded(self):
76 return self._events_discarded
77
78 @property
79 def stream_class_id(self):
80 return self._stream_class_id
81
82
83# An entry within the index of an LTTng data stream. While a stream beacon entry
84# is conceptually unrelated to an index, it is sent as a reply to a
85# LttngLiveViewerGetNextDataStreamIndexEntryCommand
aca7de76
SM
86class _LttngDataStreamBeaconIndexEntry:
87 def __init__(self, stream_class_id: int, timestamp: int):
ee1171e5
SM
88 self._stream_class_id = stream_class_id
89 self._timestamp = timestamp
90
91 @property
92 def timestamp(self):
93 return self._timestamp
94
95 @property
96 def stream_class_id(self):
97 return self._stream_class_id
98
99
aca7de76
SM
100_LttngIndexEntryT = Union[_LttngDataStreamIndexEntry, _LttngDataStreamBeaconIndexEntry]
101
102
584af91e 103class _LttngLiveViewerCommand:
aca7de76 104 def __init__(self, version: int):
584af91e
PP
105 self._version = version
106
107 @property
108 def version(self):
109 return self._version
110
111
112class _LttngLiveViewerConnectCommand(_LttngLiveViewerCommand):
aca7de76 113 def __init__(self, version: int, viewer_session_id: int, major: int, minor: int):
584af91e
PP
114 super().__init__(version)
115 self._viewer_session_id = viewer_session_id
116 self._major = major
117 self._minor = minor
118
119 @property
120 def viewer_session_id(self):
121 return self._viewer_session_id
122
123 @property
124 def major(self):
125 return self._major
126
127 @property
128 def minor(self):
129 return self._minor
130
131
aca7de76
SM
132class _LttngLiveViewerReply:
133 pass
134
135
136class _LttngLiveViewerConnectReply(_LttngLiveViewerReply):
137 def __init__(self, viewer_session_id: int, major: int, minor: int):
584af91e
PP
138 self._viewer_session_id = viewer_session_id
139 self._major = major
140 self._minor = minor
141
142 @property
143 def viewer_session_id(self):
144 return self._viewer_session_id
145
146 @property
147 def major(self):
148 return self._major
149
150 @property
151 def minor(self):
152 return self._minor
153
154
155class _LttngLiveViewerGetTracingSessionInfosCommand(_LttngLiveViewerCommand):
156 pass
157
158
159class _LttngLiveViewerTracingSessionInfo:
160 def __init__(
161 self,
aca7de76
SM
162 tracing_session_id: int,
163 live_timer_freq: int,
164 client_count: int,
165 stream_count: int,
166 hostname: str,
167 name: str,
584af91e
PP
168 ):
169 self._tracing_session_id = tracing_session_id
170 self._live_timer_freq = live_timer_freq
171 self._client_count = client_count
172 self._stream_count = stream_count
173 self._hostname = hostname
174 self._name = name
175
176 @property
177 def tracing_session_id(self):
178 return self._tracing_session_id
179
180 @property
181 def live_timer_freq(self):
182 return self._live_timer_freq
183
184 @property
185 def client_count(self):
186 return self._client_count
187
188 @property
189 def stream_count(self):
190 return self._stream_count
191
192 @property
193 def hostname(self):
194 return self._hostname
195
196 @property
197 def name(self):
198 return self._name
199
200
aca7de76
SM
201class _LttngLiveViewerGetTracingSessionInfosReply(_LttngLiveViewerReply):
202 def __init__(
203 self, tracing_session_infos: Sequence[_LttngLiveViewerTracingSessionInfo]
204 ):
584af91e
PP
205 self._tracing_session_infos = tracing_session_infos
206
207 @property
208 def tracing_session_infos(self):
209 return self._tracing_session_infos
210
211
212class _LttngLiveViewerAttachToTracingSessionCommand(_LttngLiveViewerCommand):
213 class SeekType:
214 BEGINNING = 1
215 LAST = 2
216
aca7de76
SM
217 def __init__(
218 self, version: int, tracing_session_id: int, offset: int, seek_type: int
219 ):
584af91e
PP
220 super().__init__(version)
221 self._tracing_session_id = tracing_session_id
222 self._offset = offset
223 self._seek_type = seek_type
224
225 @property
226 def tracing_session_id(self):
227 return self._tracing_session_id
228
229 @property
230 def offset(self):
231 return self._offset
232
233 @property
234 def seek_type(self):
235 return self._seek_type
236
237
238class _LttngLiveViewerStreamInfo:
aca7de76
SM
239 def __init__(
240 self, id: int, trace_id: int, is_metadata: bool, path: str, channel_name: str
241 ):
584af91e
PP
242 self._id = id
243 self._trace_id = trace_id
244 self._is_metadata = is_metadata
245 self._path = path
246 self._channel_name = channel_name
247
248 @property
249 def id(self):
250 return self._id
251
252 @property
253 def trace_id(self):
254 return self._trace_id
255
256 @property
257 def is_metadata(self):
258 return self._is_metadata
259
260 @property
261 def path(self):
262 return self._path
263
264 @property
265 def channel_name(self):
266 return self._channel_name
267
268
aca7de76 269class _LttngLiveViewerAttachToTracingSessionReply(_LttngLiveViewerReply):
584af91e
PP
270 class Status:
271 OK = 1
272 ALREADY = 2
273 UNKNOWN = 3
274 NOT_LIVE = 4
275 SEEK_ERROR = 5
276 NO_SESSION = 6
277
aca7de76 278 def __init__(self, status: int, stream_infos: Sequence[_LttngLiveViewerStreamInfo]):
584af91e
PP
279 self._status = status
280 self._stream_infos = stream_infos
281
282 @property
283 def status(self):
284 return self._status
285
286 @property
287 def stream_infos(self):
288 return self._stream_infos
289
290
291class _LttngLiveViewerGetNextDataStreamIndexEntryCommand(_LttngLiveViewerCommand):
aca7de76 292 def __init__(self, version: int, stream_id: int):
584af91e
PP
293 super().__init__(version)
294 self._stream_id = stream_id
295
296 @property
297 def stream_id(self):
298 return self._stream_id
299
300
aca7de76 301class _LttngLiveViewerGetNextDataStreamIndexEntryReply(_LttngLiveViewerReply):
584af91e
PP
302 class Status:
303 OK = 1
304 RETRY = 2
305 HUP = 3
306 ERROR = 4
307 INACTIVE = 5
308 EOF = 6
309
aca7de76
SM
310 def __init__(
311 self,
312 status: int,
313 index_entry: _LttngIndexEntryT,
314 has_new_metadata: bool,
315 has_new_data_stream: bool,
316 ):
584af91e
PP
317 self._status = status
318 self._index_entry = index_entry
319 self._has_new_metadata = has_new_metadata
320 self._has_new_data_stream = has_new_data_stream
321
322 @property
323 def status(self):
324 return self._status
325
326 @property
327 def index_entry(self):
328 return self._index_entry
329
330 @property
331 def has_new_metadata(self):
332 return self._has_new_metadata
333
334 @property
335 def has_new_data_stream(self):
336 return self._has_new_data_stream
337
338
339class _LttngLiveViewerGetDataStreamPacketDataCommand(_LttngLiveViewerCommand):
aca7de76 340 def __init__(self, version: int, stream_id: int, offset: int, req_length: int):
584af91e
PP
341 super().__init__(version)
342 self._stream_id = stream_id
343 self._offset = offset
344 self._req_length = req_length
345
346 @property
347 def stream_id(self):
348 return self._stream_id
349
350 @property
351 def offset(self):
352 return self._offset
353
354 @property
355 def req_length(self):
356 return self._req_length
357
358
aca7de76 359class _LttngLiveViewerGetDataStreamPacketDataReply(_LttngLiveViewerReply):
584af91e
PP
360 class Status:
361 OK = 1
362 RETRY = 2
363 ERROR = 3
364 EOF = 4
365
aca7de76
SM
366 def __init__(
367 self,
368 status: int,
369 data: bytes,
370 has_new_metadata: bool,
371 has_new_data_stream: bool,
372 ):
584af91e
PP
373 self._status = status
374 self._data = data
375 self._has_new_metadata = has_new_metadata
376 self._has_new_data_stream = has_new_data_stream
377
378 @property
379 def status(self):
380 return self._status
381
382 @property
383 def data(self):
384 return self._data
385
386 @property
387 def has_new_metadata(self):
388 return self._has_new_metadata
389
390 @property
391 def has_new_data_stream(self):
392 return self._has_new_data_stream
393
394
395class _LttngLiveViewerGetMetadataStreamDataCommand(_LttngLiveViewerCommand):
aca7de76 396 def __init__(self, version: int, stream_id: int):
584af91e
PP
397 super().__init__(version)
398 self._stream_id = stream_id
399
400 @property
401 def stream_id(self):
402 return self._stream_id
403
404
aca7de76 405class _LttngLiveViewerGetMetadataStreamDataContentReply(_LttngLiveViewerReply):
584af91e
PP
406 class Status:
407 OK = 1
408 NO_NEW = 2
409 ERROR = 3
410
aca7de76 411 def __init__(self, status: int, data: bytes):
584af91e
PP
412 self._status = status
413 self._data = data
414
415 @property
416 def status(self):
417 return self._status
418
419 @property
420 def data(self):
421 return self._data
422
423
424class _LttngLiveViewerGetNewStreamInfosCommand(_LttngLiveViewerCommand):
aca7de76 425 def __init__(self, version: int, tracing_session_id: int):
584af91e
PP
426 super().__init__(version)
427 self._tracing_session_id = tracing_session_id
428
429 @property
430 def tracing_session_id(self):
431 return self._tracing_session_id
432
433
aca7de76 434class _LttngLiveViewerGetNewStreamInfosReply(_LttngLiveViewerReply):
584af91e
PP
435 class Status:
436 OK = 1
437 NO_NEW = 2
438 ERROR = 3
439 HUP = 4
440
aca7de76 441 def __init__(self, status: int, stream_infos: Sequence[_LttngLiveViewerStreamInfo]):
584af91e
PP
442 self._status = status
443 self._stream_infos = stream_infos
444
445 @property
446 def status(self):
447 return self._status
448
449 @property
450 def stream_infos(self):
451 return self._stream_infos
452
453
454class _LttngLiveViewerCreateViewerSessionCommand(_LttngLiveViewerCommand):
455 pass
456
457
aca7de76 458class _LttngLiveViewerCreateViewerSessionReply(_LttngLiveViewerReply):
584af91e
PP
459 class Status:
460 OK = 1
461 ERROR = 2
462
aca7de76 463 def __init__(self, status: int):
584af91e
PP
464 self._status = status
465
466 @property
467 def status(self):
468 return self._status
469
470
471class _LttngLiveViewerDetachFromTracingSessionCommand(_LttngLiveViewerCommand):
aca7de76 472 def __init__(self, version: int, tracing_session_id: int):
584af91e
PP
473 super().__init__(version)
474 self._tracing_session_id = tracing_session_id
475
476 @property
477 def tracing_session_id(self):
478 return self._tracing_session_id
479
480
aca7de76 481class _LttngLiveViewerDetachFromTracingSessionReply(_LttngLiveViewerReply):
584af91e
PP
482 class Status:
483 OK = 1
484 UNKNOWN = 2
485 ERROR = 3
486
aca7de76 487 def __init__(self, status: int):
584af91e
PP
488 self._status = status
489
490 @property
491 def status(self):
492 return self._status
493
494
495# An LTTng live protocol codec can convert bytes to command objects and
496# reply objects to bytes.
497class _LttngLiveViewerProtocolCodec:
f5567ea8 498 _COMMAND_HEADER_STRUCT_FMT = "QII"
584af91e
PP
499 _COMMAND_HEADER_SIZE_BYTES = struct.calcsize(_COMMAND_HEADER_STRUCT_FMT)
500
501 def __init__(self):
502 pass
503
aca7de76 504 def _unpack(self, fmt: str, data: bytes, offset: int = 0):
f5567ea8 505 fmt = "!" + fmt
584af91e
PP
506 return struct.unpack_from(fmt, data, offset)
507
aca7de76 508 def _unpack_payload(self, fmt: str, data: bytes):
584af91e
PP
509 return self._unpack(
510 fmt, data, _LttngLiveViewerProtocolCodec._COMMAND_HEADER_SIZE_BYTES
511 )
512
aca7de76 513 def decode(self, data: bytes):
584af91e
PP
514 if len(data) < self._COMMAND_HEADER_SIZE_BYTES:
515 # Not enough data to read the command header
516 return
517
518 payload_size, cmd_type, version = self._unpack(
519 self._COMMAND_HEADER_STRUCT_FMT, data
520 )
521 logging.info(
f5567ea8 522 "Decoded command header: payload-size={}, cmd-type={}, version={}".format(
584af91e
PP
523 payload_size, cmd_type, version
524 )
525 )
526
527 if len(data) < self._COMMAND_HEADER_SIZE_BYTES + payload_size:
528 # Not enough data to read the whole command
529 return
530
531 if cmd_type == 1:
aca7de76 532 viewer_session_id, major, minor, _ = self._unpack_payload("QIII", data)
584af91e
PP
533 return _LttngLiveViewerConnectCommand(
534 version, viewer_session_id, major, minor
535 )
536 elif cmd_type == 2:
537 return _LttngLiveViewerGetTracingSessionInfosCommand(version)
538 elif cmd_type == 3:
f5567ea8 539 tracing_session_id, offset, seek_type = self._unpack_payload("QQI", data)
584af91e
PP
540 return _LttngLiveViewerAttachToTracingSessionCommand(
541 version, tracing_session_id, offset, seek_type
542 )
543 elif cmd_type == 4:
f5567ea8 544 (stream_id,) = self._unpack_payload("Q", data)
584af91e
PP
545 return _LttngLiveViewerGetNextDataStreamIndexEntryCommand(
546 version, stream_id
547 )
548 elif cmd_type == 5:
f5567ea8 549 stream_id, offset, req_length = self._unpack_payload("QQI", data)
584af91e
PP
550 return _LttngLiveViewerGetDataStreamPacketDataCommand(
551 version, stream_id, offset, req_length
552 )
553 elif cmd_type == 6:
f5567ea8 554 (stream_id,) = self._unpack_payload("Q", data)
584af91e
PP
555 return _LttngLiveViewerGetMetadataStreamDataCommand(version, stream_id)
556 elif cmd_type == 7:
f5567ea8 557 (tracing_session_id,) = self._unpack_payload("Q", data)
584af91e
PP
558 return _LttngLiveViewerGetNewStreamInfosCommand(version, tracing_session_id)
559 elif cmd_type == 8:
560 return _LttngLiveViewerCreateViewerSessionCommand(version)
561 elif cmd_type == 9:
f5567ea8 562 (tracing_session_id,) = self._unpack_payload("Q", data)
584af91e
PP
563 return _LttngLiveViewerDetachFromTracingSessionCommand(
564 version, tracing_session_id
565 )
566 else:
f2958352 567 raise RuntimeError("Unknown command type {}".format(cmd_type))
584af91e 568
aca7de76 569 def _pack(self, fmt: str, *args: Any):
584af91e 570 # Force network byte order
f5567ea8 571 return struct.pack("!" + fmt, *args)
584af91e 572
aca7de76 573 def _encode_zero_padded_str(self, string: str, length: int):
584af91e 574 data = string.encode()
f5567ea8 575 return data.ljust(length, b"\x00")
584af91e 576
aca7de76 577 def _encode_stream_info(self, info: _LttngLiveViewerStreamInfo):
f5567ea8 578 data = self._pack("QQI", info.id, info.trace_id, int(info.is_metadata))
584af91e
PP
579 data += self._encode_zero_padded_str(info.path, 4096)
580 data += self._encode_zero_padded_str(info.channel_name, 255)
581 return data
582
aca7de76
SM
583 def _get_has_new_stuff_flags(
584 self, has_new_metadata: bool, has_new_data_streams: bool
585 ):
584af91e
PP
586 flags = 0
587
588 if has_new_metadata:
589 flags |= 1
590
591 if has_new_data_streams:
592 flags |= 2
593
594 return flags
595
aca7de76
SM
596 def encode(
597 self,
598 reply: _LttngLiveViewerReply,
599 ) -> bytes:
584af91e
PP
600 if type(reply) is _LttngLiveViewerConnectReply:
601 data = self._pack(
f5567ea8 602 "QIII", reply.viewer_session_id, reply.major, reply.minor, 2
584af91e
PP
603 )
604 elif type(reply) is _LttngLiveViewerGetTracingSessionInfosReply:
f5567ea8 605 data = self._pack("I", len(reply.tracing_session_infos))
584af91e
PP
606
607 for info in reply.tracing_session_infos:
608 data += self._pack(
f5567ea8 609 "QIII",
584af91e
PP
610 info.tracing_session_id,
611 info.live_timer_freq,
612 info.client_count,
613 info.stream_count,
614 )
615 data += self._encode_zero_padded_str(info.hostname, 64)
616 data += self._encode_zero_padded_str(info.name, 255)
617 elif type(reply) is _LttngLiveViewerAttachToTracingSessionReply:
f5567ea8 618 data = self._pack("II", reply.status, len(reply.stream_infos))
584af91e
PP
619
620 for info in reply.stream_infos:
621 data += self._encode_stream_info(info)
622 elif type(reply) is _LttngLiveViewerGetNextDataStreamIndexEntryReply:
f5567ea8 623 index_format = "QQQQQQQII"
584af91e
PP
624 entry = reply.index_entry
625 flags = self._get_has_new_stuff_flags(
626 reply.has_new_metadata, reply.has_new_data_stream
627 )
628
aca7de76 629 if isinstance(entry, _LttngDataStreamIndexEntry):
71f56e5f
JG
630 data = self._pack(
631 index_format,
632 entry.offset_bytes,
633 entry.total_size_bits,
634 entry.content_size_bits,
635 entry.timestamp_begin,
636 entry.timestamp_end,
637 entry.events_discarded,
638 entry.stream_class_id,
639 reply.status,
640 flags,
641 )
642 else:
71f56e5f
JG
643 data = self._pack(
644 index_format,
645 0,
646 0,
647 0,
648 0,
649 entry.timestamp,
650 0,
651 entry.stream_class_id,
652 reply.status,
653 flags,
654 )
584af91e
PP
655 elif type(reply) is _LttngLiveViewerGetDataStreamPacketDataReply:
656 flags = self._get_has_new_stuff_flags(
657 reply.has_new_metadata, reply.has_new_data_stream
658 )
f5567ea8 659 data = self._pack("III", reply.status, len(reply.data), flags)
584af91e
PP
660 data += reply.data
661 elif type(reply) is _LttngLiveViewerGetMetadataStreamDataContentReply:
f5567ea8 662 data = self._pack("QI", len(reply.data), reply.status)
584af91e
PP
663 data += reply.data
664 elif type(reply) is _LttngLiveViewerGetNewStreamInfosReply:
f5567ea8 665 data = self._pack("II", reply.status, len(reply.stream_infos))
584af91e
PP
666
667 for info in reply.stream_infos:
668 data += self._encode_stream_info(info)
669 elif type(reply) is _LttngLiveViewerCreateViewerSessionReply:
f5567ea8 670 data = self._pack("I", reply.status)
584af91e 671 elif type(reply) is _LttngLiveViewerDetachFromTracingSessionReply:
f5567ea8 672 data = self._pack("I", reply.status)
584af91e
PP
673 else:
674 raise ValueError(
f5567ea8 675 "Unknown reply object with class `{}`".format(reply.__class__.__name__)
584af91e
PP
676 )
677
678 return data
679
680
aca7de76
SM
681def _get_entry_timestamp_begin(
682 entry: _LttngIndexEntryT,
683):
684 if isinstance(entry, _LttngDataStreamBeaconIndexEntry):
78169723
FD
685 return entry.timestamp
686 else:
78169723
FD
687 return entry.timestamp_begin
688
689
584af91e 690# The index of an LTTng data stream, a sequence of index entries.
aca7de76
SM
691class _LttngDataStreamIndex(Sequence[_LttngIndexEntryT]):
692 def __init__(self, path: str, beacons: Optional[tjson.ArrayVal]):
584af91e
PP
693 self._path = path
694 self._build()
71f56e5f
JG
695
696 if beacons:
697 stream_class_id = self._entries[0].stream_class_id
aca7de76
SM
698
699 beacons_list = [] # type: list[_LttngDataStreamBeaconIndexEntry]
700 for ts in beacons.iter(tjson.IntVal):
701 beacons_list.append(
702 _LttngDataStreamBeaconIndexEntry(stream_class_id, ts.val)
703 )
704
705 self._add_beacons(beacons_list)
71f56e5f 706
584af91e
PP
707 logging.info(
708 'Built data stream index entries: path="{}", count={}'.format(
709 path, len(self._entries)
710 )
711 )
712
713 def _build(self):
aca7de76 714 self._entries = [] # type: list[_LttngIndexEntryT]
584af91e 715
f5567ea8 716 with open(self._path, "rb") as f:
584af91e 717 # Read header first
f5567ea8 718 fmt = ">IIII"
584af91e
PP
719 size = struct.calcsize(fmt)
720 data = f.read(size)
721 assert len(data) == size
aca7de76 722 magic, _, _, index_entry_length = struct.unpack(fmt, data)
584af91e
PP
723 assert magic == 0xC1F1DCC1
724
725 # Read index entries
f5567ea8 726 fmt = ">QQQQQQQ"
584af91e
PP
727 size = struct.calcsize(fmt)
728
729 while True:
730 logging.debug(
731 'Decoding data stream index entry: path="{}", offset={}'.format(
732 self._path, f.tell()
733 )
734 )
735 data = f.read(size)
736
737 if not data:
738 # Done
739 break
740
741 assert len(data) == size
75882e97
FD
742 (
743 offset_bytes,
744 total_size_bits,
745 content_size_bits,
746 timestamp_begin,
747 timestamp_end,
748 events_discarded,
749 stream_class_id,
750 ) = struct.unpack(fmt, data)
584af91e
PP
751
752 self._entries.append(
753 _LttngDataStreamIndexEntry(
754 offset_bytes,
755 total_size_bits,
756 content_size_bits,
757 timestamp_begin,
758 timestamp_end,
759 events_discarded,
760 stream_class_id,
761 )
762 )
763
764 # Skip anything else before the next entry
765 f.seek(index_entry_length - size, os.SEEK_CUR)
766
aca7de76 767 def _add_beacons(self, beacons: Iterable[_LttngDataStreamBeaconIndexEntry]):
71f56e5f 768 # Assumes entries[n + 1].timestamp_end >= entries[n].timestamp_begin
aca7de76
SM
769 def sort_key(
770 entry: Union[_LttngDataStreamIndexEntry, _LttngDataStreamBeaconIndexEntry],
771 ) -> int:
772 if isinstance(entry, _LttngDataStreamBeaconIndexEntry):
71f56e5f
JG
773 return entry.timestamp
774 else:
775 return entry.timestamp_end
776
777 self._entries += beacons
778 self._entries.sort(key=sort_key)
779
aca7de76
SM
780 @overload
781 def __getitem__(self, index: int) -> _LttngIndexEntryT:
782 ...
783
784 @overload
785 def __getitem__(self, index: slice) -> Sequence[_LttngIndexEntryT]: # noqa: F811
786 ...
787
788 def __getitem__( # noqa: F811
789 self, index: Union[int, slice]
790 ) -> Union[_LttngIndexEntryT, Sequence[_LttngIndexEntryT],]:
584af91e
PP
791 return self._entries[index]
792
793 def __len__(self):
794 return len(self._entries)
795
796 @property
797 def path(self):
798 return self._path
799
800
801# An LTTng data stream.
802class _LttngDataStream:
aca7de76 803 def __init__(self, path: str, beacons_json: Optional[tjson.ArrayVal]):
584af91e
PP
804 self._path = path
805 filename = os.path.basename(path)
f5567ea8 806 match = re.match(r"(.*)_\d+", filename)
b8b97725
SM
807 if not match:
808 raise RuntimeError(
809 "Unexpected data stream file name pattern: {}".format(filename)
810 )
811
584af91e
PP
812 self._channel_name = match.group(1)
813 trace_dir = os.path.dirname(path)
f5567ea8 814 index_path = os.path.join(trace_dir, "index", filename + ".idx")
aca7de76 815 self._index = _LttngDataStreamIndex(index_path, beacons_json)
584af91e 816 assert os.path.isfile(path)
f5567ea8 817 self._file = open(path, "rb")
584af91e
PP
818 logging.info(
819 'Built data stream: path="{}", channel-name="{}"'.format(
820 path, self._channel_name
821 )
822 )
823
824 @property
825 def path(self):
826 return self._path
827
828 @property
829 def channel_name(self):
830 return self._channel_name
831
832 @property
833 def index(self):
834 return self._index
835
aca7de76 836 def get_data(self, offset_bytes: int, len_bytes: int):
584af91e
PP
837 self._file.seek(offset_bytes)
838 return self._file.read(len_bytes)
839
840
78169723 841class _LttngMetadataStreamSection:
aca7de76 842 def __init__(self, timestamp: int, data: Optional[bytes]):
78169723
FD
843 self._timestamp = timestamp
844 if data is None:
845 self._data = bytes()
846 else:
847 self._data = data
848 logging.info(
f5567ea8 849 "Built metadata stream section: ts={}, data-len={}".format(
78169723
FD
850 self._timestamp, len(self._data)
851 )
852 )
853
854 @property
855 def timestamp(self):
856 return self._timestamp
857
858 @property
859 def data(self):
860 return self._data
861
862
584af91e
PP
863# An LTTng metadata stream.
864class _LttngMetadataStream:
aca7de76
SM
865 def __init__(
866 self,
867 metadata_file_path: str,
868 config_sections: Sequence[_LttngMetadataStreamSection],
869 ):
78169723
FD
870 self._path = metadata_file_path
871 self._sections = config_sections
872 logging.info(
f5567ea8 873 "Built metadata stream: path={}, section-len={}".format(
78169723
FD
874 self._path, len(self._sections)
875 )
876 )
584af91e
PP
877
878 @property
879 def path(self):
880 return self._path
881
882 @property
78169723
FD
883 def sections(self):
884 return self._sections
584af91e 885
78169723 886
aca7de76
SM
887class LttngMetadataConfigSection:
888 def __init__(self, line: int, timestamp: int, is_empty: bool):
889 self._line = line
890 self._timestamp = timestamp
891 self._is_empty = is_empty
892
893 @property
894 def line(self):
895 return self._line
78169723 896
aca7de76
SM
897 @property
898 def timestamp(self):
899 return self._timestamp
78169723 900
aca7de76
SM
901 @property
902 def is_empty(self):
903 return self._is_empty
904
905
906def _parse_metadata_sections_config(metadata_sections_json: tjson.ArrayVal):
907 metadata_sections = [] # type: list[LttngMetadataConfigSection]
78169723
FD
908 append_empty_section = False
909 last_timestamp = 0
910 last_line = 0
911
aca7de76
SM
912 for section in metadata_sections_json:
913 if isinstance(section, tjson.StrVal):
914 if section.val == "empty":
915 # Found a empty section marker. Actually append the section at the
916 # timestamp of the next concrete section.
917 append_empty_section = True
918 else:
919 raise ValueError("Invalid string value at {}.".format(section.path))
920 elif isinstance(section, tjson.ObjVal):
921 line = section.at("line", tjson.IntVal).val
922 ts = section.at("timestamp", tjson.IntVal).val
b8b97725 923
78169723
FD
924 # Sections' timestamps and lines must both be increasing.
925 assert ts > last_timestamp
926 last_timestamp = ts
aca7de76 927
78169723
FD
928 assert line > last_line
929 last_line = line
930
931 if append_empty_section:
aca7de76 932 metadata_sections.append(LttngMetadataConfigSection(line, ts, True))
78169723
FD
933 append_empty_section = False
934
aca7de76
SM
935 metadata_sections.append(LttngMetadataConfigSection(line, ts, False))
936 else:
937 raise TypeError(
938 "`{}`: expecting a string or object value".format(section.path)
939 )
78169723 940
aca7de76 941 return metadata_sections
78169723 942
78169723 943
aca7de76
SM
944def _split_metadata_sections(
945 metadata_file_path: str, metadata_sections_json: tjson.ArrayVal
946):
947 metadata_sections = _parse_metadata_sections_config(metadata_sections_json)
78169723 948
aca7de76 949 sections = [] # type: list[_LttngMetadataStreamSection]
f5567ea8 950 with open(metadata_file_path, "r") as metadata_file:
78169723
FD
951 metadata_lines = [line for line in metadata_file]
952
aca7de76 953 metadata_section_idx = 0
78169723
FD
954 curr_metadata_section = bytearray()
955
956 for idx, line_content in enumerate(metadata_lines):
957 # Add one to the index to convert from the zero-indexing of the
958 # enumerate() function to the one-indexing used by humans when
959 # viewing a text file.
960 curr_line_number = idx + 1
961
962 # If there are no more sections, simply append the line.
aca7de76 963 if metadata_section_idx + 1 >= len(metadata_sections):
f5567ea8 964 curr_metadata_section += bytearray(line_content, "utf8")
78169723
FD
965 continue
966
aca7de76 967 next_section_line_number = metadata_sections[metadata_section_idx + 1].line
78169723
FD
968
969 # If the next section begins at the current line, create a
970 # section with the metadata we gathered so far.
971 if curr_line_number >= next_section_line_number:
78169723
FD
972 # Flushing the metadata of the current section.
973 sections.append(
974 _LttngMetadataStreamSection(
aca7de76 975 metadata_sections[metadata_section_idx].timestamp,
78169723
FD
976 bytes(curr_metadata_section),
977 )
978 )
979
980 # Move to the next section.
aca7de76 981 metadata_section_idx += 1
78169723
FD
982
983 # Clear old content and append current line for the next section.
984 curr_metadata_section.clear()
f5567ea8 985 curr_metadata_section += bytearray(line_content, "utf8")
78169723
FD
986
987 # Append any empty sections.
aca7de76 988 while metadata_sections[metadata_section_idx].is_empty:
78169723
FD
989 sections.append(
990 _LttngMetadataStreamSection(
aca7de76 991 metadata_sections[metadata_section_idx].timestamp, None
78169723
FD
992 )
993 )
aca7de76 994 metadata_section_idx += 1
78169723
FD
995 else:
996 # Append line_content to the current metadata section.
f5567ea8 997 curr_metadata_section += bytearray(line_content, "utf8")
78169723
FD
998
999 # We iterated over all the lines of the metadata file. Close the current section.
1000 sections.append(
1001 _LttngMetadataStreamSection(
aca7de76 1002 metadata_sections[metadata_section_idx].timestamp,
78169723
FD
1003 bytes(curr_metadata_section),
1004 )
1005 )
1006
1007 return sections
584af91e
PP
1008
1009
aca7de76
SM
1010_StreamBeaconsT = Dict[str, Iterable[int]]
1011
1012
584af91e 1013# An LTTng trace, a sequence of LTTng data streams.
aca7de76
SM
1014class LttngTrace(Sequence[_LttngDataStream]):
1015 def __init__(
1016 self,
1017 trace_dir: str,
1018 metadata_sections_json: Optional[tjson.ArrayVal],
1019 beacons_json: Optional[tjson.ObjVal],
1020 ):
584af91e 1021 self._path = trace_dir
aca7de76
SM
1022 self._create_metadata_stream(trace_dir, metadata_sections_json)
1023 self._create_data_streams(trace_dir, beacons_json)
584af91e
PP
1024 logging.info('Built trace: path="{}"'.format(trace_dir))
1025
aca7de76
SM
1026 def _create_data_streams(
1027 self, trace_dir: str, beacons_json: Optional[tjson.ObjVal]
1028 ):
1029 data_stream_paths = [] # type: list[str]
584af91e
PP
1030
1031 for filename in os.listdir(trace_dir):
1032 path = os.path.join(trace_dir, filename)
1033
1034 if not os.path.isfile(path):
1035 continue
1036
f5567ea8 1037 if filename.startswith("."):
584af91e
PP
1038 continue
1039
f5567ea8 1040 if filename == "metadata":
584af91e
PP
1041 continue
1042
1043 data_stream_paths.append(path)
1044
1045 data_stream_paths.sort()
aca7de76 1046 self._data_streams = [] # type: list[_LttngDataStream]
584af91e
PP
1047
1048 for data_stream_path in data_stream_paths:
71f56e5f 1049 stream_name = os.path.basename(data_stream_path)
aca7de76
SM
1050 this_beacons_json = None
1051 if beacons_json is not None and stream_name in beacons_json:
1052 this_beacons_json = beacons_json.at(stream_name, tjson.ArrayVal)
71f56e5f
JG
1053
1054 self._data_streams.append(
aca7de76 1055 _LttngDataStream(data_stream_path, this_beacons_json)
71f56e5f 1056 )
584af91e 1057
aca7de76
SM
1058 def _create_metadata_stream(
1059 self, trace_dir: str, metadata_sections_json: Optional[tjson.ArrayVal]
1060 ):
f5567ea8 1061 metadata_path = os.path.join(trace_dir, "metadata")
aca7de76 1062 metadata_sections = [] # type: list[_LttngMetadataStreamSection]
78169723 1063
aca7de76 1064 if metadata_sections_json is None:
f5567ea8 1065 with open(metadata_path, "rb") as metadata_file:
78169723
FD
1066 metadata_sections.append(
1067 _LttngMetadataStreamSection(0, metadata_file.read())
1068 )
1069 else:
1070 metadata_sections = _split_metadata_sections(
aca7de76 1071 metadata_path, metadata_sections_json
78169723
FD
1072 )
1073
1074 self._metadata_stream = _LttngMetadataStream(metadata_path, metadata_sections)
1075
584af91e
PP
1076 @property
1077 def path(self):
1078 return self._path
1079
1080 @property
1081 def metadata_stream(self):
1082 return self._metadata_stream
1083
aca7de76
SM
1084 @overload
1085 def __getitem__(self, index: int) -> _LttngDataStream:
1086 ...
1087
1088 @overload
1089 def __getitem__(self, index: slice) -> Sequence[_LttngDataStream]: # noqa: F811
1090 ...
1091
1092 def __getitem__( # noqa: F811
1093 self, index: Union[int, slice]
1094 ) -> Union[_LttngDataStream, Sequence[_LttngDataStream]]:
584af91e
PP
1095 return self._data_streams[index]
1096
1097 def __len__(self):
1098 return len(self._data_streams)
1099
1100
1101# The state of a single data stream.
1102class _LttngLiveViewerSessionDataStreamState:
aca7de76
SM
1103 def __init__(
1104 self,
1105 ts_state: "_LttngLiveViewerSessionTracingSessionState",
1106 info: _LttngLiveViewerStreamInfo,
1107 data_stream: _LttngDataStream,
1108 metadata_stream_id: int,
1109 ):
584af91e
PP
1110 self._ts_state = ts_state
1111 self._info = info
1112 self._data_stream = data_stream
78169723 1113 self._metadata_stream_id = metadata_stream_id
584af91e
PP
1114 self._cur_index_entry_index = 0
1115 fmt = 'Built data stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
1116 logging.info(
1117 fmt.format(
1118 info.id,
1119 ts_state.tracing_session_descriptor.info.tracing_session_id,
1120 ts_state.tracing_session_descriptor.info.name,
1121 data_stream.path,
1122 )
1123 )
1124
1125 @property
1126 def tracing_session_state(self):
1127 return self._ts_state
1128
1129 @property
1130 def info(self):
1131 return self._info
1132
1133 @property
1134 def data_stream(self):
1135 return self._data_stream
1136
1137 @property
1138 def cur_index_entry(self):
1139 if self._cur_index_entry_index == len(self._data_stream.index):
1140 return
1141
1142 return self._data_stream.index[self._cur_index_entry_index]
1143
aca7de76
SM
1144 @property
1145 def metadata_stream_id(self):
1146 return self._metadata_stream_id
1147
584af91e
PP
1148 def goto_next_index_entry(self):
1149 self._cur_index_entry_index += 1
1150
1151
1152# The state of a single metadata stream.
1153class _LttngLiveViewerSessionMetadataStreamState:
aca7de76
SM
1154 def __init__(
1155 self,
1156 ts_state: "_LttngLiveViewerSessionTracingSessionState",
1157 info: _LttngLiveViewerStreamInfo,
1158 metadata_stream: _LttngMetadataStream,
1159 ):
584af91e
PP
1160 self._ts_state = ts_state
1161 self._info = info
1162 self._metadata_stream = metadata_stream
78169723
FD
1163 self._cur_metadata_stream_section_index = 0
1164 if len(metadata_stream.sections) > 1:
1165 self._next_metadata_stream_section_timestamp = metadata_stream.sections[
1166 1
1167 ].timestamp
1168 else:
1169 self._next_metadata_stream_section_timestamp = None
1170
584af91e
PP
1171 self._is_sent = False
1172 fmt = 'Built metadata stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
1173 logging.info(
1174 fmt.format(
1175 info.id,
1176 ts_state.tracing_session_descriptor.info.tracing_session_id,
1177 ts_state.tracing_session_descriptor.info.name,
1178 metadata_stream.path,
1179 )
1180 )
1181
584af91e
PP
1182 @property
1183 def info(self):
1184 return self._info
1185
1186 @property
1187 def metadata_stream(self):
1188 return self._metadata_stream
1189
1190 @property
1191 def is_sent(self):
1192 return self._is_sent
1193
1194 @is_sent.setter
aca7de76 1195 def is_sent(self, value: bool):
584af91e
PP
1196 self._is_sent = value
1197
78169723
FD
1198 @property
1199 def cur_section(self):
1200 fmt = "Get current metadata section: section-idx={}"
1201 logging.info(fmt.format(self._cur_metadata_stream_section_index))
1202 if self._cur_metadata_stream_section_index == len(
1203 self._metadata_stream.sections
1204 ):
1205 return
1206
1207 return self._metadata_stream.sections[self._cur_metadata_stream_section_index]
1208
1209 def goto_next_section(self):
1210 self._cur_metadata_stream_section_index += 1
1211 if self.cur_section:
1212 self._next_metadata_stream_section_timestamp = self.cur_section.timestamp
1213 else:
1214 self._next_metadata_stream_section_timestamp = None
1215
1216 @property
1217 def next_section_timestamp(self):
1218 return self._next_metadata_stream_section_timestamp
1219
584af91e 1220
ee1171e5
SM
1221# A tracing session descriptor.
1222#
1223# In the constructor, `traces` is a list of LTTng traces (`LttngTrace`
1224# objects).
1225class LttngTracingSessionDescriptor:
1226 def __init__(
aca7de76
SM
1227 self,
1228 name: str,
1229 tracing_session_id: int,
1230 hostname: str,
1231 live_timer_freq: int,
1232 client_count: int,
1233 traces: Iterable[LttngTrace],
ee1171e5
SM
1234 ):
1235 for trace in traces:
1236 if name not in trace.path:
1237 fmt = "Tracing session name must be part of every trace path (`{}` not found in `{}`)"
1238 raise ValueError(fmt.format(name, trace.path))
1239
1240 self._traces = traces
1241 stream_count = sum([len(t) + 1 for t in traces])
1242 self._info = _LttngLiveViewerTracingSessionInfo(
1243 tracing_session_id,
1244 live_timer_freq,
1245 client_count,
1246 stream_count,
1247 hostname,
1248 name,
1249 )
1250
1251 @property
1252 def traces(self):
1253 return self._traces
1254
1255 @property
1256 def info(self):
1257 return self._info
1258
1259
584af91e
PP
1260# The state of a tracing session.
1261class _LttngLiveViewerSessionTracingSessionState:
aca7de76 1262 def __init__(self, tc_descr: LttngTracingSessionDescriptor, base_stream_id: int):
584af91e 1263 self._tc_descr = tc_descr
aca7de76
SM
1264 self._stream_infos = [] # type: list[_LttngLiveViewerStreamInfo]
1265 self._ds_states = {} # type: dict[int, _LttngLiveViewerSessionDataStreamState]
1266 self._ms_states = (
1267 {}
1268 ) # type: dict[int, _LttngLiveViewerSessionMetadataStreamState]
584af91e
PP
1269 stream_id = base_stream_id
1270
1271 for trace in tc_descr.traces:
1272 trace_id = stream_id * 1000
1273
78169723
FD
1274 # Metadata stream -> stream info and metadata stream state
1275 info = _LttngLiveViewerStreamInfo(
f5567ea8 1276 stream_id, trace_id, True, trace.metadata_stream.path, "metadata"
78169723
FD
1277 )
1278 self._stream_infos.append(info)
1279 self._ms_states[stream_id] = _LttngLiveViewerSessionMetadataStreamState(
1280 self, info, trace.metadata_stream
1281 )
1282 metadata_stream_id = stream_id
1283 stream_id += 1
1284
584af91e
PP
1285 # Data streams -> stream infos and data stream states
1286 for data_stream in trace:
1287 info = _LttngLiveViewerStreamInfo(
1288 stream_id,
1289 trace_id,
1290 False,
1291 data_stream.path,
1292 data_stream.channel_name,
1293 )
1294 self._stream_infos.append(info)
1295 self._ds_states[stream_id] = _LttngLiveViewerSessionDataStreamState(
78169723 1296 self, info, data_stream, metadata_stream_id
584af91e
PP
1297 )
1298 stream_id += 1
1299
584af91e
PP
1300 self._is_attached = False
1301 fmt = 'Built tracing session state: id={}, name="{}"'
1302 logging.info(fmt.format(tc_descr.info.tracing_session_id, tc_descr.info.name))
1303
1304 @property
1305 def tracing_session_descriptor(self):
1306 return self._tc_descr
1307
1308 @property
1309 def data_stream_states(self):
1310 return self._ds_states
1311
1312 @property
1313 def metadata_stream_states(self):
1314 return self._ms_states
1315
1316 @property
1317 def stream_infos(self):
1318 return self._stream_infos
1319
1320 @property
1321 def has_new_metadata(self):
1322 return any([not ms.is_sent for ms in self._ms_states.values()])
1323
1324 @property
1325 def is_attached(self):
1326 return self._is_attached
1327
1328 @is_attached.setter
aca7de76 1329 def is_attached(self, value: bool):
584af91e
PP
1330 self._is_attached = value
1331
1332
aca7de76
SM
1333def needs_new_metadata_section(
1334 metadata_stream_state: _LttngLiveViewerSessionMetadataStreamState,
1335 latest_timestamp: int,
1336):
78169723
FD
1337 if metadata_stream_state.next_section_timestamp is None:
1338 return False
1339
1340 if latest_timestamp >= metadata_stream_state.next_section_timestamp:
1341 return True
1342 else:
1343 return False
1344
1345
584af91e
PP
1346# An LTTng live viewer session manages a view on tracing sessions
1347# and replies to commands accordingly.
1348class _LttngLiveViewerSession:
1349 def __init__(
1350 self,
aca7de76
SM
1351 viewer_session_id: int,
1352 tracing_session_descriptors: Iterable[LttngTracingSessionDescriptor],
1353 max_query_data_response_size: Optional[int],
584af91e
PP
1354 ):
1355 self._viewer_session_id = viewer_session_id
aca7de76
SM
1356 self._ts_states = (
1357 {}
1358 ) # type: dict[int, _LttngLiveViewerSessionTracingSessionState]
1359 self._stream_states = (
1360 {}
1361 ) # type: dict[int, _LttngLiveViewerSessionDataStreamState | _LttngLiveViewerSessionMetadataStreamState]
584af91e
PP
1362 self._max_query_data_response_size = max_query_data_response_size
1363 total_stream_infos = 0
1364
1365 for ts_descr in tracing_session_descriptors:
1366 ts_state = _LttngLiveViewerSessionTracingSessionState(
1367 ts_descr, total_stream_infos
1368 )
1369 ts_id = ts_state.tracing_session_descriptor.info.tracing_session_id
1370 self._ts_states[ts_id] = ts_state
1371 total_stream_infos += len(ts_state.stream_infos)
1372
1373 # Update session's stream states to have the new states
1374 self._stream_states.update(ts_state.data_stream_states)
1375 self._stream_states.update(ts_state.metadata_stream_states)
1376
1377 self._command_handlers = {
1378 _LttngLiveViewerAttachToTracingSessionCommand: self._handle_attach_to_tracing_session_command,
1379 _LttngLiveViewerCreateViewerSessionCommand: self._handle_create_viewer_session_command,
1380 _LttngLiveViewerDetachFromTracingSessionCommand: self._handle_detach_from_tracing_session_command,
1381 _LttngLiveViewerGetDataStreamPacketDataCommand: self._handle_get_data_stream_packet_data_command,
1382 _LttngLiveViewerGetMetadataStreamDataCommand: self._handle_get_metadata_stream_data_command,
1383 _LttngLiveViewerGetNewStreamInfosCommand: self._handle_get_new_stream_infos_command,
1384 _LttngLiveViewerGetNextDataStreamIndexEntryCommand: self._handle_get_next_data_stream_index_entry_command,
1385 _LttngLiveViewerGetTracingSessionInfosCommand: self._handle_get_tracing_session_infos_command,
aca7de76 1386 } # type: dict[type[_LttngLiveViewerCommand], Callable[[Any], _LttngLiveViewerReply]]
584af91e
PP
1387
1388 @property
1389 def viewer_session_id(self):
1390 return self._viewer_session_id
1391
aca7de76 1392 def _get_tracing_session_state(self, tracing_session_id: int):
584af91e 1393 if tracing_session_id not in self._ts_states:
f2958352 1394 raise RuntimeError(
f5567ea8 1395 "Unknown tracing session ID {}".format(tracing_session_id)
584af91e
PP
1396 )
1397
1398 return self._ts_states[tracing_session_id]
1399
aca7de76 1400 def _get_data_stream_state(self, stream_id: int):
584af91e 1401 if stream_id not in self._stream_states:
aca7de76 1402 RuntimeError("Unknown stream ID {}".format(stream_id))
584af91e 1403
aca7de76
SM
1404 stream = self._stream_states[stream_id]
1405 if type(stream) is not _LttngLiveViewerSessionDataStreamState:
1406 raise RuntimeError("Stream is not a data stream")
584af91e 1407
aca7de76
SM
1408 return stream
1409
1410 def _get_metadata_stream_state(self, stream_id: int):
1411 if stream_id not in self._stream_states:
1412 RuntimeError("Unknown stream ID {}".format(stream_id))
1413
1414 stream = self._stream_states[stream_id]
1415 if type(stream) is not _LttngLiveViewerSessionMetadataStreamState:
1416 raise RuntimeError("Stream is not a metadata stream")
1417
1418 return stream
1419
1420 def handle_command(self, cmd: _LttngLiveViewerCommand):
584af91e 1421 logging.info(
f5567ea8 1422 "Handling command in viewer session: cmd-cls-name={}".format(
584af91e
PP
1423 cmd.__class__.__name__
1424 )
1425 )
1426 cmd_type = type(cmd)
1427
1428 if cmd_type not in self._command_handlers:
f2958352 1429 raise RuntimeError(
f5567ea8 1430 "Unexpected command: cmd-cls-name={}".format(cmd.__class__.__name__)
584af91e
PP
1431 )
1432
1433 return self._command_handlers[cmd_type](cmd)
1434
aca7de76
SM
1435 def _handle_attach_to_tracing_session_command(
1436 self, cmd: _LttngLiveViewerAttachToTracingSessionCommand
1437 ):
584af91e
PP
1438 fmt = 'Handling "attach to tracing session" command: ts-id={}, offset={}, seek-type={}'
1439 logging.info(fmt.format(cmd.tracing_session_id, cmd.offset, cmd.seek_type))
1440 ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
1441 info = ts_state.tracing_session_descriptor.info
1442
1443 if ts_state.is_attached:
f2958352 1444 raise RuntimeError(
f5567ea8 1445 "Cannot attach to tracing session `{}`: viewer is already attached".format(
584af91e
PP
1446 info.name
1447 )
1448 )
1449
1450 ts_state.is_attached = True
1451 status = _LttngLiveViewerAttachToTracingSessionReply.Status.OK
1452 return _LttngLiveViewerAttachToTracingSessionReply(
1453 status, ts_state.stream_infos
1454 )
1455
aca7de76
SM
1456 def _handle_detach_from_tracing_session_command(
1457 self, cmd: _LttngLiveViewerDetachFromTracingSessionCommand
1458 ):
584af91e
PP
1459 fmt = 'Handling "detach from tracing session" command: ts-id={}'
1460 logging.info(fmt.format(cmd.tracing_session_id))
1461 ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
1462 info = ts_state.tracing_session_descriptor.info
1463
1464 if not ts_state.is_attached:
f2958352 1465 raise RuntimeError(
f5567ea8 1466 "Cannot detach to tracing session `{}`: viewer is not attached".format(
584af91e
PP
1467 info.name
1468 )
1469 )
1470
1471 ts_state.is_attached = False
1472 status = _LttngLiveViewerDetachFromTracingSessionReply.Status.OK
1473 return _LttngLiveViewerDetachFromTracingSessionReply(status)
1474
aca7de76
SM
1475 def _handle_get_next_data_stream_index_entry_command(
1476 self, cmd: _LttngLiveViewerGetNextDataStreamIndexEntryCommand
1477 ):
584af91e
PP
1478 fmt = 'Handling "get next data stream index entry" command: stream-id={}'
1479 logging.info(fmt.format(cmd.stream_id))
aca7de76
SM
1480 stream_state = self._get_data_stream_state(cmd.stream_id)
1481 metadata_stream_state = self._get_metadata_stream_state(
1482 stream_state.metadata_stream_id
1483 )
584af91e
PP
1484
1485 if stream_state.cur_index_entry is None:
1486 # The viewer is done reading this stream
1487 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.HUP
1488
1489 # Dummy data stream index entry to use with the `HUP` status
1490 # (the reply needs one, but the viewer ignores it)
1491 index_entry = _LttngDataStreamIndexEntry(0, 0, 0, 0, 0, 0, 0)
1492
1493 return _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1494 status, index_entry, False, False
1495 )
1496
78169723
FD
1497 timestamp_begin = _get_entry_timestamp_begin(stream_state.cur_index_entry)
1498
1499 if needs_new_metadata_section(metadata_stream_state, timestamp_begin):
1500 metadata_stream_state.is_sent = False
1501 metadata_stream_state.goto_next_section()
1502
584af91e
PP
1503 # The viewer only checks the `has_new_metadata` flag if the
1504 # reply's status is `OK`, so we need to provide an index here
1505 has_new_metadata = stream_state.tracing_session_state.has_new_metadata
aca7de76 1506 if isinstance(stream_state.cur_index_entry, _LttngDataStreamIndexEntry):
71f56e5f
JG
1507 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.OK
1508 else:
71f56e5f
JG
1509 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.INACTIVE
1510
584af91e
PP
1511 reply = _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1512 status, stream_state.cur_index_entry, has_new_metadata, False
1513 )
1514 stream_state.goto_next_index_entry()
1515 return reply
1516
aca7de76
SM
1517 def _handle_get_data_stream_packet_data_command(
1518 self, cmd: _LttngLiveViewerGetDataStreamPacketDataCommand
1519 ):
584af91e
PP
1520 fmt = 'Handling "get data stream packet data" command: stream-id={}, offset={}, req-length={}'
1521 logging.info(fmt.format(cmd.stream_id, cmd.offset, cmd.req_length))
aca7de76 1522 stream_state = self._get_data_stream_state(cmd.stream_id)
584af91e
PP
1523 data_response_length = cmd.req_length
1524
584af91e
PP
1525 if stream_state.tracing_session_state.has_new_metadata:
1526 status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.ERROR
1527 return _LttngLiveViewerGetDataStreamPacketDataReply(
1528 status, bytes(), True, False
1529 )
1530
1531 if self._max_query_data_response_size:
1532 # Enforce a server side limit on the query requested length.
1533 # To ensure that the transaction terminate take the minimum of both
1534 # value.
1535 data_response_length = min(
1536 cmd.req_length, self._max_query_data_response_size
1537 )
1538 fmt = 'Limiting "get data stream packet data" command: req-length={} actual response size={}'
1539 logging.info(fmt.format(cmd.req_length, data_response_length))
1540
1541 data = stream_state.data_stream.get_data(cmd.offset, data_response_length)
1542 status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.OK
1543 return _LttngLiveViewerGetDataStreamPacketDataReply(status, data, False, False)
1544
aca7de76
SM
1545 def _handle_get_metadata_stream_data_command(
1546 self, cmd: _LttngLiveViewerGetMetadataStreamDataCommand
1547 ):
584af91e
PP
1548 fmt = 'Handling "get metadata stream data" command: stream-id={}'
1549 logging.info(fmt.format(cmd.stream_id))
aca7de76 1550 metadata_stream_state = self._get_metadata_stream_state(cmd.stream_id)
584af91e 1551
78169723 1552 if metadata_stream_state.is_sent:
584af91e
PP
1553 status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.NO_NEW
1554 return _LttngLiveViewerGetMetadataStreamDataContentReply(status, bytes())
1555
78169723 1556 metadata_stream_state.is_sent = True
584af91e 1557 status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.OK
78169723 1558 metadata_section = metadata_stream_state.cur_section
b8b97725 1559 assert metadata_section is not None
78169723
FD
1560
1561 # If we are sending an empty section, ready the next one right away.
1562 if len(metadata_section.data) == 0:
1563 metadata_stream_state.is_sent = False
1564 metadata_stream_state.goto_next_section()
1565
1566 fmt = 'Replying to "get metadata stream data" command: metadata-size={}'
1567 logging.info(fmt.format(len(metadata_section.data)))
584af91e 1568 return _LttngLiveViewerGetMetadataStreamDataContentReply(
78169723 1569 status, metadata_section.data
584af91e
PP
1570 )
1571
aca7de76
SM
1572 def _handle_get_new_stream_infos_command(
1573 self, cmd: _LttngLiveViewerGetNewStreamInfosCommand
1574 ):
584af91e
PP
1575 fmt = 'Handling "get new stream infos" command: ts-id={}'
1576 logging.info(fmt.format(cmd.tracing_session_id))
1577
1578 # As of this version, all the tracing session's stream infos are
1579 # always given to the viewer when sending the "attach to tracing
1580 # session" reply, so there's nothing new here. Return the `HUP`
1581 # status as, if we're handling this command, the viewer consumed
1582 # all the existing data streams.
1583 status = _LttngLiveViewerGetNewStreamInfosReply.Status.HUP
1584 return _LttngLiveViewerGetNewStreamInfosReply(status, [])
1585
aca7de76
SM
1586 def _handle_get_tracing_session_infos_command(
1587 self, cmd: _LttngLiveViewerGetTracingSessionInfosCommand
1588 ):
584af91e
PP
1589 logging.info('Handling "get tracing session infos" command.')
1590 infos = [
1591 tss.tracing_session_descriptor.info for tss in self._ts_states.values()
1592 ]
1593 infos.sort(key=lambda info: info.name)
1594 return _LttngLiveViewerGetTracingSessionInfosReply(infos)
1595
aca7de76
SM
1596 def _handle_create_viewer_session_command(
1597 self, cmd: _LttngLiveViewerCreateViewerSessionCommand
1598 ):
584af91e
PP
1599 logging.info('Handling "create viewer session" command.')
1600 status = _LttngLiveViewerCreateViewerSessionReply.Status.OK
1601
1602 # This does nothing here. In the LTTng relay daemon, it
1603 # allocates the viewer session's state.
1604 return _LttngLiveViewerCreateViewerSessionReply(status)
1605
1606
1607# An LTTng live TCP server.
1608#
e51141d3
SM
1609# On creation, it binds to `localhost` on the TCP port `port` if not `None`, or
1610# on an OS-assigned TCP port otherwise. It writes the decimal TCP port number
1611# to a temporary port file. It renames the temporary port file to
1612# `port_filename`.
584af91e
PP
1613#
1614# `tracing_session_descriptors` is a list of tracing session descriptors
1615# (`LttngTracingSessionDescriptor`) to serve.
1616#
1617# This server accepts a single viewer (client).
1618#
1619# When the viewer closes the connection, the server's constructor
1620# returns.
1621class LttngLiveServer:
1622 def __init__(
e51141d3 1623 self,
aca7de76 1624 port: Optional[int],
d11f45ef 1625 port_filename: Optional[str],
aca7de76
SM
1626 tracing_session_descriptors: Iterable[LttngTracingSessionDescriptor],
1627 max_query_data_response_size: Optional[int],
584af91e 1628 ):
f5567ea8 1629 logging.info("Server configuration:")
584af91e 1630
f5567ea8 1631 logging.info(" Port file name: `{}`".format(port_filename))
584af91e
PP
1632
1633 if max_query_data_response_size is not None:
1634 logging.info(
f5567ea8 1635 " Maximum response data query size: `{}`".format(
584af91e
PP
1636 max_query_data_response_size
1637 )
1638 )
1639
1640 for ts_descr in tracing_session_descriptors:
1641 info = ts_descr.info
1642 fmt = ' TS descriptor: name="{}", id={}, hostname="{}", live-timer-freq={}, client-count={}, stream-count={}:'
1643 logging.info(
1644 fmt.format(
1645 info.name,
1646 info.tracing_session_id,
1647 info.hostname,
1648 info.live_timer_freq,
1649 info.client_count,
1650 info.stream_count,
1651 )
1652 )
1653
1654 for trace in ts_descr.traces:
1655 logging.info(' Trace: path="{}"'.format(trace.path))
1656
1657 self._ts_descriptors = tracing_session_descriptors
1658 self._max_query_data_response_size = max_query_data_response_size
1659 self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1660 self._codec = _LttngLiveViewerProtocolCodec()
1661
1662 # Port 0: OS assigns an unused port
e51141d3 1663 serv_addr = ("localhost", port if port is not None else 0)
584af91e 1664 self._sock.bind(serv_addr)
d11f45ef
SM
1665
1666 if port_filename is not None:
1667 self._write_port_to_file(port_filename)
1668
1669 print("Listening on port {}".format(self._server_port))
584af91e
PP
1670
1671 try:
1672 self._listen()
1673 finally:
1674 self._sock.close()
f5567ea8 1675 logging.info("Closed connection and socket.")
584af91e
PP
1676
1677 @property
1678 def _server_port(self):
1679 return self._sock.getsockname()[1]
1680
1681 def _recv_command(self):
1682 data = bytes()
1683
1684 while True:
f5567ea8 1685 logging.info("Waiting for viewer command.")
584af91e
PP
1686 buf = self._conn.recv(128)
1687
1688 if not buf:
f5567ea8 1689 logging.info("Client closed connection.")
584af91e
PP
1690
1691 if data:
f2958352 1692 raise RuntimeError(
f5567ea8 1693 "Client closed connection after having sent {} command bytes.".format(
584af91e
PP
1694 len(data)
1695 )
1696 )
1697
1698 return
1699
f5567ea8 1700 logging.info("Received data from viewer: length={}".format(len(buf)))
584af91e
PP
1701
1702 data += buf
1703
1704 try:
1705 cmd = self._codec.decode(data)
1706 except struct.error as exc:
f2958352 1707 raise RuntimeError("Malformed command: {}".format(exc)) from exc
584af91e
PP
1708
1709 if cmd is not None:
1710 logging.info(
f5567ea8 1711 "Received command from viewer: cmd-cls-name={}".format(
584af91e
PP
1712 cmd.__class__.__name__
1713 )
1714 )
1715 return cmd
1716
aca7de76 1717 def _send_reply(self, reply: _LttngLiveViewerReply):
584af91e
PP
1718 data = self._codec.encode(reply)
1719 logging.info(
f5567ea8 1720 "Sending reply to viewer: reply-cls-name={}, length={}".format(
584af91e
PP
1721 reply.__class__.__name__, len(data)
1722 )
1723 )
1724 self._conn.sendall(data)
1725
1726 def _handle_connection(self):
1727 # First command must be "connect"
1728 cmd = self._recv_command()
1729
1730 if type(cmd) is not _LttngLiveViewerConnectCommand:
f2958352 1731 raise RuntimeError(
584af91e
PP
1732 'First command is not "connect": cmd-cls-name={}'.format(
1733 cmd.__class__.__name__
1734 )
1735 )
1736
1737 # Create viewer session (arbitrary ID 23)
1738 logging.info(
f5567ea8 1739 "LTTng live viewer connected: version={}.{}".format(cmd.major, cmd.minor)
584af91e
PP
1740 )
1741 viewer_session = _LttngLiveViewerSession(
1742 23, self._ts_descriptors, self._max_query_data_response_size
1743 )
1744
1745 # Send "connect" reply
1746 self._send_reply(
1747 _LttngLiveViewerConnectReply(viewer_session.viewer_session_id, 2, 10)
1748 )
1749
1750 # Make the viewer session handle the remaining commands
1751 while True:
1752 cmd = self._recv_command()
1753
1754 if cmd is None:
1755 # Connection closed (at an expected location within the
1756 # conversation)
1757 return
1758
1759 self._send_reply(viewer_session.handle_command(cmd))
1760
1761 def _listen(self):
f5567ea8 1762 logging.info("Listening: port={}".format(self._server_port))
1726ac08
JR
1763 # Backlog must be present for Python version < 3.5.
1764 # 128 is an arbitrary number since we expect only 1 connection anyway.
1765 self._sock.listen(128)
584af91e
PP
1766 self._conn, viewer_addr = self._sock.accept()
1767 logging.info(
f5567ea8 1768 "Accepted viewer: addr={}:{}".format(viewer_addr[0], viewer_addr[1])
584af91e
PP
1769 )
1770
1771 try:
1772 self._handle_connection()
1773 finally:
1774 self._conn.close()
1775
aca7de76 1776 def _write_port_to_file(self, port_filename: str):
584af91e 1777 # Write the port number to a temporary file.
f5567ea8
FD
1778 with tempfile.NamedTemporaryFile(mode="w", delete=False) as tmp_port_file:
1779 print(self._server_port, end="", file=tmp_port_file)
584af91e
PP
1780
1781 # Rename temporary file to real file
9c878ece 1782 os.replace(tmp_port_file.name, port_filename)
584af91e
PP
1783 logging.info(
1784 'Renamed port file: src-path="{}", dst-path="{}"'.format(
1785 tmp_port_file.name, port_filename
1786 )
1787 )
1788
1789
aca7de76
SM
1790def _session_descriptors_from_path(
1791 sessions_filename: str, trace_path_prefix: Optional[str]
1792):
2b763e29
JG
1793 # File format is:
1794 #
1795 # [
1796 # {
1797 # "name": "my-session",
1798 # "id": 17,
1799 # "hostname": "myhost",
1800 # "live-timer-freq": 1000000,
1801 # "client-count": 23,
1802 # "traces": [
1803 # {
1804 # "path": "lol"
1805 # },
1806 # {
71f56e5f
JG
1807 # "path": "meow/mix",
1808 # "beacons": {
1809 # "my_stream": [ 5235787, 728375283 ]
78169723
FD
1810 # },
1811 # "metadata-sections": [
1812 # {
1813 # "line": 1,
1814 # "timestamp": 0
1815 # }
1816 # ]
2b763e29
JG
1817 # }
1818 # ]
1819 # }
1820 # ]
f5567ea8 1821 with open(sessions_filename, "r") as sessions_file:
aca7de76
SM
1822 sessions_json = tjson.load(sessions_file, tjson.ArrayVal)
1823
1824 sessions = [] # type: list[LttngTracingSessionDescriptor]
2b763e29 1825
aca7de76
SM
1826 for session_json in sessions_json.iter(tjson.ObjVal):
1827 name = session_json.at("name", tjson.StrVal).val
1828 tracing_session_id = session_json.at("id", tjson.IntVal).val
1829 hostname = session_json.at("hostname", tjson.StrVal).val
1830 live_timer_freq = session_json.at("live-timer-freq", tjson.IntVal).val
1831 client_count = session_json.at("client-count", tjson.IntVal).val
1832 traces_json = session_json.at("traces", tjson.ArrayVal)
2b763e29 1833
aca7de76 1834 traces = [] # type: list[LttngTrace]
2b763e29 1835
aca7de76
SM
1836 for trace_json in traces_json.iter(tjson.ObjVal):
1837 metadata_sections = (
1838 trace_json.at("metadata-sections", tjson.ArrayVal)
1839 if "metadata-sections" in trace_json
1840 else None
1841 )
1842 beacons = (
1843 trace_json.at("beacons", tjson.ObjVal)
1844 if "beacons" in trace_json
1845 else None
1846 )
1847 path = trace_json.at("path", tjson.StrVal).val
2b763e29 1848
aca7de76 1849 if not os.path.isabs(path) and trace_path_prefix:
2b763e29
JG
1850 path = os.path.join(trace_path_prefix, path)
1851
aca7de76
SM
1852 traces.append(
1853 LttngTrace(
1854 path,
1855 metadata_sections,
1856 beacons,
1857 )
1858 )
2b763e29
JG
1859
1860 sessions.append(
1861 LttngTracingSessionDescriptor(
1862 name,
1863 tracing_session_id,
1864 hostname,
1865 live_timer_freq,
1866 client_count,
1867 traces,
1868 )
1869 )
1870
1871 return sessions
584af91e
PP
1872
1873
aca7de76 1874def _loglevel_parser(string: str):
f5567ea8 1875 loglevels = {"info": logging.INFO, "warning": logging.WARNING}
584af91e
PP
1876 if string not in loglevels:
1877 msg = "{} is not a valid loglevel".format(string)
1878 raise argparse.ArgumentTypeError(msg)
1879 return loglevels[string]
1880
1881
f5567ea8
FD
1882if __name__ == "__main__":
1883 logging.basicConfig(format="# %(asctime)-25s%(message)s")
584af91e 1884 parser = argparse.ArgumentParser(
f5567ea8 1885 description="LTTng-live protocol mocker", add_help=False
584af91e
PP
1886 )
1887 parser.add_argument(
f5567ea8
FD
1888 "--log-level",
1889 default="warning",
1890 choices=["info", "warning"],
1891 help="The loglevel to be used.",
584af91e
PP
1892 )
1893
1894 loglevel_namespace, remaining_args = parser.parse_known_args()
1895 logging.getLogger().setLevel(_loglevel_parser(loglevel_namespace.log_level))
1896
e51141d3
SM
1897 parser.add_argument(
1898 "--port",
1899 help="The port to bind to. If missing, use an OS-assigned port..",
1900 type=int,
1901 )
584af91e 1902 parser.add_argument(
f5567ea8
FD
1903 "--port-filename",
1904 help="The final port file. This file is present when the server is ready to receive connection.",
584af91e
PP
1905 )
1906 parser.add_argument(
f5567ea8 1907 "--max-query-data-response-size",
584af91e 1908 type=int,
f5567ea8 1909 help="The maximum size of control data response in bytes",
584af91e
PP
1910 )
1911 parser.add_argument(
f5567ea8 1912 "--trace-path-prefix",
2b763e29 1913 type=str,
f5567ea8 1914 help="Prefix to prepend to the trace paths of session configurations",
2b763e29
JG
1915 )
1916 parser.add_argument(
e46b8462 1917 "sessions_filename",
776a2a25 1918 type=str,
f5567ea8 1919 help="Path to a session configuration file",
e46b8462 1920 metavar="sessions-filename",
584af91e
PP
1921 )
1922 parser.add_argument(
f5567ea8
FD
1923 "-h",
1924 "--help",
1925 action="help",
584af91e 1926 default=argparse.SUPPRESS,
f5567ea8 1927 help="Show this help message and exit.",
584af91e
PP
1928 )
1929
1930 args = parser.parse_args(args=remaining_args)
f2958352
SM
1931 sessions_filename = args.sessions_filename # type: str
1932 trace_path_prefix = args.trace_path_prefix # type: str | None
1933 sessions = _session_descriptors_from_path(
1934 sessions_filename,
1935 trace_path_prefix,
1936 )
aca7de76 1937
f2958352 1938 port = args.port # type: int | None
d11f45ef 1939 port_filename = args.port_filename # type: str | None
f2958352
SM
1940 max_query_data_response_size = args.max_query_data_response_size # type: int | None
1941 LttngLiveServer(port, port_filename, sessions, max_query_data_response_size)
This page took 0.163918 seconds and 4 git commands to generate.