tests: fix all basic type check issues of lttng_live_server.py
[babeltrace.git] / tests / data / plugins / src.ctf.lttng-live / lttng_live_server.py
CommitLineData
0235b0db 1# SPDX-License-Identifier: MIT
584af91e 2#
0235b0db 3# Copyright (C) 2019 Philippe Proulx <pproulx@efficios.com>
584af91e 4#
584af91e 5
584af91e 6import os
584af91e 7import re
5995b304
SM
8import sys
9import json
584af91e
PP
10import socket
11import struct
5995b304
SM
12import logging
13import os.path
14import argparse
584af91e 15import tempfile
5995b304 16import collections.abc
78169723 17from collections import namedtuple
584af91e
PP
18
19
20class UnexpectedInput(RuntimeError):
21 pass
22
23
ee1171e5
SM
24# An entry within the index of an LTTng data stream.
25class _LttngDataStreamIndexEntry:
26 def __init__(
27 self,
28 offset_bytes,
29 total_size_bits,
30 content_size_bits,
31 timestamp_begin,
32 timestamp_end,
33 events_discarded,
34 stream_class_id,
35 ):
36 self._offset_bytes = offset_bytes
37 self._total_size_bits = total_size_bits
38 self._content_size_bits = content_size_bits
39 self._timestamp_begin = timestamp_begin
40 self._timestamp_end = timestamp_end
41 self._events_discarded = events_discarded
42 self._stream_class_id = stream_class_id
43
44 @property
45 def offset_bytes(self):
46 return self._offset_bytes
47
48 @property
49 def total_size_bits(self):
50 return self._total_size_bits
51
52 @property
53 def total_size_bytes(self):
54 return self._total_size_bits // 8
55
56 @property
57 def content_size_bits(self):
58 return self._content_size_bits
59
60 @property
61 def content_size_bytes(self):
62 return self._content_size_bits // 8
63
64 @property
65 def timestamp_begin(self):
66 return self._timestamp_begin
67
68 @property
69 def timestamp_end(self):
70 return self._timestamp_end
71
72 @property
73 def events_discarded(self):
74 return self._events_discarded
75
76 @property
77 def stream_class_id(self):
78 return self._stream_class_id
79
80
81# An entry within the index of an LTTng data stream. While a stream beacon entry
82# is conceptually unrelated to an index, it is sent as a reply to a
83# LttngLiveViewerGetNextDataStreamIndexEntryCommand
84class _LttngDataStreamBeaconEntry:
85 def __init__(self, stream_class_id, timestamp):
86 self._stream_class_id = stream_class_id
87 self._timestamp = timestamp
88
89 @property
90 def timestamp(self):
91 return self._timestamp
92
93 @property
94 def stream_class_id(self):
95 return self._stream_class_id
96
97
584af91e
PP
98class _LttngLiveViewerCommand:
99 def __init__(self, version):
100 self._version = version
101
102 @property
103 def version(self):
104 return self._version
105
106
107class _LttngLiveViewerConnectCommand(_LttngLiveViewerCommand):
108 def __init__(self, version, viewer_session_id, major, minor):
109 super().__init__(version)
110 self._viewer_session_id = viewer_session_id
111 self._major = major
112 self._minor = minor
113
114 @property
115 def viewer_session_id(self):
116 return self._viewer_session_id
117
118 @property
119 def major(self):
120 return self._major
121
122 @property
123 def minor(self):
124 return self._minor
125
126
127class _LttngLiveViewerConnectReply:
128 def __init__(self, viewer_session_id, major, minor):
129 self._viewer_session_id = viewer_session_id
130 self._major = major
131 self._minor = minor
132
133 @property
134 def viewer_session_id(self):
135 return self._viewer_session_id
136
137 @property
138 def major(self):
139 return self._major
140
141 @property
142 def minor(self):
143 return self._minor
144
145
146class _LttngLiveViewerGetTracingSessionInfosCommand(_LttngLiveViewerCommand):
147 pass
148
149
150class _LttngLiveViewerTracingSessionInfo:
151 def __init__(
152 self,
153 tracing_session_id,
154 live_timer_freq,
155 client_count,
156 stream_count,
157 hostname,
158 name,
159 ):
160 self._tracing_session_id = tracing_session_id
161 self._live_timer_freq = live_timer_freq
162 self._client_count = client_count
163 self._stream_count = stream_count
164 self._hostname = hostname
165 self._name = name
166
167 @property
168 def tracing_session_id(self):
169 return self._tracing_session_id
170
171 @property
172 def live_timer_freq(self):
173 return self._live_timer_freq
174
175 @property
176 def client_count(self):
177 return self._client_count
178
179 @property
180 def stream_count(self):
181 return self._stream_count
182
183 @property
184 def hostname(self):
185 return self._hostname
186
187 @property
188 def name(self):
189 return self._name
190
191
192class _LttngLiveViewerGetTracingSessionInfosReply:
193 def __init__(self, tracing_session_infos):
194 self._tracing_session_infos = tracing_session_infos
195
196 @property
197 def tracing_session_infos(self):
198 return self._tracing_session_infos
199
200
201class _LttngLiveViewerAttachToTracingSessionCommand(_LttngLiveViewerCommand):
202 class SeekType:
203 BEGINNING = 1
204 LAST = 2
205
206 def __init__(self, version, tracing_session_id, offset, seek_type):
207 super().__init__(version)
208 self._tracing_session_id = tracing_session_id
209 self._offset = offset
210 self._seek_type = seek_type
211
212 @property
213 def tracing_session_id(self):
214 return self._tracing_session_id
215
216 @property
217 def offset(self):
218 return self._offset
219
220 @property
221 def seek_type(self):
222 return self._seek_type
223
224
225class _LttngLiveViewerStreamInfo:
226 def __init__(self, id, trace_id, is_metadata, path, channel_name):
227 self._id = id
228 self._trace_id = trace_id
229 self._is_metadata = is_metadata
230 self._path = path
231 self._channel_name = channel_name
232
233 @property
234 def id(self):
235 return self._id
236
237 @property
238 def trace_id(self):
239 return self._trace_id
240
241 @property
242 def is_metadata(self):
243 return self._is_metadata
244
245 @property
246 def path(self):
247 return self._path
248
249 @property
250 def channel_name(self):
251 return self._channel_name
252
253
254class _LttngLiveViewerAttachToTracingSessionReply:
255 class Status:
256 OK = 1
257 ALREADY = 2
258 UNKNOWN = 3
259 NOT_LIVE = 4
260 SEEK_ERROR = 5
261 NO_SESSION = 6
262
263 def __init__(self, status, stream_infos):
264 self._status = status
265 self._stream_infos = stream_infos
266
267 @property
268 def status(self):
269 return self._status
270
271 @property
272 def stream_infos(self):
273 return self._stream_infos
274
275
276class _LttngLiveViewerGetNextDataStreamIndexEntryCommand(_LttngLiveViewerCommand):
277 def __init__(self, version, stream_id):
278 super().__init__(version)
279 self._stream_id = stream_id
280
281 @property
282 def stream_id(self):
283 return self._stream_id
284
285
286class _LttngLiveViewerGetNextDataStreamIndexEntryReply:
287 class Status:
288 OK = 1
289 RETRY = 2
290 HUP = 3
291 ERROR = 4
292 INACTIVE = 5
293 EOF = 6
294
295 def __init__(self, status, index_entry, has_new_metadata, has_new_data_stream):
296 self._status = status
297 self._index_entry = index_entry
298 self._has_new_metadata = has_new_metadata
299 self._has_new_data_stream = has_new_data_stream
300
301 @property
302 def status(self):
303 return self._status
304
305 @property
306 def index_entry(self):
307 return self._index_entry
308
309 @property
310 def has_new_metadata(self):
311 return self._has_new_metadata
312
313 @property
314 def has_new_data_stream(self):
315 return self._has_new_data_stream
316
317
318class _LttngLiveViewerGetDataStreamPacketDataCommand(_LttngLiveViewerCommand):
319 def __init__(self, version, stream_id, offset, req_length):
320 super().__init__(version)
321 self._stream_id = stream_id
322 self._offset = offset
323 self._req_length = req_length
324
325 @property
326 def stream_id(self):
327 return self._stream_id
328
329 @property
330 def offset(self):
331 return self._offset
332
333 @property
334 def req_length(self):
335 return self._req_length
336
337
338class _LttngLiveViewerGetDataStreamPacketDataReply:
339 class Status:
340 OK = 1
341 RETRY = 2
342 ERROR = 3
343 EOF = 4
344
345 def __init__(self, status, data, has_new_metadata, has_new_data_stream):
346 self._status = status
347 self._data = data
348 self._has_new_metadata = has_new_metadata
349 self._has_new_data_stream = has_new_data_stream
350
351 @property
352 def status(self):
353 return self._status
354
355 @property
356 def data(self):
357 return self._data
358
359 @property
360 def has_new_metadata(self):
361 return self._has_new_metadata
362
363 @property
364 def has_new_data_stream(self):
365 return self._has_new_data_stream
366
367
368class _LttngLiveViewerGetMetadataStreamDataCommand(_LttngLiveViewerCommand):
369 def __init__(self, version, stream_id):
370 super().__init__(version)
371 self._stream_id = stream_id
372
373 @property
374 def stream_id(self):
375 return self._stream_id
376
377
378class _LttngLiveViewerGetMetadataStreamDataContentReply:
379 class Status:
380 OK = 1
381 NO_NEW = 2
382 ERROR = 3
383
384 def __init__(self, status, data):
385 self._status = status
386 self._data = data
387
388 @property
389 def status(self):
390 return self._status
391
392 @property
393 def data(self):
394 return self._data
395
396
397class _LttngLiveViewerGetNewStreamInfosCommand(_LttngLiveViewerCommand):
398 def __init__(self, version, tracing_session_id):
399 super().__init__(version)
400 self._tracing_session_id = tracing_session_id
401
402 @property
403 def tracing_session_id(self):
404 return self._tracing_session_id
405
406
407class _LttngLiveViewerGetNewStreamInfosReply:
408 class Status:
409 OK = 1
410 NO_NEW = 2
411 ERROR = 3
412 HUP = 4
413
414 def __init__(self, status, stream_infos):
415 self._status = status
416 self._stream_infos = stream_infos
417
418 @property
419 def status(self):
420 return self._status
421
422 @property
423 def stream_infos(self):
424 return self._stream_infos
425
426
427class _LttngLiveViewerCreateViewerSessionCommand(_LttngLiveViewerCommand):
428 pass
429
430
431class _LttngLiveViewerCreateViewerSessionReply:
432 class Status:
433 OK = 1
434 ERROR = 2
435
436 def __init__(self, status):
437 self._status = status
438
439 @property
440 def status(self):
441 return self._status
442
443
444class _LttngLiveViewerDetachFromTracingSessionCommand(_LttngLiveViewerCommand):
445 def __init__(self, version, tracing_session_id):
446 super().__init__(version)
447 self._tracing_session_id = tracing_session_id
448
449 @property
450 def tracing_session_id(self):
451 return self._tracing_session_id
452
453
454class _LttngLiveViewerDetachFromTracingSessionReply:
455 class Status:
456 OK = 1
457 UNKNOWN = 2
458 ERROR = 3
459
460 def __init__(self, status):
461 self._status = status
462
463 @property
464 def status(self):
465 return self._status
466
467
468# An LTTng live protocol codec can convert bytes to command objects and
469# reply objects to bytes.
470class _LttngLiveViewerProtocolCodec:
f5567ea8 471 _COMMAND_HEADER_STRUCT_FMT = "QII"
584af91e
PP
472 _COMMAND_HEADER_SIZE_BYTES = struct.calcsize(_COMMAND_HEADER_STRUCT_FMT)
473
474 def __init__(self):
475 pass
476
477 def _unpack(self, fmt, data, offset=0):
f5567ea8 478 fmt = "!" + fmt
584af91e
PP
479 return struct.unpack_from(fmt, data, offset)
480
481 def _unpack_payload(self, fmt, data):
482 return self._unpack(
483 fmt, data, _LttngLiveViewerProtocolCodec._COMMAND_HEADER_SIZE_BYTES
484 )
485
486 def decode(self, data):
487 if len(data) < self._COMMAND_HEADER_SIZE_BYTES:
488 # Not enough data to read the command header
489 return
490
491 payload_size, cmd_type, version = self._unpack(
492 self._COMMAND_HEADER_STRUCT_FMT, data
493 )
494 logging.info(
f5567ea8 495 "Decoded command header: payload-size={}, cmd-type={}, version={}".format(
584af91e
PP
496 payload_size, cmd_type, version
497 )
498 )
499
500 if len(data) < self._COMMAND_HEADER_SIZE_BYTES + payload_size:
501 # Not enough data to read the whole command
502 return
503
504 if cmd_type == 1:
505 viewer_session_id, major, minor, conn_type = self._unpack_payload(
f5567ea8 506 "QIII", data
584af91e
PP
507 )
508 return _LttngLiveViewerConnectCommand(
509 version, viewer_session_id, major, minor
510 )
511 elif cmd_type == 2:
512 return _LttngLiveViewerGetTracingSessionInfosCommand(version)
513 elif cmd_type == 3:
f5567ea8 514 tracing_session_id, offset, seek_type = self._unpack_payload("QQI", data)
584af91e
PP
515 return _LttngLiveViewerAttachToTracingSessionCommand(
516 version, tracing_session_id, offset, seek_type
517 )
518 elif cmd_type == 4:
f5567ea8 519 (stream_id,) = self._unpack_payload("Q", data)
584af91e
PP
520 return _LttngLiveViewerGetNextDataStreamIndexEntryCommand(
521 version, stream_id
522 )
523 elif cmd_type == 5:
f5567ea8 524 stream_id, offset, req_length = self._unpack_payload("QQI", data)
584af91e
PP
525 return _LttngLiveViewerGetDataStreamPacketDataCommand(
526 version, stream_id, offset, req_length
527 )
528 elif cmd_type == 6:
f5567ea8 529 (stream_id,) = self._unpack_payload("Q", data)
584af91e
PP
530 return _LttngLiveViewerGetMetadataStreamDataCommand(version, stream_id)
531 elif cmd_type == 7:
f5567ea8 532 (tracing_session_id,) = self._unpack_payload("Q", data)
584af91e
PP
533 return _LttngLiveViewerGetNewStreamInfosCommand(version, tracing_session_id)
534 elif cmd_type == 8:
535 return _LttngLiveViewerCreateViewerSessionCommand(version)
536 elif cmd_type == 9:
f5567ea8 537 (tracing_session_id,) = self._unpack_payload("Q", data)
584af91e
PP
538 return _LttngLiveViewerDetachFromTracingSessionCommand(
539 version, tracing_session_id
540 )
541 else:
f5567ea8 542 raise UnexpectedInput("Unknown command type {}".format(cmd_type))
584af91e
PP
543
544 def _pack(self, fmt, *args):
545 # Force network byte order
f5567ea8 546 return struct.pack("!" + fmt, *args)
584af91e
PP
547
548 def _encode_zero_padded_str(self, string, length):
549 data = string.encode()
f5567ea8 550 return data.ljust(length, b"\x00")
584af91e
PP
551
552 def _encode_stream_info(self, info):
f5567ea8 553 data = self._pack("QQI", info.id, info.trace_id, int(info.is_metadata))
584af91e
PP
554 data += self._encode_zero_padded_str(info.path, 4096)
555 data += self._encode_zero_padded_str(info.channel_name, 255)
556 return data
557
558 def _get_has_new_stuff_flags(self, has_new_metadata, has_new_data_streams):
559 flags = 0
560
561 if has_new_metadata:
562 flags |= 1
563
564 if has_new_data_streams:
565 flags |= 2
566
567 return flags
568
569 def encode(self, reply):
570 if type(reply) is _LttngLiveViewerConnectReply:
571 data = self._pack(
f5567ea8 572 "QIII", reply.viewer_session_id, reply.major, reply.minor, 2
584af91e
PP
573 )
574 elif type(reply) is _LttngLiveViewerGetTracingSessionInfosReply:
f5567ea8 575 data = self._pack("I", len(reply.tracing_session_infos))
584af91e
PP
576
577 for info in reply.tracing_session_infos:
578 data += self._pack(
f5567ea8 579 "QIII",
584af91e
PP
580 info.tracing_session_id,
581 info.live_timer_freq,
582 info.client_count,
583 info.stream_count,
584 )
585 data += self._encode_zero_padded_str(info.hostname, 64)
586 data += self._encode_zero_padded_str(info.name, 255)
587 elif type(reply) is _LttngLiveViewerAttachToTracingSessionReply:
f5567ea8 588 data = self._pack("II", reply.status, len(reply.stream_infos))
584af91e
PP
589
590 for info in reply.stream_infos:
591 data += self._encode_stream_info(info)
592 elif type(reply) is _LttngLiveViewerGetNextDataStreamIndexEntryReply:
f5567ea8 593 index_format = "QQQQQQQII"
584af91e
PP
594 entry = reply.index_entry
595 flags = self._get_has_new_stuff_flags(
596 reply.has_new_metadata, reply.has_new_data_stream
597 )
598
71f56e5f
JG
599 if type(entry) is _LttngDataStreamIndexEntry:
600 data = self._pack(
601 index_format,
602 entry.offset_bytes,
603 entry.total_size_bits,
604 entry.content_size_bits,
605 entry.timestamp_begin,
606 entry.timestamp_end,
607 entry.events_discarded,
608 entry.stream_class_id,
609 reply.status,
610 flags,
611 )
612 else:
613 assert type(entry) is _LttngDataStreamBeaconEntry
614 data = self._pack(
615 index_format,
616 0,
617 0,
618 0,
619 0,
620 entry.timestamp,
621 0,
622 entry.stream_class_id,
623 reply.status,
624 flags,
625 )
584af91e
PP
626 elif type(reply) is _LttngLiveViewerGetDataStreamPacketDataReply:
627 flags = self._get_has_new_stuff_flags(
628 reply.has_new_metadata, reply.has_new_data_stream
629 )
f5567ea8 630 data = self._pack("III", reply.status, len(reply.data), flags)
584af91e
PP
631 data += reply.data
632 elif type(reply) is _LttngLiveViewerGetMetadataStreamDataContentReply:
f5567ea8 633 data = self._pack("QI", len(reply.data), reply.status)
584af91e
PP
634 data += reply.data
635 elif type(reply) is _LttngLiveViewerGetNewStreamInfosReply:
f5567ea8 636 data = self._pack("II", reply.status, len(reply.stream_infos))
584af91e
PP
637
638 for info in reply.stream_infos:
639 data += self._encode_stream_info(info)
640 elif type(reply) is _LttngLiveViewerCreateViewerSessionReply:
f5567ea8 641 data = self._pack("I", reply.status)
584af91e 642 elif type(reply) is _LttngLiveViewerDetachFromTracingSessionReply:
f5567ea8 643 data = self._pack("I", reply.status)
584af91e
PP
644 else:
645 raise ValueError(
f5567ea8 646 "Unknown reply object with class `{}`".format(reply.__class__.__name__)
584af91e
PP
647 )
648
649 return data
650
651
78169723
FD
652def _get_entry_timestamp_begin(entry):
653 if type(entry) is _LttngDataStreamBeaconEntry:
654 return entry.timestamp
655 else:
656 assert type(entry) is _LttngDataStreamIndexEntry
657 return entry.timestamp_begin
658
659
584af91e
PP
660# The index of an LTTng data stream, a sequence of index entries.
661class _LttngDataStreamIndex(collections.abc.Sequence):
71f56e5f 662 def __init__(self, path, beacons):
584af91e
PP
663 self._path = path
664 self._build()
71f56e5f
JG
665
666 if beacons:
667 stream_class_id = self._entries[0].stream_class_id
668 beacons = [
669 _LttngDataStreamBeaconEntry(stream_class_id, ts) for ts in beacons
670 ]
671 self._add_beacons(beacons)
672
584af91e
PP
673 logging.info(
674 'Built data stream index entries: path="{}", count={}'.format(
675 path, len(self._entries)
676 )
677 )
678
679 def _build(self):
680 self._entries = []
681 assert os.path.isfile(self._path)
682
f5567ea8 683 with open(self._path, "rb") as f:
584af91e 684 # Read header first
f5567ea8 685 fmt = ">IIII"
584af91e
PP
686 size = struct.calcsize(fmt)
687 data = f.read(size)
688 assert len(data) == size
689 magic, index_major, index_minor, index_entry_length = struct.unpack(
690 fmt, data
691 )
692 assert magic == 0xC1F1DCC1
693
694 # Read index entries
f5567ea8 695 fmt = ">QQQQQQQ"
584af91e
PP
696 size = struct.calcsize(fmt)
697
698 while True:
699 logging.debug(
700 'Decoding data stream index entry: path="{}", offset={}'.format(
701 self._path, f.tell()
702 )
703 )
704 data = f.read(size)
705
706 if not data:
707 # Done
708 break
709
710 assert len(data) == size
75882e97
FD
711 (
712 offset_bytes,
713 total_size_bits,
714 content_size_bits,
715 timestamp_begin,
716 timestamp_end,
717 events_discarded,
718 stream_class_id,
719 ) = struct.unpack(fmt, data)
584af91e
PP
720
721 self._entries.append(
722 _LttngDataStreamIndexEntry(
723 offset_bytes,
724 total_size_bits,
725 content_size_bits,
726 timestamp_begin,
727 timestamp_end,
728 events_discarded,
729 stream_class_id,
730 )
731 )
732
733 # Skip anything else before the next entry
734 f.seek(index_entry_length - size, os.SEEK_CUR)
735
71f56e5f
JG
736 def _add_beacons(self, beacons):
737 # Assumes entries[n + 1].timestamp_end >= entries[n].timestamp_begin
738 def sort_key(entry):
739 if type(entry) is _LttngDataStreamBeaconEntry:
740 return entry.timestamp
741 else:
742 return entry.timestamp_end
743
744 self._entries += beacons
745 self._entries.sort(key=sort_key)
746
584af91e
PP
747 def __getitem__(self, index):
748 return self._entries[index]
749
750 def __len__(self):
751 return len(self._entries)
752
753 @property
754 def path(self):
755 return self._path
756
757
758# An LTTng data stream.
759class _LttngDataStream:
71f56e5f 760 def __init__(self, path, beacons):
584af91e
PP
761 self._path = path
762 filename = os.path.basename(path)
f5567ea8 763 match = re.match(r"(.*)_\d+", filename)
b8b97725
SM
764 if not match:
765 raise RuntimeError(
766 "Unexpected data stream file name pattern: {}".format(filename)
767 )
768
584af91e
PP
769 self._channel_name = match.group(1)
770 trace_dir = os.path.dirname(path)
f5567ea8 771 index_path = os.path.join(trace_dir, "index", filename + ".idx")
71f56e5f 772 self._index = _LttngDataStreamIndex(index_path, beacons)
584af91e 773 assert os.path.isfile(path)
f5567ea8 774 self._file = open(path, "rb")
584af91e
PP
775 logging.info(
776 'Built data stream: path="{}", channel-name="{}"'.format(
777 path, self._channel_name
778 )
779 )
780
781 @property
782 def path(self):
783 return self._path
784
785 @property
786 def channel_name(self):
787 return self._channel_name
788
789 @property
790 def index(self):
791 return self._index
792
793 def get_data(self, offset_bytes, len_bytes):
794 self._file.seek(offset_bytes)
795 return self._file.read(len_bytes)
796
797
78169723
FD
798class _LttngMetadataStreamSection:
799 def __init__(self, timestamp, data):
800 self._timestamp = timestamp
801 if data is None:
802 self._data = bytes()
803 else:
804 self._data = data
805 logging.info(
f5567ea8 806 "Built metadata stream section: ts={}, data-len={}".format(
78169723
FD
807 self._timestamp, len(self._data)
808 )
809 )
810
811 @property
812 def timestamp(self):
813 return self._timestamp
814
815 @property
816 def data(self):
817 return self._data
818
819
584af91e
PP
820# An LTTng metadata stream.
821class _LttngMetadataStream:
78169723
FD
822 def __init__(self, metadata_file_path, config_sections):
823 self._path = metadata_file_path
824 self._sections = config_sections
825 logging.info(
f5567ea8 826 "Built metadata stream: path={}, section-len={}".format(
78169723
FD
827 self._path, len(self._sections)
828 )
829 )
584af91e
PP
830
831 @property
832 def path(self):
833 return self._path
834
835 @property
78169723
FD
836 def sections(self):
837 return self._sections
584af91e 838
78169723
FD
839
840LttngMetadataConfigSection = namedtuple(
f5567ea8 841 "LttngMetadataConfigSection", ["line", "timestamp", "is_empty"]
78169723
FD
842)
843
844
845def _parse_metadata_sections_config(config_sections):
846 assert config_sections is not None
847 config_metadata_sections = []
848 append_empty_section = False
849 last_timestamp = 0
850 last_line = 0
851
852 for config_section in config_sections:
f5567ea8 853 if config_section == "empty":
78169723
FD
854 # Found a empty section marker. Actually append the section at the
855 # timestamp of the next concrete section.
856 append_empty_section = True
857 else:
858 assert type(config_section) is dict
f5567ea8
FD
859 line = config_section.get("line")
860 ts = config_section.get("timestamp")
78169723 861
b8b97725
SM
862 if type(line) is not int:
863 raise RuntimeError("`line` is not an integer")
864
865 if type(ts) is not int:
866 raise RuntimeError("`timestamp` is not an integer")
867
78169723
FD
868 # Sections' timestamps and lines must both be increasing.
869 assert ts > last_timestamp
870 last_timestamp = ts
871 assert line > last_line
872 last_line = line
873
874 if append_empty_section:
875 config_metadata_sections.append(
876 LttngMetadataConfigSection(line, ts, True)
877 )
878 append_empty_section = False
879
880 config_metadata_sections.append(LttngMetadataConfigSection(line, ts, False))
881
882 return config_metadata_sections
883
884
885def _split_metadata_sections(metadata_file_path, raw_config_sections):
886 assert isinstance(raw_config_sections, collections.abc.Sequence)
887
888 parsed_sections = _parse_metadata_sections_config(raw_config_sections)
889
890 sections = []
f5567ea8 891 with open(metadata_file_path, "r") as metadata_file:
78169723
FD
892 metadata_lines = [line for line in metadata_file]
893
894 config_metadata_sections_idx = 0
895 curr_metadata_section = bytearray()
896
897 for idx, line_content in enumerate(metadata_lines):
898 # Add one to the index to convert from the zero-indexing of the
899 # enumerate() function to the one-indexing used by humans when
900 # viewing a text file.
901 curr_line_number = idx + 1
902
903 # If there are no more sections, simply append the line.
904 if config_metadata_sections_idx + 1 >= len(parsed_sections):
f5567ea8 905 curr_metadata_section += bytearray(line_content, "utf8")
78169723
FD
906 continue
907
908 next_section_line_number = parsed_sections[
909 config_metadata_sections_idx + 1
910 ].line
911
912 # If the next section begins at the current line, create a
913 # section with the metadata we gathered so far.
914 if curr_line_number >= next_section_line_number:
78169723
FD
915 # Flushing the metadata of the current section.
916 sections.append(
917 _LttngMetadataStreamSection(
918 parsed_sections[config_metadata_sections_idx].timestamp,
919 bytes(curr_metadata_section),
920 )
921 )
922
923 # Move to the next section.
924 config_metadata_sections_idx += 1
925
926 # Clear old content and append current line for the next section.
927 curr_metadata_section.clear()
f5567ea8 928 curr_metadata_section += bytearray(line_content, "utf8")
78169723
FD
929
930 # Append any empty sections.
931 while parsed_sections[config_metadata_sections_idx].is_empty:
932 sections.append(
933 _LttngMetadataStreamSection(
934 parsed_sections[config_metadata_sections_idx].timestamp, None
935 )
936 )
937 config_metadata_sections_idx += 1
938 else:
939 # Append line_content to the current metadata section.
f5567ea8 940 curr_metadata_section += bytearray(line_content, "utf8")
78169723
FD
941
942 # We iterated over all the lines of the metadata file. Close the current section.
943 sections.append(
944 _LttngMetadataStreamSection(
945 parsed_sections[config_metadata_sections_idx].timestamp,
946 bytes(curr_metadata_section),
947 )
948 )
949
950 return sections
584af91e
PP
951
952
953# An LTTng trace, a sequence of LTTng data streams.
954class LttngTrace(collections.abc.Sequence):
78169723 955 def __init__(self, trace_dir, metadata_sections, beacons):
584af91e
PP
956 assert os.path.isdir(trace_dir)
957 self._path = trace_dir
78169723 958 self._create_metadata_stream(trace_dir, metadata_sections)
71f56e5f 959 self._create_data_streams(trace_dir, beacons)
584af91e
PP
960 logging.info('Built trace: path="{}"'.format(trace_dir))
961
71f56e5f 962 def _create_data_streams(self, trace_dir, beacons):
584af91e
PP
963 data_stream_paths = []
964
965 for filename in os.listdir(trace_dir):
966 path = os.path.join(trace_dir, filename)
967
968 if not os.path.isfile(path):
969 continue
970
f5567ea8 971 if filename.startswith("."):
584af91e
PP
972 continue
973
f5567ea8 974 if filename == "metadata":
584af91e
PP
975 continue
976
977 data_stream_paths.append(path)
978
979 data_stream_paths.sort()
980 self._data_streams = []
981
982 for data_stream_path in data_stream_paths:
71f56e5f
JG
983 stream_name = os.path.basename(data_stream_path)
984 this_stream_beacons = None
985
986 if beacons is not None and stream_name in beacons:
987 this_stream_beacons = beacons[stream_name]
988
989 self._data_streams.append(
990 _LttngDataStream(data_stream_path, this_stream_beacons)
991 )
584af91e 992
78169723 993 def _create_metadata_stream(self, trace_dir, config_metadata_sections):
f5567ea8 994 metadata_path = os.path.join(trace_dir, "metadata")
78169723
FD
995 metadata_sections = []
996
997 if config_metadata_sections is None:
f5567ea8 998 with open(metadata_path, "rb") as metadata_file:
78169723
FD
999 metadata_sections.append(
1000 _LttngMetadataStreamSection(0, metadata_file.read())
1001 )
1002 else:
1003 metadata_sections = _split_metadata_sections(
1004 metadata_path, config_metadata_sections
1005 )
1006
1007 self._metadata_stream = _LttngMetadataStream(metadata_path, metadata_sections)
1008
584af91e
PP
1009 @property
1010 def path(self):
1011 return self._path
1012
1013 @property
1014 def metadata_stream(self):
1015 return self._metadata_stream
1016
1017 def __getitem__(self, index):
1018 return self._data_streams[index]
1019
1020 def __len__(self):
1021 return len(self._data_streams)
1022
1023
1024# The state of a single data stream.
1025class _LttngLiveViewerSessionDataStreamState:
78169723 1026 def __init__(self, ts_state, info, data_stream, metadata_stream_id):
584af91e
PP
1027 self._ts_state = ts_state
1028 self._info = info
1029 self._data_stream = data_stream
78169723 1030 self._metadata_stream_id = metadata_stream_id
584af91e
PP
1031 self._cur_index_entry_index = 0
1032 fmt = 'Built data stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
1033 logging.info(
1034 fmt.format(
1035 info.id,
1036 ts_state.tracing_session_descriptor.info.tracing_session_id,
1037 ts_state.tracing_session_descriptor.info.name,
1038 data_stream.path,
1039 )
1040 )
1041
1042 @property
1043 def tracing_session_state(self):
1044 return self._ts_state
1045
1046 @property
1047 def info(self):
1048 return self._info
1049
1050 @property
1051 def data_stream(self):
1052 return self._data_stream
1053
1054 @property
1055 def cur_index_entry(self):
1056 if self._cur_index_entry_index == len(self._data_stream.index):
1057 return
1058
1059 return self._data_stream.index[self._cur_index_entry_index]
1060
1061 def goto_next_index_entry(self):
1062 self._cur_index_entry_index += 1
1063
1064
1065# The state of a single metadata stream.
1066class _LttngLiveViewerSessionMetadataStreamState:
1067 def __init__(self, ts_state, info, metadata_stream):
1068 self._ts_state = ts_state
1069 self._info = info
1070 self._metadata_stream = metadata_stream
78169723
FD
1071 self._cur_metadata_stream_section_index = 0
1072 if len(metadata_stream.sections) > 1:
1073 self._next_metadata_stream_section_timestamp = metadata_stream.sections[
1074 1
1075 ].timestamp
1076 else:
1077 self._next_metadata_stream_section_timestamp = None
1078
584af91e
PP
1079 self._is_sent = False
1080 fmt = 'Built metadata stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
1081 logging.info(
1082 fmt.format(
1083 info.id,
1084 ts_state.tracing_session_descriptor.info.tracing_session_id,
1085 ts_state.tracing_session_descriptor.info.name,
1086 metadata_stream.path,
1087 )
1088 )
1089
584af91e
PP
1090 @property
1091 def info(self):
1092 return self._info
1093
1094 @property
1095 def metadata_stream(self):
1096 return self._metadata_stream
1097
1098 @property
1099 def is_sent(self):
1100 return self._is_sent
1101
1102 @is_sent.setter
1103 def is_sent(self, value):
1104 self._is_sent = value
1105
78169723
FD
1106 @property
1107 def cur_section(self):
1108 fmt = "Get current metadata section: section-idx={}"
1109 logging.info(fmt.format(self._cur_metadata_stream_section_index))
1110 if self._cur_metadata_stream_section_index == len(
1111 self._metadata_stream.sections
1112 ):
1113 return
1114
1115 return self._metadata_stream.sections[self._cur_metadata_stream_section_index]
1116
1117 def goto_next_section(self):
1118 self._cur_metadata_stream_section_index += 1
1119 if self.cur_section:
1120 self._next_metadata_stream_section_timestamp = self.cur_section.timestamp
1121 else:
1122 self._next_metadata_stream_section_timestamp = None
1123
1124 @property
1125 def next_section_timestamp(self):
1126 return self._next_metadata_stream_section_timestamp
1127
584af91e 1128
ee1171e5
SM
1129# A tracing session descriptor.
1130#
1131# In the constructor, `traces` is a list of LTTng traces (`LttngTrace`
1132# objects).
1133class LttngTracingSessionDescriptor:
1134 def __init__(
1135 self, name, tracing_session_id, hostname, live_timer_freq, client_count, traces
1136 ):
1137 for trace in traces:
1138 if name not in trace.path:
1139 fmt = "Tracing session name must be part of every trace path (`{}` not found in `{}`)"
1140 raise ValueError(fmt.format(name, trace.path))
1141
1142 self._traces = traces
1143 stream_count = sum([len(t) + 1 for t in traces])
1144 self._info = _LttngLiveViewerTracingSessionInfo(
1145 tracing_session_id,
1146 live_timer_freq,
1147 client_count,
1148 stream_count,
1149 hostname,
1150 name,
1151 )
1152
1153 @property
1154 def traces(self):
1155 return self._traces
1156
1157 @property
1158 def info(self):
1159 return self._info
1160
1161
584af91e
PP
1162# The state of a tracing session.
1163class _LttngLiveViewerSessionTracingSessionState:
1164 def __init__(self, tc_descr, base_stream_id):
1165 self._tc_descr = tc_descr
1166 self._stream_infos = []
1167 self._ds_states = {}
1168 self._ms_states = {}
1169 stream_id = base_stream_id
1170
1171 for trace in tc_descr.traces:
1172 trace_id = stream_id * 1000
1173
78169723
FD
1174 # Metadata stream -> stream info and metadata stream state
1175 info = _LttngLiveViewerStreamInfo(
f5567ea8 1176 stream_id, trace_id, True, trace.metadata_stream.path, "metadata"
78169723
FD
1177 )
1178 self._stream_infos.append(info)
1179 self._ms_states[stream_id] = _LttngLiveViewerSessionMetadataStreamState(
1180 self, info, trace.metadata_stream
1181 )
1182 metadata_stream_id = stream_id
1183 stream_id += 1
1184
584af91e
PP
1185 # Data streams -> stream infos and data stream states
1186 for data_stream in trace:
1187 info = _LttngLiveViewerStreamInfo(
1188 stream_id,
1189 trace_id,
1190 False,
1191 data_stream.path,
1192 data_stream.channel_name,
1193 )
1194 self._stream_infos.append(info)
1195 self._ds_states[stream_id] = _LttngLiveViewerSessionDataStreamState(
78169723 1196 self, info, data_stream, metadata_stream_id
584af91e
PP
1197 )
1198 stream_id += 1
1199
584af91e
PP
1200 self._is_attached = False
1201 fmt = 'Built tracing session state: id={}, name="{}"'
1202 logging.info(fmt.format(tc_descr.info.tracing_session_id, tc_descr.info.name))
1203
1204 @property
1205 def tracing_session_descriptor(self):
1206 return self._tc_descr
1207
1208 @property
1209 def data_stream_states(self):
1210 return self._ds_states
1211
1212 @property
1213 def metadata_stream_states(self):
1214 return self._ms_states
1215
1216 @property
1217 def stream_infos(self):
1218 return self._stream_infos
1219
1220 @property
1221 def has_new_metadata(self):
1222 return any([not ms.is_sent for ms in self._ms_states.values()])
1223
1224 @property
1225 def is_attached(self):
1226 return self._is_attached
1227
1228 @is_attached.setter
1229 def is_attached(self, value):
1230 self._is_attached = value
1231
1232
78169723
FD
1233def needs_new_metadata_section(metadata_stream_state, latest_timestamp):
1234 if metadata_stream_state.next_section_timestamp is None:
1235 return False
1236
1237 if latest_timestamp >= metadata_stream_state.next_section_timestamp:
1238 return True
1239 else:
1240 return False
1241
1242
584af91e
PP
1243# An LTTng live viewer session manages a view on tracing sessions
1244# and replies to commands accordingly.
1245class _LttngLiveViewerSession:
1246 def __init__(
1247 self,
1248 viewer_session_id,
1249 tracing_session_descriptors,
1250 max_query_data_response_size,
1251 ):
1252 self._viewer_session_id = viewer_session_id
1253 self._ts_states = {}
1254 self._stream_states = {}
1255 self._max_query_data_response_size = max_query_data_response_size
1256 total_stream_infos = 0
1257
1258 for ts_descr in tracing_session_descriptors:
1259 ts_state = _LttngLiveViewerSessionTracingSessionState(
1260 ts_descr, total_stream_infos
1261 )
1262 ts_id = ts_state.tracing_session_descriptor.info.tracing_session_id
1263 self._ts_states[ts_id] = ts_state
1264 total_stream_infos += len(ts_state.stream_infos)
1265
1266 # Update session's stream states to have the new states
1267 self._stream_states.update(ts_state.data_stream_states)
1268 self._stream_states.update(ts_state.metadata_stream_states)
1269
1270 self._command_handlers = {
1271 _LttngLiveViewerAttachToTracingSessionCommand: self._handle_attach_to_tracing_session_command,
1272 _LttngLiveViewerCreateViewerSessionCommand: self._handle_create_viewer_session_command,
1273 _LttngLiveViewerDetachFromTracingSessionCommand: self._handle_detach_from_tracing_session_command,
1274 _LttngLiveViewerGetDataStreamPacketDataCommand: self._handle_get_data_stream_packet_data_command,
1275 _LttngLiveViewerGetMetadataStreamDataCommand: self._handle_get_metadata_stream_data_command,
1276 _LttngLiveViewerGetNewStreamInfosCommand: self._handle_get_new_stream_infos_command,
1277 _LttngLiveViewerGetNextDataStreamIndexEntryCommand: self._handle_get_next_data_stream_index_entry_command,
1278 _LttngLiveViewerGetTracingSessionInfosCommand: self._handle_get_tracing_session_infos_command,
1279 }
1280
1281 @property
1282 def viewer_session_id(self):
1283 return self._viewer_session_id
1284
1285 def _get_tracing_session_state(self, tracing_session_id):
1286 if tracing_session_id not in self._ts_states:
1287 raise UnexpectedInput(
f5567ea8 1288 "Unknown tracing session ID {}".format(tracing_session_id)
584af91e
PP
1289 )
1290
1291 return self._ts_states[tracing_session_id]
1292
1293 def _get_stream_state(self, stream_id):
1294 if stream_id not in self._stream_states:
f5567ea8 1295 UnexpectedInput("Unknown stream ID {}".format(stream_id))
584af91e
PP
1296
1297 return self._stream_states[stream_id]
1298
1299 def handle_command(self, cmd):
1300 logging.info(
f5567ea8 1301 "Handling command in viewer session: cmd-cls-name={}".format(
584af91e
PP
1302 cmd.__class__.__name__
1303 )
1304 )
1305 cmd_type = type(cmd)
1306
1307 if cmd_type not in self._command_handlers:
1308 raise UnexpectedInput(
f5567ea8 1309 "Unexpected command: cmd-cls-name={}".format(cmd.__class__.__name__)
584af91e
PP
1310 )
1311
1312 return self._command_handlers[cmd_type](cmd)
1313
1314 def _handle_attach_to_tracing_session_command(self, cmd):
1315 fmt = 'Handling "attach to tracing session" command: ts-id={}, offset={}, seek-type={}'
1316 logging.info(fmt.format(cmd.tracing_session_id, cmd.offset, cmd.seek_type))
1317 ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
1318 info = ts_state.tracing_session_descriptor.info
1319
1320 if ts_state.is_attached:
1321 raise UnexpectedInput(
f5567ea8 1322 "Cannot attach to tracing session `{}`: viewer is already attached".format(
584af91e
PP
1323 info.name
1324 )
1325 )
1326
1327 ts_state.is_attached = True
1328 status = _LttngLiveViewerAttachToTracingSessionReply.Status.OK
1329 return _LttngLiveViewerAttachToTracingSessionReply(
1330 status, ts_state.stream_infos
1331 )
1332
1333 def _handle_detach_from_tracing_session_command(self, cmd):
1334 fmt = 'Handling "detach from tracing session" command: ts-id={}'
1335 logging.info(fmt.format(cmd.tracing_session_id))
1336 ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
1337 info = ts_state.tracing_session_descriptor.info
1338
1339 if not ts_state.is_attached:
1340 raise UnexpectedInput(
f5567ea8 1341 "Cannot detach to tracing session `{}`: viewer is not attached".format(
584af91e
PP
1342 info.name
1343 )
1344 )
1345
1346 ts_state.is_attached = False
1347 status = _LttngLiveViewerDetachFromTracingSessionReply.Status.OK
1348 return _LttngLiveViewerDetachFromTracingSessionReply(status)
1349
1350 def _handle_get_next_data_stream_index_entry_command(self, cmd):
1351 fmt = 'Handling "get next data stream index entry" command: stream-id={}'
1352 logging.info(fmt.format(cmd.stream_id))
1353 stream_state = self._get_stream_state(cmd.stream_id)
78169723 1354 metadata_stream_state = self._get_stream_state(stream_state._metadata_stream_id)
584af91e
PP
1355
1356 if type(stream_state) is not _LttngLiveViewerSessionDataStreamState:
1357 raise UnexpectedInput(
f5567ea8 1358 "Stream with ID {} is not a data stream".format(cmd.stream_id)
584af91e
PP
1359 )
1360
1361 if stream_state.cur_index_entry is None:
1362 # The viewer is done reading this stream
1363 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.HUP
1364
1365 # Dummy data stream index entry to use with the `HUP` status
1366 # (the reply needs one, but the viewer ignores it)
1367 index_entry = _LttngDataStreamIndexEntry(0, 0, 0, 0, 0, 0, 0)
1368
1369 return _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1370 status, index_entry, False, False
1371 )
1372
78169723
FD
1373 timestamp_begin = _get_entry_timestamp_begin(stream_state.cur_index_entry)
1374
1375 if needs_new_metadata_section(metadata_stream_state, timestamp_begin):
1376 metadata_stream_state.is_sent = False
1377 metadata_stream_state.goto_next_section()
1378
584af91e
PP
1379 # The viewer only checks the `has_new_metadata` flag if the
1380 # reply's status is `OK`, so we need to provide an index here
1381 has_new_metadata = stream_state.tracing_session_state.has_new_metadata
71f56e5f
JG
1382 if type(stream_state.cur_index_entry) is _LttngDataStreamIndexEntry:
1383 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.OK
1384 else:
1385 assert type(stream_state.cur_index_entry) is _LttngDataStreamBeaconEntry
1386 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.INACTIVE
1387
584af91e
PP
1388 reply = _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1389 status, stream_state.cur_index_entry, has_new_metadata, False
1390 )
1391 stream_state.goto_next_index_entry()
1392 return reply
1393
1394 def _handle_get_data_stream_packet_data_command(self, cmd):
1395 fmt = 'Handling "get data stream packet data" command: stream-id={}, offset={}, req-length={}'
1396 logging.info(fmt.format(cmd.stream_id, cmd.offset, cmd.req_length))
1397 stream_state = self._get_stream_state(cmd.stream_id)
1398 data_response_length = cmd.req_length
1399
1400 if type(stream_state) is not _LttngLiveViewerSessionDataStreamState:
1401 raise UnexpectedInput(
f5567ea8 1402 "Stream with ID {} is not a data stream".format(cmd.stream_id)
584af91e
PP
1403 )
1404
1405 if stream_state.tracing_session_state.has_new_metadata:
1406 status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.ERROR
1407 return _LttngLiveViewerGetDataStreamPacketDataReply(
1408 status, bytes(), True, False
1409 )
1410
1411 if self._max_query_data_response_size:
1412 # Enforce a server side limit on the query requested length.
1413 # To ensure that the transaction terminate take the minimum of both
1414 # value.
1415 data_response_length = min(
1416 cmd.req_length, self._max_query_data_response_size
1417 )
1418 fmt = 'Limiting "get data stream packet data" command: req-length={} actual response size={}'
1419 logging.info(fmt.format(cmd.req_length, data_response_length))
1420
1421 data = stream_state.data_stream.get_data(cmd.offset, data_response_length)
1422 status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.OK
1423 return _LttngLiveViewerGetDataStreamPacketDataReply(status, data, False, False)
1424
1425 def _handle_get_metadata_stream_data_command(self, cmd):
1426 fmt = 'Handling "get metadata stream data" command: stream-id={}'
1427 logging.info(fmt.format(cmd.stream_id))
78169723 1428 metadata_stream_state = self._get_stream_state(cmd.stream_id)
584af91e 1429
78169723
FD
1430 if (
1431 type(metadata_stream_state)
1432 is not _LttngLiveViewerSessionMetadataStreamState
1433 ):
584af91e 1434 raise UnexpectedInput(
f5567ea8 1435 "Stream with ID {} is not a metadata stream".format(cmd.stream_id)
584af91e
PP
1436 )
1437
78169723 1438 if metadata_stream_state.is_sent:
584af91e
PP
1439 status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.NO_NEW
1440 return _LttngLiveViewerGetMetadataStreamDataContentReply(status, bytes())
1441
78169723 1442 metadata_stream_state.is_sent = True
584af91e 1443 status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.OK
78169723 1444 metadata_section = metadata_stream_state.cur_section
b8b97725 1445 assert metadata_section is not None
78169723
FD
1446
1447 # If we are sending an empty section, ready the next one right away.
1448 if len(metadata_section.data) == 0:
1449 metadata_stream_state.is_sent = False
1450 metadata_stream_state.goto_next_section()
1451
1452 fmt = 'Replying to "get metadata stream data" command: metadata-size={}'
1453 logging.info(fmt.format(len(metadata_section.data)))
584af91e 1454 return _LttngLiveViewerGetMetadataStreamDataContentReply(
78169723 1455 status, metadata_section.data
584af91e
PP
1456 )
1457
1458 def _handle_get_new_stream_infos_command(self, cmd):
1459 fmt = 'Handling "get new stream infos" command: ts-id={}'
1460 logging.info(fmt.format(cmd.tracing_session_id))
1461
1462 # As of this version, all the tracing session's stream infos are
1463 # always given to the viewer when sending the "attach to tracing
1464 # session" reply, so there's nothing new here. Return the `HUP`
1465 # status as, if we're handling this command, the viewer consumed
1466 # all the existing data streams.
1467 status = _LttngLiveViewerGetNewStreamInfosReply.Status.HUP
1468 return _LttngLiveViewerGetNewStreamInfosReply(status, [])
1469
1470 def _handle_get_tracing_session_infos_command(self, cmd):
1471 logging.info('Handling "get tracing session infos" command.')
1472 infos = [
1473 tss.tracing_session_descriptor.info for tss in self._ts_states.values()
1474 ]
1475 infos.sort(key=lambda info: info.name)
1476 return _LttngLiveViewerGetTracingSessionInfosReply(infos)
1477
1478 def _handle_create_viewer_session_command(self, cmd):
1479 logging.info('Handling "create viewer session" command.')
1480 status = _LttngLiveViewerCreateViewerSessionReply.Status.OK
1481
1482 # This does nothing here. In the LTTng relay daemon, it
1483 # allocates the viewer session's state.
1484 return _LttngLiveViewerCreateViewerSessionReply(status)
1485
1486
1487# An LTTng live TCP server.
1488#
e51141d3
SM
1489# On creation, it binds to `localhost` on the TCP port `port` if not `None`, or
1490# on an OS-assigned TCP port otherwise. It writes the decimal TCP port number
1491# to a temporary port file. It renames the temporary port file to
1492# `port_filename`.
584af91e
PP
1493#
1494# `tracing_session_descriptors` is a list of tracing session descriptors
1495# (`LttngTracingSessionDescriptor`) to serve.
1496#
1497# This server accepts a single viewer (client).
1498#
1499# When the viewer closes the connection, the server's constructor
1500# returns.
1501class LttngLiveServer:
1502 def __init__(
e51141d3
SM
1503 self,
1504 port,
1505 port_filename,
1506 tracing_session_descriptors,
1507 max_query_data_response_size,
584af91e 1508 ):
f5567ea8 1509 logging.info("Server configuration:")
584af91e 1510
f5567ea8 1511 logging.info(" Port file name: `{}`".format(port_filename))
584af91e
PP
1512
1513 if max_query_data_response_size is not None:
1514 logging.info(
f5567ea8 1515 " Maximum response data query size: `{}`".format(
584af91e
PP
1516 max_query_data_response_size
1517 )
1518 )
1519
1520 for ts_descr in tracing_session_descriptors:
1521 info = ts_descr.info
1522 fmt = ' TS descriptor: name="{}", id={}, hostname="{}", live-timer-freq={}, client-count={}, stream-count={}:'
1523 logging.info(
1524 fmt.format(
1525 info.name,
1526 info.tracing_session_id,
1527 info.hostname,
1528 info.live_timer_freq,
1529 info.client_count,
1530 info.stream_count,
1531 )
1532 )
1533
1534 for trace in ts_descr.traces:
1535 logging.info(' Trace: path="{}"'.format(trace.path))
1536
1537 self._ts_descriptors = tracing_session_descriptors
1538 self._max_query_data_response_size = max_query_data_response_size
1539 self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1540 self._codec = _LttngLiveViewerProtocolCodec()
1541
1542 # Port 0: OS assigns an unused port
e51141d3 1543 serv_addr = ("localhost", port if port is not None else 0)
584af91e
PP
1544 self._sock.bind(serv_addr)
1545 self._write_port_to_file(port_filename)
1546
1547 try:
1548 self._listen()
1549 finally:
1550 self._sock.close()
f5567ea8 1551 logging.info("Closed connection and socket.")
584af91e
PP
1552
1553 @property
1554 def _server_port(self):
1555 return self._sock.getsockname()[1]
1556
1557 def _recv_command(self):
1558 data = bytes()
1559
1560 while True:
f5567ea8 1561 logging.info("Waiting for viewer command.")
584af91e
PP
1562 buf = self._conn.recv(128)
1563
1564 if not buf:
f5567ea8 1565 logging.info("Client closed connection.")
584af91e
PP
1566
1567 if data:
1568 raise UnexpectedInput(
f5567ea8 1569 "Client closed connection after having sent {} command bytes.".format(
584af91e
PP
1570 len(data)
1571 )
1572 )
1573
1574 return
1575
f5567ea8 1576 logging.info("Received data from viewer: length={}".format(len(buf)))
584af91e
PP
1577
1578 data += buf
1579
1580 try:
1581 cmd = self._codec.decode(data)
1582 except struct.error as exc:
f5567ea8 1583 raise UnexpectedInput("Malformed command: {}".format(exc)) from exc
584af91e
PP
1584
1585 if cmd is not None:
1586 logging.info(
f5567ea8 1587 "Received command from viewer: cmd-cls-name={}".format(
584af91e
PP
1588 cmd.__class__.__name__
1589 )
1590 )
1591 return cmd
1592
1593 def _send_reply(self, reply):
1594 data = self._codec.encode(reply)
1595 logging.info(
f5567ea8 1596 "Sending reply to viewer: reply-cls-name={}, length={}".format(
584af91e
PP
1597 reply.__class__.__name__, len(data)
1598 )
1599 )
1600 self._conn.sendall(data)
1601
1602 def _handle_connection(self):
1603 # First command must be "connect"
1604 cmd = self._recv_command()
1605
1606 if type(cmd) is not _LttngLiveViewerConnectCommand:
1607 raise UnexpectedInput(
1608 'First command is not "connect": cmd-cls-name={}'.format(
1609 cmd.__class__.__name__
1610 )
1611 )
1612
1613 # Create viewer session (arbitrary ID 23)
1614 logging.info(
f5567ea8 1615 "LTTng live viewer connected: version={}.{}".format(cmd.major, cmd.minor)
584af91e
PP
1616 )
1617 viewer_session = _LttngLiveViewerSession(
1618 23, self._ts_descriptors, self._max_query_data_response_size
1619 )
1620
1621 # Send "connect" reply
1622 self._send_reply(
1623 _LttngLiveViewerConnectReply(viewer_session.viewer_session_id, 2, 10)
1624 )
1625
1626 # Make the viewer session handle the remaining commands
1627 while True:
1628 cmd = self._recv_command()
1629
1630 if cmd is None:
1631 # Connection closed (at an expected location within the
1632 # conversation)
1633 return
1634
1635 self._send_reply(viewer_session.handle_command(cmd))
1636
1637 def _listen(self):
f5567ea8 1638 logging.info("Listening: port={}".format(self._server_port))
1726ac08
JR
1639 # Backlog must be present for Python version < 3.5.
1640 # 128 is an arbitrary number since we expect only 1 connection anyway.
1641 self._sock.listen(128)
584af91e
PP
1642 self._conn, viewer_addr = self._sock.accept()
1643 logging.info(
f5567ea8 1644 "Accepted viewer: addr={}:{}".format(viewer_addr[0], viewer_addr[1])
584af91e
PP
1645 )
1646
1647 try:
1648 self._handle_connection()
1649 finally:
1650 self._conn.close()
1651
1652 def _write_port_to_file(self, port_filename):
1653 # Write the port number to a temporary file.
f5567ea8
FD
1654 with tempfile.NamedTemporaryFile(mode="w", delete=False) as tmp_port_file:
1655 print(self._server_port, end="", file=tmp_port_file)
584af91e
PP
1656
1657 # Rename temporary file to real file
9c878ece 1658 os.replace(tmp_port_file.name, port_filename)
584af91e
PP
1659 logging.info(
1660 'Renamed port file: src-path="{}", dst-path="{}"'.format(
1661 tmp_port_file.name, port_filename
1662 )
1663 )
1664
1665
2b763e29
JG
1666def _session_descriptors_from_path(sessions_filename, trace_path_prefix):
1667 # File format is:
1668 #
1669 # [
1670 # {
1671 # "name": "my-session",
1672 # "id": 17,
1673 # "hostname": "myhost",
1674 # "live-timer-freq": 1000000,
1675 # "client-count": 23,
1676 # "traces": [
1677 # {
1678 # "path": "lol"
1679 # },
1680 # {
71f56e5f
JG
1681 # "path": "meow/mix",
1682 # "beacons": {
1683 # "my_stream": [ 5235787, 728375283 ]
78169723
FD
1684 # },
1685 # "metadata-sections": [
1686 # {
1687 # "line": 1,
1688 # "timestamp": 0
1689 # }
1690 # ]
2b763e29
JG
1691 # }
1692 # ]
1693 # }
1694 # ]
f5567ea8 1695 with open(sessions_filename, "r") as sessions_file:
2b763e29
JG
1696 params = json.load(sessions_file)
1697
1698 sessions = []
1699
1700 for session in params:
f5567ea8
FD
1701 name = session["name"]
1702 tracing_session_id = session["id"]
1703 hostname = session["hostname"]
1704 live_timer_freq = session["live-timer-freq"]
1705 client_count = session["client-count"]
2b763e29
JG
1706 traces = []
1707
f5567ea8
FD
1708 for trace in session["traces"]:
1709 metadata_sections = trace.get("metadata-sections")
1710 beacons = trace.get("beacons")
1711 path = trace["path"]
2b763e29
JG
1712
1713 if not os.path.isabs(path):
1714 path = os.path.join(trace_path_prefix, path)
1715
78169723 1716 traces.append(LttngTrace(path, metadata_sections, beacons))
2b763e29
JG
1717
1718 sessions.append(
1719 LttngTracingSessionDescriptor(
1720 name,
1721 tracing_session_id,
1722 hostname,
1723 live_timer_freq,
1724 client_count,
1725 traces,
1726 )
1727 )
1728
1729 return sessions
584af91e
PP
1730
1731
1732def _loglevel_parser(string):
f5567ea8 1733 loglevels = {"info": logging.INFO, "warning": logging.WARNING}
584af91e
PP
1734 if string not in loglevels:
1735 msg = "{} is not a valid loglevel".format(string)
1736 raise argparse.ArgumentTypeError(msg)
1737 return loglevels[string]
1738
1739
f5567ea8
FD
1740if __name__ == "__main__":
1741 logging.basicConfig(format="# %(asctime)-25s%(message)s")
584af91e 1742 parser = argparse.ArgumentParser(
f5567ea8 1743 description="LTTng-live protocol mocker", add_help=False
584af91e
PP
1744 )
1745 parser.add_argument(
f5567ea8
FD
1746 "--log-level",
1747 default="warning",
1748 choices=["info", "warning"],
1749 help="The loglevel to be used.",
584af91e
PP
1750 )
1751
1752 loglevel_namespace, remaining_args = parser.parse_known_args()
1753 logging.getLogger().setLevel(_loglevel_parser(loglevel_namespace.log_level))
1754
e51141d3
SM
1755 parser.add_argument(
1756 "--port",
1757 help="The port to bind to. If missing, use an OS-assigned port..",
1758 type=int,
1759 )
584af91e 1760 parser.add_argument(
f5567ea8
FD
1761 "--port-filename",
1762 help="The final port file. This file is present when the server is ready to receive connection.",
584af91e
PP
1763 required=True,
1764 )
1765 parser.add_argument(
f5567ea8 1766 "--max-query-data-response-size",
584af91e 1767 type=int,
f5567ea8 1768 help="The maximum size of control data response in bytes",
584af91e
PP
1769 )
1770 parser.add_argument(
f5567ea8 1771 "--trace-path-prefix",
2b763e29 1772 type=str,
f5567ea8 1773 help="Prefix to prepend to the trace paths of session configurations",
2b763e29
JG
1774 )
1775 parser.add_argument(
f5567ea8 1776 "--sessions-filename",
776a2a25 1777 type=str,
f5567ea8 1778 help="Path to a session configuration file",
584af91e
PP
1779 )
1780 parser.add_argument(
f5567ea8
FD
1781 "-h",
1782 "--help",
1783 action="help",
584af91e 1784 default=argparse.SUPPRESS,
f5567ea8 1785 help="Show this help message and exit.",
584af91e
PP
1786 )
1787
1788 args = parser.parse_args(args=remaining_args)
1789 try:
2b763e29 1790 sessions = _session_descriptors_from_path(
78169723
FD
1791 args.sessions_filename,
1792 args.trace_path_prefix,
584af91e 1793 )
e51141d3
SM
1794 LttngLiveServer(
1795 args.port, args.port_filename, sessions, args.max_query_data_response_size
1796 )
584af91e
PP
1797 except UnexpectedInput as exc:
1798 logging.error(str(exc))
1799 print(exc, file=sys.stderr)
1800 sys.exit(1)
This page took 0.122027 seconds and 4 git commands to generate.