Tests: src.ctf.lttng-live: split metadata sections
[babeltrace.git] / tests / data / plugins / src.ctf.lttng-live / lttng_live_server.py
... / ...
CommitLineData
1# SPDX-License-Identifier: MIT
2#
3# Copyright (C) 2019 Philippe Proulx <pproulx@efficios.com>
4#
5
6import argparse
7import collections.abc
8import logging
9import os
10import os.path
11import re
12import socket
13import struct
14import sys
15import tempfile
16import json
17from collections import namedtuple
18
19
20class UnexpectedInput(RuntimeError):
21 pass
22
23
24class _LttngLiveViewerCommand:
25 def __init__(self, version):
26 self._version = version
27
28 @property
29 def version(self):
30 return self._version
31
32
33class _LttngLiveViewerConnectCommand(_LttngLiveViewerCommand):
34 def __init__(self, version, viewer_session_id, major, minor):
35 super().__init__(version)
36 self._viewer_session_id = viewer_session_id
37 self._major = major
38 self._minor = minor
39
40 @property
41 def viewer_session_id(self):
42 return self._viewer_session_id
43
44 @property
45 def major(self):
46 return self._major
47
48 @property
49 def minor(self):
50 return self._minor
51
52
53class _LttngLiveViewerConnectReply:
54 def __init__(self, viewer_session_id, major, minor):
55 self._viewer_session_id = viewer_session_id
56 self._major = major
57 self._minor = minor
58
59 @property
60 def viewer_session_id(self):
61 return self._viewer_session_id
62
63 @property
64 def major(self):
65 return self._major
66
67 @property
68 def minor(self):
69 return self._minor
70
71
72class _LttngLiveViewerGetTracingSessionInfosCommand(_LttngLiveViewerCommand):
73 pass
74
75
76class _LttngLiveViewerTracingSessionInfo:
77 def __init__(
78 self,
79 tracing_session_id,
80 live_timer_freq,
81 client_count,
82 stream_count,
83 hostname,
84 name,
85 ):
86 self._tracing_session_id = tracing_session_id
87 self._live_timer_freq = live_timer_freq
88 self._client_count = client_count
89 self._stream_count = stream_count
90 self._hostname = hostname
91 self._name = name
92
93 @property
94 def tracing_session_id(self):
95 return self._tracing_session_id
96
97 @property
98 def live_timer_freq(self):
99 return self._live_timer_freq
100
101 @property
102 def client_count(self):
103 return self._client_count
104
105 @property
106 def stream_count(self):
107 return self._stream_count
108
109 @property
110 def hostname(self):
111 return self._hostname
112
113 @property
114 def name(self):
115 return self._name
116
117
118class _LttngLiveViewerGetTracingSessionInfosReply:
119 def __init__(self, tracing_session_infos):
120 self._tracing_session_infos = tracing_session_infos
121
122 @property
123 def tracing_session_infos(self):
124 return self._tracing_session_infos
125
126
127class _LttngLiveViewerAttachToTracingSessionCommand(_LttngLiveViewerCommand):
128 class SeekType:
129 BEGINNING = 1
130 LAST = 2
131
132 def __init__(self, version, tracing_session_id, offset, seek_type):
133 super().__init__(version)
134 self._tracing_session_id = tracing_session_id
135 self._offset = offset
136 self._seek_type = seek_type
137
138 @property
139 def tracing_session_id(self):
140 return self._tracing_session_id
141
142 @property
143 def offset(self):
144 return self._offset
145
146 @property
147 def seek_type(self):
148 return self._seek_type
149
150
151class _LttngLiveViewerStreamInfo:
152 def __init__(self, id, trace_id, is_metadata, path, channel_name):
153 self._id = id
154 self._trace_id = trace_id
155 self._is_metadata = is_metadata
156 self._path = path
157 self._channel_name = channel_name
158
159 @property
160 def id(self):
161 return self._id
162
163 @property
164 def trace_id(self):
165 return self._trace_id
166
167 @property
168 def is_metadata(self):
169 return self._is_metadata
170
171 @property
172 def path(self):
173 return self._path
174
175 @property
176 def channel_name(self):
177 return self._channel_name
178
179
180class _LttngLiveViewerAttachToTracingSessionReply:
181 class Status:
182 OK = 1
183 ALREADY = 2
184 UNKNOWN = 3
185 NOT_LIVE = 4
186 SEEK_ERROR = 5
187 NO_SESSION = 6
188
189 def __init__(self, status, stream_infos):
190 self._status = status
191 self._stream_infos = stream_infos
192
193 @property
194 def status(self):
195 return self._status
196
197 @property
198 def stream_infos(self):
199 return self._stream_infos
200
201
202class _LttngLiveViewerGetNextDataStreamIndexEntryCommand(_LttngLiveViewerCommand):
203 def __init__(self, version, stream_id):
204 super().__init__(version)
205 self._stream_id = stream_id
206
207 @property
208 def stream_id(self):
209 return self._stream_id
210
211
212class _LttngLiveViewerGetNextDataStreamIndexEntryReply:
213 class Status:
214 OK = 1
215 RETRY = 2
216 HUP = 3
217 ERROR = 4
218 INACTIVE = 5
219 EOF = 6
220
221 def __init__(self, status, index_entry, has_new_metadata, has_new_data_stream):
222 self._status = status
223 self._index_entry = index_entry
224 self._has_new_metadata = has_new_metadata
225 self._has_new_data_stream = has_new_data_stream
226
227 @property
228 def status(self):
229 return self._status
230
231 @property
232 def index_entry(self):
233 return self._index_entry
234
235 @property
236 def has_new_metadata(self):
237 return self._has_new_metadata
238
239 @property
240 def has_new_data_stream(self):
241 return self._has_new_data_stream
242
243
244class _LttngLiveViewerGetDataStreamPacketDataCommand(_LttngLiveViewerCommand):
245 def __init__(self, version, stream_id, offset, req_length):
246 super().__init__(version)
247 self._stream_id = stream_id
248 self._offset = offset
249 self._req_length = req_length
250
251 @property
252 def stream_id(self):
253 return self._stream_id
254
255 @property
256 def offset(self):
257 return self._offset
258
259 @property
260 def req_length(self):
261 return self._req_length
262
263
264class _LttngLiveViewerGetDataStreamPacketDataReply:
265 class Status:
266 OK = 1
267 RETRY = 2
268 ERROR = 3
269 EOF = 4
270
271 def __init__(self, status, data, has_new_metadata, has_new_data_stream):
272 self._status = status
273 self._data = data
274 self._has_new_metadata = has_new_metadata
275 self._has_new_data_stream = has_new_data_stream
276
277 @property
278 def status(self):
279 return self._status
280
281 @property
282 def data(self):
283 return self._data
284
285 @property
286 def has_new_metadata(self):
287 return self._has_new_metadata
288
289 @property
290 def has_new_data_stream(self):
291 return self._has_new_data_stream
292
293
294class _LttngLiveViewerGetMetadataStreamDataCommand(_LttngLiveViewerCommand):
295 def __init__(self, version, stream_id):
296 super().__init__(version)
297 self._stream_id = stream_id
298
299 @property
300 def stream_id(self):
301 return self._stream_id
302
303
304class _LttngLiveViewerGetMetadataStreamDataContentReply:
305 class Status:
306 OK = 1
307 NO_NEW = 2
308 ERROR = 3
309
310 def __init__(self, status, data):
311 self._status = status
312 self._data = data
313
314 @property
315 def status(self):
316 return self._status
317
318 @property
319 def data(self):
320 return self._data
321
322
323class _LttngLiveViewerGetNewStreamInfosCommand(_LttngLiveViewerCommand):
324 def __init__(self, version, tracing_session_id):
325 super().__init__(version)
326 self._tracing_session_id = tracing_session_id
327
328 @property
329 def tracing_session_id(self):
330 return self._tracing_session_id
331
332
333class _LttngLiveViewerGetNewStreamInfosReply:
334 class Status:
335 OK = 1
336 NO_NEW = 2
337 ERROR = 3
338 HUP = 4
339
340 def __init__(self, status, stream_infos):
341 self._status = status
342 self._stream_infos = stream_infos
343
344 @property
345 def status(self):
346 return self._status
347
348 @property
349 def stream_infos(self):
350 return self._stream_infos
351
352
353class _LttngLiveViewerCreateViewerSessionCommand(_LttngLiveViewerCommand):
354 pass
355
356
357class _LttngLiveViewerCreateViewerSessionReply:
358 class Status:
359 OK = 1
360 ERROR = 2
361
362 def __init__(self, status):
363 self._status = status
364
365 @property
366 def status(self):
367 return self._status
368
369
370class _LttngLiveViewerDetachFromTracingSessionCommand(_LttngLiveViewerCommand):
371 def __init__(self, version, tracing_session_id):
372 super().__init__(version)
373 self._tracing_session_id = tracing_session_id
374
375 @property
376 def tracing_session_id(self):
377 return self._tracing_session_id
378
379
380class _LttngLiveViewerDetachFromTracingSessionReply:
381 class Status:
382 OK = 1
383 UNKNOWN = 2
384 ERROR = 3
385
386 def __init__(self, status):
387 self._status = status
388
389 @property
390 def status(self):
391 return self._status
392
393
394# An LTTng live protocol codec can convert bytes to command objects and
395# reply objects to bytes.
396class _LttngLiveViewerProtocolCodec:
397 _COMMAND_HEADER_STRUCT_FMT = 'QII'
398 _COMMAND_HEADER_SIZE_BYTES = struct.calcsize(_COMMAND_HEADER_STRUCT_FMT)
399
400 def __init__(self):
401 pass
402
403 def _unpack(self, fmt, data, offset=0):
404 fmt = '!' + fmt
405 return struct.unpack_from(fmt, data, offset)
406
407 def _unpack_payload(self, fmt, data):
408 return self._unpack(
409 fmt, data, _LttngLiveViewerProtocolCodec._COMMAND_HEADER_SIZE_BYTES
410 )
411
412 def decode(self, data):
413 if len(data) < self._COMMAND_HEADER_SIZE_BYTES:
414 # Not enough data to read the command header
415 return
416
417 payload_size, cmd_type, version = self._unpack(
418 self._COMMAND_HEADER_STRUCT_FMT, data
419 )
420 logging.info(
421 'Decoded command header: payload-size={}, cmd-type={}, version={}'.format(
422 payload_size, cmd_type, version
423 )
424 )
425
426 if len(data) < self._COMMAND_HEADER_SIZE_BYTES + payload_size:
427 # Not enough data to read the whole command
428 return
429
430 if cmd_type == 1:
431 viewer_session_id, major, minor, conn_type = self._unpack_payload(
432 'QIII', data
433 )
434 return _LttngLiveViewerConnectCommand(
435 version, viewer_session_id, major, minor
436 )
437 elif cmd_type == 2:
438 return _LttngLiveViewerGetTracingSessionInfosCommand(version)
439 elif cmd_type == 3:
440 tracing_session_id, offset, seek_type = self._unpack_payload('QQI', data)
441 return _LttngLiveViewerAttachToTracingSessionCommand(
442 version, tracing_session_id, offset, seek_type
443 )
444 elif cmd_type == 4:
445 (stream_id,) = self._unpack_payload('Q', data)
446 return _LttngLiveViewerGetNextDataStreamIndexEntryCommand(
447 version, stream_id
448 )
449 elif cmd_type == 5:
450 stream_id, offset, req_length = self._unpack_payload('QQI', data)
451 return _LttngLiveViewerGetDataStreamPacketDataCommand(
452 version, stream_id, offset, req_length
453 )
454 elif cmd_type == 6:
455 (stream_id,) = self._unpack_payload('Q', data)
456 return _LttngLiveViewerGetMetadataStreamDataCommand(version, stream_id)
457 elif cmd_type == 7:
458 (tracing_session_id,) = self._unpack_payload('Q', data)
459 return _LttngLiveViewerGetNewStreamInfosCommand(version, tracing_session_id)
460 elif cmd_type == 8:
461 return _LttngLiveViewerCreateViewerSessionCommand(version)
462 elif cmd_type == 9:
463 (tracing_session_id,) = self._unpack_payload('Q', data)
464 return _LttngLiveViewerDetachFromTracingSessionCommand(
465 version, tracing_session_id
466 )
467 else:
468 raise UnexpectedInput('Unknown command type {}'.format(cmd_type))
469
470 def _pack(self, fmt, *args):
471 # Force network byte order
472 return struct.pack('!' + fmt, *args)
473
474 def _encode_zero_padded_str(self, string, length):
475 data = string.encode()
476 return data.ljust(length, b'\x00')
477
478 def _encode_stream_info(self, info):
479 data = self._pack('QQI', info.id, info.trace_id, int(info.is_metadata))
480 data += self._encode_zero_padded_str(info.path, 4096)
481 data += self._encode_zero_padded_str(info.channel_name, 255)
482 return data
483
484 def _get_has_new_stuff_flags(self, has_new_metadata, has_new_data_streams):
485 flags = 0
486
487 if has_new_metadata:
488 flags |= 1
489
490 if has_new_data_streams:
491 flags |= 2
492
493 return flags
494
495 def encode(self, reply):
496 if type(reply) is _LttngLiveViewerConnectReply:
497 data = self._pack(
498 'QIII', reply.viewer_session_id, reply.major, reply.minor, 2
499 )
500 elif type(reply) is _LttngLiveViewerGetTracingSessionInfosReply:
501 data = self._pack('I', len(reply.tracing_session_infos))
502
503 for info in reply.tracing_session_infos:
504 data += self._pack(
505 'QIII',
506 info.tracing_session_id,
507 info.live_timer_freq,
508 info.client_count,
509 info.stream_count,
510 )
511 data += self._encode_zero_padded_str(info.hostname, 64)
512 data += self._encode_zero_padded_str(info.name, 255)
513 elif type(reply) is _LttngLiveViewerAttachToTracingSessionReply:
514 data = self._pack('II', reply.status, len(reply.stream_infos))
515
516 for info in reply.stream_infos:
517 data += self._encode_stream_info(info)
518 elif type(reply) is _LttngLiveViewerGetNextDataStreamIndexEntryReply:
519 index_format = 'QQQQQQQII'
520 entry = reply.index_entry
521 flags = self._get_has_new_stuff_flags(
522 reply.has_new_metadata, reply.has_new_data_stream
523 )
524
525 if type(entry) is _LttngDataStreamIndexEntry:
526 data = self._pack(
527 index_format,
528 entry.offset_bytes,
529 entry.total_size_bits,
530 entry.content_size_bits,
531 entry.timestamp_begin,
532 entry.timestamp_end,
533 entry.events_discarded,
534 entry.stream_class_id,
535 reply.status,
536 flags,
537 )
538 else:
539 assert type(entry) is _LttngDataStreamBeaconEntry
540 data = self._pack(
541 index_format,
542 0,
543 0,
544 0,
545 0,
546 entry.timestamp,
547 0,
548 entry.stream_class_id,
549 reply.status,
550 flags,
551 )
552 elif type(reply) is _LttngLiveViewerGetDataStreamPacketDataReply:
553 flags = self._get_has_new_stuff_flags(
554 reply.has_new_metadata, reply.has_new_data_stream
555 )
556 data = self._pack('III', reply.status, len(reply.data), flags)
557 data += reply.data
558 elif type(reply) is _LttngLiveViewerGetMetadataStreamDataContentReply:
559 data = self._pack('QI', len(reply.data), reply.status)
560 data += reply.data
561 elif type(reply) is _LttngLiveViewerGetNewStreamInfosReply:
562 data = self._pack('II', reply.status, len(reply.stream_infos))
563
564 for info in reply.stream_infos:
565 data += self._encode_stream_info(info)
566 elif type(reply) is _LttngLiveViewerCreateViewerSessionReply:
567 data = self._pack('I', reply.status)
568 elif type(reply) is _LttngLiveViewerDetachFromTracingSessionReply:
569 data = self._pack('I', reply.status)
570 else:
571 raise ValueError(
572 'Unknown reply object with class `{}`'.format(reply.__class__.__name__)
573 )
574
575 return data
576
577
578# An entry within the index of an LTTng data stream.
579class _LttngDataStreamIndexEntry:
580 def __init__(
581 self,
582 offset_bytes,
583 total_size_bits,
584 content_size_bits,
585 timestamp_begin,
586 timestamp_end,
587 events_discarded,
588 stream_class_id,
589 ):
590 self._offset_bytes = offset_bytes
591 self._total_size_bits = total_size_bits
592 self._content_size_bits = content_size_bits
593 self._timestamp_begin = timestamp_begin
594 self._timestamp_end = timestamp_end
595 self._events_discarded = events_discarded
596 self._stream_class_id = stream_class_id
597
598 @property
599 def offset_bytes(self):
600 return self._offset_bytes
601
602 @property
603 def total_size_bits(self):
604 return self._total_size_bits
605
606 @property
607 def total_size_bytes(self):
608 return self._total_size_bits // 8
609
610 @property
611 def content_size_bits(self):
612 return self._content_size_bits
613
614 @property
615 def content_size_bytes(self):
616 return self._content_size_bits // 8
617
618 @property
619 def timestamp_begin(self):
620 return self._timestamp_begin
621
622 @property
623 def timestamp_end(self):
624 return self._timestamp_end
625
626 @property
627 def events_discarded(self):
628 return self._events_discarded
629
630 @property
631 def stream_class_id(self):
632 return self._stream_class_id
633
634
635# An entry within the index of an LTTng data stream. While a stream beacon entry
636# is conceptually unrelated to an index, it is sent as a reply to a
637# LttngLiveViewerGetNextDataStreamIndexEntryCommand
638class _LttngDataStreamBeaconEntry:
639 def __init__(self, stream_class_id, timestamp):
640 self._stream_class_id = stream_class_id
641 self._timestamp = timestamp
642
643 @property
644 def timestamp(self):
645 return self._timestamp
646
647 @property
648 def stream_class_id(self):
649 return self._stream_class_id
650
651
652def _get_entry_timestamp_begin(entry):
653 if type(entry) is _LttngDataStreamBeaconEntry:
654 return entry.timestamp
655 else:
656 assert type(entry) is _LttngDataStreamIndexEntry
657 return entry.timestamp_begin
658
659
660# The index of an LTTng data stream, a sequence of index entries.
661class _LttngDataStreamIndex(collections.abc.Sequence):
662 def __init__(self, path, beacons):
663 self._path = path
664 self._build()
665
666 if beacons:
667 stream_class_id = self._entries[0].stream_class_id
668 beacons = [
669 _LttngDataStreamBeaconEntry(stream_class_id, ts) for ts in beacons
670 ]
671 self._add_beacons(beacons)
672
673 logging.info(
674 'Built data stream index entries: path="{}", count={}'.format(
675 path, len(self._entries)
676 )
677 )
678
679 def _build(self):
680 self._entries = []
681 assert os.path.isfile(self._path)
682
683 with open(self._path, 'rb') as f:
684 # Read header first
685 fmt = '>IIII'
686 size = struct.calcsize(fmt)
687 data = f.read(size)
688 assert len(data) == size
689 magic, index_major, index_minor, index_entry_length = struct.unpack(
690 fmt, data
691 )
692 assert magic == 0xC1F1DCC1
693
694 # Read index entries
695 fmt = '>QQQQQQQ'
696 size = struct.calcsize(fmt)
697
698 while True:
699 logging.debug(
700 'Decoding data stream index entry: path="{}", offset={}'.format(
701 self._path, f.tell()
702 )
703 )
704 data = f.read(size)
705
706 if not data:
707 # Done
708 break
709
710 assert len(data) == size
711 (
712 offset_bytes,
713 total_size_bits,
714 content_size_bits,
715 timestamp_begin,
716 timestamp_end,
717 events_discarded,
718 stream_class_id,
719 ) = struct.unpack(fmt, data)
720
721 self._entries.append(
722 _LttngDataStreamIndexEntry(
723 offset_bytes,
724 total_size_bits,
725 content_size_bits,
726 timestamp_begin,
727 timestamp_end,
728 events_discarded,
729 stream_class_id,
730 )
731 )
732
733 # Skip anything else before the next entry
734 f.seek(index_entry_length - size, os.SEEK_CUR)
735
736 def _add_beacons(self, beacons):
737 # Assumes entries[n + 1].timestamp_end >= entries[n].timestamp_begin
738 def sort_key(entry):
739 if type(entry) is _LttngDataStreamBeaconEntry:
740 return entry.timestamp
741 else:
742 return entry.timestamp_end
743
744 self._entries += beacons
745 self._entries.sort(key=sort_key)
746
747 def __getitem__(self, index):
748 return self._entries[index]
749
750 def __len__(self):
751 return len(self._entries)
752
753 @property
754 def path(self):
755 return self._path
756
757
758# An LTTng data stream.
759class _LttngDataStream:
760 def __init__(self, path, beacons):
761 self._path = path
762 filename = os.path.basename(path)
763 match = re.match(r'(.*)_\d+', filename)
764 self._channel_name = match.group(1)
765 trace_dir = os.path.dirname(path)
766 index_path = os.path.join(trace_dir, 'index', filename + '.idx')
767 self._index = _LttngDataStreamIndex(index_path, beacons)
768 assert os.path.isfile(path)
769 self._file = open(path, 'rb')
770 logging.info(
771 'Built data stream: path="{}", channel-name="{}"'.format(
772 path, self._channel_name
773 )
774 )
775
776 @property
777 def path(self):
778 return self._path
779
780 @property
781 def channel_name(self):
782 return self._channel_name
783
784 @property
785 def index(self):
786 return self._index
787
788 def get_data(self, offset_bytes, len_bytes):
789 self._file.seek(offset_bytes)
790 return self._file.read(len_bytes)
791
792
793class _LttngMetadataStreamSection:
794 def __init__(self, timestamp, data):
795 self._timestamp = timestamp
796 if data is None:
797 self._data = bytes()
798 else:
799 self._data = data
800 logging.info(
801 'Built metadata stream section: ts={}, data-len={}'.format(
802 self._timestamp, len(self._data)
803 )
804 )
805
806 @property
807 def timestamp(self):
808 return self._timestamp
809
810 @property
811 def data(self):
812 return self._data
813
814
815# An LTTng metadata stream.
816class _LttngMetadataStream:
817 def __init__(self, metadata_file_path, config_sections):
818 self._path = metadata_file_path
819 self._sections = config_sections
820 logging.info(
821 'Built metadata stream: path={}, section-len={}'.format(
822 self._path, len(self._sections)
823 )
824 )
825
826 @property
827 def path(self):
828 return self._path
829
830 @property
831 def sections(self):
832 return self._sections
833
834
835LttngMetadataConfigSection = namedtuple(
836 'LttngMetadataConfigSection', ['line', 'timestamp', 'is_empty']
837)
838
839
840def _parse_metadata_sections_config(config_sections):
841 assert config_sections is not None
842 config_metadata_sections = []
843 append_empty_section = False
844 last_timestamp = 0
845 last_line = 0
846
847 for config_section in config_sections:
848 if config_section == 'empty':
849 # Found a empty section marker. Actually append the section at the
850 # timestamp of the next concrete section.
851 append_empty_section = True
852 else:
853 assert type(config_section) is dict
854 line = config_section.get('line')
855 ts = config_section.get('timestamp')
856
857 # Sections' timestamps and lines must both be increasing.
858 assert ts > last_timestamp
859 last_timestamp = ts
860 assert line > last_line
861 last_line = line
862
863 if append_empty_section:
864 config_metadata_sections.append(
865 LttngMetadataConfigSection(line, ts, True)
866 )
867 append_empty_section = False
868
869 config_metadata_sections.append(LttngMetadataConfigSection(line, ts, False))
870
871 return config_metadata_sections
872
873
874def _split_metadata_sections(metadata_file_path, raw_config_sections):
875 assert isinstance(raw_config_sections, collections.abc.Sequence)
876
877 parsed_sections = _parse_metadata_sections_config(raw_config_sections)
878
879 sections = []
880 with open(metadata_file_path, 'r') as metadata_file:
881 metadata_lines = [line for line in metadata_file]
882
883 config_metadata_sections_idx = 0
884 curr_metadata_section = bytearray()
885
886 for idx, line_content in enumerate(metadata_lines):
887 # Add one to the index to convert from the zero-indexing of the
888 # enumerate() function to the one-indexing used by humans when
889 # viewing a text file.
890 curr_line_number = idx + 1
891
892 # If there are no more sections, simply append the line.
893 if config_metadata_sections_idx + 1 >= len(parsed_sections):
894 curr_metadata_section += bytearray(line_content, 'utf8')
895 continue
896
897 next_section_line_number = parsed_sections[
898 config_metadata_sections_idx + 1
899 ].line
900
901 # If the next section begins at the current line, create a
902 # section with the metadata we gathered so far.
903 if curr_line_number >= next_section_line_number:
904
905 # Flushing the metadata of the current section.
906 sections.append(
907 _LttngMetadataStreamSection(
908 parsed_sections[config_metadata_sections_idx].timestamp,
909 bytes(curr_metadata_section),
910 )
911 )
912
913 # Move to the next section.
914 config_metadata_sections_idx += 1
915
916 # Clear old content and append current line for the next section.
917 curr_metadata_section.clear()
918 curr_metadata_section += bytearray(line_content, 'utf8')
919
920 # Append any empty sections.
921 while parsed_sections[config_metadata_sections_idx].is_empty:
922 sections.append(
923 _LttngMetadataStreamSection(
924 parsed_sections[config_metadata_sections_idx].timestamp, None
925 )
926 )
927 config_metadata_sections_idx += 1
928 else:
929 # Append line_content to the current metadata section.
930 curr_metadata_section += bytearray(line_content, 'utf8')
931
932 # We iterated over all the lines of the metadata file. Close the current section.
933 sections.append(
934 _LttngMetadataStreamSection(
935 parsed_sections[config_metadata_sections_idx].timestamp,
936 bytes(curr_metadata_section),
937 )
938 )
939
940 return sections
941
942
943# An LTTng trace, a sequence of LTTng data streams.
944class LttngTrace(collections.abc.Sequence):
945 def __init__(self, trace_dir, metadata_sections, beacons):
946 assert os.path.isdir(trace_dir)
947 self._path = trace_dir
948 self._create_metadata_stream(trace_dir, metadata_sections)
949 self._create_data_streams(trace_dir, beacons)
950 logging.info('Built trace: path="{}"'.format(trace_dir))
951
952 def _create_data_streams(self, trace_dir, beacons):
953 data_stream_paths = []
954
955 for filename in os.listdir(trace_dir):
956 path = os.path.join(trace_dir, filename)
957
958 if not os.path.isfile(path):
959 continue
960
961 if filename.startswith('.'):
962 continue
963
964 if filename == 'metadata':
965 continue
966
967 data_stream_paths.append(path)
968
969 data_stream_paths.sort()
970 self._data_streams = []
971
972 for data_stream_path in data_stream_paths:
973 stream_name = os.path.basename(data_stream_path)
974 this_stream_beacons = None
975
976 if beacons is not None and stream_name in beacons:
977 this_stream_beacons = beacons[stream_name]
978
979 self._data_streams.append(
980 _LttngDataStream(data_stream_path, this_stream_beacons)
981 )
982
983 def _create_metadata_stream(self, trace_dir, config_metadata_sections):
984 metadata_path = os.path.join(trace_dir, 'metadata')
985 metadata_sections = []
986
987 if config_metadata_sections is None:
988 with open(metadata_path, 'rb') as metadata_file:
989 metadata_sections.append(
990 _LttngMetadataStreamSection(0, metadata_file.read())
991 )
992 else:
993 metadata_sections = _split_metadata_sections(
994 metadata_path, config_metadata_sections
995 )
996
997 self._metadata_stream = _LttngMetadataStream(metadata_path, metadata_sections)
998
999 @property
1000 def path(self):
1001 return self._path
1002
1003 @property
1004 def metadata_stream(self):
1005 return self._metadata_stream
1006
1007 def __getitem__(self, index):
1008 return self._data_streams[index]
1009
1010 def __len__(self):
1011 return len(self._data_streams)
1012
1013
1014# The state of a single data stream.
1015class _LttngLiveViewerSessionDataStreamState:
1016 def __init__(self, ts_state, info, data_stream, metadata_stream_id):
1017 self._ts_state = ts_state
1018 self._info = info
1019 self._data_stream = data_stream
1020 self._metadata_stream_id = metadata_stream_id
1021 self._cur_index_entry_index = 0
1022 fmt = 'Built data stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
1023 logging.info(
1024 fmt.format(
1025 info.id,
1026 ts_state.tracing_session_descriptor.info.tracing_session_id,
1027 ts_state.tracing_session_descriptor.info.name,
1028 data_stream.path,
1029 )
1030 )
1031
1032 @property
1033 def tracing_session_state(self):
1034 return self._ts_state
1035
1036 @property
1037 def info(self):
1038 return self._info
1039
1040 @property
1041 def data_stream(self):
1042 return self._data_stream
1043
1044 @property
1045 def cur_index_entry(self):
1046 if self._cur_index_entry_index == len(self._data_stream.index):
1047 return
1048
1049 return self._data_stream.index[self._cur_index_entry_index]
1050
1051 def goto_next_index_entry(self):
1052 self._cur_index_entry_index += 1
1053
1054
1055# The state of a single metadata stream.
1056class _LttngLiveViewerSessionMetadataStreamState:
1057 def __init__(self, ts_state, info, metadata_stream):
1058 self._ts_state = ts_state
1059 self._info = info
1060 self._metadata_stream = metadata_stream
1061 self._cur_metadata_stream_section_index = 0
1062 if len(metadata_stream.sections) > 1:
1063 self._next_metadata_stream_section_timestamp = metadata_stream.sections[
1064 1
1065 ].timestamp
1066 else:
1067 self._next_metadata_stream_section_timestamp = None
1068
1069 self._is_sent = False
1070 fmt = 'Built metadata stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
1071 logging.info(
1072 fmt.format(
1073 info.id,
1074 ts_state.tracing_session_descriptor.info.tracing_session_id,
1075 ts_state.tracing_session_descriptor.info.name,
1076 metadata_stream.path,
1077 )
1078 )
1079
1080 @property
1081 def trace_session_state(self):
1082 return self._trace_session_state
1083
1084 @property
1085 def info(self):
1086 return self._info
1087
1088 @property
1089 def metadata_stream(self):
1090 return self._metadata_stream
1091
1092 @property
1093 def is_sent(self):
1094 return self._is_sent
1095
1096 @is_sent.setter
1097 def is_sent(self, value):
1098 self._is_sent = value
1099
1100 @property
1101 def cur_section(self):
1102 fmt = "Get current metadata section: section-idx={}"
1103 logging.info(fmt.format(self._cur_metadata_stream_section_index))
1104 if self._cur_metadata_stream_section_index == len(
1105 self._metadata_stream.sections
1106 ):
1107 return
1108
1109 return self._metadata_stream.sections[self._cur_metadata_stream_section_index]
1110
1111 def goto_next_section(self):
1112 self._cur_metadata_stream_section_index += 1
1113 if self.cur_section:
1114 self._next_metadata_stream_section_timestamp = self.cur_section.timestamp
1115 else:
1116 self._next_metadata_stream_section_timestamp = None
1117
1118 @property
1119 def next_section_timestamp(self):
1120 return self._next_metadata_stream_section_timestamp
1121
1122
1123# The state of a tracing session.
1124class _LttngLiveViewerSessionTracingSessionState:
1125 def __init__(self, tc_descr, base_stream_id):
1126 self._tc_descr = tc_descr
1127 self._stream_infos = []
1128 self._ds_states = {}
1129 self._ms_states = {}
1130 stream_id = base_stream_id
1131
1132 for trace in tc_descr.traces:
1133 trace_id = stream_id * 1000
1134
1135 # Metadata stream -> stream info and metadata stream state
1136 info = _LttngLiveViewerStreamInfo(
1137 stream_id, trace_id, True, trace.metadata_stream.path, 'metadata'
1138 )
1139 self._stream_infos.append(info)
1140 self._ms_states[stream_id] = _LttngLiveViewerSessionMetadataStreamState(
1141 self, info, trace.metadata_stream
1142 )
1143 metadata_stream_id = stream_id
1144 stream_id += 1
1145
1146 # Data streams -> stream infos and data stream states
1147 for data_stream in trace:
1148 info = _LttngLiveViewerStreamInfo(
1149 stream_id,
1150 trace_id,
1151 False,
1152 data_stream.path,
1153 data_stream.channel_name,
1154 )
1155 self._stream_infos.append(info)
1156 self._ds_states[stream_id] = _LttngLiveViewerSessionDataStreamState(
1157 self, info, data_stream, metadata_stream_id
1158 )
1159 stream_id += 1
1160
1161 self._is_attached = False
1162 fmt = 'Built tracing session state: id={}, name="{}"'
1163 logging.info(fmt.format(tc_descr.info.tracing_session_id, tc_descr.info.name))
1164
1165 @property
1166 def tracing_session_descriptor(self):
1167 return self._tc_descr
1168
1169 @property
1170 def data_stream_states(self):
1171 return self._ds_states
1172
1173 @property
1174 def metadata_stream_states(self):
1175 return self._ms_states
1176
1177 @property
1178 def stream_infos(self):
1179 return self._stream_infos
1180
1181 @property
1182 def has_new_metadata(self):
1183 return any([not ms.is_sent for ms in self._ms_states.values()])
1184
1185 @property
1186 def is_attached(self):
1187 return self._is_attached
1188
1189 @is_attached.setter
1190 def is_attached(self, value):
1191 self._is_attached = value
1192
1193
1194def needs_new_metadata_section(metadata_stream_state, latest_timestamp):
1195 if metadata_stream_state.next_section_timestamp is None:
1196 return False
1197
1198 if latest_timestamp >= metadata_stream_state.next_section_timestamp:
1199 return True
1200 else:
1201 return False
1202
1203
1204# An LTTng live viewer session manages a view on tracing sessions
1205# and replies to commands accordingly.
1206class _LttngLiveViewerSession:
1207 def __init__(
1208 self,
1209 viewer_session_id,
1210 tracing_session_descriptors,
1211 max_query_data_response_size,
1212 ):
1213 self._viewer_session_id = viewer_session_id
1214 self._ts_states = {}
1215 self._stream_states = {}
1216 self._max_query_data_response_size = max_query_data_response_size
1217 total_stream_infos = 0
1218
1219 for ts_descr in tracing_session_descriptors:
1220 ts_state = _LttngLiveViewerSessionTracingSessionState(
1221 ts_descr, total_stream_infos
1222 )
1223 ts_id = ts_state.tracing_session_descriptor.info.tracing_session_id
1224 self._ts_states[ts_id] = ts_state
1225 total_stream_infos += len(ts_state.stream_infos)
1226
1227 # Update session's stream states to have the new states
1228 self._stream_states.update(ts_state.data_stream_states)
1229 self._stream_states.update(ts_state.metadata_stream_states)
1230
1231 self._command_handlers = {
1232 _LttngLiveViewerAttachToTracingSessionCommand: self._handle_attach_to_tracing_session_command,
1233 _LttngLiveViewerCreateViewerSessionCommand: self._handle_create_viewer_session_command,
1234 _LttngLiveViewerDetachFromTracingSessionCommand: self._handle_detach_from_tracing_session_command,
1235 _LttngLiveViewerGetDataStreamPacketDataCommand: self._handle_get_data_stream_packet_data_command,
1236 _LttngLiveViewerGetMetadataStreamDataCommand: self._handle_get_metadata_stream_data_command,
1237 _LttngLiveViewerGetNewStreamInfosCommand: self._handle_get_new_stream_infos_command,
1238 _LttngLiveViewerGetNextDataStreamIndexEntryCommand: self._handle_get_next_data_stream_index_entry_command,
1239 _LttngLiveViewerGetTracingSessionInfosCommand: self._handle_get_tracing_session_infos_command,
1240 }
1241
1242 @property
1243 def viewer_session_id(self):
1244 return self._viewer_session_id
1245
1246 def _get_tracing_session_state(self, tracing_session_id):
1247 if tracing_session_id not in self._ts_states:
1248 raise UnexpectedInput(
1249 'Unknown tracing session ID {}'.format(tracing_session_id)
1250 )
1251
1252 return self._ts_states[tracing_session_id]
1253
1254 def _get_stream_state(self, stream_id):
1255 if stream_id not in self._stream_states:
1256 UnexpectedInput('Unknown stream ID {}'.format(stream_id))
1257
1258 return self._stream_states[stream_id]
1259
1260 def handle_command(self, cmd):
1261 logging.info(
1262 'Handling command in viewer session: cmd-cls-name={}'.format(
1263 cmd.__class__.__name__
1264 )
1265 )
1266 cmd_type = type(cmd)
1267
1268 if cmd_type not in self._command_handlers:
1269 raise UnexpectedInput(
1270 'Unexpected command: cmd-cls-name={}'.format(cmd.__class__.__name__)
1271 )
1272
1273 return self._command_handlers[cmd_type](cmd)
1274
1275 def _handle_attach_to_tracing_session_command(self, cmd):
1276 fmt = 'Handling "attach to tracing session" command: ts-id={}, offset={}, seek-type={}'
1277 logging.info(fmt.format(cmd.tracing_session_id, cmd.offset, cmd.seek_type))
1278 ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
1279 info = ts_state.tracing_session_descriptor.info
1280
1281 if ts_state.is_attached:
1282 raise UnexpectedInput(
1283 'Cannot attach to tracing session `{}`: viewer is already attached'.format(
1284 info.name
1285 )
1286 )
1287
1288 ts_state.is_attached = True
1289 status = _LttngLiveViewerAttachToTracingSessionReply.Status.OK
1290 return _LttngLiveViewerAttachToTracingSessionReply(
1291 status, ts_state.stream_infos
1292 )
1293
1294 def _handle_detach_from_tracing_session_command(self, cmd):
1295 fmt = 'Handling "detach from tracing session" command: ts-id={}'
1296 logging.info(fmt.format(cmd.tracing_session_id))
1297 ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
1298 info = ts_state.tracing_session_descriptor.info
1299
1300 if not ts_state.is_attached:
1301 raise UnexpectedInput(
1302 'Cannot detach to tracing session `{}`: viewer is not attached'.format(
1303 info.name
1304 )
1305 )
1306
1307 ts_state.is_attached = False
1308 status = _LttngLiveViewerDetachFromTracingSessionReply.Status.OK
1309 return _LttngLiveViewerDetachFromTracingSessionReply(status)
1310
1311 def _handle_get_next_data_stream_index_entry_command(self, cmd):
1312 fmt = 'Handling "get next data stream index entry" command: stream-id={}'
1313 logging.info(fmt.format(cmd.stream_id))
1314 stream_state = self._get_stream_state(cmd.stream_id)
1315 metadata_stream_state = self._get_stream_state(stream_state._metadata_stream_id)
1316
1317 if type(stream_state) is not _LttngLiveViewerSessionDataStreamState:
1318 raise UnexpectedInput(
1319 'Stream with ID {} is not a data stream'.format(cmd.stream_id)
1320 )
1321
1322 if stream_state.cur_index_entry is None:
1323 # The viewer is done reading this stream
1324 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.HUP
1325
1326 # Dummy data stream index entry to use with the `HUP` status
1327 # (the reply needs one, but the viewer ignores it)
1328 index_entry = _LttngDataStreamIndexEntry(0, 0, 0, 0, 0, 0, 0)
1329
1330 return _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1331 status, index_entry, False, False
1332 )
1333
1334 timestamp_begin = _get_entry_timestamp_begin(stream_state.cur_index_entry)
1335
1336 if needs_new_metadata_section(metadata_stream_state, timestamp_begin):
1337 metadata_stream_state.is_sent = False
1338 metadata_stream_state.goto_next_section()
1339
1340 # The viewer only checks the `has_new_metadata` flag if the
1341 # reply's status is `OK`, so we need to provide an index here
1342 has_new_metadata = stream_state.tracing_session_state.has_new_metadata
1343 if type(stream_state.cur_index_entry) is _LttngDataStreamIndexEntry:
1344 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.OK
1345 else:
1346 assert type(stream_state.cur_index_entry) is _LttngDataStreamBeaconEntry
1347 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.INACTIVE
1348
1349 reply = _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1350 status, stream_state.cur_index_entry, has_new_metadata, False
1351 )
1352 stream_state.goto_next_index_entry()
1353 return reply
1354
1355 def _handle_get_data_stream_packet_data_command(self, cmd):
1356 fmt = 'Handling "get data stream packet data" command: stream-id={}, offset={}, req-length={}'
1357 logging.info(fmt.format(cmd.stream_id, cmd.offset, cmd.req_length))
1358 stream_state = self._get_stream_state(cmd.stream_id)
1359 data_response_length = cmd.req_length
1360
1361 if type(stream_state) is not _LttngLiveViewerSessionDataStreamState:
1362 raise UnexpectedInput(
1363 'Stream with ID {} is not a data stream'.format(cmd.stream_id)
1364 )
1365
1366 if stream_state.tracing_session_state.has_new_metadata:
1367 status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.ERROR
1368 return _LttngLiveViewerGetDataStreamPacketDataReply(
1369 status, bytes(), True, False
1370 )
1371
1372 if self._max_query_data_response_size:
1373 # Enforce a server side limit on the query requested length.
1374 # To ensure that the transaction terminate take the minimum of both
1375 # value.
1376 data_response_length = min(
1377 cmd.req_length, self._max_query_data_response_size
1378 )
1379 fmt = 'Limiting "get data stream packet data" command: req-length={} actual response size={}'
1380 logging.info(fmt.format(cmd.req_length, data_response_length))
1381
1382 data = stream_state.data_stream.get_data(cmd.offset, data_response_length)
1383 status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.OK
1384 return _LttngLiveViewerGetDataStreamPacketDataReply(status, data, False, False)
1385
1386 def _handle_get_metadata_stream_data_command(self, cmd):
1387 fmt = 'Handling "get metadata stream data" command: stream-id={}'
1388 logging.info(fmt.format(cmd.stream_id))
1389 metadata_stream_state = self._get_stream_state(cmd.stream_id)
1390
1391 if (
1392 type(metadata_stream_state)
1393 is not _LttngLiveViewerSessionMetadataStreamState
1394 ):
1395 raise UnexpectedInput(
1396 'Stream with ID {} is not a metadata stream'.format(cmd.stream_id)
1397 )
1398
1399 if metadata_stream_state.is_sent:
1400 status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.NO_NEW
1401 return _LttngLiveViewerGetMetadataStreamDataContentReply(status, bytes())
1402
1403 metadata_stream_state.is_sent = True
1404 status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.OK
1405 metadata_section = metadata_stream_state.cur_section
1406
1407 # If we are sending an empty section, ready the next one right away.
1408 if len(metadata_section.data) == 0:
1409 metadata_stream_state.is_sent = False
1410 metadata_stream_state.goto_next_section()
1411
1412 fmt = 'Replying to "get metadata stream data" command: metadata-size={}'
1413 logging.info(fmt.format(len(metadata_section.data)))
1414 return _LttngLiveViewerGetMetadataStreamDataContentReply(
1415 status, metadata_section.data
1416 )
1417
1418 def _handle_get_new_stream_infos_command(self, cmd):
1419 fmt = 'Handling "get new stream infos" command: ts-id={}'
1420 logging.info(fmt.format(cmd.tracing_session_id))
1421
1422 # As of this version, all the tracing session's stream infos are
1423 # always given to the viewer when sending the "attach to tracing
1424 # session" reply, so there's nothing new here. Return the `HUP`
1425 # status as, if we're handling this command, the viewer consumed
1426 # all the existing data streams.
1427 status = _LttngLiveViewerGetNewStreamInfosReply.Status.HUP
1428 return _LttngLiveViewerGetNewStreamInfosReply(status, [])
1429
1430 def _handle_get_tracing_session_infos_command(self, cmd):
1431 logging.info('Handling "get tracing session infos" command.')
1432 infos = [
1433 tss.tracing_session_descriptor.info for tss in self._ts_states.values()
1434 ]
1435 infos.sort(key=lambda info: info.name)
1436 return _LttngLiveViewerGetTracingSessionInfosReply(infos)
1437
1438 def _handle_create_viewer_session_command(self, cmd):
1439 logging.info('Handling "create viewer session" command.')
1440 status = _LttngLiveViewerCreateViewerSessionReply.Status.OK
1441
1442 # This does nothing here. In the LTTng relay daemon, it
1443 # allocates the viewer session's state.
1444 return _LttngLiveViewerCreateViewerSessionReply(status)
1445
1446
1447# An LTTng live TCP server.
1448#
1449# On creation, it binds to `localhost` with an OS-assigned TCP port. It writes
1450# the decimal TCP port number to a temporary port file. It renames the
1451# temporary port file to `port_filename`.
1452#
1453# `tracing_session_descriptors` is a list of tracing session descriptors
1454# (`LttngTracingSessionDescriptor`) to serve.
1455#
1456# This server accepts a single viewer (client).
1457#
1458# When the viewer closes the connection, the server's constructor
1459# returns.
1460class LttngLiveServer:
1461 def __init__(
1462 self, port_filename, tracing_session_descriptors, max_query_data_response_size
1463 ):
1464 logging.info('Server configuration:')
1465
1466 logging.info(' Port file name: `{}`'.format(port_filename))
1467
1468 if max_query_data_response_size is not None:
1469 logging.info(
1470 ' Maximum response data query size: `{}`'.format(
1471 max_query_data_response_size
1472 )
1473 )
1474
1475 for ts_descr in tracing_session_descriptors:
1476 info = ts_descr.info
1477 fmt = ' TS descriptor: name="{}", id={}, hostname="{}", live-timer-freq={}, client-count={}, stream-count={}:'
1478 logging.info(
1479 fmt.format(
1480 info.name,
1481 info.tracing_session_id,
1482 info.hostname,
1483 info.live_timer_freq,
1484 info.client_count,
1485 info.stream_count,
1486 )
1487 )
1488
1489 for trace in ts_descr.traces:
1490 logging.info(' Trace: path="{}"'.format(trace.path))
1491
1492 self._ts_descriptors = tracing_session_descriptors
1493 self._max_query_data_response_size = max_query_data_response_size
1494 self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1495 self._codec = _LttngLiveViewerProtocolCodec()
1496
1497 # Port 0: OS assigns an unused port
1498 serv_addr = ('localhost', 0)
1499 self._sock.bind(serv_addr)
1500 self._write_port_to_file(port_filename)
1501
1502 try:
1503 self._listen()
1504 finally:
1505 self._sock.close()
1506 logging.info('Closed connection and socket.')
1507
1508 @property
1509 def _server_port(self):
1510 return self._sock.getsockname()[1]
1511
1512 def _recv_command(self):
1513 data = bytes()
1514
1515 while True:
1516 logging.info('Waiting for viewer command.')
1517 buf = self._conn.recv(128)
1518
1519 if not buf:
1520 logging.info('Client closed connection.')
1521
1522 if data:
1523 raise UnexpectedInput(
1524 'Client closed connection after having sent {} command bytes.'.format(
1525 len(data)
1526 )
1527 )
1528
1529 return
1530
1531 logging.info('Received data from viewer: length={}'.format(len(buf)))
1532
1533 data += buf
1534
1535 try:
1536 cmd = self._codec.decode(data)
1537 except struct.error as exc:
1538 raise UnexpectedInput('Malformed command: {}'.format(exc)) from exc
1539
1540 if cmd is not None:
1541 logging.info(
1542 'Received command from viewer: cmd-cls-name={}'.format(
1543 cmd.__class__.__name__
1544 )
1545 )
1546 return cmd
1547
1548 def _send_reply(self, reply):
1549 data = self._codec.encode(reply)
1550 logging.info(
1551 'Sending reply to viewer: reply-cls-name={}, length={}'.format(
1552 reply.__class__.__name__, len(data)
1553 )
1554 )
1555 self._conn.sendall(data)
1556
1557 def _handle_connection(self):
1558 # First command must be "connect"
1559 cmd = self._recv_command()
1560
1561 if type(cmd) is not _LttngLiveViewerConnectCommand:
1562 raise UnexpectedInput(
1563 'First command is not "connect": cmd-cls-name={}'.format(
1564 cmd.__class__.__name__
1565 )
1566 )
1567
1568 # Create viewer session (arbitrary ID 23)
1569 logging.info(
1570 'LTTng live viewer connected: version={}.{}'.format(cmd.major, cmd.minor)
1571 )
1572 viewer_session = _LttngLiveViewerSession(
1573 23, self._ts_descriptors, self._max_query_data_response_size
1574 )
1575
1576 # Send "connect" reply
1577 self._send_reply(
1578 _LttngLiveViewerConnectReply(viewer_session.viewer_session_id, 2, 10)
1579 )
1580
1581 # Make the viewer session handle the remaining commands
1582 while True:
1583 cmd = self._recv_command()
1584
1585 if cmd is None:
1586 # Connection closed (at an expected location within the
1587 # conversation)
1588 return
1589
1590 self._send_reply(viewer_session.handle_command(cmd))
1591
1592 def _listen(self):
1593 logging.info('Listening: port={}'.format(self._server_port))
1594 # Backlog must be present for Python version < 3.5.
1595 # 128 is an arbitrary number since we expect only 1 connection anyway.
1596 self._sock.listen(128)
1597 self._conn, viewer_addr = self._sock.accept()
1598 logging.info(
1599 'Accepted viewer: addr={}:{}'.format(viewer_addr[0], viewer_addr[1])
1600 )
1601
1602 try:
1603 self._handle_connection()
1604 finally:
1605 self._conn.close()
1606
1607 def _write_port_to_file(self, port_filename):
1608 # Write the port number to a temporary file.
1609 with tempfile.NamedTemporaryFile(mode='w', delete=False) as tmp_port_file:
1610 print(self._server_port, end='', file=tmp_port_file)
1611
1612 # Rename temporary file to real file
1613 os.replace(tmp_port_file.name, port_filename)
1614 logging.info(
1615 'Renamed port file: src-path="{}", dst-path="{}"'.format(
1616 tmp_port_file.name, port_filename
1617 )
1618 )
1619
1620
1621# A tracing session descriptor.
1622#
1623# In the constructor, `traces` is a list of LTTng traces (`LttngTrace`
1624# objects).
1625class LttngTracingSessionDescriptor:
1626 def __init__(
1627 self, name, tracing_session_id, hostname, live_timer_freq, client_count, traces
1628 ):
1629 for trace in traces:
1630 if name not in trace.path:
1631 fmt = 'Tracing session name must be part of every trace path (`{}` not found in `{}`)'
1632 raise ValueError(fmt.format(name, trace.path))
1633
1634 self._traces = traces
1635 stream_count = sum([len(t) + 1 for t in traces])
1636 self._info = _LttngLiveViewerTracingSessionInfo(
1637 tracing_session_id,
1638 live_timer_freq,
1639 client_count,
1640 stream_count,
1641 hostname,
1642 name,
1643 )
1644
1645 @property
1646 def traces(self):
1647 return self._traces
1648
1649 @property
1650 def info(self):
1651 return self._info
1652
1653
1654def _session_descriptors_from_path(sessions_filename, trace_path_prefix):
1655 # File format is:
1656 #
1657 # [
1658 # {
1659 # "name": "my-session",
1660 # "id": 17,
1661 # "hostname": "myhost",
1662 # "live-timer-freq": 1000000,
1663 # "client-count": 23,
1664 # "traces": [
1665 # {
1666 # "path": "lol"
1667 # },
1668 # {
1669 # "path": "meow/mix",
1670 # "beacons": {
1671 # "my_stream": [ 5235787, 728375283 ]
1672 # },
1673 # "metadata-sections": [
1674 # {
1675 # "line": 1,
1676 # "timestamp": 0
1677 # }
1678 # ]
1679 # }
1680 # ]
1681 # }
1682 # ]
1683 with open(sessions_filename, 'r') as sessions_file:
1684 params = json.load(sessions_file)
1685
1686 sessions = []
1687
1688 for session in params:
1689 name = session['name']
1690 tracing_session_id = session['id']
1691 hostname = session['hostname']
1692 live_timer_freq = session['live-timer-freq']
1693 client_count = session['client-count']
1694 traces = []
1695
1696 for trace in session['traces']:
1697 metadata_sections = trace.get('metadata-sections')
1698 beacons = trace.get('beacons')
1699 path = trace['path']
1700
1701 if not os.path.isabs(path):
1702 path = os.path.join(trace_path_prefix, path)
1703
1704 traces.append(LttngTrace(path, metadata_sections, beacons))
1705
1706 sessions.append(
1707 LttngTracingSessionDescriptor(
1708 name,
1709 tracing_session_id,
1710 hostname,
1711 live_timer_freq,
1712 client_count,
1713 traces,
1714 )
1715 )
1716
1717 return sessions
1718
1719
1720def _loglevel_parser(string):
1721 loglevels = {'info': logging.INFO, 'warning': logging.WARNING}
1722 if string not in loglevels:
1723 msg = "{} is not a valid loglevel".format(string)
1724 raise argparse.ArgumentTypeError(msg)
1725 return loglevels[string]
1726
1727
1728if __name__ == '__main__':
1729 logging.basicConfig(format='# %(asctime)-25s%(message)s')
1730 parser = argparse.ArgumentParser(
1731 description='LTTng-live protocol mocker', add_help=False
1732 )
1733 parser.add_argument(
1734 '--log-level',
1735 default='warning',
1736 choices=['info', 'warning'],
1737 help='The loglevel to be used.',
1738 )
1739
1740 loglevel_namespace, remaining_args = parser.parse_known_args()
1741 logging.getLogger().setLevel(_loglevel_parser(loglevel_namespace.log_level))
1742
1743 parser.add_argument(
1744 '--port-filename',
1745 help='The final port file. This file is present when the server is ready to receive connection.',
1746 required=True,
1747 )
1748 parser.add_argument(
1749 '--max-query-data-response-size',
1750 type=int,
1751 help='The maximum size of control data response in bytes',
1752 )
1753 parser.add_argument(
1754 '--trace-path-prefix',
1755 type=str,
1756 help='Prefix to prepend to the trace paths of session configurations',
1757 )
1758 parser.add_argument(
1759 '--sessions-filename',
1760 type=str,
1761 help='Path to a session configuration file',
1762 )
1763 parser.add_argument(
1764 '-h',
1765 '--help',
1766 action='help',
1767 default=argparse.SUPPRESS,
1768 help='Show this help message and exit.',
1769 )
1770
1771 args = parser.parse_args(args=remaining_args)
1772 try:
1773 sessions = _session_descriptors_from_path(
1774 args.sessions_filename,
1775 args.trace_path_prefix,
1776 )
1777 LttngLiveServer(args.port_filename, sessions, args.max_query_data_response_size)
1778 except UnexpectedInput as exc:
1779 logging.error(str(exc))
1780 print(exc, file=sys.stderr)
1781 sys.exit(1)
This page took 0.074024 seconds and 4 git commands to generate.