python: run isort on everything
[babeltrace.git] / tests / data / plugins / src.ctf.lttng-live / lttng_live_server.py
CommitLineData
0235b0db 1# SPDX-License-Identifier: MIT
584af91e 2#
0235b0db 3# Copyright (C) 2019 Philippe Proulx <pproulx@efficios.com>
584af91e 4#
584af91e 5
584af91e 6import os
584af91e 7import re
5995b304
SM
8import sys
9import json
584af91e
PP
10import socket
11import struct
5995b304
SM
12import logging
13import os.path
14import argparse
584af91e 15import tempfile
5995b304 16import collections.abc
78169723 17from collections import namedtuple
584af91e
PP
18
19
20class UnexpectedInput(RuntimeError):
21 pass
22
23
24class _LttngLiveViewerCommand:
25 def __init__(self, version):
26 self._version = version
27
28 @property
29 def version(self):
30 return self._version
31
32
33class _LttngLiveViewerConnectCommand(_LttngLiveViewerCommand):
34 def __init__(self, version, viewer_session_id, major, minor):
35 super().__init__(version)
36 self._viewer_session_id = viewer_session_id
37 self._major = major
38 self._minor = minor
39
40 @property
41 def viewer_session_id(self):
42 return self._viewer_session_id
43
44 @property
45 def major(self):
46 return self._major
47
48 @property
49 def minor(self):
50 return self._minor
51
52
53class _LttngLiveViewerConnectReply:
54 def __init__(self, viewer_session_id, major, minor):
55 self._viewer_session_id = viewer_session_id
56 self._major = major
57 self._minor = minor
58
59 @property
60 def viewer_session_id(self):
61 return self._viewer_session_id
62
63 @property
64 def major(self):
65 return self._major
66
67 @property
68 def minor(self):
69 return self._minor
70
71
72class _LttngLiveViewerGetTracingSessionInfosCommand(_LttngLiveViewerCommand):
73 pass
74
75
76class _LttngLiveViewerTracingSessionInfo:
77 def __init__(
78 self,
79 tracing_session_id,
80 live_timer_freq,
81 client_count,
82 stream_count,
83 hostname,
84 name,
85 ):
86 self._tracing_session_id = tracing_session_id
87 self._live_timer_freq = live_timer_freq
88 self._client_count = client_count
89 self._stream_count = stream_count
90 self._hostname = hostname
91 self._name = name
92
93 @property
94 def tracing_session_id(self):
95 return self._tracing_session_id
96
97 @property
98 def live_timer_freq(self):
99 return self._live_timer_freq
100
101 @property
102 def client_count(self):
103 return self._client_count
104
105 @property
106 def stream_count(self):
107 return self._stream_count
108
109 @property
110 def hostname(self):
111 return self._hostname
112
113 @property
114 def name(self):
115 return self._name
116
117
118class _LttngLiveViewerGetTracingSessionInfosReply:
119 def __init__(self, tracing_session_infos):
120 self._tracing_session_infos = tracing_session_infos
121
122 @property
123 def tracing_session_infos(self):
124 return self._tracing_session_infos
125
126
127class _LttngLiveViewerAttachToTracingSessionCommand(_LttngLiveViewerCommand):
128 class SeekType:
129 BEGINNING = 1
130 LAST = 2
131
132 def __init__(self, version, tracing_session_id, offset, seek_type):
133 super().__init__(version)
134 self._tracing_session_id = tracing_session_id
135 self._offset = offset
136 self._seek_type = seek_type
137
138 @property
139 def tracing_session_id(self):
140 return self._tracing_session_id
141
142 @property
143 def offset(self):
144 return self._offset
145
146 @property
147 def seek_type(self):
148 return self._seek_type
149
150
151class _LttngLiveViewerStreamInfo:
152 def __init__(self, id, trace_id, is_metadata, path, channel_name):
153 self._id = id
154 self._trace_id = trace_id
155 self._is_metadata = is_metadata
156 self._path = path
157 self._channel_name = channel_name
158
159 @property
160 def id(self):
161 return self._id
162
163 @property
164 def trace_id(self):
165 return self._trace_id
166
167 @property
168 def is_metadata(self):
169 return self._is_metadata
170
171 @property
172 def path(self):
173 return self._path
174
175 @property
176 def channel_name(self):
177 return self._channel_name
178
179
180class _LttngLiveViewerAttachToTracingSessionReply:
181 class Status:
182 OK = 1
183 ALREADY = 2
184 UNKNOWN = 3
185 NOT_LIVE = 4
186 SEEK_ERROR = 5
187 NO_SESSION = 6
188
189 def __init__(self, status, stream_infos):
190 self._status = status
191 self._stream_infos = stream_infos
192
193 @property
194 def status(self):
195 return self._status
196
197 @property
198 def stream_infos(self):
199 return self._stream_infos
200
201
202class _LttngLiveViewerGetNextDataStreamIndexEntryCommand(_LttngLiveViewerCommand):
203 def __init__(self, version, stream_id):
204 super().__init__(version)
205 self._stream_id = stream_id
206
207 @property
208 def stream_id(self):
209 return self._stream_id
210
211
212class _LttngLiveViewerGetNextDataStreamIndexEntryReply:
213 class Status:
214 OK = 1
215 RETRY = 2
216 HUP = 3
217 ERROR = 4
218 INACTIVE = 5
219 EOF = 6
220
221 def __init__(self, status, index_entry, has_new_metadata, has_new_data_stream):
222 self._status = status
223 self._index_entry = index_entry
224 self._has_new_metadata = has_new_metadata
225 self._has_new_data_stream = has_new_data_stream
226
227 @property
228 def status(self):
229 return self._status
230
231 @property
232 def index_entry(self):
233 return self._index_entry
234
235 @property
236 def has_new_metadata(self):
237 return self._has_new_metadata
238
239 @property
240 def has_new_data_stream(self):
241 return self._has_new_data_stream
242
243
244class _LttngLiveViewerGetDataStreamPacketDataCommand(_LttngLiveViewerCommand):
245 def __init__(self, version, stream_id, offset, req_length):
246 super().__init__(version)
247 self._stream_id = stream_id
248 self._offset = offset
249 self._req_length = req_length
250
251 @property
252 def stream_id(self):
253 return self._stream_id
254
255 @property
256 def offset(self):
257 return self._offset
258
259 @property
260 def req_length(self):
261 return self._req_length
262
263
264class _LttngLiveViewerGetDataStreamPacketDataReply:
265 class Status:
266 OK = 1
267 RETRY = 2
268 ERROR = 3
269 EOF = 4
270
271 def __init__(self, status, data, has_new_metadata, has_new_data_stream):
272 self._status = status
273 self._data = data
274 self._has_new_metadata = has_new_metadata
275 self._has_new_data_stream = has_new_data_stream
276
277 @property
278 def status(self):
279 return self._status
280
281 @property
282 def data(self):
283 return self._data
284
285 @property
286 def has_new_metadata(self):
287 return self._has_new_metadata
288
289 @property
290 def has_new_data_stream(self):
291 return self._has_new_data_stream
292
293
294class _LttngLiveViewerGetMetadataStreamDataCommand(_LttngLiveViewerCommand):
295 def __init__(self, version, stream_id):
296 super().__init__(version)
297 self._stream_id = stream_id
298
299 @property
300 def stream_id(self):
301 return self._stream_id
302
303
304class _LttngLiveViewerGetMetadataStreamDataContentReply:
305 class Status:
306 OK = 1
307 NO_NEW = 2
308 ERROR = 3
309
310 def __init__(self, status, data):
311 self._status = status
312 self._data = data
313
314 @property
315 def status(self):
316 return self._status
317
318 @property
319 def data(self):
320 return self._data
321
322
323class _LttngLiveViewerGetNewStreamInfosCommand(_LttngLiveViewerCommand):
324 def __init__(self, version, tracing_session_id):
325 super().__init__(version)
326 self._tracing_session_id = tracing_session_id
327
328 @property
329 def tracing_session_id(self):
330 return self._tracing_session_id
331
332
333class _LttngLiveViewerGetNewStreamInfosReply:
334 class Status:
335 OK = 1
336 NO_NEW = 2
337 ERROR = 3
338 HUP = 4
339
340 def __init__(self, status, stream_infos):
341 self._status = status
342 self._stream_infos = stream_infos
343
344 @property
345 def status(self):
346 return self._status
347
348 @property
349 def stream_infos(self):
350 return self._stream_infos
351
352
353class _LttngLiveViewerCreateViewerSessionCommand(_LttngLiveViewerCommand):
354 pass
355
356
357class _LttngLiveViewerCreateViewerSessionReply:
358 class Status:
359 OK = 1
360 ERROR = 2
361
362 def __init__(self, status):
363 self._status = status
364
365 @property
366 def status(self):
367 return self._status
368
369
370class _LttngLiveViewerDetachFromTracingSessionCommand(_LttngLiveViewerCommand):
371 def __init__(self, version, tracing_session_id):
372 super().__init__(version)
373 self._tracing_session_id = tracing_session_id
374
375 @property
376 def tracing_session_id(self):
377 return self._tracing_session_id
378
379
380class _LttngLiveViewerDetachFromTracingSessionReply:
381 class Status:
382 OK = 1
383 UNKNOWN = 2
384 ERROR = 3
385
386 def __init__(self, status):
387 self._status = status
388
389 @property
390 def status(self):
391 return self._status
392
393
394# An LTTng live protocol codec can convert bytes to command objects and
395# reply objects to bytes.
396class _LttngLiveViewerProtocolCodec:
f5567ea8 397 _COMMAND_HEADER_STRUCT_FMT = "QII"
584af91e
PP
398 _COMMAND_HEADER_SIZE_BYTES = struct.calcsize(_COMMAND_HEADER_STRUCT_FMT)
399
400 def __init__(self):
401 pass
402
403 def _unpack(self, fmt, data, offset=0):
f5567ea8 404 fmt = "!" + fmt
584af91e
PP
405 return struct.unpack_from(fmt, data, offset)
406
407 def _unpack_payload(self, fmt, data):
408 return self._unpack(
409 fmt, data, _LttngLiveViewerProtocolCodec._COMMAND_HEADER_SIZE_BYTES
410 )
411
412 def decode(self, data):
413 if len(data) < self._COMMAND_HEADER_SIZE_BYTES:
414 # Not enough data to read the command header
415 return
416
417 payload_size, cmd_type, version = self._unpack(
418 self._COMMAND_HEADER_STRUCT_FMT, data
419 )
420 logging.info(
f5567ea8 421 "Decoded command header: payload-size={}, cmd-type={}, version={}".format(
584af91e
PP
422 payload_size, cmd_type, version
423 )
424 )
425
426 if len(data) < self._COMMAND_HEADER_SIZE_BYTES + payload_size:
427 # Not enough data to read the whole command
428 return
429
430 if cmd_type == 1:
431 viewer_session_id, major, minor, conn_type = self._unpack_payload(
f5567ea8 432 "QIII", data
584af91e
PP
433 )
434 return _LttngLiveViewerConnectCommand(
435 version, viewer_session_id, major, minor
436 )
437 elif cmd_type == 2:
438 return _LttngLiveViewerGetTracingSessionInfosCommand(version)
439 elif cmd_type == 3:
f5567ea8 440 tracing_session_id, offset, seek_type = self._unpack_payload("QQI", data)
584af91e
PP
441 return _LttngLiveViewerAttachToTracingSessionCommand(
442 version, tracing_session_id, offset, seek_type
443 )
444 elif cmd_type == 4:
f5567ea8 445 (stream_id,) = self._unpack_payload("Q", data)
584af91e
PP
446 return _LttngLiveViewerGetNextDataStreamIndexEntryCommand(
447 version, stream_id
448 )
449 elif cmd_type == 5:
f5567ea8 450 stream_id, offset, req_length = self._unpack_payload("QQI", data)
584af91e
PP
451 return _LttngLiveViewerGetDataStreamPacketDataCommand(
452 version, stream_id, offset, req_length
453 )
454 elif cmd_type == 6:
f5567ea8 455 (stream_id,) = self._unpack_payload("Q", data)
584af91e
PP
456 return _LttngLiveViewerGetMetadataStreamDataCommand(version, stream_id)
457 elif cmd_type == 7:
f5567ea8 458 (tracing_session_id,) = self._unpack_payload("Q", data)
584af91e
PP
459 return _LttngLiveViewerGetNewStreamInfosCommand(version, tracing_session_id)
460 elif cmd_type == 8:
461 return _LttngLiveViewerCreateViewerSessionCommand(version)
462 elif cmd_type == 9:
f5567ea8 463 (tracing_session_id,) = self._unpack_payload("Q", data)
584af91e
PP
464 return _LttngLiveViewerDetachFromTracingSessionCommand(
465 version, tracing_session_id
466 )
467 else:
f5567ea8 468 raise UnexpectedInput("Unknown command type {}".format(cmd_type))
584af91e
PP
469
470 def _pack(self, fmt, *args):
471 # Force network byte order
f5567ea8 472 return struct.pack("!" + fmt, *args)
584af91e
PP
473
474 def _encode_zero_padded_str(self, string, length):
475 data = string.encode()
f5567ea8 476 return data.ljust(length, b"\x00")
584af91e
PP
477
478 def _encode_stream_info(self, info):
f5567ea8 479 data = self._pack("QQI", info.id, info.trace_id, int(info.is_metadata))
584af91e
PP
480 data += self._encode_zero_padded_str(info.path, 4096)
481 data += self._encode_zero_padded_str(info.channel_name, 255)
482 return data
483
484 def _get_has_new_stuff_flags(self, has_new_metadata, has_new_data_streams):
485 flags = 0
486
487 if has_new_metadata:
488 flags |= 1
489
490 if has_new_data_streams:
491 flags |= 2
492
493 return flags
494
495 def encode(self, reply):
496 if type(reply) is _LttngLiveViewerConnectReply:
497 data = self._pack(
f5567ea8 498 "QIII", reply.viewer_session_id, reply.major, reply.minor, 2
584af91e
PP
499 )
500 elif type(reply) is _LttngLiveViewerGetTracingSessionInfosReply:
f5567ea8 501 data = self._pack("I", len(reply.tracing_session_infos))
584af91e
PP
502
503 for info in reply.tracing_session_infos:
504 data += self._pack(
f5567ea8 505 "QIII",
584af91e
PP
506 info.tracing_session_id,
507 info.live_timer_freq,
508 info.client_count,
509 info.stream_count,
510 )
511 data += self._encode_zero_padded_str(info.hostname, 64)
512 data += self._encode_zero_padded_str(info.name, 255)
513 elif type(reply) is _LttngLiveViewerAttachToTracingSessionReply:
f5567ea8 514 data = self._pack("II", reply.status, len(reply.stream_infos))
584af91e
PP
515
516 for info in reply.stream_infos:
517 data += self._encode_stream_info(info)
518 elif type(reply) is _LttngLiveViewerGetNextDataStreamIndexEntryReply:
f5567ea8 519 index_format = "QQQQQQQII"
584af91e
PP
520 entry = reply.index_entry
521 flags = self._get_has_new_stuff_flags(
522 reply.has_new_metadata, reply.has_new_data_stream
523 )
524
71f56e5f
JG
525 if type(entry) is _LttngDataStreamIndexEntry:
526 data = self._pack(
527 index_format,
528 entry.offset_bytes,
529 entry.total_size_bits,
530 entry.content_size_bits,
531 entry.timestamp_begin,
532 entry.timestamp_end,
533 entry.events_discarded,
534 entry.stream_class_id,
535 reply.status,
536 flags,
537 )
538 else:
539 assert type(entry) is _LttngDataStreamBeaconEntry
540 data = self._pack(
541 index_format,
542 0,
543 0,
544 0,
545 0,
546 entry.timestamp,
547 0,
548 entry.stream_class_id,
549 reply.status,
550 flags,
551 )
584af91e
PP
552 elif type(reply) is _LttngLiveViewerGetDataStreamPacketDataReply:
553 flags = self._get_has_new_stuff_flags(
554 reply.has_new_metadata, reply.has_new_data_stream
555 )
f5567ea8 556 data = self._pack("III", reply.status, len(reply.data), flags)
584af91e
PP
557 data += reply.data
558 elif type(reply) is _LttngLiveViewerGetMetadataStreamDataContentReply:
f5567ea8 559 data = self._pack("QI", len(reply.data), reply.status)
584af91e
PP
560 data += reply.data
561 elif type(reply) is _LttngLiveViewerGetNewStreamInfosReply:
f5567ea8 562 data = self._pack("II", reply.status, len(reply.stream_infos))
584af91e
PP
563
564 for info in reply.stream_infos:
565 data += self._encode_stream_info(info)
566 elif type(reply) is _LttngLiveViewerCreateViewerSessionReply:
f5567ea8 567 data = self._pack("I", reply.status)
584af91e 568 elif type(reply) is _LttngLiveViewerDetachFromTracingSessionReply:
f5567ea8 569 data = self._pack("I", reply.status)
584af91e
PP
570 else:
571 raise ValueError(
f5567ea8 572 "Unknown reply object with class `{}`".format(reply.__class__.__name__)
584af91e
PP
573 )
574
575 return data
576
577
578# An entry within the index of an LTTng data stream.
579class _LttngDataStreamIndexEntry:
580 def __init__(
581 self,
582 offset_bytes,
583 total_size_bits,
584 content_size_bits,
585 timestamp_begin,
586 timestamp_end,
587 events_discarded,
588 stream_class_id,
589 ):
590 self._offset_bytes = offset_bytes
591 self._total_size_bits = total_size_bits
592 self._content_size_bits = content_size_bits
593 self._timestamp_begin = timestamp_begin
594 self._timestamp_end = timestamp_end
595 self._events_discarded = events_discarded
596 self._stream_class_id = stream_class_id
597
598 @property
599 def offset_bytes(self):
600 return self._offset_bytes
601
602 @property
603 def total_size_bits(self):
604 return self._total_size_bits
605
606 @property
607 def total_size_bytes(self):
608 return self._total_size_bits // 8
609
610 @property
611 def content_size_bits(self):
612 return self._content_size_bits
613
614 @property
615 def content_size_bytes(self):
616 return self._content_size_bits // 8
617
618 @property
619 def timestamp_begin(self):
620 return self._timestamp_begin
621
622 @property
623 def timestamp_end(self):
624 return self._timestamp_end
625
626 @property
627 def events_discarded(self):
628 return self._events_discarded
629
630 @property
631 def stream_class_id(self):
632 return self._stream_class_id
633
634
71f56e5f
JG
635# An entry within the index of an LTTng data stream. While a stream beacon entry
636# is conceptually unrelated to an index, it is sent as a reply to a
637# LttngLiveViewerGetNextDataStreamIndexEntryCommand
638class _LttngDataStreamBeaconEntry:
639 def __init__(self, stream_class_id, timestamp):
640 self._stream_class_id = stream_class_id
641 self._timestamp = timestamp
642
643 @property
644 def timestamp(self):
645 return self._timestamp
646
647 @property
648 def stream_class_id(self):
649 return self._stream_class_id
650
651
78169723
FD
652def _get_entry_timestamp_begin(entry):
653 if type(entry) is _LttngDataStreamBeaconEntry:
654 return entry.timestamp
655 else:
656 assert type(entry) is _LttngDataStreamIndexEntry
657 return entry.timestamp_begin
658
659
584af91e
PP
660# The index of an LTTng data stream, a sequence of index entries.
661class _LttngDataStreamIndex(collections.abc.Sequence):
71f56e5f 662 def __init__(self, path, beacons):
584af91e
PP
663 self._path = path
664 self._build()
71f56e5f
JG
665
666 if beacons:
667 stream_class_id = self._entries[0].stream_class_id
668 beacons = [
669 _LttngDataStreamBeaconEntry(stream_class_id, ts) for ts in beacons
670 ]
671 self._add_beacons(beacons)
672
584af91e
PP
673 logging.info(
674 'Built data stream index entries: path="{}", count={}'.format(
675 path, len(self._entries)
676 )
677 )
678
679 def _build(self):
680 self._entries = []
681 assert os.path.isfile(self._path)
682
f5567ea8 683 with open(self._path, "rb") as f:
584af91e 684 # Read header first
f5567ea8 685 fmt = ">IIII"
584af91e
PP
686 size = struct.calcsize(fmt)
687 data = f.read(size)
688 assert len(data) == size
689 magic, index_major, index_minor, index_entry_length = struct.unpack(
690 fmt, data
691 )
692 assert magic == 0xC1F1DCC1
693
694 # Read index entries
f5567ea8 695 fmt = ">QQQQQQQ"
584af91e
PP
696 size = struct.calcsize(fmt)
697
698 while True:
699 logging.debug(
700 'Decoding data stream index entry: path="{}", offset={}'.format(
701 self._path, f.tell()
702 )
703 )
704 data = f.read(size)
705
706 if not data:
707 # Done
708 break
709
710 assert len(data) == size
75882e97
FD
711 (
712 offset_bytes,
713 total_size_bits,
714 content_size_bits,
715 timestamp_begin,
716 timestamp_end,
717 events_discarded,
718 stream_class_id,
719 ) = struct.unpack(fmt, data)
584af91e
PP
720
721 self._entries.append(
722 _LttngDataStreamIndexEntry(
723 offset_bytes,
724 total_size_bits,
725 content_size_bits,
726 timestamp_begin,
727 timestamp_end,
728 events_discarded,
729 stream_class_id,
730 )
731 )
732
733 # Skip anything else before the next entry
734 f.seek(index_entry_length - size, os.SEEK_CUR)
735
71f56e5f
JG
736 def _add_beacons(self, beacons):
737 # Assumes entries[n + 1].timestamp_end >= entries[n].timestamp_begin
738 def sort_key(entry):
739 if type(entry) is _LttngDataStreamBeaconEntry:
740 return entry.timestamp
741 else:
742 return entry.timestamp_end
743
744 self._entries += beacons
745 self._entries.sort(key=sort_key)
746
584af91e
PP
747 def __getitem__(self, index):
748 return self._entries[index]
749
750 def __len__(self):
751 return len(self._entries)
752
753 @property
754 def path(self):
755 return self._path
756
757
758# An LTTng data stream.
759class _LttngDataStream:
71f56e5f 760 def __init__(self, path, beacons):
584af91e
PP
761 self._path = path
762 filename = os.path.basename(path)
f5567ea8 763 match = re.match(r"(.*)_\d+", filename)
584af91e
PP
764 self._channel_name = match.group(1)
765 trace_dir = os.path.dirname(path)
f5567ea8 766 index_path = os.path.join(trace_dir, "index", filename + ".idx")
71f56e5f 767 self._index = _LttngDataStreamIndex(index_path, beacons)
584af91e 768 assert os.path.isfile(path)
f5567ea8 769 self._file = open(path, "rb")
584af91e
PP
770 logging.info(
771 'Built data stream: path="{}", channel-name="{}"'.format(
772 path, self._channel_name
773 )
774 )
775
776 @property
777 def path(self):
778 return self._path
779
780 @property
781 def channel_name(self):
782 return self._channel_name
783
784 @property
785 def index(self):
786 return self._index
787
788 def get_data(self, offset_bytes, len_bytes):
789 self._file.seek(offset_bytes)
790 return self._file.read(len_bytes)
791
792
78169723
FD
793class _LttngMetadataStreamSection:
794 def __init__(self, timestamp, data):
795 self._timestamp = timestamp
796 if data is None:
797 self._data = bytes()
798 else:
799 self._data = data
800 logging.info(
f5567ea8 801 "Built metadata stream section: ts={}, data-len={}".format(
78169723
FD
802 self._timestamp, len(self._data)
803 )
804 )
805
806 @property
807 def timestamp(self):
808 return self._timestamp
809
810 @property
811 def data(self):
812 return self._data
813
814
584af91e
PP
815# An LTTng metadata stream.
816class _LttngMetadataStream:
78169723
FD
817 def __init__(self, metadata_file_path, config_sections):
818 self._path = metadata_file_path
819 self._sections = config_sections
820 logging.info(
f5567ea8 821 "Built metadata stream: path={}, section-len={}".format(
78169723
FD
822 self._path, len(self._sections)
823 )
824 )
584af91e
PP
825
826 @property
827 def path(self):
828 return self._path
829
830 @property
78169723
FD
831 def sections(self):
832 return self._sections
584af91e 833
78169723
FD
834
835LttngMetadataConfigSection = namedtuple(
f5567ea8 836 "LttngMetadataConfigSection", ["line", "timestamp", "is_empty"]
78169723
FD
837)
838
839
840def _parse_metadata_sections_config(config_sections):
841 assert config_sections is not None
842 config_metadata_sections = []
843 append_empty_section = False
844 last_timestamp = 0
845 last_line = 0
846
847 for config_section in config_sections:
f5567ea8 848 if config_section == "empty":
78169723
FD
849 # Found a empty section marker. Actually append the section at the
850 # timestamp of the next concrete section.
851 append_empty_section = True
852 else:
853 assert type(config_section) is dict
f5567ea8
FD
854 line = config_section.get("line")
855 ts = config_section.get("timestamp")
78169723
FD
856
857 # Sections' timestamps and lines must both be increasing.
858 assert ts > last_timestamp
859 last_timestamp = ts
860 assert line > last_line
861 last_line = line
862
863 if append_empty_section:
864 config_metadata_sections.append(
865 LttngMetadataConfigSection(line, ts, True)
866 )
867 append_empty_section = False
868
869 config_metadata_sections.append(LttngMetadataConfigSection(line, ts, False))
870
871 return config_metadata_sections
872
873
874def _split_metadata_sections(metadata_file_path, raw_config_sections):
875 assert isinstance(raw_config_sections, collections.abc.Sequence)
876
877 parsed_sections = _parse_metadata_sections_config(raw_config_sections)
878
879 sections = []
f5567ea8 880 with open(metadata_file_path, "r") as metadata_file:
78169723
FD
881 metadata_lines = [line for line in metadata_file]
882
883 config_metadata_sections_idx = 0
884 curr_metadata_section = bytearray()
885
886 for idx, line_content in enumerate(metadata_lines):
887 # Add one to the index to convert from the zero-indexing of the
888 # enumerate() function to the one-indexing used by humans when
889 # viewing a text file.
890 curr_line_number = idx + 1
891
892 # If there are no more sections, simply append the line.
893 if config_metadata_sections_idx + 1 >= len(parsed_sections):
f5567ea8 894 curr_metadata_section += bytearray(line_content, "utf8")
78169723
FD
895 continue
896
897 next_section_line_number = parsed_sections[
898 config_metadata_sections_idx + 1
899 ].line
900
901 # If the next section begins at the current line, create a
902 # section with the metadata we gathered so far.
903 if curr_line_number >= next_section_line_number:
78169723
FD
904 # Flushing the metadata of the current section.
905 sections.append(
906 _LttngMetadataStreamSection(
907 parsed_sections[config_metadata_sections_idx].timestamp,
908 bytes(curr_metadata_section),
909 )
910 )
911
912 # Move to the next section.
913 config_metadata_sections_idx += 1
914
915 # Clear old content and append current line for the next section.
916 curr_metadata_section.clear()
f5567ea8 917 curr_metadata_section += bytearray(line_content, "utf8")
78169723
FD
918
919 # Append any empty sections.
920 while parsed_sections[config_metadata_sections_idx].is_empty:
921 sections.append(
922 _LttngMetadataStreamSection(
923 parsed_sections[config_metadata_sections_idx].timestamp, None
924 )
925 )
926 config_metadata_sections_idx += 1
927 else:
928 # Append line_content to the current metadata section.
f5567ea8 929 curr_metadata_section += bytearray(line_content, "utf8")
78169723
FD
930
931 # We iterated over all the lines of the metadata file. Close the current section.
932 sections.append(
933 _LttngMetadataStreamSection(
934 parsed_sections[config_metadata_sections_idx].timestamp,
935 bytes(curr_metadata_section),
936 )
937 )
938
939 return sections
584af91e
PP
940
941
942# An LTTng trace, a sequence of LTTng data streams.
943class LttngTrace(collections.abc.Sequence):
78169723 944 def __init__(self, trace_dir, metadata_sections, beacons):
584af91e
PP
945 assert os.path.isdir(trace_dir)
946 self._path = trace_dir
78169723 947 self._create_metadata_stream(trace_dir, metadata_sections)
71f56e5f 948 self._create_data_streams(trace_dir, beacons)
584af91e
PP
949 logging.info('Built trace: path="{}"'.format(trace_dir))
950
71f56e5f 951 def _create_data_streams(self, trace_dir, beacons):
584af91e
PP
952 data_stream_paths = []
953
954 for filename in os.listdir(trace_dir):
955 path = os.path.join(trace_dir, filename)
956
957 if not os.path.isfile(path):
958 continue
959
f5567ea8 960 if filename.startswith("."):
584af91e
PP
961 continue
962
f5567ea8 963 if filename == "metadata":
584af91e
PP
964 continue
965
966 data_stream_paths.append(path)
967
968 data_stream_paths.sort()
969 self._data_streams = []
970
971 for data_stream_path in data_stream_paths:
71f56e5f
JG
972 stream_name = os.path.basename(data_stream_path)
973 this_stream_beacons = None
974
975 if beacons is not None and stream_name in beacons:
976 this_stream_beacons = beacons[stream_name]
977
978 self._data_streams.append(
979 _LttngDataStream(data_stream_path, this_stream_beacons)
980 )
584af91e 981
78169723 982 def _create_metadata_stream(self, trace_dir, config_metadata_sections):
f5567ea8 983 metadata_path = os.path.join(trace_dir, "metadata")
78169723
FD
984 metadata_sections = []
985
986 if config_metadata_sections is None:
f5567ea8 987 with open(metadata_path, "rb") as metadata_file:
78169723
FD
988 metadata_sections.append(
989 _LttngMetadataStreamSection(0, metadata_file.read())
990 )
991 else:
992 metadata_sections = _split_metadata_sections(
993 metadata_path, config_metadata_sections
994 )
995
996 self._metadata_stream = _LttngMetadataStream(metadata_path, metadata_sections)
997
584af91e
PP
998 @property
999 def path(self):
1000 return self._path
1001
1002 @property
1003 def metadata_stream(self):
1004 return self._metadata_stream
1005
1006 def __getitem__(self, index):
1007 return self._data_streams[index]
1008
1009 def __len__(self):
1010 return len(self._data_streams)
1011
1012
1013# The state of a single data stream.
1014class _LttngLiveViewerSessionDataStreamState:
78169723 1015 def __init__(self, ts_state, info, data_stream, metadata_stream_id):
584af91e
PP
1016 self._ts_state = ts_state
1017 self._info = info
1018 self._data_stream = data_stream
78169723 1019 self._metadata_stream_id = metadata_stream_id
584af91e
PP
1020 self._cur_index_entry_index = 0
1021 fmt = 'Built data stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
1022 logging.info(
1023 fmt.format(
1024 info.id,
1025 ts_state.tracing_session_descriptor.info.tracing_session_id,
1026 ts_state.tracing_session_descriptor.info.name,
1027 data_stream.path,
1028 )
1029 )
1030
1031 @property
1032 def tracing_session_state(self):
1033 return self._ts_state
1034
1035 @property
1036 def info(self):
1037 return self._info
1038
1039 @property
1040 def data_stream(self):
1041 return self._data_stream
1042
1043 @property
1044 def cur_index_entry(self):
1045 if self._cur_index_entry_index == len(self._data_stream.index):
1046 return
1047
1048 return self._data_stream.index[self._cur_index_entry_index]
1049
1050 def goto_next_index_entry(self):
1051 self._cur_index_entry_index += 1
1052
1053
1054# The state of a single metadata stream.
1055class _LttngLiveViewerSessionMetadataStreamState:
1056 def __init__(self, ts_state, info, metadata_stream):
1057 self._ts_state = ts_state
1058 self._info = info
1059 self._metadata_stream = metadata_stream
78169723
FD
1060 self._cur_metadata_stream_section_index = 0
1061 if len(metadata_stream.sections) > 1:
1062 self._next_metadata_stream_section_timestamp = metadata_stream.sections[
1063 1
1064 ].timestamp
1065 else:
1066 self._next_metadata_stream_section_timestamp = None
1067
584af91e
PP
1068 self._is_sent = False
1069 fmt = 'Built metadata stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
1070 logging.info(
1071 fmt.format(
1072 info.id,
1073 ts_state.tracing_session_descriptor.info.tracing_session_id,
1074 ts_state.tracing_session_descriptor.info.name,
1075 metadata_stream.path,
1076 )
1077 )
1078
1079 @property
1080 def trace_session_state(self):
1081 return self._trace_session_state
1082
1083 @property
1084 def info(self):
1085 return self._info
1086
1087 @property
1088 def metadata_stream(self):
1089 return self._metadata_stream
1090
1091 @property
1092 def is_sent(self):
1093 return self._is_sent
1094
1095 @is_sent.setter
1096 def is_sent(self, value):
1097 self._is_sent = value
1098
78169723
FD
1099 @property
1100 def cur_section(self):
1101 fmt = "Get current metadata section: section-idx={}"
1102 logging.info(fmt.format(self._cur_metadata_stream_section_index))
1103 if self._cur_metadata_stream_section_index == len(
1104 self._metadata_stream.sections
1105 ):
1106 return
1107
1108 return self._metadata_stream.sections[self._cur_metadata_stream_section_index]
1109
1110 def goto_next_section(self):
1111 self._cur_metadata_stream_section_index += 1
1112 if self.cur_section:
1113 self._next_metadata_stream_section_timestamp = self.cur_section.timestamp
1114 else:
1115 self._next_metadata_stream_section_timestamp = None
1116
1117 @property
1118 def next_section_timestamp(self):
1119 return self._next_metadata_stream_section_timestamp
1120
584af91e
PP
1121
1122# The state of a tracing session.
1123class _LttngLiveViewerSessionTracingSessionState:
1124 def __init__(self, tc_descr, base_stream_id):
1125 self._tc_descr = tc_descr
1126 self._stream_infos = []
1127 self._ds_states = {}
1128 self._ms_states = {}
1129 stream_id = base_stream_id
1130
1131 for trace in tc_descr.traces:
1132 trace_id = stream_id * 1000
1133
78169723
FD
1134 # Metadata stream -> stream info and metadata stream state
1135 info = _LttngLiveViewerStreamInfo(
f5567ea8 1136 stream_id, trace_id, True, trace.metadata_stream.path, "metadata"
78169723
FD
1137 )
1138 self._stream_infos.append(info)
1139 self._ms_states[stream_id] = _LttngLiveViewerSessionMetadataStreamState(
1140 self, info, trace.metadata_stream
1141 )
1142 metadata_stream_id = stream_id
1143 stream_id += 1
1144
584af91e
PP
1145 # Data streams -> stream infos and data stream states
1146 for data_stream in trace:
1147 info = _LttngLiveViewerStreamInfo(
1148 stream_id,
1149 trace_id,
1150 False,
1151 data_stream.path,
1152 data_stream.channel_name,
1153 )
1154 self._stream_infos.append(info)
1155 self._ds_states[stream_id] = _LttngLiveViewerSessionDataStreamState(
78169723 1156 self, info, data_stream, metadata_stream_id
584af91e
PP
1157 )
1158 stream_id += 1
1159
584af91e
PP
1160 self._is_attached = False
1161 fmt = 'Built tracing session state: id={}, name="{}"'
1162 logging.info(fmt.format(tc_descr.info.tracing_session_id, tc_descr.info.name))
1163
1164 @property
1165 def tracing_session_descriptor(self):
1166 return self._tc_descr
1167
1168 @property
1169 def data_stream_states(self):
1170 return self._ds_states
1171
1172 @property
1173 def metadata_stream_states(self):
1174 return self._ms_states
1175
1176 @property
1177 def stream_infos(self):
1178 return self._stream_infos
1179
1180 @property
1181 def has_new_metadata(self):
1182 return any([not ms.is_sent for ms in self._ms_states.values()])
1183
1184 @property
1185 def is_attached(self):
1186 return self._is_attached
1187
1188 @is_attached.setter
1189 def is_attached(self, value):
1190 self._is_attached = value
1191
1192
78169723
FD
1193def needs_new_metadata_section(metadata_stream_state, latest_timestamp):
1194 if metadata_stream_state.next_section_timestamp is None:
1195 return False
1196
1197 if latest_timestamp >= metadata_stream_state.next_section_timestamp:
1198 return True
1199 else:
1200 return False
1201
1202
584af91e
PP
1203# An LTTng live viewer session manages a view on tracing sessions
1204# and replies to commands accordingly.
1205class _LttngLiveViewerSession:
1206 def __init__(
1207 self,
1208 viewer_session_id,
1209 tracing_session_descriptors,
1210 max_query_data_response_size,
1211 ):
1212 self._viewer_session_id = viewer_session_id
1213 self._ts_states = {}
1214 self._stream_states = {}
1215 self._max_query_data_response_size = max_query_data_response_size
1216 total_stream_infos = 0
1217
1218 for ts_descr in tracing_session_descriptors:
1219 ts_state = _LttngLiveViewerSessionTracingSessionState(
1220 ts_descr, total_stream_infos
1221 )
1222 ts_id = ts_state.tracing_session_descriptor.info.tracing_session_id
1223 self._ts_states[ts_id] = ts_state
1224 total_stream_infos += len(ts_state.stream_infos)
1225
1226 # Update session's stream states to have the new states
1227 self._stream_states.update(ts_state.data_stream_states)
1228 self._stream_states.update(ts_state.metadata_stream_states)
1229
1230 self._command_handlers = {
1231 _LttngLiveViewerAttachToTracingSessionCommand: self._handle_attach_to_tracing_session_command,
1232 _LttngLiveViewerCreateViewerSessionCommand: self._handle_create_viewer_session_command,
1233 _LttngLiveViewerDetachFromTracingSessionCommand: self._handle_detach_from_tracing_session_command,
1234 _LttngLiveViewerGetDataStreamPacketDataCommand: self._handle_get_data_stream_packet_data_command,
1235 _LttngLiveViewerGetMetadataStreamDataCommand: self._handle_get_metadata_stream_data_command,
1236 _LttngLiveViewerGetNewStreamInfosCommand: self._handle_get_new_stream_infos_command,
1237 _LttngLiveViewerGetNextDataStreamIndexEntryCommand: self._handle_get_next_data_stream_index_entry_command,
1238 _LttngLiveViewerGetTracingSessionInfosCommand: self._handle_get_tracing_session_infos_command,
1239 }
1240
1241 @property
1242 def viewer_session_id(self):
1243 return self._viewer_session_id
1244
1245 def _get_tracing_session_state(self, tracing_session_id):
1246 if tracing_session_id not in self._ts_states:
1247 raise UnexpectedInput(
f5567ea8 1248 "Unknown tracing session ID {}".format(tracing_session_id)
584af91e
PP
1249 )
1250
1251 return self._ts_states[tracing_session_id]
1252
1253 def _get_stream_state(self, stream_id):
1254 if stream_id not in self._stream_states:
f5567ea8 1255 UnexpectedInput("Unknown stream ID {}".format(stream_id))
584af91e
PP
1256
1257 return self._stream_states[stream_id]
1258
1259 def handle_command(self, cmd):
1260 logging.info(
f5567ea8 1261 "Handling command in viewer session: cmd-cls-name={}".format(
584af91e
PP
1262 cmd.__class__.__name__
1263 )
1264 )
1265 cmd_type = type(cmd)
1266
1267 if cmd_type not in self._command_handlers:
1268 raise UnexpectedInput(
f5567ea8 1269 "Unexpected command: cmd-cls-name={}".format(cmd.__class__.__name__)
584af91e
PP
1270 )
1271
1272 return self._command_handlers[cmd_type](cmd)
1273
1274 def _handle_attach_to_tracing_session_command(self, cmd):
1275 fmt = 'Handling "attach to tracing session" command: ts-id={}, offset={}, seek-type={}'
1276 logging.info(fmt.format(cmd.tracing_session_id, cmd.offset, cmd.seek_type))
1277 ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
1278 info = ts_state.tracing_session_descriptor.info
1279
1280 if ts_state.is_attached:
1281 raise UnexpectedInput(
f5567ea8 1282 "Cannot attach to tracing session `{}`: viewer is already attached".format(
584af91e
PP
1283 info.name
1284 )
1285 )
1286
1287 ts_state.is_attached = True
1288 status = _LttngLiveViewerAttachToTracingSessionReply.Status.OK
1289 return _LttngLiveViewerAttachToTracingSessionReply(
1290 status, ts_state.stream_infos
1291 )
1292
1293 def _handle_detach_from_tracing_session_command(self, cmd):
1294 fmt = 'Handling "detach from tracing session" command: ts-id={}'
1295 logging.info(fmt.format(cmd.tracing_session_id))
1296 ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
1297 info = ts_state.tracing_session_descriptor.info
1298
1299 if not ts_state.is_attached:
1300 raise UnexpectedInput(
f5567ea8 1301 "Cannot detach to tracing session `{}`: viewer is not attached".format(
584af91e
PP
1302 info.name
1303 )
1304 )
1305
1306 ts_state.is_attached = False
1307 status = _LttngLiveViewerDetachFromTracingSessionReply.Status.OK
1308 return _LttngLiveViewerDetachFromTracingSessionReply(status)
1309
1310 def _handle_get_next_data_stream_index_entry_command(self, cmd):
1311 fmt = 'Handling "get next data stream index entry" command: stream-id={}'
1312 logging.info(fmt.format(cmd.stream_id))
1313 stream_state = self._get_stream_state(cmd.stream_id)
78169723 1314 metadata_stream_state = self._get_stream_state(stream_state._metadata_stream_id)
584af91e
PP
1315
1316 if type(stream_state) is not _LttngLiveViewerSessionDataStreamState:
1317 raise UnexpectedInput(
f5567ea8 1318 "Stream with ID {} is not a data stream".format(cmd.stream_id)
584af91e
PP
1319 )
1320
1321 if stream_state.cur_index_entry is None:
1322 # The viewer is done reading this stream
1323 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.HUP
1324
1325 # Dummy data stream index entry to use with the `HUP` status
1326 # (the reply needs one, but the viewer ignores it)
1327 index_entry = _LttngDataStreamIndexEntry(0, 0, 0, 0, 0, 0, 0)
1328
1329 return _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1330 status, index_entry, False, False
1331 )
1332
78169723
FD
1333 timestamp_begin = _get_entry_timestamp_begin(stream_state.cur_index_entry)
1334
1335 if needs_new_metadata_section(metadata_stream_state, timestamp_begin):
1336 metadata_stream_state.is_sent = False
1337 metadata_stream_state.goto_next_section()
1338
584af91e
PP
1339 # The viewer only checks the `has_new_metadata` flag if the
1340 # reply's status is `OK`, so we need to provide an index here
1341 has_new_metadata = stream_state.tracing_session_state.has_new_metadata
71f56e5f
JG
1342 if type(stream_state.cur_index_entry) is _LttngDataStreamIndexEntry:
1343 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.OK
1344 else:
1345 assert type(stream_state.cur_index_entry) is _LttngDataStreamBeaconEntry
1346 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.INACTIVE
1347
584af91e
PP
1348 reply = _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1349 status, stream_state.cur_index_entry, has_new_metadata, False
1350 )
1351 stream_state.goto_next_index_entry()
1352 return reply
1353
1354 def _handle_get_data_stream_packet_data_command(self, cmd):
1355 fmt = 'Handling "get data stream packet data" command: stream-id={}, offset={}, req-length={}'
1356 logging.info(fmt.format(cmd.stream_id, cmd.offset, cmd.req_length))
1357 stream_state = self._get_stream_state(cmd.stream_id)
1358 data_response_length = cmd.req_length
1359
1360 if type(stream_state) is not _LttngLiveViewerSessionDataStreamState:
1361 raise UnexpectedInput(
f5567ea8 1362 "Stream with ID {} is not a data stream".format(cmd.stream_id)
584af91e
PP
1363 )
1364
1365 if stream_state.tracing_session_state.has_new_metadata:
1366 status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.ERROR
1367 return _LttngLiveViewerGetDataStreamPacketDataReply(
1368 status, bytes(), True, False
1369 )
1370
1371 if self._max_query_data_response_size:
1372 # Enforce a server side limit on the query requested length.
1373 # To ensure that the transaction terminate take the minimum of both
1374 # value.
1375 data_response_length = min(
1376 cmd.req_length, self._max_query_data_response_size
1377 )
1378 fmt = 'Limiting "get data stream packet data" command: req-length={} actual response size={}'
1379 logging.info(fmt.format(cmd.req_length, data_response_length))
1380
1381 data = stream_state.data_stream.get_data(cmd.offset, data_response_length)
1382 status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.OK
1383 return _LttngLiveViewerGetDataStreamPacketDataReply(status, data, False, False)
1384
1385 def _handle_get_metadata_stream_data_command(self, cmd):
1386 fmt = 'Handling "get metadata stream data" command: stream-id={}'
1387 logging.info(fmt.format(cmd.stream_id))
78169723 1388 metadata_stream_state = self._get_stream_state(cmd.stream_id)
584af91e 1389
78169723
FD
1390 if (
1391 type(metadata_stream_state)
1392 is not _LttngLiveViewerSessionMetadataStreamState
1393 ):
584af91e 1394 raise UnexpectedInput(
f5567ea8 1395 "Stream with ID {} is not a metadata stream".format(cmd.stream_id)
584af91e
PP
1396 )
1397
78169723 1398 if metadata_stream_state.is_sent:
584af91e
PP
1399 status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.NO_NEW
1400 return _LttngLiveViewerGetMetadataStreamDataContentReply(status, bytes())
1401
78169723 1402 metadata_stream_state.is_sent = True
584af91e 1403 status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.OK
78169723
FD
1404 metadata_section = metadata_stream_state.cur_section
1405
1406 # If we are sending an empty section, ready the next one right away.
1407 if len(metadata_section.data) == 0:
1408 metadata_stream_state.is_sent = False
1409 metadata_stream_state.goto_next_section()
1410
1411 fmt = 'Replying to "get metadata stream data" command: metadata-size={}'
1412 logging.info(fmt.format(len(metadata_section.data)))
584af91e 1413 return _LttngLiveViewerGetMetadataStreamDataContentReply(
78169723 1414 status, metadata_section.data
584af91e
PP
1415 )
1416
1417 def _handle_get_new_stream_infos_command(self, cmd):
1418 fmt = 'Handling "get new stream infos" command: ts-id={}'
1419 logging.info(fmt.format(cmd.tracing_session_id))
1420
1421 # As of this version, all the tracing session's stream infos are
1422 # always given to the viewer when sending the "attach to tracing
1423 # session" reply, so there's nothing new here. Return the `HUP`
1424 # status as, if we're handling this command, the viewer consumed
1425 # all the existing data streams.
1426 status = _LttngLiveViewerGetNewStreamInfosReply.Status.HUP
1427 return _LttngLiveViewerGetNewStreamInfosReply(status, [])
1428
1429 def _handle_get_tracing_session_infos_command(self, cmd):
1430 logging.info('Handling "get tracing session infos" command.')
1431 infos = [
1432 tss.tracing_session_descriptor.info for tss in self._ts_states.values()
1433 ]
1434 infos.sort(key=lambda info: info.name)
1435 return _LttngLiveViewerGetTracingSessionInfosReply(infos)
1436
1437 def _handle_create_viewer_session_command(self, cmd):
1438 logging.info('Handling "create viewer session" command.')
1439 status = _LttngLiveViewerCreateViewerSessionReply.Status.OK
1440
1441 # This does nothing here. In the LTTng relay daemon, it
1442 # allocates the viewer session's state.
1443 return _LttngLiveViewerCreateViewerSessionReply(status)
1444
1445
1446# An LTTng live TCP server.
1447#
e51141d3
SM
1448# On creation, it binds to `localhost` on the TCP port `port` if not `None`, or
1449# on an OS-assigned TCP port otherwise. It writes the decimal TCP port number
1450# to a temporary port file. It renames the temporary port file to
1451# `port_filename`.
584af91e
PP
1452#
1453# `tracing_session_descriptors` is a list of tracing session descriptors
1454# (`LttngTracingSessionDescriptor`) to serve.
1455#
1456# This server accepts a single viewer (client).
1457#
1458# When the viewer closes the connection, the server's constructor
1459# returns.
1460class LttngLiveServer:
1461 def __init__(
e51141d3
SM
1462 self,
1463 port,
1464 port_filename,
1465 tracing_session_descriptors,
1466 max_query_data_response_size,
584af91e 1467 ):
f5567ea8 1468 logging.info("Server configuration:")
584af91e 1469
f5567ea8 1470 logging.info(" Port file name: `{}`".format(port_filename))
584af91e
PP
1471
1472 if max_query_data_response_size is not None:
1473 logging.info(
f5567ea8 1474 " Maximum response data query size: `{}`".format(
584af91e
PP
1475 max_query_data_response_size
1476 )
1477 )
1478
1479 for ts_descr in tracing_session_descriptors:
1480 info = ts_descr.info
1481 fmt = ' TS descriptor: name="{}", id={}, hostname="{}", live-timer-freq={}, client-count={}, stream-count={}:'
1482 logging.info(
1483 fmt.format(
1484 info.name,
1485 info.tracing_session_id,
1486 info.hostname,
1487 info.live_timer_freq,
1488 info.client_count,
1489 info.stream_count,
1490 )
1491 )
1492
1493 for trace in ts_descr.traces:
1494 logging.info(' Trace: path="{}"'.format(trace.path))
1495
1496 self._ts_descriptors = tracing_session_descriptors
1497 self._max_query_data_response_size = max_query_data_response_size
1498 self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1499 self._codec = _LttngLiveViewerProtocolCodec()
1500
1501 # Port 0: OS assigns an unused port
e51141d3 1502 serv_addr = ("localhost", port if port is not None else 0)
584af91e
PP
1503 self._sock.bind(serv_addr)
1504 self._write_port_to_file(port_filename)
1505
1506 try:
1507 self._listen()
1508 finally:
1509 self._sock.close()
f5567ea8 1510 logging.info("Closed connection and socket.")
584af91e
PP
1511
1512 @property
1513 def _server_port(self):
1514 return self._sock.getsockname()[1]
1515
1516 def _recv_command(self):
1517 data = bytes()
1518
1519 while True:
f5567ea8 1520 logging.info("Waiting for viewer command.")
584af91e
PP
1521 buf = self._conn.recv(128)
1522
1523 if not buf:
f5567ea8 1524 logging.info("Client closed connection.")
584af91e
PP
1525
1526 if data:
1527 raise UnexpectedInput(
f5567ea8 1528 "Client closed connection after having sent {} command bytes.".format(
584af91e
PP
1529 len(data)
1530 )
1531 )
1532
1533 return
1534
f5567ea8 1535 logging.info("Received data from viewer: length={}".format(len(buf)))
584af91e
PP
1536
1537 data += buf
1538
1539 try:
1540 cmd = self._codec.decode(data)
1541 except struct.error as exc:
f5567ea8 1542 raise UnexpectedInput("Malformed command: {}".format(exc)) from exc
584af91e
PP
1543
1544 if cmd is not None:
1545 logging.info(
f5567ea8 1546 "Received command from viewer: cmd-cls-name={}".format(
584af91e
PP
1547 cmd.__class__.__name__
1548 )
1549 )
1550 return cmd
1551
1552 def _send_reply(self, reply):
1553 data = self._codec.encode(reply)
1554 logging.info(
f5567ea8 1555 "Sending reply to viewer: reply-cls-name={}, length={}".format(
584af91e
PP
1556 reply.__class__.__name__, len(data)
1557 )
1558 )
1559 self._conn.sendall(data)
1560
1561 def _handle_connection(self):
1562 # First command must be "connect"
1563 cmd = self._recv_command()
1564
1565 if type(cmd) is not _LttngLiveViewerConnectCommand:
1566 raise UnexpectedInput(
1567 'First command is not "connect": cmd-cls-name={}'.format(
1568 cmd.__class__.__name__
1569 )
1570 )
1571
1572 # Create viewer session (arbitrary ID 23)
1573 logging.info(
f5567ea8 1574 "LTTng live viewer connected: version={}.{}".format(cmd.major, cmd.minor)
584af91e
PP
1575 )
1576 viewer_session = _LttngLiveViewerSession(
1577 23, self._ts_descriptors, self._max_query_data_response_size
1578 )
1579
1580 # Send "connect" reply
1581 self._send_reply(
1582 _LttngLiveViewerConnectReply(viewer_session.viewer_session_id, 2, 10)
1583 )
1584
1585 # Make the viewer session handle the remaining commands
1586 while True:
1587 cmd = self._recv_command()
1588
1589 if cmd is None:
1590 # Connection closed (at an expected location within the
1591 # conversation)
1592 return
1593
1594 self._send_reply(viewer_session.handle_command(cmd))
1595
1596 def _listen(self):
f5567ea8 1597 logging.info("Listening: port={}".format(self._server_port))
1726ac08
JR
1598 # Backlog must be present for Python version < 3.5.
1599 # 128 is an arbitrary number since we expect only 1 connection anyway.
1600 self._sock.listen(128)
584af91e
PP
1601 self._conn, viewer_addr = self._sock.accept()
1602 logging.info(
f5567ea8 1603 "Accepted viewer: addr={}:{}".format(viewer_addr[0], viewer_addr[1])
584af91e
PP
1604 )
1605
1606 try:
1607 self._handle_connection()
1608 finally:
1609 self._conn.close()
1610
1611 def _write_port_to_file(self, port_filename):
1612 # Write the port number to a temporary file.
f5567ea8
FD
1613 with tempfile.NamedTemporaryFile(mode="w", delete=False) as tmp_port_file:
1614 print(self._server_port, end="", file=tmp_port_file)
584af91e
PP
1615
1616 # Rename temporary file to real file
9c878ece 1617 os.replace(tmp_port_file.name, port_filename)
584af91e
PP
1618 logging.info(
1619 'Renamed port file: src-path="{}", dst-path="{}"'.format(
1620 tmp_port_file.name, port_filename
1621 )
1622 )
1623
1624
1625# A tracing session descriptor.
1626#
1627# In the constructor, `traces` is a list of LTTng traces (`LttngTrace`
1628# objects).
1629class LttngTracingSessionDescriptor:
1630 def __init__(
1631 self, name, tracing_session_id, hostname, live_timer_freq, client_count, traces
1632 ):
1633 for trace in traces:
1634 if name not in trace.path:
f5567ea8 1635 fmt = "Tracing session name must be part of every trace path (`{}` not found in `{}`)"
584af91e
PP
1636 raise ValueError(fmt.format(name, trace.path))
1637
1638 self._traces = traces
1639 stream_count = sum([len(t) + 1 for t in traces])
1640 self._info = _LttngLiveViewerTracingSessionInfo(
1641 tracing_session_id,
1642 live_timer_freq,
1643 client_count,
1644 stream_count,
1645 hostname,
1646 name,
1647 )
1648
1649 @property
1650 def traces(self):
1651 return self._traces
1652
1653 @property
1654 def info(self):
1655 return self._info
1656
1657
2b763e29
JG
1658def _session_descriptors_from_path(sessions_filename, trace_path_prefix):
1659 # File format is:
1660 #
1661 # [
1662 # {
1663 # "name": "my-session",
1664 # "id": 17,
1665 # "hostname": "myhost",
1666 # "live-timer-freq": 1000000,
1667 # "client-count": 23,
1668 # "traces": [
1669 # {
1670 # "path": "lol"
1671 # },
1672 # {
71f56e5f
JG
1673 # "path": "meow/mix",
1674 # "beacons": {
1675 # "my_stream": [ 5235787, 728375283 ]
78169723
FD
1676 # },
1677 # "metadata-sections": [
1678 # {
1679 # "line": 1,
1680 # "timestamp": 0
1681 # }
1682 # ]
2b763e29
JG
1683 # }
1684 # ]
1685 # }
1686 # ]
f5567ea8 1687 with open(sessions_filename, "r") as sessions_file:
2b763e29
JG
1688 params = json.load(sessions_file)
1689
1690 sessions = []
1691
1692 for session in params:
f5567ea8
FD
1693 name = session["name"]
1694 tracing_session_id = session["id"]
1695 hostname = session["hostname"]
1696 live_timer_freq = session["live-timer-freq"]
1697 client_count = session["client-count"]
2b763e29
JG
1698 traces = []
1699
f5567ea8
FD
1700 for trace in session["traces"]:
1701 metadata_sections = trace.get("metadata-sections")
1702 beacons = trace.get("beacons")
1703 path = trace["path"]
2b763e29
JG
1704
1705 if not os.path.isabs(path):
1706 path = os.path.join(trace_path_prefix, path)
1707
78169723 1708 traces.append(LttngTrace(path, metadata_sections, beacons))
2b763e29
JG
1709
1710 sessions.append(
1711 LttngTracingSessionDescriptor(
1712 name,
1713 tracing_session_id,
1714 hostname,
1715 live_timer_freq,
1716 client_count,
1717 traces,
1718 )
1719 )
1720
1721 return sessions
584af91e
PP
1722
1723
1724def _loglevel_parser(string):
f5567ea8 1725 loglevels = {"info": logging.INFO, "warning": logging.WARNING}
584af91e
PP
1726 if string not in loglevels:
1727 msg = "{} is not a valid loglevel".format(string)
1728 raise argparse.ArgumentTypeError(msg)
1729 return loglevels[string]
1730
1731
f5567ea8
FD
1732if __name__ == "__main__":
1733 logging.basicConfig(format="# %(asctime)-25s%(message)s")
584af91e 1734 parser = argparse.ArgumentParser(
f5567ea8 1735 description="LTTng-live protocol mocker", add_help=False
584af91e
PP
1736 )
1737 parser.add_argument(
f5567ea8
FD
1738 "--log-level",
1739 default="warning",
1740 choices=["info", "warning"],
1741 help="The loglevel to be used.",
584af91e
PP
1742 )
1743
1744 loglevel_namespace, remaining_args = parser.parse_known_args()
1745 logging.getLogger().setLevel(_loglevel_parser(loglevel_namespace.log_level))
1746
e51141d3
SM
1747 parser.add_argument(
1748 "--port",
1749 help="The port to bind to. If missing, use an OS-assigned port..",
1750 type=int,
1751 )
584af91e 1752 parser.add_argument(
f5567ea8
FD
1753 "--port-filename",
1754 help="The final port file. This file is present when the server is ready to receive connection.",
584af91e
PP
1755 required=True,
1756 )
1757 parser.add_argument(
f5567ea8 1758 "--max-query-data-response-size",
584af91e 1759 type=int,
f5567ea8 1760 help="The maximum size of control data response in bytes",
584af91e
PP
1761 )
1762 parser.add_argument(
f5567ea8 1763 "--trace-path-prefix",
2b763e29 1764 type=str,
f5567ea8 1765 help="Prefix to prepend to the trace paths of session configurations",
2b763e29
JG
1766 )
1767 parser.add_argument(
f5567ea8 1768 "--sessions-filename",
776a2a25 1769 type=str,
f5567ea8 1770 help="Path to a session configuration file",
584af91e
PP
1771 )
1772 parser.add_argument(
f5567ea8
FD
1773 "-h",
1774 "--help",
1775 action="help",
584af91e 1776 default=argparse.SUPPRESS,
f5567ea8 1777 help="Show this help message and exit.",
584af91e
PP
1778 )
1779
1780 args = parser.parse_args(args=remaining_args)
1781 try:
2b763e29 1782 sessions = _session_descriptors_from_path(
78169723
FD
1783 args.sessions_filename,
1784 args.trace_path_prefix,
584af91e 1785 )
e51141d3
SM
1786 LttngLiveServer(
1787 args.port, args.port_filename, sessions, args.max_query_data_response_size
1788 )
584af91e
PP
1789 except UnexpectedInput as exc:
1790 logging.error(str(exc))
1791 print(exc, file=sys.stderr)
1792 sys.exit(1)
This page took 0.243815 seconds and 4 git commands to generate.