Tests: src.ctf.lttng-live: split metadata sections
[babeltrace.git] / tests / data / plugins / src.ctf.lttng-live / lttng_live_server.py
CommitLineData
0235b0db 1# SPDX-License-Identifier: MIT
584af91e 2#
0235b0db 3# Copyright (C) 2019 Philippe Proulx <pproulx@efficios.com>
584af91e 4#
584af91e
PP
5
6import argparse
7import collections.abc
8import logging
9import os
10import os.path
11import re
12import socket
13import struct
14import sys
15import tempfile
2b763e29 16import json
78169723 17from collections import namedtuple
584af91e
PP
18
19
20class UnexpectedInput(RuntimeError):
21 pass
22
23
24class _LttngLiveViewerCommand:
25 def __init__(self, version):
26 self._version = version
27
28 @property
29 def version(self):
30 return self._version
31
32
33class _LttngLiveViewerConnectCommand(_LttngLiveViewerCommand):
34 def __init__(self, version, viewer_session_id, major, minor):
35 super().__init__(version)
36 self._viewer_session_id = viewer_session_id
37 self._major = major
38 self._minor = minor
39
40 @property
41 def viewer_session_id(self):
42 return self._viewer_session_id
43
44 @property
45 def major(self):
46 return self._major
47
48 @property
49 def minor(self):
50 return self._minor
51
52
53class _LttngLiveViewerConnectReply:
54 def __init__(self, viewer_session_id, major, minor):
55 self._viewer_session_id = viewer_session_id
56 self._major = major
57 self._minor = minor
58
59 @property
60 def viewer_session_id(self):
61 return self._viewer_session_id
62
63 @property
64 def major(self):
65 return self._major
66
67 @property
68 def minor(self):
69 return self._minor
70
71
72class _LttngLiveViewerGetTracingSessionInfosCommand(_LttngLiveViewerCommand):
73 pass
74
75
76class _LttngLiveViewerTracingSessionInfo:
77 def __init__(
78 self,
79 tracing_session_id,
80 live_timer_freq,
81 client_count,
82 stream_count,
83 hostname,
84 name,
85 ):
86 self._tracing_session_id = tracing_session_id
87 self._live_timer_freq = live_timer_freq
88 self._client_count = client_count
89 self._stream_count = stream_count
90 self._hostname = hostname
91 self._name = name
92
93 @property
94 def tracing_session_id(self):
95 return self._tracing_session_id
96
97 @property
98 def live_timer_freq(self):
99 return self._live_timer_freq
100
101 @property
102 def client_count(self):
103 return self._client_count
104
105 @property
106 def stream_count(self):
107 return self._stream_count
108
109 @property
110 def hostname(self):
111 return self._hostname
112
113 @property
114 def name(self):
115 return self._name
116
117
118class _LttngLiveViewerGetTracingSessionInfosReply:
119 def __init__(self, tracing_session_infos):
120 self._tracing_session_infos = tracing_session_infos
121
122 @property
123 def tracing_session_infos(self):
124 return self._tracing_session_infos
125
126
127class _LttngLiveViewerAttachToTracingSessionCommand(_LttngLiveViewerCommand):
128 class SeekType:
129 BEGINNING = 1
130 LAST = 2
131
132 def __init__(self, version, tracing_session_id, offset, seek_type):
133 super().__init__(version)
134 self._tracing_session_id = tracing_session_id
135 self._offset = offset
136 self._seek_type = seek_type
137
138 @property
139 def tracing_session_id(self):
140 return self._tracing_session_id
141
142 @property
143 def offset(self):
144 return self._offset
145
146 @property
147 def seek_type(self):
148 return self._seek_type
149
150
151class _LttngLiveViewerStreamInfo:
152 def __init__(self, id, trace_id, is_metadata, path, channel_name):
153 self._id = id
154 self._trace_id = trace_id
155 self._is_metadata = is_metadata
156 self._path = path
157 self._channel_name = channel_name
158
159 @property
160 def id(self):
161 return self._id
162
163 @property
164 def trace_id(self):
165 return self._trace_id
166
167 @property
168 def is_metadata(self):
169 return self._is_metadata
170
171 @property
172 def path(self):
173 return self._path
174
175 @property
176 def channel_name(self):
177 return self._channel_name
178
179
180class _LttngLiveViewerAttachToTracingSessionReply:
181 class Status:
182 OK = 1
183 ALREADY = 2
184 UNKNOWN = 3
185 NOT_LIVE = 4
186 SEEK_ERROR = 5
187 NO_SESSION = 6
188
189 def __init__(self, status, stream_infos):
190 self._status = status
191 self._stream_infos = stream_infos
192
193 @property
194 def status(self):
195 return self._status
196
197 @property
198 def stream_infos(self):
199 return self._stream_infos
200
201
202class _LttngLiveViewerGetNextDataStreamIndexEntryCommand(_LttngLiveViewerCommand):
203 def __init__(self, version, stream_id):
204 super().__init__(version)
205 self._stream_id = stream_id
206
207 @property
208 def stream_id(self):
209 return self._stream_id
210
211
212class _LttngLiveViewerGetNextDataStreamIndexEntryReply:
213 class Status:
214 OK = 1
215 RETRY = 2
216 HUP = 3
217 ERROR = 4
218 INACTIVE = 5
219 EOF = 6
220
221 def __init__(self, status, index_entry, has_new_metadata, has_new_data_stream):
222 self._status = status
223 self._index_entry = index_entry
224 self._has_new_metadata = has_new_metadata
225 self._has_new_data_stream = has_new_data_stream
226
227 @property
228 def status(self):
229 return self._status
230
231 @property
232 def index_entry(self):
233 return self._index_entry
234
235 @property
236 def has_new_metadata(self):
237 return self._has_new_metadata
238
239 @property
240 def has_new_data_stream(self):
241 return self._has_new_data_stream
242
243
244class _LttngLiveViewerGetDataStreamPacketDataCommand(_LttngLiveViewerCommand):
245 def __init__(self, version, stream_id, offset, req_length):
246 super().__init__(version)
247 self._stream_id = stream_id
248 self._offset = offset
249 self._req_length = req_length
250
251 @property
252 def stream_id(self):
253 return self._stream_id
254
255 @property
256 def offset(self):
257 return self._offset
258
259 @property
260 def req_length(self):
261 return self._req_length
262
263
264class _LttngLiveViewerGetDataStreamPacketDataReply:
265 class Status:
266 OK = 1
267 RETRY = 2
268 ERROR = 3
269 EOF = 4
270
271 def __init__(self, status, data, has_new_metadata, has_new_data_stream):
272 self._status = status
273 self._data = data
274 self._has_new_metadata = has_new_metadata
275 self._has_new_data_stream = has_new_data_stream
276
277 @property
278 def status(self):
279 return self._status
280
281 @property
282 def data(self):
283 return self._data
284
285 @property
286 def has_new_metadata(self):
287 return self._has_new_metadata
288
289 @property
290 def has_new_data_stream(self):
291 return self._has_new_data_stream
292
293
294class _LttngLiveViewerGetMetadataStreamDataCommand(_LttngLiveViewerCommand):
295 def __init__(self, version, stream_id):
296 super().__init__(version)
297 self._stream_id = stream_id
298
299 @property
300 def stream_id(self):
301 return self._stream_id
302
303
304class _LttngLiveViewerGetMetadataStreamDataContentReply:
305 class Status:
306 OK = 1
307 NO_NEW = 2
308 ERROR = 3
309
310 def __init__(self, status, data):
311 self._status = status
312 self._data = data
313
314 @property
315 def status(self):
316 return self._status
317
318 @property
319 def data(self):
320 return self._data
321
322
323class _LttngLiveViewerGetNewStreamInfosCommand(_LttngLiveViewerCommand):
324 def __init__(self, version, tracing_session_id):
325 super().__init__(version)
326 self._tracing_session_id = tracing_session_id
327
328 @property
329 def tracing_session_id(self):
330 return self._tracing_session_id
331
332
333class _LttngLiveViewerGetNewStreamInfosReply:
334 class Status:
335 OK = 1
336 NO_NEW = 2
337 ERROR = 3
338 HUP = 4
339
340 def __init__(self, status, stream_infos):
341 self._status = status
342 self._stream_infos = stream_infos
343
344 @property
345 def status(self):
346 return self._status
347
348 @property
349 def stream_infos(self):
350 return self._stream_infos
351
352
353class _LttngLiveViewerCreateViewerSessionCommand(_LttngLiveViewerCommand):
354 pass
355
356
357class _LttngLiveViewerCreateViewerSessionReply:
358 class Status:
359 OK = 1
360 ERROR = 2
361
362 def __init__(self, status):
363 self._status = status
364
365 @property
366 def status(self):
367 return self._status
368
369
370class _LttngLiveViewerDetachFromTracingSessionCommand(_LttngLiveViewerCommand):
371 def __init__(self, version, tracing_session_id):
372 super().__init__(version)
373 self._tracing_session_id = tracing_session_id
374
375 @property
376 def tracing_session_id(self):
377 return self._tracing_session_id
378
379
380class _LttngLiveViewerDetachFromTracingSessionReply:
381 class Status:
382 OK = 1
383 UNKNOWN = 2
384 ERROR = 3
385
386 def __init__(self, status):
387 self._status = status
388
389 @property
390 def status(self):
391 return self._status
392
393
394# An LTTng live protocol codec can convert bytes to command objects and
395# reply objects to bytes.
396class _LttngLiveViewerProtocolCodec:
397 _COMMAND_HEADER_STRUCT_FMT = 'QII'
398 _COMMAND_HEADER_SIZE_BYTES = struct.calcsize(_COMMAND_HEADER_STRUCT_FMT)
399
400 def __init__(self):
401 pass
402
403 def _unpack(self, fmt, data, offset=0):
404 fmt = '!' + fmt
405 return struct.unpack_from(fmt, data, offset)
406
407 def _unpack_payload(self, fmt, data):
408 return self._unpack(
409 fmt, data, _LttngLiveViewerProtocolCodec._COMMAND_HEADER_SIZE_BYTES
410 )
411
412 def decode(self, data):
413 if len(data) < self._COMMAND_HEADER_SIZE_BYTES:
414 # Not enough data to read the command header
415 return
416
417 payload_size, cmd_type, version = self._unpack(
418 self._COMMAND_HEADER_STRUCT_FMT, data
419 )
420 logging.info(
421 'Decoded command header: payload-size={}, cmd-type={}, version={}'.format(
422 payload_size, cmd_type, version
423 )
424 )
425
426 if len(data) < self._COMMAND_HEADER_SIZE_BYTES + payload_size:
427 # Not enough data to read the whole command
428 return
429
430 if cmd_type == 1:
431 viewer_session_id, major, minor, conn_type = self._unpack_payload(
432 'QIII', data
433 )
434 return _LttngLiveViewerConnectCommand(
435 version, viewer_session_id, major, minor
436 )
437 elif cmd_type == 2:
438 return _LttngLiveViewerGetTracingSessionInfosCommand(version)
439 elif cmd_type == 3:
440 tracing_session_id, offset, seek_type = self._unpack_payload('QQI', data)
441 return _LttngLiveViewerAttachToTracingSessionCommand(
442 version, tracing_session_id, offset, seek_type
443 )
444 elif cmd_type == 4:
75882e97 445 (stream_id,) = self._unpack_payload('Q', data)
584af91e
PP
446 return _LttngLiveViewerGetNextDataStreamIndexEntryCommand(
447 version, stream_id
448 )
449 elif cmd_type == 5:
450 stream_id, offset, req_length = self._unpack_payload('QQI', data)
451 return _LttngLiveViewerGetDataStreamPacketDataCommand(
452 version, stream_id, offset, req_length
453 )
454 elif cmd_type == 6:
75882e97 455 (stream_id,) = self._unpack_payload('Q', data)
584af91e
PP
456 return _LttngLiveViewerGetMetadataStreamDataCommand(version, stream_id)
457 elif cmd_type == 7:
75882e97 458 (tracing_session_id,) = self._unpack_payload('Q', data)
584af91e
PP
459 return _LttngLiveViewerGetNewStreamInfosCommand(version, tracing_session_id)
460 elif cmd_type == 8:
461 return _LttngLiveViewerCreateViewerSessionCommand(version)
462 elif cmd_type == 9:
75882e97 463 (tracing_session_id,) = self._unpack_payload('Q', data)
584af91e
PP
464 return _LttngLiveViewerDetachFromTracingSessionCommand(
465 version, tracing_session_id
466 )
467 else:
468 raise UnexpectedInput('Unknown command type {}'.format(cmd_type))
469
470 def _pack(self, fmt, *args):
471 # Force network byte order
472 return struct.pack('!' + fmt, *args)
473
474 def _encode_zero_padded_str(self, string, length):
475 data = string.encode()
476 return data.ljust(length, b'\x00')
477
478 def _encode_stream_info(self, info):
479 data = self._pack('QQI', info.id, info.trace_id, int(info.is_metadata))
480 data += self._encode_zero_padded_str(info.path, 4096)
481 data += self._encode_zero_padded_str(info.channel_name, 255)
482 return data
483
484 def _get_has_new_stuff_flags(self, has_new_metadata, has_new_data_streams):
485 flags = 0
486
487 if has_new_metadata:
488 flags |= 1
489
490 if has_new_data_streams:
491 flags |= 2
492
493 return flags
494
495 def encode(self, reply):
496 if type(reply) is _LttngLiveViewerConnectReply:
497 data = self._pack(
498 'QIII', reply.viewer_session_id, reply.major, reply.minor, 2
499 )
500 elif type(reply) is _LttngLiveViewerGetTracingSessionInfosReply:
501 data = self._pack('I', len(reply.tracing_session_infos))
502
503 for info in reply.tracing_session_infos:
504 data += self._pack(
505 'QIII',
506 info.tracing_session_id,
507 info.live_timer_freq,
508 info.client_count,
509 info.stream_count,
510 )
511 data += self._encode_zero_padded_str(info.hostname, 64)
512 data += self._encode_zero_padded_str(info.name, 255)
513 elif type(reply) is _LttngLiveViewerAttachToTracingSessionReply:
514 data = self._pack('II', reply.status, len(reply.stream_infos))
515
516 for info in reply.stream_infos:
517 data += self._encode_stream_info(info)
518 elif type(reply) is _LttngLiveViewerGetNextDataStreamIndexEntryReply:
71f56e5f 519 index_format = 'QQQQQQQII'
584af91e
PP
520 entry = reply.index_entry
521 flags = self._get_has_new_stuff_flags(
522 reply.has_new_metadata, reply.has_new_data_stream
523 )
524
71f56e5f
JG
525 if type(entry) is _LttngDataStreamIndexEntry:
526 data = self._pack(
527 index_format,
528 entry.offset_bytes,
529 entry.total_size_bits,
530 entry.content_size_bits,
531 entry.timestamp_begin,
532 entry.timestamp_end,
533 entry.events_discarded,
534 entry.stream_class_id,
535 reply.status,
536 flags,
537 )
538 else:
539 assert type(entry) is _LttngDataStreamBeaconEntry
540 data = self._pack(
541 index_format,
542 0,
543 0,
544 0,
545 0,
546 entry.timestamp,
547 0,
548 entry.stream_class_id,
549 reply.status,
550 flags,
551 )
584af91e
PP
552 elif type(reply) is _LttngLiveViewerGetDataStreamPacketDataReply:
553 flags = self._get_has_new_stuff_flags(
554 reply.has_new_metadata, reply.has_new_data_stream
555 )
556 data = self._pack('III', reply.status, len(reply.data), flags)
557 data += reply.data
558 elif type(reply) is _LttngLiveViewerGetMetadataStreamDataContentReply:
559 data = self._pack('QI', len(reply.data), reply.status)
560 data += reply.data
561 elif type(reply) is _LttngLiveViewerGetNewStreamInfosReply:
562 data = self._pack('II', reply.status, len(reply.stream_infos))
563
564 for info in reply.stream_infos:
565 data += self._encode_stream_info(info)
566 elif type(reply) is _LttngLiveViewerCreateViewerSessionReply:
567 data = self._pack('I', reply.status)
568 elif type(reply) is _LttngLiveViewerDetachFromTracingSessionReply:
569 data = self._pack('I', reply.status)
570 else:
571 raise ValueError(
572 'Unknown reply object with class `{}`'.format(reply.__class__.__name__)
573 )
574
575 return data
576
577
578# An entry within the index of an LTTng data stream.
579class _LttngDataStreamIndexEntry:
580 def __init__(
581 self,
582 offset_bytes,
583 total_size_bits,
584 content_size_bits,
585 timestamp_begin,
586 timestamp_end,
587 events_discarded,
588 stream_class_id,
589 ):
590 self._offset_bytes = offset_bytes
591 self._total_size_bits = total_size_bits
592 self._content_size_bits = content_size_bits
593 self._timestamp_begin = timestamp_begin
594 self._timestamp_end = timestamp_end
595 self._events_discarded = events_discarded
596 self._stream_class_id = stream_class_id
597
598 @property
599 def offset_bytes(self):
600 return self._offset_bytes
601
602 @property
603 def total_size_bits(self):
604 return self._total_size_bits
605
606 @property
607 def total_size_bytes(self):
608 return self._total_size_bits // 8
609
610 @property
611 def content_size_bits(self):
612 return self._content_size_bits
613
614 @property
615 def content_size_bytes(self):
616 return self._content_size_bits // 8
617
618 @property
619 def timestamp_begin(self):
620 return self._timestamp_begin
621
622 @property
623 def timestamp_end(self):
624 return self._timestamp_end
625
626 @property
627 def events_discarded(self):
628 return self._events_discarded
629
630 @property
631 def stream_class_id(self):
632 return self._stream_class_id
633
634
71f56e5f
JG
635# An entry within the index of an LTTng data stream. While a stream beacon entry
636# is conceptually unrelated to an index, it is sent as a reply to a
637# LttngLiveViewerGetNextDataStreamIndexEntryCommand
638class _LttngDataStreamBeaconEntry:
639 def __init__(self, stream_class_id, timestamp):
640 self._stream_class_id = stream_class_id
641 self._timestamp = timestamp
642
643 @property
644 def timestamp(self):
645 return self._timestamp
646
647 @property
648 def stream_class_id(self):
649 return self._stream_class_id
650
651
78169723
FD
652def _get_entry_timestamp_begin(entry):
653 if type(entry) is _LttngDataStreamBeaconEntry:
654 return entry.timestamp
655 else:
656 assert type(entry) is _LttngDataStreamIndexEntry
657 return entry.timestamp_begin
658
659
584af91e
PP
660# The index of an LTTng data stream, a sequence of index entries.
661class _LttngDataStreamIndex(collections.abc.Sequence):
71f56e5f 662 def __init__(self, path, beacons):
584af91e
PP
663 self._path = path
664 self._build()
71f56e5f
JG
665
666 if beacons:
667 stream_class_id = self._entries[0].stream_class_id
668 beacons = [
669 _LttngDataStreamBeaconEntry(stream_class_id, ts) for ts in beacons
670 ]
671 self._add_beacons(beacons)
672
584af91e
PP
673 logging.info(
674 'Built data stream index entries: path="{}", count={}'.format(
675 path, len(self._entries)
676 )
677 )
678
679 def _build(self):
680 self._entries = []
681 assert os.path.isfile(self._path)
682
683 with open(self._path, 'rb') as f:
684 # Read header first
685 fmt = '>IIII'
686 size = struct.calcsize(fmt)
687 data = f.read(size)
688 assert len(data) == size
689 magic, index_major, index_minor, index_entry_length = struct.unpack(
690 fmt, data
691 )
692 assert magic == 0xC1F1DCC1
693
694 # Read index entries
695 fmt = '>QQQQQQQ'
696 size = struct.calcsize(fmt)
697
698 while True:
699 logging.debug(
700 'Decoding data stream index entry: path="{}", offset={}'.format(
701 self._path, f.tell()
702 )
703 )
704 data = f.read(size)
705
706 if not data:
707 # Done
708 break
709
710 assert len(data) == size
75882e97
FD
711 (
712 offset_bytes,
713 total_size_bits,
714 content_size_bits,
715 timestamp_begin,
716 timestamp_end,
717 events_discarded,
718 stream_class_id,
719 ) = struct.unpack(fmt, data)
584af91e
PP
720
721 self._entries.append(
722 _LttngDataStreamIndexEntry(
723 offset_bytes,
724 total_size_bits,
725 content_size_bits,
726 timestamp_begin,
727 timestamp_end,
728 events_discarded,
729 stream_class_id,
730 )
731 )
732
733 # Skip anything else before the next entry
734 f.seek(index_entry_length - size, os.SEEK_CUR)
735
71f56e5f
JG
736 def _add_beacons(self, beacons):
737 # Assumes entries[n + 1].timestamp_end >= entries[n].timestamp_begin
738 def sort_key(entry):
739 if type(entry) is _LttngDataStreamBeaconEntry:
740 return entry.timestamp
741 else:
742 return entry.timestamp_end
743
744 self._entries += beacons
745 self._entries.sort(key=sort_key)
746
584af91e
PP
747 def __getitem__(self, index):
748 return self._entries[index]
749
750 def __len__(self):
751 return len(self._entries)
752
753 @property
754 def path(self):
755 return self._path
756
757
758# An LTTng data stream.
759class _LttngDataStream:
71f56e5f 760 def __init__(self, path, beacons):
584af91e
PP
761 self._path = path
762 filename = os.path.basename(path)
763 match = re.match(r'(.*)_\d+', filename)
764 self._channel_name = match.group(1)
765 trace_dir = os.path.dirname(path)
766 index_path = os.path.join(trace_dir, 'index', filename + '.idx')
71f56e5f 767 self._index = _LttngDataStreamIndex(index_path, beacons)
584af91e
PP
768 assert os.path.isfile(path)
769 self._file = open(path, 'rb')
770 logging.info(
771 'Built data stream: path="{}", channel-name="{}"'.format(
772 path, self._channel_name
773 )
774 )
775
776 @property
777 def path(self):
778 return self._path
779
780 @property
781 def channel_name(self):
782 return self._channel_name
783
784 @property
785 def index(self):
786 return self._index
787
788 def get_data(self, offset_bytes, len_bytes):
789 self._file.seek(offset_bytes)
790 return self._file.read(len_bytes)
791
792
78169723
FD
793class _LttngMetadataStreamSection:
794 def __init__(self, timestamp, data):
795 self._timestamp = timestamp
796 if data is None:
797 self._data = bytes()
798 else:
799 self._data = data
800 logging.info(
801 'Built metadata stream section: ts={}, data-len={}'.format(
802 self._timestamp, len(self._data)
803 )
804 )
805
806 @property
807 def timestamp(self):
808 return self._timestamp
809
810 @property
811 def data(self):
812 return self._data
813
814
584af91e
PP
815# An LTTng metadata stream.
816class _LttngMetadataStream:
78169723
FD
817 def __init__(self, metadata_file_path, config_sections):
818 self._path = metadata_file_path
819 self._sections = config_sections
820 logging.info(
821 'Built metadata stream: path={}, section-len={}'.format(
822 self._path, len(self._sections)
823 )
824 )
584af91e
PP
825
826 @property
827 def path(self):
828 return self._path
829
830 @property
78169723
FD
831 def sections(self):
832 return self._sections
584af91e 833
78169723
FD
834
835LttngMetadataConfigSection = namedtuple(
836 'LttngMetadataConfigSection', ['line', 'timestamp', 'is_empty']
837)
838
839
840def _parse_metadata_sections_config(config_sections):
841 assert config_sections is not None
842 config_metadata_sections = []
843 append_empty_section = False
844 last_timestamp = 0
845 last_line = 0
846
847 for config_section in config_sections:
848 if config_section == 'empty':
849 # Found a empty section marker. Actually append the section at the
850 # timestamp of the next concrete section.
851 append_empty_section = True
852 else:
853 assert type(config_section) is dict
854 line = config_section.get('line')
855 ts = config_section.get('timestamp')
856
857 # Sections' timestamps and lines must both be increasing.
858 assert ts > last_timestamp
859 last_timestamp = ts
860 assert line > last_line
861 last_line = line
862
863 if append_empty_section:
864 config_metadata_sections.append(
865 LttngMetadataConfigSection(line, ts, True)
866 )
867 append_empty_section = False
868
869 config_metadata_sections.append(LttngMetadataConfigSection(line, ts, False))
870
871 return config_metadata_sections
872
873
874def _split_metadata_sections(metadata_file_path, raw_config_sections):
875 assert isinstance(raw_config_sections, collections.abc.Sequence)
876
877 parsed_sections = _parse_metadata_sections_config(raw_config_sections)
878
879 sections = []
880 with open(metadata_file_path, 'r') as metadata_file:
881 metadata_lines = [line for line in metadata_file]
882
883 config_metadata_sections_idx = 0
884 curr_metadata_section = bytearray()
885
886 for idx, line_content in enumerate(metadata_lines):
887 # Add one to the index to convert from the zero-indexing of the
888 # enumerate() function to the one-indexing used by humans when
889 # viewing a text file.
890 curr_line_number = idx + 1
891
892 # If there are no more sections, simply append the line.
893 if config_metadata_sections_idx + 1 >= len(parsed_sections):
894 curr_metadata_section += bytearray(line_content, 'utf8')
895 continue
896
897 next_section_line_number = parsed_sections[
898 config_metadata_sections_idx + 1
899 ].line
900
901 # If the next section begins at the current line, create a
902 # section with the metadata we gathered so far.
903 if curr_line_number >= next_section_line_number:
904
905 # Flushing the metadata of the current section.
906 sections.append(
907 _LttngMetadataStreamSection(
908 parsed_sections[config_metadata_sections_idx].timestamp,
909 bytes(curr_metadata_section),
910 )
911 )
912
913 # Move to the next section.
914 config_metadata_sections_idx += 1
915
916 # Clear old content and append current line for the next section.
917 curr_metadata_section.clear()
918 curr_metadata_section += bytearray(line_content, 'utf8')
919
920 # Append any empty sections.
921 while parsed_sections[config_metadata_sections_idx].is_empty:
922 sections.append(
923 _LttngMetadataStreamSection(
924 parsed_sections[config_metadata_sections_idx].timestamp, None
925 )
926 )
927 config_metadata_sections_idx += 1
928 else:
929 # Append line_content to the current metadata section.
930 curr_metadata_section += bytearray(line_content, 'utf8')
931
932 # We iterated over all the lines of the metadata file. Close the current section.
933 sections.append(
934 _LttngMetadataStreamSection(
935 parsed_sections[config_metadata_sections_idx].timestamp,
936 bytes(curr_metadata_section),
937 )
938 )
939
940 return sections
584af91e
PP
941
942
943# An LTTng trace, a sequence of LTTng data streams.
944class LttngTrace(collections.abc.Sequence):
78169723 945 def __init__(self, trace_dir, metadata_sections, beacons):
584af91e
PP
946 assert os.path.isdir(trace_dir)
947 self._path = trace_dir
78169723 948 self._create_metadata_stream(trace_dir, metadata_sections)
71f56e5f 949 self._create_data_streams(trace_dir, beacons)
584af91e
PP
950 logging.info('Built trace: path="{}"'.format(trace_dir))
951
71f56e5f 952 def _create_data_streams(self, trace_dir, beacons):
584af91e
PP
953 data_stream_paths = []
954
955 for filename in os.listdir(trace_dir):
956 path = os.path.join(trace_dir, filename)
957
958 if not os.path.isfile(path):
959 continue
960
961 if filename.startswith('.'):
962 continue
963
964 if filename == 'metadata':
965 continue
966
967 data_stream_paths.append(path)
968
969 data_stream_paths.sort()
970 self._data_streams = []
971
972 for data_stream_path in data_stream_paths:
71f56e5f
JG
973 stream_name = os.path.basename(data_stream_path)
974 this_stream_beacons = None
975
976 if beacons is not None and stream_name in beacons:
977 this_stream_beacons = beacons[stream_name]
978
979 self._data_streams.append(
980 _LttngDataStream(data_stream_path, this_stream_beacons)
981 )
584af91e 982
78169723
FD
983 def _create_metadata_stream(self, trace_dir, config_metadata_sections):
984 metadata_path = os.path.join(trace_dir, 'metadata')
985 metadata_sections = []
986
987 if config_metadata_sections is None:
988 with open(metadata_path, 'rb') as metadata_file:
989 metadata_sections.append(
990 _LttngMetadataStreamSection(0, metadata_file.read())
991 )
992 else:
993 metadata_sections = _split_metadata_sections(
994 metadata_path, config_metadata_sections
995 )
996
997 self._metadata_stream = _LttngMetadataStream(metadata_path, metadata_sections)
998
584af91e
PP
999 @property
1000 def path(self):
1001 return self._path
1002
1003 @property
1004 def metadata_stream(self):
1005 return self._metadata_stream
1006
1007 def __getitem__(self, index):
1008 return self._data_streams[index]
1009
1010 def __len__(self):
1011 return len(self._data_streams)
1012
1013
1014# The state of a single data stream.
1015class _LttngLiveViewerSessionDataStreamState:
78169723 1016 def __init__(self, ts_state, info, data_stream, metadata_stream_id):
584af91e
PP
1017 self._ts_state = ts_state
1018 self._info = info
1019 self._data_stream = data_stream
78169723 1020 self._metadata_stream_id = metadata_stream_id
584af91e
PP
1021 self._cur_index_entry_index = 0
1022 fmt = 'Built data stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
1023 logging.info(
1024 fmt.format(
1025 info.id,
1026 ts_state.tracing_session_descriptor.info.tracing_session_id,
1027 ts_state.tracing_session_descriptor.info.name,
1028 data_stream.path,
1029 )
1030 )
1031
1032 @property
1033 def tracing_session_state(self):
1034 return self._ts_state
1035
1036 @property
1037 def info(self):
1038 return self._info
1039
1040 @property
1041 def data_stream(self):
1042 return self._data_stream
1043
1044 @property
1045 def cur_index_entry(self):
1046 if self._cur_index_entry_index == len(self._data_stream.index):
1047 return
1048
1049 return self._data_stream.index[self._cur_index_entry_index]
1050
1051 def goto_next_index_entry(self):
1052 self._cur_index_entry_index += 1
1053
1054
1055# The state of a single metadata stream.
1056class _LttngLiveViewerSessionMetadataStreamState:
1057 def __init__(self, ts_state, info, metadata_stream):
1058 self._ts_state = ts_state
1059 self._info = info
1060 self._metadata_stream = metadata_stream
78169723
FD
1061 self._cur_metadata_stream_section_index = 0
1062 if len(metadata_stream.sections) > 1:
1063 self._next_metadata_stream_section_timestamp = metadata_stream.sections[
1064 1
1065 ].timestamp
1066 else:
1067 self._next_metadata_stream_section_timestamp = None
1068
584af91e
PP
1069 self._is_sent = False
1070 fmt = 'Built metadata stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
1071 logging.info(
1072 fmt.format(
1073 info.id,
1074 ts_state.tracing_session_descriptor.info.tracing_session_id,
1075 ts_state.tracing_session_descriptor.info.name,
1076 metadata_stream.path,
1077 )
1078 )
1079
1080 @property
1081 def trace_session_state(self):
1082 return self._trace_session_state
1083
1084 @property
1085 def info(self):
1086 return self._info
1087
1088 @property
1089 def metadata_stream(self):
1090 return self._metadata_stream
1091
1092 @property
1093 def is_sent(self):
1094 return self._is_sent
1095
1096 @is_sent.setter
1097 def is_sent(self, value):
1098 self._is_sent = value
1099
78169723
FD
1100 @property
1101 def cur_section(self):
1102 fmt = "Get current metadata section: section-idx={}"
1103 logging.info(fmt.format(self._cur_metadata_stream_section_index))
1104 if self._cur_metadata_stream_section_index == len(
1105 self._metadata_stream.sections
1106 ):
1107 return
1108
1109 return self._metadata_stream.sections[self._cur_metadata_stream_section_index]
1110
1111 def goto_next_section(self):
1112 self._cur_metadata_stream_section_index += 1
1113 if self.cur_section:
1114 self._next_metadata_stream_section_timestamp = self.cur_section.timestamp
1115 else:
1116 self._next_metadata_stream_section_timestamp = None
1117
1118 @property
1119 def next_section_timestamp(self):
1120 return self._next_metadata_stream_section_timestamp
1121
584af91e
PP
1122
1123# The state of a tracing session.
1124class _LttngLiveViewerSessionTracingSessionState:
1125 def __init__(self, tc_descr, base_stream_id):
1126 self._tc_descr = tc_descr
1127 self._stream_infos = []
1128 self._ds_states = {}
1129 self._ms_states = {}
1130 stream_id = base_stream_id
1131
1132 for trace in tc_descr.traces:
1133 trace_id = stream_id * 1000
1134
78169723
FD
1135 # Metadata stream -> stream info and metadata stream state
1136 info = _LttngLiveViewerStreamInfo(
1137 stream_id, trace_id, True, trace.metadata_stream.path, 'metadata'
1138 )
1139 self._stream_infos.append(info)
1140 self._ms_states[stream_id] = _LttngLiveViewerSessionMetadataStreamState(
1141 self, info, trace.metadata_stream
1142 )
1143 metadata_stream_id = stream_id
1144 stream_id += 1
1145
584af91e
PP
1146 # Data streams -> stream infos and data stream states
1147 for data_stream in trace:
1148 info = _LttngLiveViewerStreamInfo(
1149 stream_id,
1150 trace_id,
1151 False,
1152 data_stream.path,
1153 data_stream.channel_name,
1154 )
1155 self._stream_infos.append(info)
1156 self._ds_states[stream_id] = _LttngLiveViewerSessionDataStreamState(
78169723 1157 self, info, data_stream, metadata_stream_id
584af91e
PP
1158 )
1159 stream_id += 1
1160
584af91e
PP
1161 self._is_attached = False
1162 fmt = 'Built tracing session state: id={}, name="{}"'
1163 logging.info(fmt.format(tc_descr.info.tracing_session_id, tc_descr.info.name))
1164
1165 @property
1166 def tracing_session_descriptor(self):
1167 return self._tc_descr
1168
1169 @property
1170 def data_stream_states(self):
1171 return self._ds_states
1172
1173 @property
1174 def metadata_stream_states(self):
1175 return self._ms_states
1176
1177 @property
1178 def stream_infos(self):
1179 return self._stream_infos
1180
1181 @property
1182 def has_new_metadata(self):
1183 return any([not ms.is_sent for ms in self._ms_states.values()])
1184
1185 @property
1186 def is_attached(self):
1187 return self._is_attached
1188
1189 @is_attached.setter
1190 def is_attached(self, value):
1191 self._is_attached = value
1192
1193
78169723
FD
1194def needs_new_metadata_section(metadata_stream_state, latest_timestamp):
1195 if metadata_stream_state.next_section_timestamp is None:
1196 return False
1197
1198 if latest_timestamp >= metadata_stream_state.next_section_timestamp:
1199 return True
1200 else:
1201 return False
1202
1203
584af91e
PP
1204# An LTTng live viewer session manages a view on tracing sessions
1205# and replies to commands accordingly.
1206class _LttngLiveViewerSession:
1207 def __init__(
1208 self,
1209 viewer_session_id,
1210 tracing_session_descriptors,
1211 max_query_data_response_size,
1212 ):
1213 self._viewer_session_id = viewer_session_id
1214 self._ts_states = {}
1215 self._stream_states = {}
1216 self._max_query_data_response_size = max_query_data_response_size
1217 total_stream_infos = 0
1218
1219 for ts_descr in tracing_session_descriptors:
1220 ts_state = _LttngLiveViewerSessionTracingSessionState(
1221 ts_descr, total_stream_infos
1222 )
1223 ts_id = ts_state.tracing_session_descriptor.info.tracing_session_id
1224 self._ts_states[ts_id] = ts_state
1225 total_stream_infos += len(ts_state.stream_infos)
1226
1227 # Update session's stream states to have the new states
1228 self._stream_states.update(ts_state.data_stream_states)
1229 self._stream_states.update(ts_state.metadata_stream_states)
1230
1231 self._command_handlers = {
1232 _LttngLiveViewerAttachToTracingSessionCommand: self._handle_attach_to_tracing_session_command,
1233 _LttngLiveViewerCreateViewerSessionCommand: self._handle_create_viewer_session_command,
1234 _LttngLiveViewerDetachFromTracingSessionCommand: self._handle_detach_from_tracing_session_command,
1235 _LttngLiveViewerGetDataStreamPacketDataCommand: self._handle_get_data_stream_packet_data_command,
1236 _LttngLiveViewerGetMetadataStreamDataCommand: self._handle_get_metadata_stream_data_command,
1237 _LttngLiveViewerGetNewStreamInfosCommand: self._handle_get_new_stream_infos_command,
1238 _LttngLiveViewerGetNextDataStreamIndexEntryCommand: self._handle_get_next_data_stream_index_entry_command,
1239 _LttngLiveViewerGetTracingSessionInfosCommand: self._handle_get_tracing_session_infos_command,
1240 }
1241
1242 @property
1243 def viewer_session_id(self):
1244 return self._viewer_session_id
1245
1246 def _get_tracing_session_state(self, tracing_session_id):
1247 if tracing_session_id not in self._ts_states:
1248 raise UnexpectedInput(
1249 'Unknown tracing session ID {}'.format(tracing_session_id)
1250 )
1251
1252 return self._ts_states[tracing_session_id]
1253
1254 def _get_stream_state(self, stream_id):
1255 if stream_id not in self._stream_states:
1256 UnexpectedInput('Unknown stream ID {}'.format(stream_id))
1257
1258 return self._stream_states[stream_id]
1259
1260 def handle_command(self, cmd):
1261 logging.info(
1262 'Handling command in viewer session: cmd-cls-name={}'.format(
1263 cmd.__class__.__name__
1264 )
1265 )
1266 cmd_type = type(cmd)
1267
1268 if cmd_type not in self._command_handlers:
1269 raise UnexpectedInput(
1270 'Unexpected command: cmd-cls-name={}'.format(cmd.__class__.__name__)
1271 )
1272
1273 return self._command_handlers[cmd_type](cmd)
1274
1275 def _handle_attach_to_tracing_session_command(self, cmd):
1276 fmt = 'Handling "attach to tracing session" command: ts-id={}, offset={}, seek-type={}'
1277 logging.info(fmt.format(cmd.tracing_session_id, cmd.offset, cmd.seek_type))
1278 ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
1279 info = ts_state.tracing_session_descriptor.info
1280
1281 if ts_state.is_attached:
1282 raise UnexpectedInput(
1283 'Cannot attach to tracing session `{}`: viewer is already attached'.format(
1284 info.name
1285 )
1286 )
1287
1288 ts_state.is_attached = True
1289 status = _LttngLiveViewerAttachToTracingSessionReply.Status.OK
1290 return _LttngLiveViewerAttachToTracingSessionReply(
1291 status, ts_state.stream_infos
1292 )
1293
1294 def _handle_detach_from_tracing_session_command(self, cmd):
1295 fmt = 'Handling "detach from tracing session" command: ts-id={}'
1296 logging.info(fmt.format(cmd.tracing_session_id))
1297 ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
1298 info = ts_state.tracing_session_descriptor.info
1299
1300 if not ts_state.is_attached:
1301 raise UnexpectedInput(
1302 'Cannot detach to tracing session `{}`: viewer is not attached'.format(
1303 info.name
1304 )
1305 )
1306
1307 ts_state.is_attached = False
1308 status = _LttngLiveViewerDetachFromTracingSessionReply.Status.OK
1309 return _LttngLiveViewerDetachFromTracingSessionReply(status)
1310
1311 def _handle_get_next_data_stream_index_entry_command(self, cmd):
1312 fmt = 'Handling "get next data stream index entry" command: stream-id={}'
1313 logging.info(fmt.format(cmd.stream_id))
1314 stream_state = self._get_stream_state(cmd.stream_id)
78169723 1315 metadata_stream_state = self._get_stream_state(stream_state._metadata_stream_id)
584af91e
PP
1316
1317 if type(stream_state) is not _LttngLiveViewerSessionDataStreamState:
1318 raise UnexpectedInput(
1319 'Stream with ID {} is not a data stream'.format(cmd.stream_id)
1320 )
1321
1322 if stream_state.cur_index_entry is None:
1323 # The viewer is done reading this stream
1324 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.HUP
1325
1326 # Dummy data stream index entry to use with the `HUP` status
1327 # (the reply needs one, but the viewer ignores it)
1328 index_entry = _LttngDataStreamIndexEntry(0, 0, 0, 0, 0, 0, 0)
1329
1330 return _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1331 status, index_entry, False, False
1332 )
1333
78169723
FD
1334 timestamp_begin = _get_entry_timestamp_begin(stream_state.cur_index_entry)
1335
1336 if needs_new_metadata_section(metadata_stream_state, timestamp_begin):
1337 metadata_stream_state.is_sent = False
1338 metadata_stream_state.goto_next_section()
1339
584af91e
PP
1340 # The viewer only checks the `has_new_metadata` flag if the
1341 # reply's status is `OK`, so we need to provide an index here
1342 has_new_metadata = stream_state.tracing_session_state.has_new_metadata
71f56e5f
JG
1343 if type(stream_state.cur_index_entry) is _LttngDataStreamIndexEntry:
1344 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.OK
1345 else:
1346 assert type(stream_state.cur_index_entry) is _LttngDataStreamBeaconEntry
1347 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.INACTIVE
1348
584af91e
PP
1349 reply = _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1350 status, stream_state.cur_index_entry, has_new_metadata, False
1351 )
1352 stream_state.goto_next_index_entry()
1353 return reply
1354
1355 def _handle_get_data_stream_packet_data_command(self, cmd):
1356 fmt = 'Handling "get data stream packet data" command: stream-id={}, offset={}, req-length={}'
1357 logging.info(fmt.format(cmd.stream_id, cmd.offset, cmd.req_length))
1358 stream_state = self._get_stream_state(cmd.stream_id)
1359 data_response_length = cmd.req_length
1360
1361 if type(stream_state) is not _LttngLiveViewerSessionDataStreamState:
1362 raise UnexpectedInput(
1363 'Stream with ID {} is not a data stream'.format(cmd.stream_id)
1364 )
1365
1366 if stream_state.tracing_session_state.has_new_metadata:
1367 status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.ERROR
1368 return _LttngLiveViewerGetDataStreamPacketDataReply(
1369 status, bytes(), True, False
1370 )
1371
1372 if self._max_query_data_response_size:
1373 # Enforce a server side limit on the query requested length.
1374 # To ensure that the transaction terminate take the minimum of both
1375 # value.
1376 data_response_length = min(
1377 cmd.req_length, self._max_query_data_response_size
1378 )
1379 fmt = 'Limiting "get data stream packet data" command: req-length={} actual response size={}'
1380 logging.info(fmt.format(cmd.req_length, data_response_length))
1381
1382 data = stream_state.data_stream.get_data(cmd.offset, data_response_length)
1383 status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.OK
1384 return _LttngLiveViewerGetDataStreamPacketDataReply(status, data, False, False)
1385
1386 def _handle_get_metadata_stream_data_command(self, cmd):
1387 fmt = 'Handling "get metadata stream data" command: stream-id={}'
1388 logging.info(fmt.format(cmd.stream_id))
78169723 1389 metadata_stream_state = self._get_stream_state(cmd.stream_id)
584af91e 1390
78169723
FD
1391 if (
1392 type(metadata_stream_state)
1393 is not _LttngLiveViewerSessionMetadataStreamState
1394 ):
584af91e
PP
1395 raise UnexpectedInput(
1396 'Stream with ID {} is not a metadata stream'.format(cmd.stream_id)
1397 )
1398
78169723 1399 if metadata_stream_state.is_sent:
584af91e
PP
1400 status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.NO_NEW
1401 return _LttngLiveViewerGetMetadataStreamDataContentReply(status, bytes())
1402
78169723 1403 metadata_stream_state.is_sent = True
584af91e 1404 status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.OK
78169723
FD
1405 metadata_section = metadata_stream_state.cur_section
1406
1407 # If we are sending an empty section, ready the next one right away.
1408 if len(metadata_section.data) == 0:
1409 metadata_stream_state.is_sent = False
1410 metadata_stream_state.goto_next_section()
1411
1412 fmt = 'Replying to "get metadata stream data" command: metadata-size={}'
1413 logging.info(fmt.format(len(metadata_section.data)))
584af91e 1414 return _LttngLiveViewerGetMetadataStreamDataContentReply(
78169723 1415 status, metadata_section.data
584af91e
PP
1416 )
1417
1418 def _handle_get_new_stream_infos_command(self, cmd):
1419 fmt = 'Handling "get new stream infos" command: ts-id={}'
1420 logging.info(fmt.format(cmd.tracing_session_id))
1421
1422 # As of this version, all the tracing session's stream infos are
1423 # always given to the viewer when sending the "attach to tracing
1424 # session" reply, so there's nothing new here. Return the `HUP`
1425 # status as, if we're handling this command, the viewer consumed
1426 # all the existing data streams.
1427 status = _LttngLiveViewerGetNewStreamInfosReply.Status.HUP
1428 return _LttngLiveViewerGetNewStreamInfosReply(status, [])
1429
1430 def _handle_get_tracing_session_infos_command(self, cmd):
1431 logging.info('Handling "get tracing session infos" command.')
1432 infos = [
1433 tss.tracing_session_descriptor.info for tss in self._ts_states.values()
1434 ]
1435 infos.sort(key=lambda info: info.name)
1436 return _LttngLiveViewerGetTracingSessionInfosReply(infos)
1437
1438 def _handle_create_viewer_session_command(self, cmd):
1439 logging.info('Handling "create viewer session" command.')
1440 status = _LttngLiveViewerCreateViewerSessionReply.Status.OK
1441
1442 # This does nothing here. In the LTTng relay daemon, it
1443 # allocates the viewer session's state.
1444 return _LttngLiveViewerCreateViewerSessionReply(status)
1445
1446
1447# An LTTng live TCP server.
1448#
1449# On creation, it binds to `localhost` with an OS-assigned TCP port. It writes
1450# the decimal TCP port number to a temporary port file. It renames the
1451# temporary port file to `port_filename`.
1452#
1453# `tracing_session_descriptors` is a list of tracing session descriptors
1454# (`LttngTracingSessionDescriptor`) to serve.
1455#
1456# This server accepts a single viewer (client).
1457#
1458# When the viewer closes the connection, the server's constructor
1459# returns.
1460class LttngLiveServer:
1461 def __init__(
1462 self, port_filename, tracing_session_descriptors, max_query_data_response_size
1463 ):
1464 logging.info('Server configuration:')
1465
1466 logging.info(' Port file name: `{}`'.format(port_filename))
1467
1468 if max_query_data_response_size is not None:
1469 logging.info(
1470 ' Maximum response data query size: `{}`'.format(
1471 max_query_data_response_size
1472 )
1473 )
1474
1475 for ts_descr in tracing_session_descriptors:
1476 info = ts_descr.info
1477 fmt = ' TS descriptor: name="{}", id={}, hostname="{}", live-timer-freq={}, client-count={}, stream-count={}:'
1478 logging.info(
1479 fmt.format(
1480 info.name,
1481 info.tracing_session_id,
1482 info.hostname,
1483 info.live_timer_freq,
1484 info.client_count,
1485 info.stream_count,
1486 )
1487 )
1488
1489 for trace in ts_descr.traces:
1490 logging.info(' Trace: path="{}"'.format(trace.path))
1491
1492 self._ts_descriptors = tracing_session_descriptors
1493 self._max_query_data_response_size = max_query_data_response_size
1494 self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1495 self._codec = _LttngLiveViewerProtocolCodec()
1496
1497 # Port 0: OS assigns an unused port
1498 serv_addr = ('localhost', 0)
1499 self._sock.bind(serv_addr)
1500 self._write_port_to_file(port_filename)
1501
1502 try:
1503 self._listen()
1504 finally:
1505 self._sock.close()
1506 logging.info('Closed connection and socket.')
1507
1508 @property
1509 def _server_port(self):
1510 return self._sock.getsockname()[1]
1511
1512 def _recv_command(self):
1513 data = bytes()
1514
1515 while True:
1516 logging.info('Waiting for viewer command.')
1517 buf = self._conn.recv(128)
1518
1519 if not buf:
1520 logging.info('Client closed connection.')
1521
1522 if data:
1523 raise UnexpectedInput(
1524 'Client closed connection after having sent {} command bytes.'.format(
1525 len(data)
1526 )
1527 )
1528
1529 return
1530
1531 logging.info('Received data from viewer: length={}'.format(len(buf)))
1532
1533 data += buf
1534
1535 try:
1536 cmd = self._codec.decode(data)
1537 except struct.error as exc:
1538 raise UnexpectedInput('Malformed command: {}'.format(exc)) from exc
1539
1540 if cmd is not None:
1541 logging.info(
1542 'Received command from viewer: cmd-cls-name={}'.format(
1543 cmd.__class__.__name__
1544 )
1545 )
1546 return cmd
1547
1548 def _send_reply(self, reply):
1549 data = self._codec.encode(reply)
1550 logging.info(
1551 'Sending reply to viewer: reply-cls-name={}, length={}'.format(
1552 reply.__class__.__name__, len(data)
1553 )
1554 )
1555 self._conn.sendall(data)
1556
1557 def _handle_connection(self):
1558 # First command must be "connect"
1559 cmd = self._recv_command()
1560
1561 if type(cmd) is not _LttngLiveViewerConnectCommand:
1562 raise UnexpectedInput(
1563 'First command is not "connect": cmd-cls-name={}'.format(
1564 cmd.__class__.__name__
1565 )
1566 )
1567
1568 # Create viewer session (arbitrary ID 23)
1569 logging.info(
1570 'LTTng live viewer connected: version={}.{}'.format(cmd.major, cmd.minor)
1571 )
1572 viewer_session = _LttngLiveViewerSession(
1573 23, self._ts_descriptors, self._max_query_data_response_size
1574 )
1575
1576 # Send "connect" reply
1577 self._send_reply(
1578 _LttngLiveViewerConnectReply(viewer_session.viewer_session_id, 2, 10)
1579 )
1580
1581 # Make the viewer session handle the remaining commands
1582 while True:
1583 cmd = self._recv_command()
1584
1585 if cmd is None:
1586 # Connection closed (at an expected location within the
1587 # conversation)
1588 return
1589
1590 self._send_reply(viewer_session.handle_command(cmd))
1591
1592 def _listen(self):
1593 logging.info('Listening: port={}'.format(self._server_port))
1726ac08
JR
1594 # Backlog must be present for Python version < 3.5.
1595 # 128 is an arbitrary number since we expect only 1 connection anyway.
1596 self._sock.listen(128)
584af91e
PP
1597 self._conn, viewer_addr = self._sock.accept()
1598 logging.info(
1599 'Accepted viewer: addr={}:{}'.format(viewer_addr[0], viewer_addr[1])
1600 )
1601
1602 try:
1603 self._handle_connection()
1604 finally:
1605 self._conn.close()
1606
1607 def _write_port_to_file(self, port_filename):
1608 # Write the port number to a temporary file.
1609 with tempfile.NamedTemporaryFile(mode='w', delete=False) as tmp_port_file:
1610 print(self._server_port, end='', file=tmp_port_file)
1611
1612 # Rename temporary file to real file
9c878ece 1613 os.replace(tmp_port_file.name, port_filename)
584af91e
PP
1614 logging.info(
1615 'Renamed port file: src-path="{}", dst-path="{}"'.format(
1616 tmp_port_file.name, port_filename
1617 )
1618 )
1619
1620
1621# A tracing session descriptor.
1622#
1623# In the constructor, `traces` is a list of LTTng traces (`LttngTrace`
1624# objects).
1625class LttngTracingSessionDescriptor:
1626 def __init__(
1627 self, name, tracing_session_id, hostname, live_timer_freq, client_count, traces
1628 ):
1629 for trace in traces:
1630 if name not in trace.path:
1631 fmt = 'Tracing session name must be part of every trace path (`{}` not found in `{}`)'
1632 raise ValueError(fmt.format(name, trace.path))
1633
1634 self._traces = traces
1635 stream_count = sum([len(t) + 1 for t in traces])
1636 self._info = _LttngLiveViewerTracingSessionInfo(
1637 tracing_session_id,
1638 live_timer_freq,
1639 client_count,
1640 stream_count,
1641 hostname,
1642 name,
1643 )
1644
1645 @property
1646 def traces(self):
1647 return self._traces
1648
1649 @property
1650 def info(self):
1651 return self._info
1652
1653
2b763e29
JG
1654def _session_descriptors_from_path(sessions_filename, trace_path_prefix):
1655 # File format is:
1656 #
1657 # [
1658 # {
1659 # "name": "my-session",
1660 # "id": 17,
1661 # "hostname": "myhost",
1662 # "live-timer-freq": 1000000,
1663 # "client-count": 23,
1664 # "traces": [
1665 # {
1666 # "path": "lol"
1667 # },
1668 # {
71f56e5f
JG
1669 # "path": "meow/mix",
1670 # "beacons": {
1671 # "my_stream": [ 5235787, 728375283 ]
78169723
FD
1672 # },
1673 # "metadata-sections": [
1674 # {
1675 # "line": 1,
1676 # "timestamp": 0
1677 # }
1678 # ]
2b763e29
JG
1679 # }
1680 # ]
1681 # }
1682 # ]
1683 with open(sessions_filename, 'r') as sessions_file:
1684 params = json.load(sessions_file)
1685
1686 sessions = []
1687
1688 for session in params:
1689 name = session['name']
1690 tracing_session_id = session['id']
1691 hostname = session['hostname']
1692 live_timer_freq = session['live-timer-freq']
1693 client_count = session['client-count']
1694 traces = []
1695
1696 for trace in session['traces']:
78169723 1697 metadata_sections = trace.get('metadata-sections')
71f56e5f 1698 beacons = trace.get('beacons')
2b763e29
JG
1699 path = trace['path']
1700
1701 if not os.path.isabs(path):
1702 path = os.path.join(trace_path_prefix, path)
1703
78169723 1704 traces.append(LttngTrace(path, metadata_sections, beacons))
2b763e29
JG
1705
1706 sessions.append(
1707 LttngTracingSessionDescriptor(
1708 name,
1709 tracing_session_id,
1710 hostname,
1711 live_timer_freq,
1712 client_count,
1713 traces,
1714 )
1715 )
1716
1717 return sessions
584af91e
PP
1718
1719
1720def _loglevel_parser(string):
1721 loglevels = {'info': logging.INFO, 'warning': logging.WARNING}
1722 if string not in loglevels:
1723 msg = "{} is not a valid loglevel".format(string)
1724 raise argparse.ArgumentTypeError(msg)
1725 return loglevels[string]
1726
1727
1728if __name__ == '__main__':
1729 logging.basicConfig(format='# %(asctime)-25s%(message)s')
1730 parser = argparse.ArgumentParser(
1731 description='LTTng-live protocol mocker', add_help=False
1732 )
1733 parser.add_argument(
1734 '--log-level',
1735 default='warning',
1736 choices=['info', 'warning'],
1737 help='The loglevel to be used.',
1738 )
1739
1740 loglevel_namespace, remaining_args = parser.parse_known_args()
1741 logging.getLogger().setLevel(_loglevel_parser(loglevel_namespace.log_level))
1742
1743 parser.add_argument(
1744 '--port-filename',
1745 help='The final port file. This file is present when the server is ready to receive connection.',
1746 required=True,
1747 )
1748 parser.add_argument(
1749 '--max-query-data-response-size',
1750 type=int,
1751 help='The maximum size of control data response in bytes',
1752 )
1753 parser.add_argument(
2b763e29
JG
1754 '--trace-path-prefix',
1755 type=str,
1756 help='Prefix to prepend to the trace paths of session configurations',
1757 )
1758 parser.add_argument(
776a2a25
PP
1759 '--sessions-filename',
1760 type=str,
1761 help='Path to a session configuration file',
584af91e
PP
1762 )
1763 parser.add_argument(
1764 '-h',
1765 '--help',
1766 action='help',
1767 default=argparse.SUPPRESS,
1768 help='Show this help message and exit.',
1769 )
1770
1771 args = parser.parse_args(args=remaining_args)
1772 try:
2b763e29 1773 sessions = _session_descriptors_from_path(
78169723
FD
1774 args.sessions_filename,
1775 args.trace_path_prefix,
584af91e 1776 )
2b763e29 1777 LttngLiveServer(args.port_filename, sessions, args.max_query_data_response_size)
584af91e
PP
1778 except UnexpectedInput as exc:
1779 logging.error(str(exc))
1780 print(exc, file=sys.stderr)
1781 sys.exit(1)
This page took 0.106598 seconds and 4 git commands to generate.