tests: move classes around in lttng_live_server.py
[babeltrace.git] / tests / data / plugins / src.ctf.lttng-live / lttng_live_server.py
CommitLineData
0235b0db 1# SPDX-License-Identifier: MIT
584af91e 2#
0235b0db 3# Copyright (C) 2019 Philippe Proulx <pproulx@efficios.com>
584af91e 4#
584af91e 5
584af91e 6import os
584af91e 7import re
5995b304
SM
8import sys
9import json
584af91e
PP
10import socket
11import struct
5995b304
SM
12import logging
13import os.path
14import argparse
584af91e 15import tempfile
5995b304 16import collections.abc
78169723 17from collections import namedtuple
584af91e
PP
18
19
20class UnexpectedInput(RuntimeError):
21 pass
22
23
ee1171e5
SM
24# An entry within the index of an LTTng data stream.
25class _LttngDataStreamIndexEntry:
26 def __init__(
27 self,
28 offset_bytes,
29 total_size_bits,
30 content_size_bits,
31 timestamp_begin,
32 timestamp_end,
33 events_discarded,
34 stream_class_id,
35 ):
36 self._offset_bytes = offset_bytes
37 self._total_size_bits = total_size_bits
38 self._content_size_bits = content_size_bits
39 self._timestamp_begin = timestamp_begin
40 self._timestamp_end = timestamp_end
41 self._events_discarded = events_discarded
42 self._stream_class_id = stream_class_id
43
44 @property
45 def offset_bytes(self):
46 return self._offset_bytes
47
48 @property
49 def total_size_bits(self):
50 return self._total_size_bits
51
52 @property
53 def total_size_bytes(self):
54 return self._total_size_bits // 8
55
56 @property
57 def content_size_bits(self):
58 return self._content_size_bits
59
60 @property
61 def content_size_bytes(self):
62 return self._content_size_bits // 8
63
64 @property
65 def timestamp_begin(self):
66 return self._timestamp_begin
67
68 @property
69 def timestamp_end(self):
70 return self._timestamp_end
71
72 @property
73 def events_discarded(self):
74 return self._events_discarded
75
76 @property
77 def stream_class_id(self):
78 return self._stream_class_id
79
80
81# An entry within the index of an LTTng data stream. While a stream beacon entry
82# is conceptually unrelated to an index, it is sent as a reply to a
83# LttngLiveViewerGetNextDataStreamIndexEntryCommand
84class _LttngDataStreamBeaconEntry:
85 def __init__(self, stream_class_id, timestamp):
86 self._stream_class_id = stream_class_id
87 self._timestamp = timestamp
88
89 @property
90 def timestamp(self):
91 return self._timestamp
92
93 @property
94 def stream_class_id(self):
95 return self._stream_class_id
96
97
584af91e
PP
98class _LttngLiveViewerCommand:
99 def __init__(self, version):
100 self._version = version
101
102 @property
103 def version(self):
104 return self._version
105
106
107class _LttngLiveViewerConnectCommand(_LttngLiveViewerCommand):
108 def __init__(self, version, viewer_session_id, major, minor):
109 super().__init__(version)
110 self._viewer_session_id = viewer_session_id
111 self._major = major
112 self._minor = minor
113
114 @property
115 def viewer_session_id(self):
116 return self._viewer_session_id
117
118 @property
119 def major(self):
120 return self._major
121
122 @property
123 def minor(self):
124 return self._minor
125
126
127class _LttngLiveViewerConnectReply:
128 def __init__(self, viewer_session_id, major, minor):
129 self._viewer_session_id = viewer_session_id
130 self._major = major
131 self._minor = minor
132
133 @property
134 def viewer_session_id(self):
135 return self._viewer_session_id
136
137 @property
138 def major(self):
139 return self._major
140
141 @property
142 def minor(self):
143 return self._minor
144
145
146class _LttngLiveViewerGetTracingSessionInfosCommand(_LttngLiveViewerCommand):
147 pass
148
149
150class _LttngLiveViewerTracingSessionInfo:
151 def __init__(
152 self,
153 tracing_session_id,
154 live_timer_freq,
155 client_count,
156 stream_count,
157 hostname,
158 name,
159 ):
160 self._tracing_session_id = tracing_session_id
161 self._live_timer_freq = live_timer_freq
162 self._client_count = client_count
163 self._stream_count = stream_count
164 self._hostname = hostname
165 self._name = name
166
167 @property
168 def tracing_session_id(self):
169 return self._tracing_session_id
170
171 @property
172 def live_timer_freq(self):
173 return self._live_timer_freq
174
175 @property
176 def client_count(self):
177 return self._client_count
178
179 @property
180 def stream_count(self):
181 return self._stream_count
182
183 @property
184 def hostname(self):
185 return self._hostname
186
187 @property
188 def name(self):
189 return self._name
190
191
192class _LttngLiveViewerGetTracingSessionInfosReply:
193 def __init__(self, tracing_session_infos):
194 self._tracing_session_infos = tracing_session_infos
195
196 @property
197 def tracing_session_infos(self):
198 return self._tracing_session_infos
199
200
201class _LttngLiveViewerAttachToTracingSessionCommand(_LttngLiveViewerCommand):
202 class SeekType:
203 BEGINNING = 1
204 LAST = 2
205
206 def __init__(self, version, tracing_session_id, offset, seek_type):
207 super().__init__(version)
208 self._tracing_session_id = tracing_session_id
209 self._offset = offset
210 self._seek_type = seek_type
211
212 @property
213 def tracing_session_id(self):
214 return self._tracing_session_id
215
216 @property
217 def offset(self):
218 return self._offset
219
220 @property
221 def seek_type(self):
222 return self._seek_type
223
224
225class _LttngLiveViewerStreamInfo:
226 def __init__(self, id, trace_id, is_metadata, path, channel_name):
227 self._id = id
228 self._trace_id = trace_id
229 self._is_metadata = is_metadata
230 self._path = path
231 self._channel_name = channel_name
232
233 @property
234 def id(self):
235 return self._id
236
237 @property
238 def trace_id(self):
239 return self._trace_id
240
241 @property
242 def is_metadata(self):
243 return self._is_metadata
244
245 @property
246 def path(self):
247 return self._path
248
249 @property
250 def channel_name(self):
251 return self._channel_name
252
253
254class _LttngLiveViewerAttachToTracingSessionReply:
255 class Status:
256 OK = 1
257 ALREADY = 2
258 UNKNOWN = 3
259 NOT_LIVE = 4
260 SEEK_ERROR = 5
261 NO_SESSION = 6
262
263 def __init__(self, status, stream_infos):
264 self._status = status
265 self._stream_infos = stream_infos
266
267 @property
268 def status(self):
269 return self._status
270
271 @property
272 def stream_infos(self):
273 return self._stream_infos
274
275
276class _LttngLiveViewerGetNextDataStreamIndexEntryCommand(_LttngLiveViewerCommand):
277 def __init__(self, version, stream_id):
278 super().__init__(version)
279 self._stream_id = stream_id
280
281 @property
282 def stream_id(self):
283 return self._stream_id
284
285
286class _LttngLiveViewerGetNextDataStreamIndexEntryReply:
287 class Status:
288 OK = 1
289 RETRY = 2
290 HUP = 3
291 ERROR = 4
292 INACTIVE = 5
293 EOF = 6
294
295 def __init__(self, status, index_entry, has_new_metadata, has_new_data_stream):
296 self._status = status
297 self._index_entry = index_entry
298 self._has_new_metadata = has_new_metadata
299 self._has_new_data_stream = has_new_data_stream
300
301 @property
302 def status(self):
303 return self._status
304
305 @property
306 def index_entry(self):
307 return self._index_entry
308
309 @property
310 def has_new_metadata(self):
311 return self._has_new_metadata
312
313 @property
314 def has_new_data_stream(self):
315 return self._has_new_data_stream
316
317
318class _LttngLiveViewerGetDataStreamPacketDataCommand(_LttngLiveViewerCommand):
319 def __init__(self, version, stream_id, offset, req_length):
320 super().__init__(version)
321 self._stream_id = stream_id
322 self._offset = offset
323 self._req_length = req_length
324
325 @property
326 def stream_id(self):
327 return self._stream_id
328
329 @property
330 def offset(self):
331 return self._offset
332
333 @property
334 def req_length(self):
335 return self._req_length
336
337
338class _LttngLiveViewerGetDataStreamPacketDataReply:
339 class Status:
340 OK = 1
341 RETRY = 2
342 ERROR = 3
343 EOF = 4
344
345 def __init__(self, status, data, has_new_metadata, has_new_data_stream):
346 self._status = status
347 self._data = data
348 self._has_new_metadata = has_new_metadata
349 self._has_new_data_stream = has_new_data_stream
350
351 @property
352 def status(self):
353 return self._status
354
355 @property
356 def data(self):
357 return self._data
358
359 @property
360 def has_new_metadata(self):
361 return self._has_new_metadata
362
363 @property
364 def has_new_data_stream(self):
365 return self._has_new_data_stream
366
367
368class _LttngLiveViewerGetMetadataStreamDataCommand(_LttngLiveViewerCommand):
369 def __init__(self, version, stream_id):
370 super().__init__(version)
371 self._stream_id = stream_id
372
373 @property
374 def stream_id(self):
375 return self._stream_id
376
377
378class _LttngLiveViewerGetMetadataStreamDataContentReply:
379 class Status:
380 OK = 1
381 NO_NEW = 2
382 ERROR = 3
383
384 def __init__(self, status, data):
385 self._status = status
386 self._data = data
387
388 @property
389 def status(self):
390 return self._status
391
392 @property
393 def data(self):
394 return self._data
395
396
397class _LttngLiveViewerGetNewStreamInfosCommand(_LttngLiveViewerCommand):
398 def __init__(self, version, tracing_session_id):
399 super().__init__(version)
400 self._tracing_session_id = tracing_session_id
401
402 @property
403 def tracing_session_id(self):
404 return self._tracing_session_id
405
406
407class _LttngLiveViewerGetNewStreamInfosReply:
408 class Status:
409 OK = 1
410 NO_NEW = 2
411 ERROR = 3
412 HUP = 4
413
414 def __init__(self, status, stream_infos):
415 self._status = status
416 self._stream_infos = stream_infos
417
418 @property
419 def status(self):
420 return self._status
421
422 @property
423 def stream_infos(self):
424 return self._stream_infos
425
426
427class _LttngLiveViewerCreateViewerSessionCommand(_LttngLiveViewerCommand):
428 pass
429
430
431class _LttngLiveViewerCreateViewerSessionReply:
432 class Status:
433 OK = 1
434 ERROR = 2
435
436 def __init__(self, status):
437 self._status = status
438
439 @property
440 def status(self):
441 return self._status
442
443
444class _LttngLiveViewerDetachFromTracingSessionCommand(_LttngLiveViewerCommand):
445 def __init__(self, version, tracing_session_id):
446 super().__init__(version)
447 self._tracing_session_id = tracing_session_id
448
449 @property
450 def tracing_session_id(self):
451 return self._tracing_session_id
452
453
454class _LttngLiveViewerDetachFromTracingSessionReply:
455 class Status:
456 OK = 1
457 UNKNOWN = 2
458 ERROR = 3
459
460 def __init__(self, status):
461 self._status = status
462
463 @property
464 def status(self):
465 return self._status
466
467
468# An LTTng live protocol codec can convert bytes to command objects and
469# reply objects to bytes.
470class _LttngLiveViewerProtocolCodec:
f5567ea8 471 _COMMAND_HEADER_STRUCT_FMT = "QII"
584af91e
PP
472 _COMMAND_HEADER_SIZE_BYTES = struct.calcsize(_COMMAND_HEADER_STRUCT_FMT)
473
474 def __init__(self):
475 pass
476
477 def _unpack(self, fmt, data, offset=0):
f5567ea8 478 fmt = "!" + fmt
584af91e
PP
479 return struct.unpack_from(fmt, data, offset)
480
481 def _unpack_payload(self, fmt, data):
482 return self._unpack(
483 fmt, data, _LttngLiveViewerProtocolCodec._COMMAND_HEADER_SIZE_BYTES
484 )
485
486 def decode(self, data):
487 if len(data) < self._COMMAND_HEADER_SIZE_BYTES:
488 # Not enough data to read the command header
489 return
490
491 payload_size, cmd_type, version = self._unpack(
492 self._COMMAND_HEADER_STRUCT_FMT, data
493 )
494 logging.info(
f5567ea8 495 "Decoded command header: payload-size={}, cmd-type={}, version={}".format(
584af91e
PP
496 payload_size, cmd_type, version
497 )
498 )
499
500 if len(data) < self._COMMAND_HEADER_SIZE_BYTES + payload_size:
501 # Not enough data to read the whole command
502 return
503
504 if cmd_type == 1:
505 viewer_session_id, major, minor, conn_type = self._unpack_payload(
f5567ea8 506 "QIII", data
584af91e
PP
507 )
508 return _LttngLiveViewerConnectCommand(
509 version, viewer_session_id, major, minor
510 )
511 elif cmd_type == 2:
512 return _LttngLiveViewerGetTracingSessionInfosCommand(version)
513 elif cmd_type == 3:
f5567ea8 514 tracing_session_id, offset, seek_type = self._unpack_payload("QQI", data)
584af91e
PP
515 return _LttngLiveViewerAttachToTracingSessionCommand(
516 version, tracing_session_id, offset, seek_type
517 )
518 elif cmd_type == 4:
f5567ea8 519 (stream_id,) = self._unpack_payload("Q", data)
584af91e
PP
520 return _LttngLiveViewerGetNextDataStreamIndexEntryCommand(
521 version, stream_id
522 )
523 elif cmd_type == 5:
f5567ea8 524 stream_id, offset, req_length = self._unpack_payload("QQI", data)
584af91e
PP
525 return _LttngLiveViewerGetDataStreamPacketDataCommand(
526 version, stream_id, offset, req_length
527 )
528 elif cmd_type == 6:
f5567ea8 529 (stream_id,) = self._unpack_payload("Q", data)
584af91e
PP
530 return _LttngLiveViewerGetMetadataStreamDataCommand(version, stream_id)
531 elif cmd_type == 7:
f5567ea8 532 (tracing_session_id,) = self._unpack_payload("Q", data)
584af91e
PP
533 return _LttngLiveViewerGetNewStreamInfosCommand(version, tracing_session_id)
534 elif cmd_type == 8:
535 return _LttngLiveViewerCreateViewerSessionCommand(version)
536 elif cmd_type == 9:
f5567ea8 537 (tracing_session_id,) = self._unpack_payload("Q", data)
584af91e
PP
538 return _LttngLiveViewerDetachFromTracingSessionCommand(
539 version, tracing_session_id
540 )
541 else:
f5567ea8 542 raise UnexpectedInput("Unknown command type {}".format(cmd_type))
584af91e
PP
543
544 def _pack(self, fmt, *args):
545 # Force network byte order
f5567ea8 546 return struct.pack("!" + fmt, *args)
584af91e
PP
547
548 def _encode_zero_padded_str(self, string, length):
549 data = string.encode()
f5567ea8 550 return data.ljust(length, b"\x00")
584af91e
PP
551
552 def _encode_stream_info(self, info):
f5567ea8 553 data = self._pack("QQI", info.id, info.trace_id, int(info.is_metadata))
584af91e
PP
554 data += self._encode_zero_padded_str(info.path, 4096)
555 data += self._encode_zero_padded_str(info.channel_name, 255)
556 return data
557
558 def _get_has_new_stuff_flags(self, has_new_metadata, has_new_data_streams):
559 flags = 0
560
561 if has_new_metadata:
562 flags |= 1
563
564 if has_new_data_streams:
565 flags |= 2
566
567 return flags
568
569 def encode(self, reply):
570 if type(reply) is _LttngLiveViewerConnectReply:
571 data = self._pack(
f5567ea8 572 "QIII", reply.viewer_session_id, reply.major, reply.minor, 2
584af91e
PP
573 )
574 elif type(reply) is _LttngLiveViewerGetTracingSessionInfosReply:
f5567ea8 575 data = self._pack("I", len(reply.tracing_session_infos))
584af91e
PP
576
577 for info in reply.tracing_session_infos:
578 data += self._pack(
f5567ea8 579 "QIII",
584af91e
PP
580 info.tracing_session_id,
581 info.live_timer_freq,
582 info.client_count,
583 info.stream_count,
584 )
585 data += self._encode_zero_padded_str(info.hostname, 64)
586 data += self._encode_zero_padded_str(info.name, 255)
587 elif type(reply) is _LttngLiveViewerAttachToTracingSessionReply:
f5567ea8 588 data = self._pack("II", reply.status, len(reply.stream_infos))
584af91e
PP
589
590 for info in reply.stream_infos:
591 data += self._encode_stream_info(info)
592 elif type(reply) is _LttngLiveViewerGetNextDataStreamIndexEntryReply:
f5567ea8 593 index_format = "QQQQQQQII"
584af91e
PP
594 entry = reply.index_entry
595 flags = self._get_has_new_stuff_flags(
596 reply.has_new_metadata, reply.has_new_data_stream
597 )
598
71f56e5f
JG
599 if type(entry) is _LttngDataStreamIndexEntry:
600 data = self._pack(
601 index_format,
602 entry.offset_bytes,
603 entry.total_size_bits,
604 entry.content_size_bits,
605 entry.timestamp_begin,
606 entry.timestamp_end,
607 entry.events_discarded,
608 entry.stream_class_id,
609 reply.status,
610 flags,
611 )
612 else:
613 assert type(entry) is _LttngDataStreamBeaconEntry
614 data = self._pack(
615 index_format,
616 0,
617 0,
618 0,
619 0,
620 entry.timestamp,
621 0,
622 entry.stream_class_id,
623 reply.status,
624 flags,
625 )
584af91e
PP
626 elif type(reply) is _LttngLiveViewerGetDataStreamPacketDataReply:
627 flags = self._get_has_new_stuff_flags(
628 reply.has_new_metadata, reply.has_new_data_stream
629 )
f5567ea8 630 data = self._pack("III", reply.status, len(reply.data), flags)
584af91e
PP
631 data += reply.data
632 elif type(reply) is _LttngLiveViewerGetMetadataStreamDataContentReply:
f5567ea8 633 data = self._pack("QI", len(reply.data), reply.status)
584af91e
PP
634 data += reply.data
635 elif type(reply) is _LttngLiveViewerGetNewStreamInfosReply:
f5567ea8 636 data = self._pack("II", reply.status, len(reply.stream_infos))
584af91e
PP
637
638 for info in reply.stream_infos:
639 data += self._encode_stream_info(info)
640 elif type(reply) is _LttngLiveViewerCreateViewerSessionReply:
f5567ea8 641 data = self._pack("I", reply.status)
584af91e 642 elif type(reply) is _LttngLiveViewerDetachFromTracingSessionReply:
f5567ea8 643 data = self._pack("I", reply.status)
584af91e
PP
644 else:
645 raise ValueError(
f5567ea8 646 "Unknown reply object with class `{}`".format(reply.__class__.__name__)
584af91e
PP
647 )
648
649 return data
650
651
78169723
FD
652def _get_entry_timestamp_begin(entry):
653 if type(entry) is _LttngDataStreamBeaconEntry:
654 return entry.timestamp
655 else:
656 assert type(entry) is _LttngDataStreamIndexEntry
657 return entry.timestamp_begin
658
659
584af91e
PP
660# The index of an LTTng data stream, a sequence of index entries.
661class _LttngDataStreamIndex(collections.abc.Sequence):
71f56e5f 662 def __init__(self, path, beacons):
584af91e
PP
663 self._path = path
664 self._build()
71f56e5f
JG
665
666 if beacons:
667 stream_class_id = self._entries[0].stream_class_id
668 beacons = [
669 _LttngDataStreamBeaconEntry(stream_class_id, ts) for ts in beacons
670 ]
671 self._add_beacons(beacons)
672
584af91e
PP
673 logging.info(
674 'Built data stream index entries: path="{}", count={}'.format(
675 path, len(self._entries)
676 )
677 )
678
679 def _build(self):
680 self._entries = []
681 assert os.path.isfile(self._path)
682
f5567ea8 683 with open(self._path, "rb") as f:
584af91e 684 # Read header first
f5567ea8 685 fmt = ">IIII"
584af91e
PP
686 size = struct.calcsize(fmt)
687 data = f.read(size)
688 assert len(data) == size
689 magic, index_major, index_minor, index_entry_length = struct.unpack(
690 fmt, data
691 )
692 assert magic == 0xC1F1DCC1
693
694 # Read index entries
f5567ea8 695 fmt = ">QQQQQQQ"
584af91e
PP
696 size = struct.calcsize(fmt)
697
698 while True:
699 logging.debug(
700 'Decoding data stream index entry: path="{}", offset={}'.format(
701 self._path, f.tell()
702 )
703 )
704 data = f.read(size)
705
706 if not data:
707 # Done
708 break
709
710 assert len(data) == size
75882e97
FD
711 (
712 offset_bytes,
713 total_size_bits,
714 content_size_bits,
715 timestamp_begin,
716 timestamp_end,
717 events_discarded,
718 stream_class_id,
719 ) = struct.unpack(fmt, data)
584af91e
PP
720
721 self._entries.append(
722 _LttngDataStreamIndexEntry(
723 offset_bytes,
724 total_size_bits,
725 content_size_bits,
726 timestamp_begin,
727 timestamp_end,
728 events_discarded,
729 stream_class_id,
730 )
731 )
732
733 # Skip anything else before the next entry
734 f.seek(index_entry_length - size, os.SEEK_CUR)
735
71f56e5f
JG
736 def _add_beacons(self, beacons):
737 # Assumes entries[n + 1].timestamp_end >= entries[n].timestamp_begin
738 def sort_key(entry):
739 if type(entry) is _LttngDataStreamBeaconEntry:
740 return entry.timestamp
741 else:
742 return entry.timestamp_end
743
744 self._entries += beacons
745 self._entries.sort(key=sort_key)
746
584af91e
PP
747 def __getitem__(self, index):
748 return self._entries[index]
749
750 def __len__(self):
751 return len(self._entries)
752
753 @property
754 def path(self):
755 return self._path
756
757
758# An LTTng data stream.
759class _LttngDataStream:
71f56e5f 760 def __init__(self, path, beacons):
584af91e
PP
761 self._path = path
762 filename = os.path.basename(path)
f5567ea8 763 match = re.match(r"(.*)_\d+", filename)
584af91e
PP
764 self._channel_name = match.group(1)
765 trace_dir = os.path.dirname(path)
f5567ea8 766 index_path = os.path.join(trace_dir, "index", filename + ".idx")
71f56e5f 767 self._index = _LttngDataStreamIndex(index_path, beacons)
584af91e 768 assert os.path.isfile(path)
f5567ea8 769 self._file = open(path, "rb")
584af91e
PP
770 logging.info(
771 'Built data stream: path="{}", channel-name="{}"'.format(
772 path, self._channel_name
773 )
774 )
775
776 @property
777 def path(self):
778 return self._path
779
780 @property
781 def channel_name(self):
782 return self._channel_name
783
784 @property
785 def index(self):
786 return self._index
787
788 def get_data(self, offset_bytes, len_bytes):
789 self._file.seek(offset_bytes)
790 return self._file.read(len_bytes)
791
792
78169723
FD
793class _LttngMetadataStreamSection:
794 def __init__(self, timestamp, data):
795 self._timestamp = timestamp
796 if data is None:
797 self._data = bytes()
798 else:
799 self._data = data
800 logging.info(
f5567ea8 801 "Built metadata stream section: ts={}, data-len={}".format(
78169723
FD
802 self._timestamp, len(self._data)
803 )
804 )
805
806 @property
807 def timestamp(self):
808 return self._timestamp
809
810 @property
811 def data(self):
812 return self._data
813
814
584af91e
PP
815# An LTTng metadata stream.
816class _LttngMetadataStream:
78169723
FD
817 def __init__(self, metadata_file_path, config_sections):
818 self._path = metadata_file_path
819 self._sections = config_sections
820 logging.info(
f5567ea8 821 "Built metadata stream: path={}, section-len={}".format(
78169723
FD
822 self._path, len(self._sections)
823 )
824 )
584af91e
PP
825
826 @property
827 def path(self):
828 return self._path
829
830 @property
78169723
FD
831 def sections(self):
832 return self._sections
584af91e 833
78169723
FD
834
835LttngMetadataConfigSection = namedtuple(
f5567ea8 836 "LttngMetadataConfigSection", ["line", "timestamp", "is_empty"]
78169723
FD
837)
838
839
840def _parse_metadata_sections_config(config_sections):
841 assert config_sections is not None
842 config_metadata_sections = []
843 append_empty_section = False
844 last_timestamp = 0
845 last_line = 0
846
847 for config_section in config_sections:
f5567ea8 848 if config_section == "empty":
78169723
FD
849 # Found a empty section marker. Actually append the section at the
850 # timestamp of the next concrete section.
851 append_empty_section = True
852 else:
853 assert type(config_section) is dict
f5567ea8
FD
854 line = config_section.get("line")
855 ts = config_section.get("timestamp")
78169723
FD
856
857 # Sections' timestamps and lines must both be increasing.
858 assert ts > last_timestamp
859 last_timestamp = ts
860 assert line > last_line
861 last_line = line
862
863 if append_empty_section:
864 config_metadata_sections.append(
865 LttngMetadataConfigSection(line, ts, True)
866 )
867 append_empty_section = False
868
869 config_metadata_sections.append(LttngMetadataConfigSection(line, ts, False))
870
871 return config_metadata_sections
872
873
874def _split_metadata_sections(metadata_file_path, raw_config_sections):
875 assert isinstance(raw_config_sections, collections.abc.Sequence)
876
877 parsed_sections = _parse_metadata_sections_config(raw_config_sections)
878
879 sections = []
f5567ea8 880 with open(metadata_file_path, "r") as metadata_file:
78169723
FD
881 metadata_lines = [line for line in metadata_file]
882
883 config_metadata_sections_idx = 0
884 curr_metadata_section = bytearray()
885
886 for idx, line_content in enumerate(metadata_lines):
887 # Add one to the index to convert from the zero-indexing of the
888 # enumerate() function to the one-indexing used by humans when
889 # viewing a text file.
890 curr_line_number = idx + 1
891
892 # If there are no more sections, simply append the line.
893 if config_metadata_sections_idx + 1 >= len(parsed_sections):
f5567ea8 894 curr_metadata_section += bytearray(line_content, "utf8")
78169723
FD
895 continue
896
897 next_section_line_number = parsed_sections[
898 config_metadata_sections_idx + 1
899 ].line
900
901 # If the next section begins at the current line, create a
902 # section with the metadata we gathered so far.
903 if curr_line_number >= next_section_line_number:
78169723
FD
904 # Flushing the metadata of the current section.
905 sections.append(
906 _LttngMetadataStreamSection(
907 parsed_sections[config_metadata_sections_idx].timestamp,
908 bytes(curr_metadata_section),
909 )
910 )
911
912 # Move to the next section.
913 config_metadata_sections_idx += 1
914
915 # Clear old content and append current line for the next section.
916 curr_metadata_section.clear()
f5567ea8 917 curr_metadata_section += bytearray(line_content, "utf8")
78169723
FD
918
919 # Append any empty sections.
920 while parsed_sections[config_metadata_sections_idx].is_empty:
921 sections.append(
922 _LttngMetadataStreamSection(
923 parsed_sections[config_metadata_sections_idx].timestamp, None
924 )
925 )
926 config_metadata_sections_idx += 1
927 else:
928 # Append line_content to the current metadata section.
f5567ea8 929 curr_metadata_section += bytearray(line_content, "utf8")
78169723
FD
930
931 # We iterated over all the lines of the metadata file. Close the current section.
932 sections.append(
933 _LttngMetadataStreamSection(
934 parsed_sections[config_metadata_sections_idx].timestamp,
935 bytes(curr_metadata_section),
936 )
937 )
938
939 return sections
584af91e
PP
940
941
942# An LTTng trace, a sequence of LTTng data streams.
943class LttngTrace(collections.abc.Sequence):
78169723 944 def __init__(self, trace_dir, metadata_sections, beacons):
584af91e
PP
945 assert os.path.isdir(trace_dir)
946 self._path = trace_dir
78169723 947 self._create_metadata_stream(trace_dir, metadata_sections)
71f56e5f 948 self._create_data_streams(trace_dir, beacons)
584af91e
PP
949 logging.info('Built trace: path="{}"'.format(trace_dir))
950
71f56e5f 951 def _create_data_streams(self, trace_dir, beacons):
584af91e
PP
952 data_stream_paths = []
953
954 for filename in os.listdir(trace_dir):
955 path = os.path.join(trace_dir, filename)
956
957 if not os.path.isfile(path):
958 continue
959
f5567ea8 960 if filename.startswith("."):
584af91e
PP
961 continue
962
f5567ea8 963 if filename == "metadata":
584af91e
PP
964 continue
965
966 data_stream_paths.append(path)
967
968 data_stream_paths.sort()
969 self._data_streams = []
970
971 for data_stream_path in data_stream_paths:
71f56e5f
JG
972 stream_name = os.path.basename(data_stream_path)
973 this_stream_beacons = None
974
975 if beacons is not None and stream_name in beacons:
976 this_stream_beacons = beacons[stream_name]
977
978 self._data_streams.append(
979 _LttngDataStream(data_stream_path, this_stream_beacons)
980 )
584af91e 981
78169723 982 def _create_metadata_stream(self, trace_dir, config_metadata_sections):
f5567ea8 983 metadata_path = os.path.join(trace_dir, "metadata")
78169723
FD
984 metadata_sections = []
985
986 if config_metadata_sections is None:
f5567ea8 987 with open(metadata_path, "rb") as metadata_file:
78169723
FD
988 metadata_sections.append(
989 _LttngMetadataStreamSection(0, metadata_file.read())
990 )
991 else:
992 metadata_sections = _split_metadata_sections(
993 metadata_path, config_metadata_sections
994 )
995
996 self._metadata_stream = _LttngMetadataStream(metadata_path, metadata_sections)
997
584af91e
PP
998 @property
999 def path(self):
1000 return self._path
1001
1002 @property
1003 def metadata_stream(self):
1004 return self._metadata_stream
1005
1006 def __getitem__(self, index):
1007 return self._data_streams[index]
1008
1009 def __len__(self):
1010 return len(self._data_streams)
1011
1012
1013# The state of a single data stream.
1014class _LttngLiveViewerSessionDataStreamState:
78169723 1015 def __init__(self, ts_state, info, data_stream, metadata_stream_id):
584af91e
PP
1016 self._ts_state = ts_state
1017 self._info = info
1018 self._data_stream = data_stream
78169723 1019 self._metadata_stream_id = metadata_stream_id
584af91e
PP
1020 self._cur_index_entry_index = 0
1021 fmt = 'Built data stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
1022 logging.info(
1023 fmt.format(
1024 info.id,
1025 ts_state.tracing_session_descriptor.info.tracing_session_id,
1026 ts_state.tracing_session_descriptor.info.name,
1027 data_stream.path,
1028 )
1029 )
1030
1031 @property
1032 def tracing_session_state(self):
1033 return self._ts_state
1034
1035 @property
1036 def info(self):
1037 return self._info
1038
1039 @property
1040 def data_stream(self):
1041 return self._data_stream
1042
1043 @property
1044 def cur_index_entry(self):
1045 if self._cur_index_entry_index == len(self._data_stream.index):
1046 return
1047
1048 return self._data_stream.index[self._cur_index_entry_index]
1049
1050 def goto_next_index_entry(self):
1051 self._cur_index_entry_index += 1
1052
1053
1054# The state of a single metadata stream.
1055class _LttngLiveViewerSessionMetadataStreamState:
1056 def __init__(self, ts_state, info, metadata_stream):
1057 self._ts_state = ts_state
1058 self._info = info
1059 self._metadata_stream = metadata_stream
78169723
FD
1060 self._cur_metadata_stream_section_index = 0
1061 if len(metadata_stream.sections) > 1:
1062 self._next_metadata_stream_section_timestamp = metadata_stream.sections[
1063 1
1064 ].timestamp
1065 else:
1066 self._next_metadata_stream_section_timestamp = None
1067
584af91e
PP
1068 self._is_sent = False
1069 fmt = 'Built metadata stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
1070 logging.info(
1071 fmt.format(
1072 info.id,
1073 ts_state.tracing_session_descriptor.info.tracing_session_id,
1074 ts_state.tracing_session_descriptor.info.name,
1075 metadata_stream.path,
1076 )
1077 )
1078
1079 @property
1080 def trace_session_state(self):
1081 return self._trace_session_state
1082
1083 @property
1084 def info(self):
1085 return self._info
1086
1087 @property
1088 def metadata_stream(self):
1089 return self._metadata_stream
1090
1091 @property
1092 def is_sent(self):
1093 return self._is_sent
1094
1095 @is_sent.setter
1096 def is_sent(self, value):
1097 self._is_sent = value
1098
78169723
FD
1099 @property
1100 def cur_section(self):
1101 fmt = "Get current metadata section: section-idx={}"
1102 logging.info(fmt.format(self._cur_metadata_stream_section_index))
1103 if self._cur_metadata_stream_section_index == len(
1104 self._metadata_stream.sections
1105 ):
1106 return
1107
1108 return self._metadata_stream.sections[self._cur_metadata_stream_section_index]
1109
1110 def goto_next_section(self):
1111 self._cur_metadata_stream_section_index += 1
1112 if self.cur_section:
1113 self._next_metadata_stream_section_timestamp = self.cur_section.timestamp
1114 else:
1115 self._next_metadata_stream_section_timestamp = None
1116
1117 @property
1118 def next_section_timestamp(self):
1119 return self._next_metadata_stream_section_timestamp
1120
584af91e 1121
ee1171e5
SM
1122# A tracing session descriptor.
1123#
1124# In the constructor, `traces` is a list of LTTng traces (`LttngTrace`
1125# objects).
1126class LttngTracingSessionDescriptor:
1127 def __init__(
1128 self, name, tracing_session_id, hostname, live_timer_freq, client_count, traces
1129 ):
1130 for trace in traces:
1131 if name not in trace.path:
1132 fmt = "Tracing session name must be part of every trace path (`{}` not found in `{}`)"
1133 raise ValueError(fmt.format(name, trace.path))
1134
1135 self._traces = traces
1136 stream_count = sum([len(t) + 1 for t in traces])
1137 self._info = _LttngLiveViewerTracingSessionInfo(
1138 tracing_session_id,
1139 live_timer_freq,
1140 client_count,
1141 stream_count,
1142 hostname,
1143 name,
1144 )
1145
1146 @property
1147 def traces(self):
1148 return self._traces
1149
1150 @property
1151 def info(self):
1152 return self._info
1153
1154
584af91e
PP
1155# The state of a tracing session.
1156class _LttngLiveViewerSessionTracingSessionState:
1157 def __init__(self, tc_descr, base_stream_id):
1158 self._tc_descr = tc_descr
1159 self._stream_infos = []
1160 self._ds_states = {}
1161 self._ms_states = {}
1162 stream_id = base_stream_id
1163
1164 for trace in tc_descr.traces:
1165 trace_id = stream_id * 1000
1166
78169723
FD
1167 # Metadata stream -> stream info and metadata stream state
1168 info = _LttngLiveViewerStreamInfo(
f5567ea8 1169 stream_id, trace_id, True, trace.metadata_stream.path, "metadata"
78169723
FD
1170 )
1171 self._stream_infos.append(info)
1172 self._ms_states[stream_id] = _LttngLiveViewerSessionMetadataStreamState(
1173 self, info, trace.metadata_stream
1174 )
1175 metadata_stream_id = stream_id
1176 stream_id += 1
1177
584af91e
PP
1178 # Data streams -> stream infos and data stream states
1179 for data_stream in trace:
1180 info = _LttngLiveViewerStreamInfo(
1181 stream_id,
1182 trace_id,
1183 False,
1184 data_stream.path,
1185 data_stream.channel_name,
1186 )
1187 self._stream_infos.append(info)
1188 self._ds_states[stream_id] = _LttngLiveViewerSessionDataStreamState(
78169723 1189 self, info, data_stream, metadata_stream_id
584af91e
PP
1190 )
1191 stream_id += 1
1192
584af91e
PP
1193 self._is_attached = False
1194 fmt = 'Built tracing session state: id={}, name="{}"'
1195 logging.info(fmt.format(tc_descr.info.tracing_session_id, tc_descr.info.name))
1196
1197 @property
1198 def tracing_session_descriptor(self):
1199 return self._tc_descr
1200
1201 @property
1202 def data_stream_states(self):
1203 return self._ds_states
1204
1205 @property
1206 def metadata_stream_states(self):
1207 return self._ms_states
1208
1209 @property
1210 def stream_infos(self):
1211 return self._stream_infos
1212
1213 @property
1214 def has_new_metadata(self):
1215 return any([not ms.is_sent for ms in self._ms_states.values()])
1216
1217 @property
1218 def is_attached(self):
1219 return self._is_attached
1220
1221 @is_attached.setter
1222 def is_attached(self, value):
1223 self._is_attached = value
1224
1225
78169723
FD
1226def needs_new_metadata_section(metadata_stream_state, latest_timestamp):
1227 if metadata_stream_state.next_section_timestamp is None:
1228 return False
1229
1230 if latest_timestamp >= metadata_stream_state.next_section_timestamp:
1231 return True
1232 else:
1233 return False
1234
1235
584af91e
PP
1236# An LTTng live viewer session manages a view on tracing sessions
1237# and replies to commands accordingly.
1238class _LttngLiveViewerSession:
1239 def __init__(
1240 self,
1241 viewer_session_id,
1242 tracing_session_descriptors,
1243 max_query_data_response_size,
1244 ):
1245 self._viewer_session_id = viewer_session_id
1246 self._ts_states = {}
1247 self._stream_states = {}
1248 self._max_query_data_response_size = max_query_data_response_size
1249 total_stream_infos = 0
1250
1251 for ts_descr in tracing_session_descriptors:
1252 ts_state = _LttngLiveViewerSessionTracingSessionState(
1253 ts_descr, total_stream_infos
1254 )
1255 ts_id = ts_state.tracing_session_descriptor.info.tracing_session_id
1256 self._ts_states[ts_id] = ts_state
1257 total_stream_infos += len(ts_state.stream_infos)
1258
1259 # Update session's stream states to have the new states
1260 self._stream_states.update(ts_state.data_stream_states)
1261 self._stream_states.update(ts_state.metadata_stream_states)
1262
1263 self._command_handlers = {
1264 _LttngLiveViewerAttachToTracingSessionCommand: self._handle_attach_to_tracing_session_command,
1265 _LttngLiveViewerCreateViewerSessionCommand: self._handle_create_viewer_session_command,
1266 _LttngLiveViewerDetachFromTracingSessionCommand: self._handle_detach_from_tracing_session_command,
1267 _LttngLiveViewerGetDataStreamPacketDataCommand: self._handle_get_data_stream_packet_data_command,
1268 _LttngLiveViewerGetMetadataStreamDataCommand: self._handle_get_metadata_stream_data_command,
1269 _LttngLiveViewerGetNewStreamInfosCommand: self._handle_get_new_stream_infos_command,
1270 _LttngLiveViewerGetNextDataStreamIndexEntryCommand: self._handle_get_next_data_stream_index_entry_command,
1271 _LttngLiveViewerGetTracingSessionInfosCommand: self._handle_get_tracing_session_infos_command,
1272 }
1273
1274 @property
1275 def viewer_session_id(self):
1276 return self._viewer_session_id
1277
1278 def _get_tracing_session_state(self, tracing_session_id):
1279 if tracing_session_id not in self._ts_states:
1280 raise UnexpectedInput(
f5567ea8 1281 "Unknown tracing session ID {}".format(tracing_session_id)
584af91e
PP
1282 )
1283
1284 return self._ts_states[tracing_session_id]
1285
1286 def _get_stream_state(self, stream_id):
1287 if stream_id not in self._stream_states:
f5567ea8 1288 UnexpectedInput("Unknown stream ID {}".format(stream_id))
584af91e
PP
1289
1290 return self._stream_states[stream_id]
1291
1292 def handle_command(self, cmd):
1293 logging.info(
f5567ea8 1294 "Handling command in viewer session: cmd-cls-name={}".format(
584af91e
PP
1295 cmd.__class__.__name__
1296 )
1297 )
1298 cmd_type = type(cmd)
1299
1300 if cmd_type not in self._command_handlers:
1301 raise UnexpectedInput(
f5567ea8 1302 "Unexpected command: cmd-cls-name={}".format(cmd.__class__.__name__)
584af91e
PP
1303 )
1304
1305 return self._command_handlers[cmd_type](cmd)
1306
1307 def _handle_attach_to_tracing_session_command(self, cmd):
1308 fmt = 'Handling "attach to tracing session" command: ts-id={}, offset={}, seek-type={}'
1309 logging.info(fmt.format(cmd.tracing_session_id, cmd.offset, cmd.seek_type))
1310 ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
1311 info = ts_state.tracing_session_descriptor.info
1312
1313 if ts_state.is_attached:
1314 raise UnexpectedInput(
f5567ea8 1315 "Cannot attach to tracing session `{}`: viewer is already attached".format(
584af91e
PP
1316 info.name
1317 )
1318 )
1319
1320 ts_state.is_attached = True
1321 status = _LttngLiveViewerAttachToTracingSessionReply.Status.OK
1322 return _LttngLiveViewerAttachToTracingSessionReply(
1323 status, ts_state.stream_infos
1324 )
1325
1326 def _handle_detach_from_tracing_session_command(self, cmd):
1327 fmt = 'Handling "detach from tracing session" command: ts-id={}'
1328 logging.info(fmt.format(cmd.tracing_session_id))
1329 ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
1330 info = ts_state.tracing_session_descriptor.info
1331
1332 if not ts_state.is_attached:
1333 raise UnexpectedInput(
f5567ea8 1334 "Cannot detach to tracing session `{}`: viewer is not attached".format(
584af91e
PP
1335 info.name
1336 )
1337 )
1338
1339 ts_state.is_attached = False
1340 status = _LttngLiveViewerDetachFromTracingSessionReply.Status.OK
1341 return _LttngLiveViewerDetachFromTracingSessionReply(status)
1342
1343 def _handle_get_next_data_stream_index_entry_command(self, cmd):
1344 fmt = 'Handling "get next data stream index entry" command: stream-id={}'
1345 logging.info(fmt.format(cmd.stream_id))
1346 stream_state = self._get_stream_state(cmd.stream_id)
78169723 1347 metadata_stream_state = self._get_stream_state(stream_state._metadata_stream_id)
584af91e
PP
1348
1349 if type(stream_state) is not _LttngLiveViewerSessionDataStreamState:
1350 raise UnexpectedInput(
f5567ea8 1351 "Stream with ID {} is not a data stream".format(cmd.stream_id)
584af91e
PP
1352 )
1353
1354 if stream_state.cur_index_entry is None:
1355 # The viewer is done reading this stream
1356 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.HUP
1357
1358 # Dummy data stream index entry to use with the `HUP` status
1359 # (the reply needs one, but the viewer ignores it)
1360 index_entry = _LttngDataStreamIndexEntry(0, 0, 0, 0, 0, 0, 0)
1361
1362 return _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1363 status, index_entry, False, False
1364 )
1365
78169723
FD
1366 timestamp_begin = _get_entry_timestamp_begin(stream_state.cur_index_entry)
1367
1368 if needs_new_metadata_section(metadata_stream_state, timestamp_begin):
1369 metadata_stream_state.is_sent = False
1370 metadata_stream_state.goto_next_section()
1371
584af91e
PP
1372 # The viewer only checks the `has_new_metadata` flag if the
1373 # reply's status is `OK`, so we need to provide an index here
1374 has_new_metadata = stream_state.tracing_session_state.has_new_metadata
71f56e5f
JG
1375 if type(stream_state.cur_index_entry) is _LttngDataStreamIndexEntry:
1376 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.OK
1377 else:
1378 assert type(stream_state.cur_index_entry) is _LttngDataStreamBeaconEntry
1379 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.INACTIVE
1380
584af91e
PP
1381 reply = _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1382 status, stream_state.cur_index_entry, has_new_metadata, False
1383 )
1384 stream_state.goto_next_index_entry()
1385 return reply
1386
1387 def _handle_get_data_stream_packet_data_command(self, cmd):
1388 fmt = 'Handling "get data stream packet data" command: stream-id={}, offset={}, req-length={}'
1389 logging.info(fmt.format(cmd.stream_id, cmd.offset, cmd.req_length))
1390 stream_state = self._get_stream_state(cmd.stream_id)
1391 data_response_length = cmd.req_length
1392
1393 if type(stream_state) is not _LttngLiveViewerSessionDataStreamState:
1394 raise UnexpectedInput(
f5567ea8 1395 "Stream with ID {} is not a data stream".format(cmd.stream_id)
584af91e
PP
1396 )
1397
1398 if stream_state.tracing_session_state.has_new_metadata:
1399 status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.ERROR
1400 return _LttngLiveViewerGetDataStreamPacketDataReply(
1401 status, bytes(), True, False
1402 )
1403
1404 if self._max_query_data_response_size:
1405 # Enforce a server side limit on the query requested length.
1406 # To ensure that the transaction terminate take the minimum of both
1407 # value.
1408 data_response_length = min(
1409 cmd.req_length, self._max_query_data_response_size
1410 )
1411 fmt = 'Limiting "get data stream packet data" command: req-length={} actual response size={}'
1412 logging.info(fmt.format(cmd.req_length, data_response_length))
1413
1414 data = stream_state.data_stream.get_data(cmd.offset, data_response_length)
1415 status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.OK
1416 return _LttngLiveViewerGetDataStreamPacketDataReply(status, data, False, False)
1417
1418 def _handle_get_metadata_stream_data_command(self, cmd):
1419 fmt = 'Handling "get metadata stream data" command: stream-id={}'
1420 logging.info(fmt.format(cmd.stream_id))
78169723 1421 metadata_stream_state = self._get_stream_state(cmd.stream_id)
584af91e 1422
78169723
FD
1423 if (
1424 type(metadata_stream_state)
1425 is not _LttngLiveViewerSessionMetadataStreamState
1426 ):
584af91e 1427 raise UnexpectedInput(
f5567ea8 1428 "Stream with ID {} is not a metadata stream".format(cmd.stream_id)
584af91e
PP
1429 )
1430
78169723 1431 if metadata_stream_state.is_sent:
584af91e
PP
1432 status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.NO_NEW
1433 return _LttngLiveViewerGetMetadataStreamDataContentReply(status, bytes())
1434
78169723 1435 metadata_stream_state.is_sent = True
584af91e 1436 status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.OK
78169723
FD
1437 metadata_section = metadata_stream_state.cur_section
1438
1439 # If we are sending an empty section, ready the next one right away.
1440 if len(metadata_section.data) == 0:
1441 metadata_stream_state.is_sent = False
1442 metadata_stream_state.goto_next_section()
1443
1444 fmt = 'Replying to "get metadata stream data" command: metadata-size={}'
1445 logging.info(fmt.format(len(metadata_section.data)))
584af91e 1446 return _LttngLiveViewerGetMetadataStreamDataContentReply(
78169723 1447 status, metadata_section.data
584af91e
PP
1448 )
1449
1450 def _handle_get_new_stream_infos_command(self, cmd):
1451 fmt = 'Handling "get new stream infos" command: ts-id={}'
1452 logging.info(fmt.format(cmd.tracing_session_id))
1453
1454 # As of this version, all the tracing session's stream infos are
1455 # always given to the viewer when sending the "attach to tracing
1456 # session" reply, so there's nothing new here. Return the `HUP`
1457 # status as, if we're handling this command, the viewer consumed
1458 # all the existing data streams.
1459 status = _LttngLiveViewerGetNewStreamInfosReply.Status.HUP
1460 return _LttngLiveViewerGetNewStreamInfosReply(status, [])
1461
1462 def _handle_get_tracing_session_infos_command(self, cmd):
1463 logging.info('Handling "get tracing session infos" command.')
1464 infos = [
1465 tss.tracing_session_descriptor.info for tss in self._ts_states.values()
1466 ]
1467 infos.sort(key=lambda info: info.name)
1468 return _LttngLiveViewerGetTracingSessionInfosReply(infos)
1469
1470 def _handle_create_viewer_session_command(self, cmd):
1471 logging.info('Handling "create viewer session" command.')
1472 status = _LttngLiveViewerCreateViewerSessionReply.Status.OK
1473
1474 # This does nothing here. In the LTTng relay daemon, it
1475 # allocates the viewer session's state.
1476 return _LttngLiveViewerCreateViewerSessionReply(status)
1477
1478
1479# An LTTng live TCP server.
1480#
e51141d3
SM
1481# On creation, it binds to `localhost` on the TCP port `port` if not `None`, or
1482# on an OS-assigned TCP port otherwise. It writes the decimal TCP port number
1483# to a temporary port file. It renames the temporary port file to
1484# `port_filename`.
584af91e
PP
1485#
1486# `tracing_session_descriptors` is a list of tracing session descriptors
1487# (`LttngTracingSessionDescriptor`) to serve.
1488#
1489# This server accepts a single viewer (client).
1490#
1491# When the viewer closes the connection, the server's constructor
1492# returns.
1493class LttngLiveServer:
1494 def __init__(
e51141d3
SM
1495 self,
1496 port,
1497 port_filename,
1498 tracing_session_descriptors,
1499 max_query_data_response_size,
584af91e 1500 ):
f5567ea8 1501 logging.info("Server configuration:")
584af91e 1502
f5567ea8 1503 logging.info(" Port file name: `{}`".format(port_filename))
584af91e
PP
1504
1505 if max_query_data_response_size is not None:
1506 logging.info(
f5567ea8 1507 " Maximum response data query size: `{}`".format(
584af91e
PP
1508 max_query_data_response_size
1509 )
1510 )
1511
1512 for ts_descr in tracing_session_descriptors:
1513 info = ts_descr.info
1514 fmt = ' TS descriptor: name="{}", id={}, hostname="{}", live-timer-freq={}, client-count={}, stream-count={}:'
1515 logging.info(
1516 fmt.format(
1517 info.name,
1518 info.tracing_session_id,
1519 info.hostname,
1520 info.live_timer_freq,
1521 info.client_count,
1522 info.stream_count,
1523 )
1524 )
1525
1526 for trace in ts_descr.traces:
1527 logging.info(' Trace: path="{}"'.format(trace.path))
1528
1529 self._ts_descriptors = tracing_session_descriptors
1530 self._max_query_data_response_size = max_query_data_response_size
1531 self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1532 self._codec = _LttngLiveViewerProtocolCodec()
1533
1534 # Port 0: OS assigns an unused port
e51141d3 1535 serv_addr = ("localhost", port if port is not None else 0)
584af91e
PP
1536 self._sock.bind(serv_addr)
1537 self._write_port_to_file(port_filename)
1538
1539 try:
1540 self._listen()
1541 finally:
1542 self._sock.close()
f5567ea8 1543 logging.info("Closed connection and socket.")
584af91e
PP
1544
1545 @property
1546 def _server_port(self):
1547 return self._sock.getsockname()[1]
1548
1549 def _recv_command(self):
1550 data = bytes()
1551
1552 while True:
f5567ea8 1553 logging.info("Waiting for viewer command.")
584af91e
PP
1554 buf = self._conn.recv(128)
1555
1556 if not buf:
f5567ea8 1557 logging.info("Client closed connection.")
584af91e
PP
1558
1559 if data:
1560 raise UnexpectedInput(
f5567ea8 1561 "Client closed connection after having sent {} command bytes.".format(
584af91e
PP
1562 len(data)
1563 )
1564 )
1565
1566 return
1567
f5567ea8 1568 logging.info("Received data from viewer: length={}".format(len(buf)))
584af91e
PP
1569
1570 data += buf
1571
1572 try:
1573 cmd = self._codec.decode(data)
1574 except struct.error as exc:
f5567ea8 1575 raise UnexpectedInput("Malformed command: {}".format(exc)) from exc
584af91e
PP
1576
1577 if cmd is not None:
1578 logging.info(
f5567ea8 1579 "Received command from viewer: cmd-cls-name={}".format(
584af91e
PP
1580 cmd.__class__.__name__
1581 )
1582 )
1583 return cmd
1584
1585 def _send_reply(self, reply):
1586 data = self._codec.encode(reply)
1587 logging.info(
f5567ea8 1588 "Sending reply to viewer: reply-cls-name={}, length={}".format(
584af91e
PP
1589 reply.__class__.__name__, len(data)
1590 )
1591 )
1592 self._conn.sendall(data)
1593
1594 def _handle_connection(self):
1595 # First command must be "connect"
1596 cmd = self._recv_command()
1597
1598 if type(cmd) is not _LttngLiveViewerConnectCommand:
1599 raise UnexpectedInput(
1600 'First command is not "connect": cmd-cls-name={}'.format(
1601 cmd.__class__.__name__
1602 )
1603 )
1604
1605 # Create viewer session (arbitrary ID 23)
1606 logging.info(
f5567ea8 1607 "LTTng live viewer connected: version={}.{}".format(cmd.major, cmd.minor)
584af91e
PP
1608 )
1609 viewer_session = _LttngLiveViewerSession(
1610 23, self._ts_descriptors, self._max_query_data_response_size
1611 )
1612
1613 # Send "connect" reply
1614 self._send_reply(
1615 _LttngLiveViewerConnectReply(viewer_session.viewer_session_id, 2, 10)
1616 )
1617
1618 # Make the viewer session handle the remaining commands
1619 while True:
1620 cmd = self._recv_command()
1621
1622 if cmd is None:
1623 # Connection closed (at an expected location within the
1624 # conversation)
1625 return
1626
1627 self._send_reply(viewer_session.handle_command(cmd))
1628
1629 def _listen(self):
f5567ea8 1630 logging.info("Listening: port={}".format(self._server_port))
1726ac08
JR
1631 # Backlog must be present for Python version < 3.5.
1632 # 128 is an arbitrary number since we expect only 1 connection anyway.
1633 self._sock.listen(128)
584af91e
PP
1634 self._conn, viewer_addr = self._sock.accept()
1635 logging.info(
f5567ea8 1636 "Accepted viewer: addr={}:{}".format(viewer_addr[0], viewer_addr[1])
584af91e
PP
1637 )
1638
1639 try:
1640 self._handle_connection()
1641 finally:
1642 self._conn.close()
1643
1644 def _write_port_to_file(self, port_filename):
1645 # Write the port number to a temporary file.
f5567ea8
FD
1646 with tempfile.NamedTemporaryFile(mode="w", delete=False) as tmp_port_file:
1647 print(self._server_port, end="", file=tmp_port_file)
584af91e
PP
1648
1649 # Rename temporary file to real file
9c878ece 1650 os.replace(tmp_port_file.name, port_filename)
584af91e
PP
1651 logging.info(
1652 'Renamed port file: src-path="{}", dst-path="{}"'.format(
1653 tmp_port_file.name, port_filename
1654 )
1655 )
1656
1657
2b763e29
JG
1658def _session_descriptors_from_path(sessions_filename, trace_path_prefix):
1659 # File format is:
1660 #
1661 # [
1662 # {
1663 # "name": "my-session",
1664 # "id": 17,
1665 # "hostname": "myhost",
1666 # "live-timer-freq": 1000000,
1667 # "client-count": 23,
1668 # "traces": [
1669 # {
1670 # "path": "lol"
1671 # },
1672 # {
71f56e5f
JG
1673 # "path": "meow/mix",
1674 # "beacons": {
1675 # "my_stream": [ 5235787, 728375283 ]
78169723
FD
1676 # },
1677 # "metadata-sections": [
1678 # {
1679 # "line": 1,
1680 # "timestamp": 0
1681 # }
1682 # ]
2b763e29
JG
1683 # }
1684 # ]
1685 # }
1686 # ]
f5567ea8 1687 with open(sessions_filename, "r") as sessions_file:
2b763e29
JG
1688 params = json.load(sessions_file)
1689
1690 sessions = []
1691
1692 for session in params:
f5567ea8
FD
1693 name = session["name"]
1694 tracing_session_id = session["id"]
1695 hostname = session["hostname"]
1696 live_timer_freq = session["live-timer-freq"]
1697 client_count = session["client-count"]
2b763e29
JG
1698 traces = []
1699
f5567ea8
FD
1700 for trace in session["traces"]:
1701 metadata_sections = trace.get("metadata-sections")
1702 beacons = trace.get("beacons")
1703 path = trace["path"]
2b763e29
JG
1704
1705 if not os.path.isabs(path):
1706 path = os.path.join(trace_path_prefix, path)
1707
78169723 1708 traces.append(LttngTrace(path, metadata_sections, beacons))
2b763e29
JG
1709
1710 sessions.append(
1711 LttngTracingSessionDescriptor(
1712 name,
1713 tracing_session_id,
1714 hostname,
1715 live_timer_freq,
1716 client_count,
1717 traces,
1718 )
1719 )
1720
1721 return sessions
584af91e
PP
1722
1723
1724def _loglevel_parser(string):
f5567ea8 1725 loglevels = {"info": logging.INFO, "warning": logging.WARNING}
584af91e
PP
1726 if string not in loglevels:
1727 msg = "{} is not a valid loglevel".format(string)
1728 raise argparse.ArgumentTypeError(msg)
1729 return loglevels[string]
1730
1731
f5567ea8
FD
1732if __name__ == "__main__":
1733 logging.basicConfig(format="# %(asctime)-25s%(message)s")
584af91e 1734 parser = argparse.ArgumentParser(
f5567ea8 1735 description="LTTng-live protocol mocker", add_help=False
584af91e
PP
1736 )
1737 parser.add_argument(
f5567ea8
FD
1738 "--log-level",
1739 default="warning",
1740 choices=["info", "warning"],
1741 help="The loglevel to be used.",
584af91e
PP
1742 )
1743
1744 loglevel_namespace, remaining_args = parser.parse_known_args()
1745 logging.getLogger().setLevel(_loglevel_parser(loglevel_namespace.log_level))
1746
e51141d3
SM
1747 parser.add_argument(
1748 "--port",
1749 help="The port to bind to. If missing, use an OS-assigned port..",
1750 type=int,
1751 )
584af91e 1752 parser.add_argument(
f5567ea8
FD
1753 "--port-filename",
1754 help="The final port file. This file is present when the server is ready to receive connection.",
584af91e
PP
1755 required=True,
1756 )
1757 parser.add_argument(
f5567ea8 1758 "--max-query-data-response-size",
584af91e 1759 type=int,
f5567ea8 1760 help="The maximum size of control data response in bytes",
584af91e
PP
1761 )
1762 parser.add_argument(
f5567ea8 1763 "--trace-path-prefix",
2b763e29 1764 type=str,
f5567ea8 1765 help="Prefix to prepend to the trace paths of session configurations",
2b763e29
JG
1766 )
1767 parser.add_argument(
f5567ea8 1768 "--sessions-filename",
776a2a25 1769 type=str,
f5567ea8 1770 help="Path to a session configuration file",
584af91e
PP
1771 )
1772 parser.add_argument(
f5567ea8
FD
1773 "-h",
1774 "--help",
1775 action="help",
584af91e 1776 default=argparse.SUPPRESS,
f5567ea8 1777 help="Show this help message and exit.",
584af91e
PP
1778 )
1779
1780 args = parser.parse_args(args=remaining_args)
1781 try:
2b763e29 1782 sessions = _session_descriptors_from_path(
78169723
FD
1783 args.sessions_filename,
1784 args.trace_path_prefix,
584af91e 1785 )
e51141d3
SM
1786 LttngLiveServer(
1787 args.port, args.port_filename, sessions, args.max_query_data_response_size
1788 )
584af91e
PP
1789 except UnexpectedInput as exc:
1790 logging.error(str(exc))
1791 print(exc, file=sys.stderr)
1792 sys.exit(1)
This page took 0.135135 seconds and 4 git commands to generate.