tests: make lttng_live_server.py write temp port filename in same directory
[babeltrace.git] / tests / data / plugins / src.ctf.lttng-live / lttng_live_server.py
CommitLineData
0235b0db 1# SPDX-License-Identifier: MIT
584af91e 2#
0235b0db 3# Copyright (C) 2019 Philippe Proulx <pproulx@efficios.com>
584af91e 4#
584af91e 5
aca7de76
SM
6# pyright: strict, reportTypeCommentUsage=false, reportMissingTypeStubs=false
7
584af91e 8import os
584af91e
PP
9import re
10import socket
11import struct
5995b304
SM
12import logging
13import os.path
14import argparse
584af91e 15import tempfile
aca7de76
SM
16from typing import Dict, Union, Iterable, Optional, Sequence, overload
17
18import tjson
19
20# isort: off
21from typing import Any, Callable # noqa: F401
22
23# isort: on
584af91e
PP
24
25
ee1171e5
SM
26# An entry within the index of an LTTng data stream.
27class _LttngDataStreamIndexEntry:
28 def __init__(
29 self,
aca7de76
SM
30 offset_bytes: int,
31 total_size_bits: int,
32 content_size_bits: int,
33 timestamp_begin: int,
34 timestamp_end: int,
35 events_discarded: int,
36 stream_class_id: int,
ee1171e5
SM
37 ):
38 self._offset_bytes = offset_bytes
39 self._total_size_bits = total_size_bits
40 self._content_size_bits = content_size_bits
41 self._timestamp_begin = timestamp_begin
42 self._timestamp_end = timestamp_end
43 self._events_discarded = events_discarded
44 self._stream_class_id = stream_class_id
45
46 @property
47 def offset_bytes(self):
48 return self._offset_bytes
49
50 @property
51 def total_size_bits(self):
52 return self._total_size_bits
53
54 @property
55 def total_size_bytes(self):
56 return self._total_size_bits // 8
57
58 @property
59 def content_size_bits(self):
60 return self._content_size_bits
61
62 @property
63 def content_size_bytes(self):
64 return self._content_size_bits // 8
65
66 @property
67 def timestamp_begin(self):
68 return self._timestamp_begin
69
70 @property
71 def timestamp_end(self):
72 return self._timestamp_end
73
74 @property
75 def events_discarded(self):
76 return self._events_discarded
77
78 @property
79 def stream_class_id(self):
80 return self._stream_class_id
81
82
83# An entry within the index of an LTTng data stream. While a stream beacon entry
84# is conceptually unrelated to an index, it is sent as a reply to a
85# LttngLiveViewerGetNextDataStreamIndexEntryCommand
aca7de76
SM
86class _LttngDataStreamBeaconIndexEntry:
87 def __init__(self, stream_class_id: int, timestamp: int):
ee1171e5
SM
88 self._stream_class_id = stream_class_id
89 self._timestamp = timestamp
90
91 @property
92 def timestamp(self):
93 return self._timestamp
94
95 @property
96 def stream_class_id(self):
97 return self._stream_class_id
98
99
aca7de76
SM
100_LttngIndexEntryT = Union[_LttngDataStreamIndexEntry, _LttngDataStreamBeaconIndexEntry]
101
102
584af91e 103class _LttngLiveViewerCommand:
aca7de76 104 def __init__(self, version: int):
584af91e
PP
105 self._version = version
106
107 @property
108 def version(self):
109 return self._version
110
111
112class _LttngLiveViewerConnectCommand(_LttngLiveViewerCommand):
aca7de76 113 def __init__(self, version: int, viewer_session_id: int, major: int, minor: int):
584af91e
PP
114 super().__init__(version)
115 self._viewer_session_id = viewer_session_id
116 self._major = major
117 self._minor = minor
118
119 @property
120 def viewer_session_id(self):
121 return self._viewer_session_id
122
123 @property
124 def major(self):
125 return self._major
126
127 @property
128 def minor(self):
129 return self._minor
130
131
aca7de76
SM
132class _LttngLiveViewerReply:
133 pass
134
135
136class _LttngLiveViewerConnectReply(_LttngLiveViewerReply):
137 def __init__(self, viewer_session_id: int, major: int, minor: int):
584af91e
PP
138 self._viewer_session_id = viewer_session_id
139 self._major = major
140 self._minor = minor
141
142 @property
143 def viewer_session_id(self):
144 return self._viewer_session_id
145
146 @property
147 def major(self):
148 return self._major
149
150 @property
151 def minor(self):
152 return self._minor
153
154
155class _LttngLiveViewerGetTracingSessionInfosCommand(_LttngLiveViewerCommand):
156 pass
157
158
159class _LttngLiveViewerTracingSessionInfo:
160 def __init__(
161 self,
aca7de76
SM
162 tracing_session_id: int,
163 live_timer_freq: int,
164 client_count: int,
165 stream_count: int,
166 hostname: str,
167 name: str,
584af91e
PP
168 ):
169 self._tracing_session_id = tracing_session_id
170 self._live_timer_freq = live_timer_freq
171 self._client_count = client_count
172 self._stream_count = stream_count
173 self._hostname = hostname
174 self._name = name
175
176 @property
177 def tracing_session_id(self):
178 return self._tracing_session_id
179
180 @property
181 def live_timer_freq(self):
182 return self._live_timer_freq
183
184 @property
185 def client_count(self):
186 return self._client_count
187
188 @property
189 def stream_count(self):
190 return self._stream_count
191
192 @property
193 def hostname(self):
194 return self._hostname
195
196 @property
197 def name(self):
198 return self._name
199
200
aca7de76
SM
201class _LttngLiveViewerGetTracingSessionInfosReply(_LttngLiveViewerReply):
202 def __init__(
203 self, tracing_session_infos: Sequence[_LttngLiveViewerTracingSessionInfo]
204 ):
584af91e
PP
205 self._tracing_session_infos = tracing_session_infos
206
207 @property
208 def tracing_session_infos(self):
209 return self._tracing_session_infos
210
211
212class _LttngLiveViewerAttachToTracingSessionCommand(_LttngLiveViewerCommand):
213 class SeekType:
214 BEGINNING = 1
215 LAST = 2
216
aca7de76
SM
217 def __init__(
218 self, version: int, tracing_session_id: int, offset: int, seek_type: int
219 ):
584af91e
PP
220 super().__init__(version)
221 self._tracing_session_id = tracing_session_id
222 self._offset = offset
223 self._seek_type = seek_type
224
225 @property
226 def tracing_session_id(self):
227 return self._tracing_session_id
228
229 @property
230 def offset(self):
231 return self._offset
232
233 @property
234 def seek_type(self):
235 return self._seek_type
236
237
238class _LttngLiveViewerStreamInfo:
aca7de76
SM
239 def __init__(
240 self, id: int, trace_id: int, is_metadata: bool, path: str, channel_name: str
241 ):
584af91e
PP
242 self._id = id
243 self._trace_id = trace_id
244 self._is_metadata = is_metadata
245 self._path = path
246 self._channel_name = channel_name
247
248 @property
249 def id(self):
250 return self._id
251
252 @property
253 def trace_id(self):
254 return self._trace_id
255
256 @property
257 def is_metadata(self):
258 return self._is_metadata
259
260 @property
261 def path(self):
262 return self._path
263
264 @property
265 def channel_name(self):
266 return self._channel_name
267
268
aca7de76 269class _LttngLiveViewerAttachToTracingSessionReply(_LttngLiveViewerReply):
584af91e
PP
270 class Status:
271 OK = 1
272 ALREADY = 2
273 UNKNOWN = 3
274 NOT_LIVE = 4
275 SEEK_ERROR = 5
276 NO_SESSION = 6
277
aca7de76 278 def __init__(self, status: int, stream_infos: Sequence[_LttngLiveViewerStreamInfo]):
584af91e
PP
279 self._status = status
280 self._stream_infos = stream_infos
281
282 @property
283 def status(self):
284 return self._status
285
286 @property
287 def stream_infos(self):
288 return self._stream_infos
289
290
291class _LttngLiveViewerGetNextDataStreamIndexEntryCommand(_LttngLiveViewerCommand):
aca7de76 292 def __init__(self, version: int, stream_id: int):
584af91e
PP
293 super().__init__(version)
294 self._stream_id = stream_id
295
296 @property
297 def stream_id(self):
298 return self._stream_id
299
300
aca7de76 301class _LttngLiveViewerGetNextDataStreamIndexEntryReply(_LttngLiveViewerReply):
584af91e
PP
302 class Status:
303 OK = 1
304 RETRY = 2
305 HUP = 3
306 ERROR = 4
307 INACTIVE = 5
308 EOF = 6
309
aca7de76
SM
310 def __init__(
311 self,
312 status: int,
313 index_entry: _LttngIndexEntryT,
314 has_new_metadata: bool,
315 has_new_data_stream: bool,
316 ):
584af91e
PP
317 self._status = status
318 self._index_entry = index_entry
319 self._has_new_metadata = has_new_metadata
320 self._has_new_data_stream = has_new_data_stream
321
322 @property
323 def status(self):
324 return self._status
325
326 @property
327 def index_entry(self):
328 return self._index_entry
329
330 @property
331 def has_new_metadata(self):
332 return self._has_new_metadata
333
334 @property
335 def has_new_data_stream(self):
336 return self._has_new_data_stream
337
338
339class _LttngLiveViewerGetDataStreamPacketDataCommand(_LttngLiveViewerCommand):
aca7de76 340 def __init__(self, version: int, stream_id: int, offset: int, req_length: int):
584af91e
PP
341 super().__init__(version)
342 self._stream_id = stream_id
343 self._offset = offset
344 self._req_length = req_length
345
346 @property
347 def stream_id(self):
348 return self._stream_id
349
350 @property
351 def offset(self):
352 return self._offset
353
354 @property
355 def req_length(self):
356 return self._req_length
357
358
aca7de76 359class _LttngLiveViewerGetDataStreamPacketDataReply(_LttngLiveViewerReply):
584af91e
PP
360 class Status:
361 OK = 1
362 RETRY = 2
363 ERROR = 3
364 EOF = 4
365
aca7de76
SM
366 def __init__(
367 self,
368 status: int,
369 data: bytes,
370 has_new_metadata: bool,
371 has_new_data_stream: bool,
372 ):
584af91e
PP
373 self._status = status
374 self._data = data
375 self._has_new_metadata = has_new_metadata
376 self._has_new_data_stream = has_new_data_stream
377
378 @property
379 def status(self):
380 return self._status
381
382 @property
383 def data(self):
384 return self._data
385
386 @property
387 def has_new_metadata(self):
388 return self._has_new_metadata
389
390 @property
391 def has_new_data_stream(self):
392 return self._has_new_data_stream
393
394
395class _LttngLiveViewerGetMetadataStreamDataCommand(_LttngLiveViewerCommand):
aca7de76 396 def __init__(self, version: int, stream_id: int):
584af91e
PP
397 super().__init__(version)
398 self._stream_id = stream_id
399
400 @property
401 def stream_id(self):
402 return self._stream_id
403
404
aca7de76 405class _LttngLiveViewerGetMetadataStreamDataContentReply(_LttngLiveViewerReply):
584af91e
PP
406 class Status:
407 OK = 1
408 NO_NEW = 2
409 ERROR = 3
410
aca7de76 411 def __init__(self, status: int, data: bytes):
584af91e
PP
412 self._status = status
413 self._data = data
414
415 @property
416 def status(self):
417 return self._status
418
419 @property
420 def data(self):
421 return self._data
422
423
424class _LttngLiveViewerGetNewStreamInfosCommand(_LttngLiveViewerCommand):
aca7de76 425 def __init__(self, version: int, tracing_session_id: int):
584af91e
PP
426 super().__init__(version)
427 self._tracing_session_id = tracing_session_id
428
429 @property
430 def tracing_session_id(self):
431 return self._tracing_session_id
432
433
aca7de76 434class _LttngLiveViewerGetNewStreamInfosReply(_LttngLiveViewerReply):
584af91e
PP
435 class Status:
436 OK = 1
437 NO_NEW = 2
438 ERROR = 3
439 HUP = 4
440
aca7de76 441 def __init__(self, status: int, stream_infos: Sequence[_LttngLiveViewerStreamInfo]):
584af91e
PP
442 self._status = status
443 self._stream_infos = stream_infos
444
445 @property
446 def status(self):
447 return self._status
448
449 @property
450 def stream_infos(self):
451 return self._stream_infos
452
453
454class _LttngLiveViewerCreateViewerSessionCommand(_LttngLiveViewerCommand):
455 pass
456
457
aca7de76 458class _LttngLiveViewerCreateViewerSessionReply(_LttngLiveViewerReply):
584af91e
PP
459 class Status:
460 OK = 1
461 ERROR = 2
462
aca7de76 463 def __init__(self, status: int):
584af91e
PP
464 self._status = status
465
466 @property
467 def status(self):
468 return self._status
469
470
471class _LttngLiveViewerDetachFromTracingSessionCommand(_LttngLiveViewerCommand):
aca7de76 472 def __init__(self, version: int, tracing_session_id: int):
584af91e
PP
473 super().__init__(version)
474 self._tracing_session_id = tracing_session_id
475
476 @property
477 def tracing_session_id(self):
478 return self._tracing_session_id
479
480
aca7de76 481class _LttngLiveViewerDetachFromTracingSessionReply(_LttngLiveViewerReply):
584af91e
PP
482 class Status:
483 OK = 1
484 UNKNOWN = 2
485 ERROR = 3
486
aca7de76 487 def __init__(self, status: int):
584af91e
PP
488 self._status = status
489
490 @property
491 def status(self):
492 return self._status
493
494
495# An LTTng live protocol codec can convert bytes to command objects and
496# reply objects to bytes.
497class _LttngLiveViewerProtocolCodec:
f5567ea8 498 _COMMAND_HEADER_STRUCT_FMT = "QII"
584af91e
PP
499 _COMMAND_HEADER_SIZE_BYTES = struct.calcsize(_COMMAND_HEADER_STRUCT_FMT)
500
501 def __init__(self):
502 pass
503
aca7de76 504 def _unpack(self, fmt: str, data: bytes, offset: int = 0):
f5567ea8 505 fmt = "!" + fmt
584af91e
PP
506 return struct.unpack_from(fmt, data, offset)
507
aca7de76 508 def _unpack_payload(self, fmt: str, data: bytes):
584af91e
PP
509 return self._unpack(
510 fmt, data, _LttngLiveViewerProtocolCodec._COMMAND_HEADER_SIZE_BYTES
511 )
512
aca7de76 513 def decode(self, data: bytes):
584af91e
PP
514 if len(data) < self._COMMAND_HEADER_SIZE_BYTES:
515 # Not enough data to read the command header
516 return
517
518 payload_size, cmd_type, version = self._unpack(
519 self._COMMAND_HEADER_STRUCT_FMT, data
520 )
521 logging.info(
f5567ea8 522 "Decoded command header: payload-size={}, cmd-type={}, version={}".format(
584af91e
PP
523 payload_size, cmd_type, version
524 )
525 )
526
527 if len(data) < self._COMMAND_HEADER_SIZE_BYTES + payload_size:
528 # Not enough data to read the whole command
529 return
530
531 if cmd_type == 1:
aca7de76 532 viewer_session_id, major, minor, _ = self._unpack_payload("QIII", data)
584af91e
PP
533 return _LttngLiveViewerConnectCommand(
534 version, viewer_session_id, major, minor
535 )
536 elif cmd_type == 2:
537 return _LttngLiveViewerGetTracingSessionInfosCommand(version)
538 elif cmd_type == 3:
f5567ea8 539 tracing_session_id, offset, seek_type = self._unpack_payload("QQI", data)
584af91e
PP
540 return _LttngLiveViewerAttachToTracingSessionCommand(
541 version, tracing_session_id, offset, seek_type
542 )
543 elif cmd_type == 4:
f5567ea8 544 (stream_id,) = self._unpack_payload("Q", data)
584af91e
PP
545 return _LttngLiveViewerGetNextDataStreamIndexEntryCommand(
546 version, stream_id
547 )
548 elif cmd_type == 5:
f5567ea8 549 stream_id, offset, req_length = self._unpack_payload("QQI", data)
584af91e
PP
550 return _LttngLiveViewerGetDataStreamPacketDataCommand(
551 version, stream_id, offset, req_length
552 )
553 elif cmd_type == 6:
f5567ea8 554 (stream_id,) = self._unpack_payload("Q", data)
584af91e
PP
555 return _LttngLiveViewerGetMetadataStreamDataCommand(version, stream_id)
556 elif cmd_type == 7:
f5567ea8 557 (tracing_session_id,) = self._unpack_payload("Q", data)
584af91e
PP
558 return _LttngLiveViewerGetNewStreamInfosCommand(version, tracing_session_id)
559 elif cmd_type == 8:
560 return _LttngLiveViewerCreateViewerSessionCommand(version)
561 elif cmd_type == 9:
f5567ea8 562 (tracing_session_id,) = self._unpack_payload("Q", data)
584af91e
PP
563 return _LttngLiveViewerDetachFromTracingSessionCommand(
564 version, tracing_session_id
565 )
566 else:
f2958352 567 raise RuntimeError("Unknown command type {}".format(cmd_type))
584af91e 568
aca7de76 569 def _pack(self, fmt: str, *args: Any):
584af91e 570 # Force network byte order
f5567ea8 571 return struct.pack("!" + fmt, *args)
584af91e 572
aca7de76 573 def _encode_zero_padded_str(self, string: str, length: int):
584af91e 574 data = string.encode()
f5567ea8 575 return data.ljust(length, b"\x00")
584af91e 576
aca7de76 577 def _encode_stream_info(self, info: _LttngLiveViewerStreamInfo):
f5567ea8 578 data = self._pack("QQI", info.id, info.trace_id, int(info.is_metadata))
584af91e
PP
579 data += self._encode_zero_padded_str(info.path, 4096)
580 data += self._encode_zero_padded_str(info.channel_name, 255)
581 return data
582
aca7de76
SM
583 def _get_has_new_stuff_flags(
584 self, has_new_metadata: bool, has_new_data_streams: bool
585 ):
584af91e
PP
586 flags = 0
587
588 if has_new_metadata:
589 flags |= 1
590
591 if has_new_data_streams:
592 flags |= 2
593
594 return flags
595
aca7de76
SM
596 def encode(
597 self,
598 reply: _LttngLiveViewerReply,
599 ) -> bytes:
584af91e
PP
600 if type(reply) is _LttngLiveViewerConnectReply:
601 data = self._pack(
f5567ea8 602 "QIII", reply.viewer_session_id, reply.major, reply.minor, 2
584af91e
PP
603 )
604 elif type(reply) is _LttngLiveViewerGetTracingSessionInfosReply:
f5567ea8 605 data = self._pack("I", len(reply.tracing_session_infos))
584af91e
PP
606
607 for info in reply.tracing_session_infos:
608 data += self._pack(
f5567ea8 609 "QIII",
584af91e
PP
610 info.tracing_session_id,
611 info.live_timer_freq,
612 info.client_count,
613 info.stream_count,
614 )
615 data += self._encode_zero_padded_str(info.hostname, 64)
616 data += self._encode_zero_padded_str(info.name, 255)
617 elif type(reply) is _LttngLiveViewerAttachToTracingSessionReply:
f5567ea8 618 data = self._pack("II", reply.status, len(reply.stream_infos))
584af91e
PP
619
620 for info in reply.stream_infos:
621 data += self._encode_stream_info(info)
622 elif type(reply) is _LttngLiveViewerGetNextDataStreamIndexEntryReply:
f5567ea8 623 index_format = "QQQQQQQII"
584af91e
PP
624 entry = reply.index_entry
625 flags = self._get_has_new_stuff_flags(
626 reply.has_new_metadata, reply.has_new_data_stream
627 )
628
aca7de76 629 if isinstance(entry, _LttngDataStreamIndexEntry):
71f56e5f
JG
630 data = self._pack(
631 index_format,
632 entry.offset_bytes,
633 entry.total_size_bits,
634 entry.content_size_bits,
635 entry.timestamp_begin,
636 entry.timestamp_end,
637 entry.events_discarded,
638 entry.stream_class_id,
639 reply.status,
640 flags,
641 )
642 else:
71f56e5f
JG
643 data = self._pack(
644 index_format,
645 0,
646 0,
647 0,
648 0,
649 entry.timestamp,
650 0,
651 entry.stream_class_id,
652 reply.status,
653 flags,
654 )
584af91e
PP
655 elif type(reply) is _LttngLiveViewerGetDataStreamPacketDataReply:
656 flags = self._get_has_new_stuff_flags(
657 reply.has_new_metadata, reply.has_new_data_stream
658 )
f5567ea8 659 data = self._pack("III", reply.status, len(reply.data), flags)
584af91e
PP
660 data += reply.data
661 elif type(reply) is _LttngLiveViewerGetMetadataStreamDataContentReply:
f5567ea8 662 data = self._pack("QI", len(reply.data), reply.status)
584af91e
PP
663 data += reply.data
664 elif type(reply) is _LttngLiveViewerGetNewStreamInfosReply:
f5567ea8 665 data = self._pack("II", reply.status, len(reply.stream_infos))
584af91e
PP
666
667 for info in reply.stream_infos:
668 data += self._encode_stream_info(info)
669 elif type(reply) is _LttngLiveViewerCreateViewerSessionReply:
f5567ea8 670 data = self._pack("I", reply.status)
584af91e 671 elif type(reply) is _LttngLiveViewerDetachFromTracingSessionReply:
f5567ea8 672 data = self._pack("I", reply.status)
584af91e
PP
673 else:
674 raise ValueError(
f5567ea8 675 "Unknown reply object with class `{}`".format(reply.__class__.__name__)
584af91e
PP
676 )
677
678 return data
679
680
aca7de76
SM
681def _get_entry_timestamp_begin(
682 entry: _LttngIndexEntryT,
683):
684 if isinstance(entry, _LttngDataStreamBeaconIndexEntry):
78169723
FD
685 return entry.timestamp
686 else:
78169723
FD
687 return entry.timestamp_begin
688
689
584af91e 690# The index of an LTTng data stream, a sequence of index entries.
aca7de76
SM
691class _LttngDataStreamIndex(Sequence[_LttngIndexEntryT]):
692 def __init__(self, path: str, beacons: Optional[tjson.ArrayVal]):
584af91e
PP
693 self._path = path
694 self._build()
71f56e5f
JG
695
696 if beacons:
697 stream_class_id = self._entries[0].stream_class_id
aca7de76
SM
698
699 beacons_list = [] # type: list[_LttngDataStreamBeaconIndexEntry]
700 for ts in beacons.iter(tjson.IntVal):
701 beacons_list.append(
702 _LttngDataStreamBeaconIndexEntry(stream_class_id, ts.val)
703 )
704
705 self._add_beacons(beacons_list)
71f56e5f 706
584af91e
PP
707 logging.info(
708 'Built data stream index entries: path="{}", count={}'.format(
709 path, len(self._entries)
710 )
711 )
712
713 def _build(self):
aca7de76 714 self._entries = [] # type: list[_LttngIndexEntryT]
584af91e 715
f5567ea8 716 with open(self._path, "rb") as f:
584af91e 717 # Read header first
f5567ea8 718 fmt = ">IIII"
584af91e
PP
719 size = struct.calcsize(fmt)
720 data = f.read(size)
721 assert len(data) == size
aca7de76 722 magic, _, _, index_entry_length = struct.unpack(fmt, data)
584af91e
PP
723 assert magic == 0xC1F1DCC1
724
725 # Read index entries
f5567ea8 726 fmt = ">QQQQQQQ"
584af91e
PP
727 size = struct.calcsize(fmt)
728
729 while True:
730 logging.debug(
731 'Decoding data stream index entry: path="{}", offset={}'.format(
732 self._path, f.tell()
733 )
734 )
735 data = f.read(size)
736
737 if not data:
738 # Done
739 break
740
741 assert len(data) == size
75882e97
FD
742 (
743 offset_bytes,
744 total_size_bits,
745 content_size_bits,
746 timestamp_begin,
747 timestamp_end,
748 events_discarded,
749 stream_class_id,
750 ) = struct.unpack(fmt, data)
584af91e
PP
751
752 self._entries.append(
753 _LttngDataStreamIndexEntry(
754 offset_bytes,
755 total_size_bits,
756 content_size_bits,
757 timestamp_begin,
758 timestamp_end,
759 events_discarded,
760 stream_class_id,
761 )
762 )
763
764 # Skip anything else before the next entry
765 f.seek(index_entry_length - size, os.SEEK_CUR)
766
aca7de76 767 def _add_beacons(self, beacons: Iterable[_LttngDataStreamBeaconIndexEntry]):
71f56e5f 768 # Assumes entries[n + 1].timestamp_end >= entries[n].timestamp_begin
aca7de76
SM
769 def sort_key(
770 entry: Union[_LttngDataStreamIndexEntry, _LttngDataStreamBeaconIndexEntry],
771 ) -> int:
772 if isinstance(entry, _LttngDataStreamBeaconIndexEntry):
71f56e5f
JG
773 return entry.timestamp
774 else:
775 return entry.timestamp_end
776
777 self._entries += beacons
778 self._entries.sort(key=sort_key)
779
aca7de76
SM
780 @overload
781 def __getitem__(self, index: int) -> _LttngIndexEntryT:
782 ...
783
784 @overload
785 def __getitem__(self, index: slice) -> Sequence[_LttngIndexEntryT]: # noqa: F811
786 ...
787
788 def __getitem__( # noqa: F811
789 self, index: Union[int, slice]
790 ) -> Union[_LttngIndexEntryT, Sequence[_LttngIndexEntryT],]:
584af91e
PP
791 return self._entries[index]
792
793 def __len__(self):
794 return len(self._entries)
795
796 @property
797 def path(self):
798 return self._path
799
800
801# An LTTng data stream.
802class _LttngDataStream:
aca7de76 803 def __init__(self, path: str, beacons_json: Optional[tjson.ArrayVal]):
584af91e
PP
804 self._path = path
805 filename = os.path.basename(path)
f5567ea8 806 match = re.match(r"(.*)_\d+", filename)
b8b97725
SM
807 if not match:
808 raise RuntimeError(
809 "Unexpected data stream file name pattern: {}".format(filename)
810 )
811
584af91e
PP
812 self._channel_name = match.group(1)
813 trace_dir = os.path.dirname(path)
f5567ea8 814 index_path = os.path.join(trace_dir, "index", filename + ".idx")
aca7de76 815 self._index = _LttngDataStreamIndex(index_path, beacons_json)
584af91e 816 assert os.path.isfile(path)
f5567ea8 817 self._file = open(path, "rb")
584af91e
PP
818 logging.info(
819 'Built data stream: path="{}", channel-name="{}"'.format(
820 path, self._channel_name
821 )
822 )
823
824 @property
825 def path(self):
826 return self._path
827
828 @property
829 def channel_name(self):
830 return self._channel_name
831
832 @property
833 def index(self):
834 return self._index
835
aca7de76 836 def get_data(self, offset_bytes: int, len_bytes: int):
584af91e
PP
837 self._file.seek(offset_bytes)
838 return self._file.read(len_bytes)
839
840
78169723 841class _LttngMetadataStreamSection:
aca7de76 842 def __init__(self, timestamp: int, data: Optional[bytes]):
78169723
FD
843 self._timestamp = timestamp
844 if data is None:
845 self._data = bytes()
846 else:
847 self._data = data
848 logging.info(
f5567ea8 849 "Built metadata stream section: ts={}, data-len={}".format(
78169723
FD
850 self._timestamp, len(self._data)
851 )
852 )
853
854 @property
855 def timestamp(self):
856 return self._timestamp
857
858 @property
859 def data(self):
860 return self._data
861
862
584af91e
PP
863# An LTTng metadata stream.
864class _LttngMetadataStream:
aca7de76
SM
865 def __init__(
866 self,
867 metadata_file_path: str,
868 config_sections: Sequence[_LttngMetadataStreamSection],
869 ):
78169723
FD
870 self._path = metadata_file_path
871 self._sections = config_sections
872 logging.info(
f5567ea8 873 "Built metadata stream: path={}, section-len={}".format(
78169723
FD
874 self._path, len(self._sections)
875 )
876 )
584af91e
PP
877
878 @property
879 def path(self):
880 return self._path
881
882 @property
78169723
FD
883 def sections(self):
884 return self._sections
584af91e 885
78169723 886
aca7de76
SM
887class LttngMetadataConfigSection:
888 def __init__(self, line: int, timestamp: int, is_empty: bool):
889 self._line = line
890 self._timestamp = timestamp
891 self._is_empty = is_empty
892
893 @property
894 def line(self):
895 return self._line
78169723 896
aca7de76
SM
897 @property
898 def timestamp(self):
899 return self._timestamp
78169723 900
aca7de76
SM
901 @property
902 def is_empty(self):
903 return self._is_empty
904
905
906def _parse_metadata_sections_config(metadata_sections_json: tjson.ArrayVal):
907 metadata_sections = [] # type: list[LttngMetadataConfigSection]
78169723
FD
908 append_empty_section = False
909 last_timestamp = 0
910 last_line = 0
911
aca7de76
SM
912 for section in metadata_sections_json:
913 if isinstance(section, tjson.StrVal):
914 if section.val == "empty":
915 # Found a empty section marker. Actually append the section at the
916 # timestamp of the next concrete section.
917 append_empty_section = True
918 else:
919 raise ValueError("Invalid string value at {}.".format(section.path))
920 elif isinstance(section, tjson.ObjVal):
921 line = section.at("line", tjson.IntVal).val
922 ts = section.at("timestamp", tjson.IntVal).val
b8b97725 923
78169723
FD
924 # Sections' timestamps and lines must both be increasing.
925 assert ts > last_timestamp
926 last_timestamp = ts
aca7de76 927
78169723
FD
928 assert line > last_line
929 last_line = line
930
931 if append_empty_section:
aca7de76 932 metadata_sections.append(LttngMetadataConfigSection(line, ts, True))
78169723
FD
933 append_empty_section = False
934
aca7de76
SM
935 metadata_sections.append(LttngMetadataConfigSection(line, ts, False))
936 else:
937 raise TypeError(
938 "`{}`: expecting a string or object value".format(section.path)
939 )
78169723 940
aca7de76 941 return metadata_sections
78169723 942
78169723 943
aca7de76
SM
944def _split_metadata_sections(
945 metadata_file_path: str, metadata_sections_json: tjson.ArrayVal
946):
947 metadata_sections = _parse_metadata_sections_config(metadata_sections_json)
78169723 948
aca7de76 949 sections = [] # type: list[_LttngMetadataStreamSection]
f5567ea8 950 with open(metadata_file_path, "r") as metadata_file:
78169723
FD
951 metadata_lines = [line for line in metadata_file]
952
aca7de76 953 metadata_section_idx = 0
78169723
FD
954 curr_metadata_section = bytearray()
955
956 for idx, line_content in enumerate(metadata_lines):
957 # Add one to the index to convert from the zero-indexing of the
958 # enumerate() function to the one-indexing used by humans when
959 # viewing a text file.
960 curr_line_number = idx + 1
961
962 # If there are no more sections, simply append the line.
aca7de76 963 if metadata_section_idx + 1 >= len(metadata_sections):
f5567ea8 964 curr_metadata_section += bytearray(line_content, "utf8")
78169723
FD
965 continue
966
aca7de76 967 next_section_line_number = metadata_sections[metadata_section_idx + 1].line
78169723
FD
968
969 # If the next section begins at the current line, create a
970 # section with the metadata we gathered so far.
971 if curr_line_number >= next_section_line_number:
78169723
FD
972 # Flushing the metadata of the current section.
973 sections.append(
974 _LttngMetadataStreamSection(
aca7de76 975 metadata_sections[metadata_section_idx].timestamp,
78169723
FD
976 bytes(curr_metadata_section),
977 )
978 )
979
980 # Move to the next section.
aca7de76 981 metadata_section_idx += 1
78169723
FD
982
983 # Clear old content and append current line for the next section.
984 curr_metadata_section.clear()
f5567ea8 985 curr_metadata_section += bytearray(line_content, "utf8")
78169723
FD
986
987 # Append any empty sections.
aca7de76 988 while metadata_sections[metadata_section_idx].is_empty:
78169723
FD
989 sections.append(
990 _LttngMetadataStreamSection(
aca7de76 991 metadata_sections[metadata_section_idx].timestamp, None
78169723
FD
992 )
993 )
aca7de76 994 metadata_section_idx += 1
78169723
FD
995 else:
996 # Append line_content to the current metadata section.
f5567ea8 997 curr_metadata_section += bytearray(line_content, "utf8")
78169723
FD
998
999 # We iterated over all the lines of the metadata file. Close the current section.
1000 sections.append(
1001 _LttngMetadataStreamSection(
aca7de76 1002 metadata_sections[metadata_section_idx].timestamp,
78169723
FD
1003 bytes(curr_metadata_section),
1004 )
1005 )
1006
1007 return sections
584af91e
PP
1008
1009
aca7de76
SM
1010_StreamBeaconsT = Dict[str, Iterable[int]]
1011
1012
584af91e 1013# An LTTng trace, a sequence of LTTng data streams.
aca7de76
SM
1014class LttngTrace(Sequence[_LttngDataStream]):
1015 def __init__(
1016 self,
1017 trace_dir: str,
1018 metadata_sections_json: Optional[tjson.ArrayVal],
1019 beacons_json: Optional[tjson.ObjVal],
1020 ):
584af91e 1021 self._path = trace_dir
aca7de76
SM
1022 self._create_metadata_stream(trace_dir, metadata_sections_json)
1023 self._create_data_streams(trace_dir, beacons_json)
584af91e
PP
1024 logging.info('Built trace: path="{}"'.format(trace_dir))
1025
aca7de76
SM
1026 def _create_data_streams(
1027 self, trace_dir: str, beacons_json: Optional[tjson.ObjVal]
1028 ):
1029 data_stream_paths = [] # type: list[str]
584af91e
PP
1030
1031 for filename in os.listdir(trace_dir):
1032 path = os.path.join(trace_dir, filename)
1033
1034 if not os.path.isfile(path):
1035 continue
1036
f5567ea8 1037 if filename.startswith("."):
584af91e
PP
1038 continue
1039
f5567ea8 1040 if filename == "metadata":
584af91e
PP
1041 continue
1042
1043 data_stream_paths.append(path)
1044
1045 data_stream_paths.sort()
aca7de76 1046 self._data_streams = [] # type: list[_LttngDataStream]
584af91e
PP
1047
1048 for data_stream_path in data_stream_paths:
71f56e5f 1049 stream_name = os.path.basename(data_stream_path)
aca7de76
SM
1050 this_beacons_json = None
1051 if beacons_json is not None and stream_name in beacons_json:
1052 this_beacons_json = beacons_json.at(stream_name, tjson.ArrayVal)
71f56e5f
JG
1053
1054 self._data_streams.append(
aca7de76 1055 _LttngDataStream(data_stream_path, this_beacons_json)
71f56e5f 1056 )
584af91e 1057
aca7de76
SM
1058 def _create_metadata_stream(
1059 self, trace_dir: str, metadata_sections_json: Optional[tjson.ArrayVal]
1060 ):
f5567ea8 1061 metadata_path = os.path.join(trace_dir, "metadata")
aca7de76 1062 metadata_sections = [] # type: list[_LttngMetadataStreamSection]
78169723 1063
aca7de76 1064 if metadata_sections_json is None:
f5567ea8 1065 with open(metadata_path, "rb") as metadata_file:
78169723
FD
1066 metadata_sections.append(
1067 _LttngMetadataStreamSection(0, metadata_file.read())
1068 )
1069 else:
1070 metadata_sections = _split_metadata_sections(
aca7de76 1071 metadata_path, metadata_sections_json
78169723
FD
1072 )
1073
1074 self._metadata_stream = _LttngMetadataStream(metadata_path, metadata_sections)
1075
584af91e
PP
1076 @property
1077 def path(self):
1078 return self._path
1079
1080 @property
1081 def metadata_stream(self):
1082 return self._metadata_stream
1083
aca7de76
SM
1084 @overload
1085 def __getitem__(self, index: int) -> _LttngDataStream:
1086 ...
1087
1088 @overload
1089 def __getitem__(self, index: slice) -> Sequence[_LttngDataStream]: # noqa: F811
1090 ...
1091
1092 def __getitem__( # noqa: F811
1093 self, index: Union[int, slice]
1094 ) -> Union[_LttngDataStream, Sequence[_LttngDataStream]]:
584af91e
PP
1095 return self._data_streams[index]
1096
1097 def __len__(self):
1098 return len(self._data_streams)
1099
1100
1101# The state of a single data stream.
1102class _LttngLiveViewerSessionDataStreamState:
aca7de76
SM
1103 def __init__(
1104 self,
1105 ts_state: "_LttngLiveViewerSessionTracingSessionState",
1106 info: _LttngLiveViewerStreamInfo,
1107 data_stream: _LttngDataStream,
1108 metadata_stream_id: int,
1109 ):
584af91e
PP
1110 self._ts_state = ts_state
1111 self._info = info
1112 self._data_stream = data_stream
78169723 1113 self._metadata_stream_id = metadata_stream_id
584af91e
PP
1114 self._cur_index_entry_index = 0
1115 fmt = 'Built data stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
1116 logging.info(
1117 fmt.format(
1118 info.id,
1119 ts_state.tracing_session_descriptor.info.tracing_session_id,
1120 ts_state.tracing_session_descriptor.info.name,
1121 data_stream.path,
1122 )
1123 )
1124
1125 @property
1126 def tracing_session_state(self):
1127 return self._ts_state
1128
1129 @property
1130 def info(self):
1131 return self._info
1132
1133 @property
1134 def data_stream(self):
1135 return self._data_stream
1136
1137 @property
1138 def cur_index_entry(self):
1139 if self._cur_index_entry_index == len(self._data_stream.index):
1140 return
1141
1142 return self._data_stream.index[self._cur_index_entry_index]
1143
aca7de76
SM
1144 @property
1145 def metadata_stream_id(self):
1146 return self._metadata_stream_id
1147
584af91e
PP
1148 def goto_next_index_entry(self):
1149 self._cur_index_entry_index += 1
1150
1151
1152# The state of a single metadata stream.
1153class _LttngLiveViewerSessionMetadataStreamState:
aca7de76
SM
1154 def __init__(
1155 self,
1156 ts_state: "_LttngLiveViewerSessionTracingSessionState",
1157 info: _LttngLiveViewerStreamInfo,
1158 metadata_stream: _LttngMetadataStream,
1159 ):
584af91e
PP
1160 self._ts_state = ts_state
1161 self._info = info
1162 self._metadata_stream = metadata_stream
78169723
FD
1163 self._cur_metadata_stream_section_index = 0
1164 if len(metadata_stream.sections) > 1:
1165 self._next_metadata_stream_section_timestamp = metadata_stream.sections[
1166 1
1167 ].timestamp
1168 else:
1169 self._next_metadata_stream_section_timestamp = None
1170
584af91e
PP
1171 self._is_sent = False
1172 fmt = 'Built metadata stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
1173 logging.info(
1174 fmt.format(
1175 info.id,
1176 ts_state.tracing_session_descriptor.info.tracing_session_id,
1177 ts_state.tracing_session_descriptor.info.name,
1178 metadata_stream.path,
1179 )
1180 )
1181
584af91e
PP
1182 @property
1183 def info(self):
1184 return self._info
1185
1186 @property
1187 def metadata_stream(self):
1188 return self._metadata_stream
1189
1190 @property
1191 def is_sent(self):
1192 return self._is_sent
1193
1194 @is_sent.setter
aca7de76 1195 def is_sent(self, value: bool):
584af91e
PP
1196 self._is_sent = value
1197
78169723
FD
1198 @property
1199 def cur_section(self):
1200 fmt = "Get current metadata section: section-idx={}"
1201 logging.info(fmt.format(self._cur_metadata_stream_section_index))
1202 if self._cur_metadata_stream_section_index == len(
1203 self._metadata_stream.sections
1204 ):
1205 return
1206
1207 return self._metadata_stream.sections[self._cur_metadata_stream_section_index]
1208
1209 def goto_next_section(self):
1210 self._cur_metadata_stream_section_index += 1
1211 if self.cur_section:
1212 self._next_metadata_stream_section_timestamp = self.cur_section.timestamp
1213 else:
1214 self._next_metadata_stream_section_timestamp = None
1215
1216 @property
1217 def next_section_timestamp(self):
1218 return self._next_metadata_stream_section_timestamp
1219
584af91e 1220
ee1171e5
SM
1221# A tracing session descriptor.
1222#
1223# In the constructor, `traces` is a list of LTTng traces (`LttngTrace`
1224# objects).
1225class LttngTracingSessionDescriptor:
1226 def __init__(
aca7de76
SM
1227 self,
1228 name: str,
1229 tracing_session_id: int,
1230 hostname: str,
1231 live_timer_freq: int,
1232 client_count: int,
1233 traces: Iterable[LttngTrace],
ee1171e5
SM
1234 ):
1235 for trace in traces:
1236 if name not in trace.path:
1237 fmt = "Tracing session name must be part of every trace path (`{}` not found in `{}`)"
1238 raise ValueError(fmt.format(name, trace.path))
1239
1240 self._traces = traces
1241 stream_count = sum([len(t) + 1 for t in traces])
1242 self._info = _LttngLiveViewerTracingSessionInfo(
1243 tracing_session_id,
1244 live_timer_freq,
1245 client_count,
1246 stream_count,
1247 hostname,
1248 name,
1249 )
1250
1251 @property
1252 def traces(self):
1253 return self._traces
1254
1255 @property
1256 def info(self):
1257 return self._info
1258
1259
584af91e
PP
1260# The state of a tracing session.
1261class _LttngLiveViewerSessionTracingSessionState:
aca7de76 1262 def __init__(self, tc_descr: LttngTracingSessionDescriptor, base_stream_id: int):
584af91e 1263 self._tc_descr = tc_descr
aca7de76
SM
1264 self._stream_infos = [] # type: list[_LttngLiveViewerStreamInfo]
1265 self._ds_states = {} # type: dict[int, _LttngLiveViewerSessionDataStreamState]
1266 self._ms_states = (
1267 {}
1268 ) # type: dict[int, _LttngLiveViewerSessionMetadataStreamState]
584af91e
PP
1269 stream_id = base_stream_id
1270
1271 for trace in tc_descr.traces:
1272 trace_id = stream_id * 1000
1273
78169723
FD
1274 # Metadata stream -> stream info and metadata stream state
1275 info = _LttngLiveViewerStreamInfo(
f5567ea8 1276 stream_id, trace_id, True, trace.metadata_stream.path, "metadata"
78169723
FD
1277 )
1278 self._stream_infos.append(info)
1279 self._ms_states[stream_id] = _LttngLiveViewerSessionMetadataStreamState(
1280 self, info, trace.metadata_stream
1281 )
1282 metadata_stream_id = stream_id
1283 stream_id += 1
1284
584af91e
PP
1285 # Data streams -> stream infos and data stream states
1286 for data_stream in trace:
1287 info = _LttngLiveViewerStreamInfo(
1288 stream_id,
1289 trace_id,
1290 False,
1291 data_stream.path,
1292 data_stream.channel_name,
1293 )
1294 self._stream_infos.append(info)
1295 self._ds_states[stream_id] = _LttngLiveViewerSessionDataStreamState(
78169723 1296 self, info, data_stream, metadata_stream_id
584af91e
PP
1297 )
1298 stream_id += 1
1299
584af91e
PP
1300 self._is_attached = False
1301 fmt = 'Built tracing session state: id={}, name="{}"'
1302 logging.info(fmt.format(tc_descr.info.tracing_session_id, tc_descr.info.name))
1303
1304 @property
1305 def tracing_session_descriptor(self):
1306 return self._tc_descr
1307
1308 @property
1309 def data_stream_states(self):
1310 return self._ds_states
1311
1312 @property
1313 def metadata_stream_states(self):
1314 return self._ms_states
1315
1316 @property
1317 def stream_infos(self):
1318 return self._stream_infos
1319
1320 @property
1321 def has_new_metadata(self):
1322 return any([not ms.is_sent for ms in self._ms_states.values()])
1323
1324 @property
1325 def is_attached(self):
1326 return self._is_attached
1327
1328 @is_attached.setter
aca7de76 1329 def is_attached(self, value: bool):
584af91e
PP
1330 self._is_attached = value
1331
1332
aca7de76
SM
1333def needs_new_metadata_section(
1334 metadata_stream_state: _LttngLiveViewerSessionMetadataStreamState,
1335 latest_timestamp: int,
1336):
78169723
FD
1337 if metadata_stream_state.next_section_timestamp is None:
1338 return False
1339
1340 if latest_timestamp >= metadata_stream_state.next_section_timestamp:
1341 return True
1342 else:
1343 return False
1344
1345
584af91e
PP
1346# An LTTng live viewer session manages a view on tracing sessions
1347# and replies to commands accordingly.
1348class _LttngLiveViewerSession:
1349 def __init__(
1350 self,
aca7de76
SM
1351 viewer_session_id: int,
1352 tracing_session_descriptors: Iterable[LttngTracingSessionDescriptor],
1353 max_query_data_response_size: Optional[int],
584af91e
PP
1354 ):
1355 self._viewer_session_id = viewer_session_id
aca7de76
SM
1356 self._ts_states = (
1357 {}
1358 ) # type: dict[int, _LttngLiveViewerSessionTracingSessionState]
1359 self._stream_states = (
1360 {}
1361 ) # type: dict[int, _LttngLiveViewerSessionDataStreamState | _LttngLiveViewerSessionMetadataStreamState]
584af91e
PP
1362 self._max_query_data_response_size = max_query_data_response_size
1363 total_stream_infos = 0
1364
1365 for ts_descr in tracing_session_descriptors:
1366 ts_state = _LttngLiveViewerSessionTracingSessionState(
1367 ts_descr, total_stream_infos
1368 )
1369 ts_id = ts_state.tracing_session_descriptor.info.tracing_session_id
1370 self._ts_states[ts_id] = ts_state
1371 total_stream_infos += len(ts_state.stream_infos)
1372
1373 # Update session's stream states to have the new states
1374 self._stream_states.update(ts_state.data_stream_states)
1375 self._stream_states.update(ts_state.metadata_stream_states)
1376
1377 self._command_handlers = {
1378 _LttngLiveViewerAttachToTracingSessionCommand: self._handle_attach_to_tracing_session_command,
1379 _LttngLiveViewerCreateViewerSessionCommand: self._handle_create_viewer_session_command,
1380 _LttngLiveViewerDetachFromTracingSessionCommand: self._handle_detach_from_tracing_session_command,
1381 _LttngLiveViewerGetDataStreamPacketDataCommand: self._handle_get_data_stream_packet_data_command,
1382 _LttngLiveViewerGetMetadataStreamDataCommand: self._handle_get_metadata_stream_data_command,
1383 _LttngLiveViewerGetNewStreamInfosCommand: self._handle_get_new_stream_infos_command,
1384 _LttngLiveViewerGetNextDataStreamIndexEntryCommand: self._handle_get_next_data_stream_index_entry_command,
1385 _LttngLiveViewerGetTracingSessionInfosCommand: self._handle_get_tracing_session_infos_command,
aca7de76 1386 } # type: dict[type[_LttngLiveViewerCommand], Callable[[Any], _LttngLiveViewerReply]]
584af91e
PP
1387
1388 @property
1389 def viewer_session_id(self):
1390 return self._viewer_session_id
1391
aca7de76 1392 def _get_tracing_session_state(self, tracing_session_id: int):
584af91e 1393 if tracing_session_id not in self._ts_states:
f2958352 1394 raise RuntimeError(
f5567ea8 1395 "Unknown tracing session ID {}".format(tracing_session_id)
584af91e
PP
1396 )
1397
1398 return self._ts_states[tracing_session_id]
1399
aca7de76 1400 def _get_data_stream_state(self, stream_id: int):
584af91e 1401 if stream_id not in self._stream_states:
aca7de76 1402 RuntimeError("Unknown stream ID {}".format(stream_id))
584af91e 1403
aca7de76
SM
1404 stream = self._stream_states[stream_id]
1405 if type(stream) is not _LttngLiveViewerSessionDataStreamState:
1406 raise RuntimeError("Stream is not a data stream")
584af91e 1407
aca7de76
SM
1408 return stream
1409
1410 def _get_metadata_stream_state(self, stream_id: int):
1411 if stream_id not in self._stream_states:
1412 RuntimeError("Unknown stream ID {}".format(stream_id))
1413
1414 stream = self._stream_states[stream_id]
1415 if type(stream) is not _LttngLiveViewerSessionMetadataStreamState:
1416 raise RuntimeError("Stream is not a metadata stream")
1417
1418 return stream
1419
1420 def handle_command(self, cmd: _LttngLiveViewerCommand):
584af91e 1421 logging.info(
f5567ea8 1422 "Handling command in viewer session: cmd-cls-name={}".format(
584af91e
PP
1423 cmd.__class__.__name__
1424 )
1425 )
1426 cmd_type = type(cmd)
1427
1428 if cmd_type not in self._command_handlers:
f2958352 1429 raise RuntimeError(
f5567ea8 1430 "Unexpected command: cmd-cls-name={}".format(cmd.__class__.__name__)
584af91e
PP
1431 )
1432
1433 return self._command_handlers[cmd_type](cmd)
1434
aca7de76
SM
1435 def _handle_attach_to_tracing_session_command(
1436 self, cmd: _LttngLiveViewerAttachToTracingSessionCommand
1437 ):
584af91e
PP
1438 fmt = 'Handling "attach to tracing session" command: ts-id={}, offset={}, seek-type={}'
1439 logging.info(fmt.format(cmd.tracing_session_id, cmd.offset, cmd.seek_type))
1440 ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
1441 info = ts_state.tracing_session_descriptor.info
1442
1443 if ts_state.is_attached:
f2958352 1444 raise RuntimeError(
f5567ea8 1445 "Cannot attach to tracing session `{}`: viewer is already attached".format(
584af91e
PP
1446 info.name
1447 )
1448 )
1449
1450 ts_state.is_attached = True
1451 status = _LttngLiveViewerAttachToTracingSessionReply.Status.OK
1452 return _LttngLiveViewerAttachToTracingSessionReply(
1453 status, ts_state.stream_infos
1454 )
1455
aca7de76
SM
1456 def _handle_detach_from_tracing_session_command(
1457 self, cmd: _LttngLiveViewerDetachFromTracingSessionCommand
1458 ):
584af91e
PP
1459 fmt = 'Handling "detach from tracing session" command: ts-id={}'
1460 logging.info(fmt.format(cmd.tracing_session_id))
1461 ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
1462 info = ts_state.tracing_session_descriptor.info
1463
1464 if not ts_state.is_attached:
f2958352 1465 raise RuntimeError(
f5567ea8 1466 "Cannot detach to tracing session `{}`: viewer is not attached".format(
584af91e
PP
1467 info.name
1468 )
1469 )
1470
1471 ts_state.is_attached = False
1472 status = _LttngLiveViewerDetachFromTracingSessionReply.Status.OK
1473 return _LttngLiveViewerDetachFromTracingSessionReply(status)
1474
aca7de76
SM
1475 def _handle_get_next_data_stream_index_entry_command(
1476 self, cmd: _LttngLiveViewerGetNextDataStreamIndexEntryCommand
1477 ):
584af91e
PP
1478 fmt = 'Handling "get next data stream index entry" command: stream-id={}'
1479 logging.info(fmt.format(cmd.stream_id))
aca7de76
SM
1480 stream_state = self._get_data_stream_state(cmd.stream_id)
1481 metadata_stream_state = self._get_metadata_stream_state(
1482 stream_state.metadata_stream_id
1483 )
584af91e
PP
1484
1485 if stream_state.cur_index_entry is None:
1486 # The viewer is done reading this stream
1487 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.HUP
1488
1489 # Dummy data stream index entry to use with the `HUP` status
1490 # (the reply needs one, but the viewer ignores it)
1491 index_entry = _LttngDataStreamIndexEntry(0, 0, 0, 0, 0, 0, 0)
1492
1493 return _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1494 status, index_entry, False, False
1495 )
1496
78169723
FD
1497 timestamp_begin = _get_entry_timestamp_begin(stream_state.cur_index_entry)
1498
1499 if needs_new_metadata_section(metadata_stream_state, timestamp_begin):
1500 metadata_stream_state.is_sent = False
1501 metadata_stream_state.goto_next_section()
1502
584af91e
PP
1503 # The viewer only checks the `has_new_metadata` flag if the
1504 # reply's status is `OK`, so we need to provide an index here
1505 has_new_metadata = stream_state.tracing_session_state.has_new_metadata
aca7de76 1506 if isinstance(stream_state.cur_index_entry, _LttngDataStreamIndexEntry):
71f56e5f
JG
1507 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.OK
1508 else:
71f56e5f
JG
1509 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.INACTIVE
1510
584af91e
PP
1511 reply = _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1512 status, stream_state.cur_index_entry, has_new_metadata, False
1513 )
1514 stream_state.goto_next_index_entry()
1515 return reply
1516
aca7de76
SM
1517 def _handle_get_data_stream_packet_data_command(
1518 self, cmd: _LttngLiveViewerGetDataStreamPacketDataCommand
1519 ):
584af91e
PP
1520 fmt = 'Handling "get data stream packet data" command: stream-id={}, offset={}, req-length={}'
1521 logging.info(fmt.format(cmd.stream_id, cmd.offset, cmd.req_length))
aca7de76 1522 stream_state = self._get_data_stream_state(cmd.stream_id)
584af91e
PP
1523 data_response_length = cmd.req_length
1524
584af91e
PP
1525 if stream_state.tracing_session_state.has_new_metadata:
1526 status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.ERROR
1527 return _LttngLiveViewerGetDataStreamPacketDataReply(
1528 status, bytes(), True, False
1529 )
1530
1531 if self._max_query_data_response_size:
1532 # Enforce a server side limit on the query requested length.
1533 # To ensure that the transaction terminate take the minimum of both
1534 # value.
1535 data_response_length = min(
1536 cmd.req_length, self._max_query_data_response_size
1537 )
1538 fmt = 'Limiting "get data stream packet data" command: req-length={} actual response size={}'
1539 logging.info(fmt.format(cmd.req_length, data_response_length))
1540
1541 data = stream_state.data_stream.get_data(cmd.offset, data_response_length)
1542 status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.OK
1543 return _LttngLiveViewerGetDataStreamPacketDataReply(status, data, False, False)
1544
aca7de76
SM
1545 def _handle_get_metadata_stream_data_command(
1546 self, cmd: _LttngLiveViewerGetMetadataStreamDataCommand
1547 ):
584af91e
PP
1548 fmt = 'Handling "get metadata stream data" command: stream-id={}'
1549 logging.info(fmt.format(cmd.stream_id))
aca7de76 1550 metadata_stream_state = self._get_metadata_stream_state(cmd.stream_id)
584af91e 1551
78169723 1552 if metadata_stream_state.is_sent:
584af91e
PP
1553 status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.NO_NEW
1554 return _LttngLiveViewerGetMetadataStreamDataContentReply(status, bytes())
1555
78169723 1556 metadata_stream_state.is_sent = True
584af91e 1557 status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.OK
78169723 1558 metadata_section = metadata_stream_state.cur_section
b8b97725 1559 assert metadata_section is not None
78169723
FD
1560
1561 # If we are sending an empty section, ready the next one right away.
1562 if len(metadata_section.data) == 0:
1563 metadata_stream_state.is_sent = False
1564 metadata_stream_state.goto_next_section()
1565
1566 fmt = 'Replying to "get metadata stream data" command: metadata-size={}'
1567 logging.info(fmt.format(len(metadata_section.data)))
584af91e 1568 return _LttngLiveViewerGetMetadataStreamDataContentReply(
78169723 1569 status, metadata_section.data
584af91e
PP
1570 )
1571
aca7de76
SM
1572 def _handle_get_new_stream_infos_command(
1573 self, cmd: _LttngLiveViewerGetNewStreamInfosCommand
1574 ):
584af91e
PP
1575 fmt = 'Handling "get new stream infos" command: ts-id={}'
1576 logging.info(fmt.format(cmd.tracing_session_id))
1577
1578 # As of this version, all the tracing session's stream infos are
1579 # always given to the viewer when sending the "attach to tracing
1580 # session" reply, so there's nothing new here. Return the `HUP`
1581 # status as, if we're handling this command, the viewer consumed
1582 # all the existing data streams.
1583 status = _LttngLiveViewerGetNewStreamInfosReply.Status.HUP
1584 return _LttngLiveViewerGetNewStreamInfosReply(status, [])
1585
aca7de76
SM
1586 def _handle_get_tracing_session_infos_command(
1587 self, cmd: _LttngLiveViewerGetTracingSessionInfosCommand
1588 ):
584af91e
PP
1589 logging.info('Handling "get tracing session infos" command.')
1590 infos = [
1591 tss.tracing_session_descriptor.info for tss in self._ts_states.values()
1592 ]
1593 infos.sort(key=lambda info: info.name)
1594 return _LttngLiveViewerGetTracingSessionInfosReply(infos)
1595
aca7de76
SM
1596 def _handle_create_viewer_session_command(
1597 self, cmd: _LttngLiveViewerCreateViewerSessionCommand
1598 ):
584af91e
PP
1599 logging.info('Handling "create viewer session" command.')
1600 status = _LttngLiveViewerCreateViewerSessionReply.Status.OK
1601
1602 # This does nothing here. In the LTTng relay daemon, it
1603 # allocates the viewer session's state.
1604 return _LttngLiveViewerCreateViewerSessionReply(status)
1605
1606
1607# An LTTng live TCP server.
1608#
e51141d3
SM
1609# On creation, it binds to `localhost` on the TCP port `port` if not `None`, or
1610# on an OS-assigned TCP port otherwise. It writes the decimal TCP port number
1611# to a temporary port file. It renames the temporary port file to
1612# `port_filename`.
584af91e
PP
1613#
1614# `tracing_session_descriptors` is a list of tracing session descriptors
1615# (`LttngTracingSessionDescriptor`) to serve.
1616#
1617# This server accepts a single viewer (client).
1618#
1619# When the viewer closes the connection, the server's constructor
1620# returns.
1621class LttngLiveServer:
1622 def __init__(
e51141d3 1623 self,
aca7de76 1624 port: Optional[int],
d11f45ef 1625 port_filename: Optional[str],
aca7de76
SM
1626 tracing_session_descriptors: Iterable[LttngTracingSessionDescriptor],
1627 max_query_data_response_size: Optional[int],
584af91e 1628 ):
f5567ea8 1629 logging.info("Server configuration:")
584af91e 1630
f5567ea8 1631 logging.info(" Port file name: `{}`".format(port_filename))
584af91e
PP
1632
1633 if max_query_data_response_size is not None:
1634 logging.info(
f5567ea8 1635 " Maximum response data query size: `{}`".format(
584af91e
PP
1636 max_query_data_response_size
1637 )
1638 )
1639
1640 for ts_descr in tracing_session_descriptors:
1641 info = ts_descr.info
1642 fmt = ' TS descriptor: name="{}", id={}, hostname="{}", live-timer-freq={}, client-count={}, stream-count={}:'
1643 logging.info(
1644 fmt.format(
1645 info.name,
1646 info.tracing_session_id,
1647 info.hostname,
1648 info.live_timer_freq,
1649 info.client_count,
1650 info.stream_count,
1651 )
1652 )
1653
1654 for trace in ts_descr.traces:
1655 logging.info(' Trace: path="{}"'.format(trace.path))
1656
1657 self._ts_descriptors = tracing_session_descriptors
1658 self._max_query_data_response_size = max_query_data_response_size
1659 self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1660 self._codec = _LttngLiveViewerProtocolCodec()
1661
1662 # Port 0: OS assigns an unused port
e51141d3 1663 serv_addr = ("localhost", port if port is not None else 0)
584af91e 1664 self._sock.bind(serv_addr)
d11f45ef
SM
1665
1666 if port_filename is not None:
1667 self._write_port_to_file(port_filename)
1668
1669 print("Listening on port {}".format(self._server_port))
584af91e 1670
2439d12b
SM
1671 for ts_descr in tracing_session_descriptors:
1672 info = ts_descr.info
1673 print(
1674 "net://localhost:{}/host/{}/{}".format(
1675 self._server_port, info.hostname, info.name
1676 )
1677 )
1678
584af91e
PP
1679 try:
1680 self._listen()
1681 finally:
1682 self._sock.close()
f5567ea8 1683 logging.info("Closed connection and socket.")
584af91e
PP
1684
1685 @property
1686 def _server_port(self):
1687 return self._sock.getsockname()[1]
1688
1689 def _recv_command(self):
1690 data = bytes()
1691
1692 while True:
f5567ea8 1693 logging.info("Waiting for viewer command.")
584af91e
PP
1694 buf = self._conn.recv(128)
1695
1696 if not buf:
f5567ea8 1697 logging.info("Client closed connection.")
584af91e
PP
1698
1699 if data:
f2958352 1700 raise RuntimeError(
f5567ea8 1701 "Client closed connection after having sent {} command bytes.".format(
584af91e
PP
1702 len(data)
1703 )
1704 )
1705
1706 return
1707
f5567ea8 1708 logging.info("Received data from viewer: length={}".format(len(buf)))
584af91e
PP
1709
1710 data += buf
1711
1712 try:
1713 cmd = self._codec.decode(data)
1714 except struct.error as exc:
f2958352 1715 raise RuntimeError("Malformed command: {}".format(exc)) from exc
584af91e
PP
1716
1717 if cmd is not None:
1718 logging.info(
f5567ea8 1719 "Received command from viewer: cmd-cls-name={}".format(
584af91e
PP
1720 cmd.__class__.__name__
1721 )
1722 )
1723 return cmd
1724
aca7de76 1725 def _send_reply(self, reply: _LttngLiveViewerReply):
584af91e
PP
1726 data = self._codec.encode(reply)
1727 logging.info(
f5567ea8 1728 "Sending reply to viewer: reply-cls-name={}, length={}".format(
584af91e
PP
1729 reply.__class__.__name__, len(data)
1730 )
1731 )
1732 self._conn.sendall(data)
1733
1734 def _handle_connection(self):
1735 # First command must be "connect"
1736 cmd = self._recv_command()
1737
1738 if type(cmd) is not _LttngLiveViewerConnectCommand:
f2958352 1739 raise RuntimeError(
584af91e
PP
1740 'First command is not "connect": cmd-cls-name={}'.format(
1741 cmd.__class__.__name__
1742 )
1743 )
1744
1745 # Create viewer session (arbitrary ID 23)
1746 logging.info(
f5567ea8 1747 "LTTng live viewer connected: version={}.{}".format(cmd.major, cmd.minor)
584af91e
PP
1748 )
1749 viewer_session = _LttngLiveViewerSession(
1750 23, self._ts_descriptors, self._max_query_data_response_size
1751 )
1752
1753 # Send "connect" reply
1754 self._send_reply(
1755 _LttngLiveViewerConnectReply(viewer_session.viewer_session_id, 2, 10)
1756 )
1757
1758 # Make the viewer session handle the remaining commands
1759 while True:
1760 cmd = self._recv_command()
1761
1762 if cmd is None:
1763 # Connection closed (at an expected location within the
1764 # conversation)
1765 return
1766
1767 self._send_reply(viewer_session.handle_command(cmd))
1768
1769 def _listen(self):
f5567ea8 1770 logging.info("Listening: port={}".format(self._server_port))
1726ac08
JR
1771 # Backlog must be present for Python version < 3.5.
1772 # 128 is an arbitrary number since we expect only 1 connection anyway.
1773 self._sock.listen(128)
584af91e
PP
1774 self._conn, viewer_addr = self._sock.accept()
1775 logging.info(
f5567ea8 1776 "Accepted viewer: addr={}:{}".format(viewer_addr[0], viewer_addr[1])
584af91e
PP
1777 )
1778
1779 try:
1780 self._handle_connection()
1781 finally:
1782 self._conn.close()
1783
aca7de76 1784 def _write_port_to_file(self, port_filename: str):
584af91e 1785 # Write the port number to a temporary file.
a0a92e79
SM
1786 with tempfile.NamedTemporaryFile(
1787 mode="w", delete=False, dir=os.path.dirname(port_filename)
1788 ) as tmp_port_file:
f5567ea8 1789 print(self._server_port, end="", file=tmp_port_file)
584af91e
PP
1790
1791 # Rename temporary file to real file
9c878ece 1792 os.replace(tmp_port_file.name, port_filename)
584af91e
PP
1793 logging.info(
1794 'Renamed port file: src-path="{}", dst-path="{}"'.format(
1795 tmp_port_file.name, port_filename
1796 )
1797 )
1798
1799
aca7de76
SM
1800def _session_descriptors_from_path(
1801 sessions_filename: str, trace_path_prefix: Optional[str]
1802):
2b763e29
JG
1803 # File format is:
1804 #
1805 # [
1806 # {
1807 # "name": "my-session",
1808 # "id": 17,
1809 # "hostname": "myhost",
1810 # "live-timer-freq": 1000000,
1811 # "client-count": 23,
1812 # "traces": [
1813 # {
1814 # "path": "lol"
1815 # },
1816 # {
71f56e5f
JG
1817 # "path": "meow/mix",
1818 # "beacons": {
1819 # "my_stream": [ 5235787, 728375283 ]
78169723
FD
1820 # },
1821 # "metadata-sections": [
1822 # {
1823 # "line": 1,
1824 # "timestamp": 0
1825 # }
1826 # ]
2b763e29
JG
1827 # }
1828 # ]
1829 # }
1830 # ]
f5567ea8 1831 with open(sessions_filename, "r") as sessions_file:
aca7de76
SM
1832 sessions_json = tjson.load(sessions_file, tjson.ArrayVal)
1833
1834 sessions = [] # type: list[LttngTracingSessionDescriptor]
2b763e29 1835
aca7de76
SM
1836 for session_json in sessions_json.iter(tjson.ObjVal):
1837 name = session_json.at("name", tjson.StrVal).val
1838 tracing_session_id = session_json.at("id", tjson.IntVal).val
1839 hostname = session_json.at("hostname", tjson.StrVal).val
1840 live_timer_freq = session_json.at("live-timer-freq", tjson.IntVal).val
1841 client_count = session_json.at("client-count", tjson.IntVal).val
1842 traces_json = session_json.at("traces", tjson.ArrayVal)
2b763e29 1843
aca7de76 1844 traces = [] # type: list[LttngTrace]
2b763e29 1845
aca7de76
SM
1846 for trace_json in traces_json.iter(tjson.ObjVal):
1847 metadata_sections = (
1848 trace_json.at("metadata-sections", tjson.ArrayVal)
1849 if "metadata-sections" in trace_json
1850 else None
1851 )
1852 beacons = (
1853 trace_json.at("beacons", tjson.ObjVal)
1854 if "beacons" in trace_json
1855 else None
1856 )
1857 path = trace_json.at("path", tjson.StrVal).val
2b763e29 1858
aca7de76 1859 if not os.path.isabs(path) and trace_path_prefix:
2b763e29
JG
1860 path = os.path.join(trace_path_prefix, path)
1861
aca7de76
SM
1862 traces.append(
1863 LttngTrace(
1864 path,
1865 metadata_sections,
1866 beacons,
1867 )
1868 )
2b763e29
JG
1869
1870 sessions.append(
1871 LttngTracingSessionDescriptor(
1872 name,
1873 tracing_session_id,
1874 hostname,
1875 live_timer_freq,
1876 client_count,
1877 traces,
1878 )
1879 )
1880
1881 return sessions
584af91e
PP
1882
1883
aca7de76 1884def _loglevel_parser(string: str):
f5567ea8 1885 loglevels = {"info": logging.INFO, "warning": logging.WARNING}
584af91e
PP
1886 if string not in loglevels:
1887 msg = "{} is not a valid loglevel".format(string)
1888 raise argparse.ArgumentTypeError(msg)
1889 return loglevels[string]
1890
1891
f5567ea8
FD
1892if __name__ == "__main__":
1893 logging.basicConfig(format="# %(asctime)-25s%(message)s")
584af91e 1894 parser = argparse.ArgumentParser(
f5567ea8 1895 description="LTTng-live protocol mocker", add_help=False
584af91e
PP
1896 )
1897 parser.add_argument(
f5567ea8
FD
1898 "--log-level",
1899 default="warning",
1900 choices=["info", "warning"],
1901 help="The loglevel to be used.",
584af91e
PP
1902 )
1903
1904 loglevel_namespace, remaining_args = parser.parse_known_args()
1905 logging.getLogger().setLevel(_loglevel_parser(loglevel_namespace.log_level))
1906
e51141d3
SM
1907 parser.add_argument(
1908 "--port",
1909 help="The port to bind to. If missing, use an OS-assigned port..",
1910 type=int,
1911 )
584af91e 1912 parser.add_argument(
f5567ea8
FD
1913 "--port-filename",
1914 help="The final port file. This file is present when the server is ready to receive connection.",
584af91e
PP
1915 )
1916 parser.add_argument(
f5567ea8 1917 "--max-query-data-response-size",
584af91e 1918 type=int,
f5567ea8 1919 help="The maximum size of control data response in bytes",
584af91e
PP
1920 )
1921 parser.add_argument(
f5567ea8 1922 "--trace-path-prefix",
2b763e29 1923 type=str,
f5567ea8 1924 help="Prefix to prepend to the trace paths of session configurations",
2b763e29
JG
1925 )
1926 parser.add_argument(
e46b8462 1927 "sessions_filename",
776a2a25 1928 type=str,
f5567ea8 1929 help="Path to a session configuration file",
e46b8462 1930 metavar="sessions-filename",
584af91e
PP
1931 )
1932 parser.add_argument(
f5567ea8
FD
1933 "-h",
1934 "--help",
1935 action="help",
584af91e 1936 default=argparse.SUPPRESS,
f5567ea8 1937 help="Show this help message and exit.",
584af91e
PP
1938 )
1939
1940 args = parser.parse_args(args=remaining_args)
f2958352
SM
1941 sessions_filename = args.sessions_filename # type: str
1942 trace_path_prefix = args.trace_path_prefix # type: str | None
1943 sessions = _session_descriptors_from_path(
1944 sessions_filename,
1945 trace_path_prefix,
1946 )
aca7de76 1947
f2958352 1948 port = args.port # type: int | None
d11f45ef 1949 port_filename = args.port_filename # type: str | None
f2958352
SM
1950 max_query_data_response_size = args.max_query_data_response_size # type: int | None
1951 LttngLiveServer(port, port_filename, sessions, max_query_data_response_size)
This page took 0.148032 seconds and 4 git commands to generate.