Fix: src.ctf.lttng-live: consider empty metadata packet as retry
[babeltrace.git] / tests / data / plugins / src.ctf.lttng-live / lttng_live_server.py
CommitLineData
0235b0db 1# SPDX-License-Identifier: MIT
584af91e 2#
0235b0db 3# Copyright (C) 2019 Philippe Proulx <pproulx@efficios.com>
584af91e 4#
584af91e
PP
5
6import argparse
7import collections.abc
8import logging
9import os
10import os.path
11import re
12import socket
13import struct
14import sys
15import tempfile
2b763e29 16import json
584af91e
PP
17
18
19class UnexpectedInput(RuntimeError):
20 pass
21
22
23class _LttngLiveViewerCommand:
24 def __init__(self, version):
25 self._version = version
26
27 @property
28 def version(self):
29 return self._version
30
31
32class _LttngLiveViewerConnectCommand(_LttngLiveViewerCommand):
33 def __init__(self, version, viewer_session_id, major, minor):
34 super().__init__(version)
35 self._viewer_session_id = viewer_session_id
36 self._major = major
37 self._minor = minor
38
39 @property
40 def viewer_session_id(self):
41 return self._viewer_session_id
42
43 @property
44 def major(self):
45 return self._major
46
47 @property
48 def minor(self):
49 return self._minor
50
51
52class _LttngLiveViewerConnectReply:
53 def __init__(self, viewer_session_id, major, minor):
54 self._viewer_session_id = viewer_session_id
55 self._major = major
56 self._minor = minor
57
58 @property
59 def viewer_session_id(self):
60 return self._viewer_session_id
61
62 @property
63 def major(self):
64 return self._major
65
66 @property
67 def minor(self):
68 return self._minor
69
70
71class _LttngLiveViewerGetTracingSessionInfosCommand(_LttngLiveViewerCommand):
72 pass
73
74
75class _LttngLiveViewerTracingSessionInfo:
76 def __init__(
77 self,
78 tracing_session_id,
79 live_timer_freq,
80 client_count,
81 stream_count,
82 hostname,
83 name,
84 ):
85 self._tracing_session_id = tracing_session_id
86 self._live_timer_freq = live_timer_freq
87 self._client_count = client_count
88 self._stream_count = stream_count
89 self._hostname = hostname
90 self._name = name
91
92 @property
93 def tracing_session_id(self):
94 return self._tracing_session_id
95
96 @property
97 def live_timer_freq(self):
98 return self._live_timer_freq
99
100 @property
101 def client_count(self):
102 return self._client_count
103
104 @property
105 def stream_count(self):
106 return self._stream_count
107
108 @property
109 def hostname(self):
110 return self._hostname
111
112 @property
113 def name(self):
114 return self._name
115
116
117class _LttngLiveViewerGetTracingSessionInfosReply:
118 def __init__(self, tracing_session_infos):
119 self._tracing_session_infos = tracing_session_infos
120
121 @property
122 def tracing_session_infos(self):
123 return self._tracing_session_infos
124
125
126class _LttngLiveViewerAttachToTracingSessionCommand(_LttngLiveViewerCommand):
127 class SeekType:
128 BEGINNING = 1
129 LAST = 2
130
131 def __init__(self, version, tracing_session_id, offset, seek_type):
132 super().__init__(version)
133 self._tracing_session_id = tracing_session_id
134 self._offset = offset
135 self._seek_type = seek_type
136
137 @property
138 def tracing_session_id(self):
139 return self._tracing_session_id
140
141 @property
142 def offset(self):
143 return self._offset
144
145 @property
146 def seek_type(self):
147 return self._seek_type
148
149
150class _LttngLiveViewerStreamInfo:
151 def __init__(self, id, trace_id, is_metadata, path, channel_name):
152 self._id = id
153 self._trace_id = trace_id
154 self._is_metadata = is_metadata
155 self._path = path
156 self._channel_name = channel_name
157
158 @property
159 def id(self):
160 return self._id
161
162 @property
163 def trace_id(self):
164 return self._trace_id
165
166 @property
167 def is_metadata(self):
168 return self._is_metadata
169
170 @property
171 def path(self):
172 return self._path
173
174 @property
175 def channel_name(self):
176 return self._channel_name
177
178
179class _LttngLiveViewerAttachToTracingSessionReply:
180 class Status:
181 OK = 1
182 ALREADY = 2
183 UNKNOWN = 3
184 NOT_LIVE = 4
185 SEEK_ERROR = 5
186 NO_SESSION = 6
187
188 def __init__(self, status, stream_infos):
189 self._status = status
190 self._stream_infos = stream_infos
191
192 @property
193 def status(self):
194 return self._status
195
196 @property
197 def stream_infos(self):
198 return self._stream_infos
199
200
201class _LttngLiveViewerGetNextDataStreamIndexEntryCommand(_LttngLiveViewerCommand):
202 def __init__(self, version, stream_id):
203 super().__init__(version)
204 self._stream_id = stream_id
205
206 @property
207 def stream_id(self):
208 return self._stream_id
209
210
211class _LttngLiveViewerGetNextDataStreamIndexEntryReply:
212 class Status:
213 OK = 1
214 RETRY = 2
215 HUP = 3
216 ERROR = 4
217 INACTIVE = 5
218 EOF = 6
219
220 def __init__(self, status, index_entry, has_new_metadata, has_new_data_stream):
221 self._status = status
222 self._index_entry = index_entry
223 self._has_new_metadata = has_new_metadata
224 self._has_new_data_stream = has_new_data_stream
225
226 @property
227 def status(self):
228 return self._status
229
230 @property
231 def index_entry(self):
232 return self._index_entry
233
234 @property
235 def has_new_metadata(self):
236 return self._has_new_metadata
237
238 @property
239 def has_new_data_stream(self):
240 return self._has_new_data_stream
241
242
243class _LttngLiveViewerGetDataStreamPacketDataCommand(_LttngLiveViewerCommand):
244 def __init__(self, version, stream_id, offset, req_length):
245 super().__init__(version)
246 self._stream_id = stream_id
247 self._offset = offset
248 self._req_length = req_length
249
250 @property
251 def stream_id(self):
252 return self._stream_id
253
254 @property
255 def offset(self):
256 return self._offset
257
258 @property
259 def req_length(self):
260 return self._req_length
261
262
263class _LttngLiveViewerGetDataStreamPacketDataReply:
264 class Status:
265 OK = 1
266 RETRY = 2
267 ERROR = 3
268 EOF = 4
269
270 def __init__(self, status, data, has_new_metadata, has_new_data_stream):
271 self._status = status
272 self._data = data
273 self._has_new_metadata = has_new_metadata
274 self._has_new_data_stream = has_new_data_stream
275
276 @property
277 def status(self):
278 return self._status
279
280 @property
281 def data(self):
282 return self._data
283
284 @property
285 def has_new_metadata(self):
286 return self._has_new_metadata
287
288 @property
289 def has_new_data_stream(self):
290 return self._has_new_data_stream
291
292
293class _LttngLiveViewerGetMetadataStreamDataCommand(_LttngLiveViewerCommand):
294 def __init__(self, version, stream_id):
295 super().__init__(version)
296 self._stream_id = stream_id
297
298 @property
299 def stream_id(self):
300 return self._stream_id
301
302
303class _LttngLiveViewerGetMetadataStreamDataContentReply:
304 class Status:
305 OK = 1
306 NO_NEW = 2
307 ERROR = 3
308
309 def __init__(self, status, data):
310 self._status = status
311 self._data = data
312
313 @property
314 def status(self):
315 return self._status
316
317 @property
318 def data(self):
319 return self._data
320
321
322class _LttngLiveViewerGetNewStreamInfosCommand(_LttngLiveViewerCommand):
323 def __init__(self, version, tracing_session_id):
324 super().__init__(version)
325 self._tracing_session_id = tracing_session_id
326
327 @property
328 def tracing_session_id(self):
329 return self._tracing_session_id
330
331
332class _LttngLiveViewerGetNewStreamInfosReply:
333 class Status:
334 OK = 1
335 NO_NEW = 2
336 ERROR = 3
337 HUP = 4
338
339 def __init__(self, status, stream_infos):
340 self._status = status
341 self._stream_infos = stream_infos
342
343 @property
344 def status(self):
345 return self._status
346
347 @property
348 def stream_infos(self):
349 return self._stream_infos
350
351
352class _LttngLiveViewerCreateViewerSessionCommand(_LttngLiveViewerCommand):
353 pass
354
355
356class _LttngLiveViewerCreateViewerSessionReply:
357 class Status:
358 OK = 1
359 ERROR = 2
360
361 def __init__(self, status):
362 self._status = status
363
364 @property
365 def status(self):
366 return self._status
367
368
369class _LttngLiveViewerDetachFromTracingSessionCommand(_LttngLiveViewerCommand):
370 def __init__(self, version, tracing_session_id):
371 super().__init__(version)
372 self._tracing_session_id = tracing_session_id
373
374 @property
375 def tracing_session_id(self):
376 return self._tracing_session_id
377
378
379class _LttngLiveViewerDetachFromTracingSessionReply:
380 class Status:
381 OK = 1
382 UNKNOWN = 2
383 ERROR = 3
384
385 def __init__(self, status):
386 self._status = status
387
388 @property
389 def status(self):
390 return self._status
391
392
393# An LTTng live protocol codec can convert bytes to command objects and
394# reply objects to bytes.
395class _LttngLiveViewerProtocolCodec:
396 _COMMAND_HEADER_STRUCT_FMT = 'QII'
397 _COMMAND_HEADER_SIZE_BYTES = struct.calcsize(_COMMAND_HEADER_STRUCT_FMT)
398
399 def __init__(self):
400 pass
401
402 def _unpack(self, fmt, data, offset=0):
403 fmt = '!' + fmt
404 return struct.unpack_from(fmt, data, offset)
405
406 def _unpack_payload(self, fmt, data):
407 return self._unpack(
408 fmt, data, _LttngLiveViewerProtocolCodec._COMMAND_HEADER_SIZE_BYTES
409 )
410
411 def decode(self, data):
412 if len(data) < self._COMMAND_HEADER_SIZE_BYTES:
413 # Not enough data to read the command header
414 return
415
416 payload_size, cmd_type, version = self._unpack(
417 self._COMMAND_HEADER_STRUCT_FMT, data
418 )
419 logging.info(
420 'Decoded command header: payload-size={}, cmd-type={}, version={}'.format(
421 payload_size, cmd_type, version
422 )
423 )
424
425 if len(data) < self._COMMAND_HEADER_SIZE_BYTES + payload_size:
426 # Not enough data to read the whole command
427 return
428
429 if cmd_type == 1:
430 viewer_session_id, major, minor, conn_type = self._unpack_payload(
431 'QIII', data
432 )
433 return _LttngLiveViewerConnectCommand(
434 version, viewer_session_id, major, minor
435 )
436 elif cmd_type == 2:
437 return _LttngLiveViewerGetTracingSessionInfosCommand(version)
438 elif cmd_type == 3:
439 tracing_session_id, offset, seek_type = self._unpack_payload('QQI', data)
440 return _LttngLiveViewerAttachToTracingSessionCommand(
441 version, tracing_session_id, offset, seek_type
442 )
443 elif cmd_type == 4:
75882e97 444 (stream_id,) = self._unpack_payload('Q', data)
584af91e
PP
445 return _LttngLiveViewerGetNextDataStreamIndexEntryCommand(
446 version, stream_id
447 )
448 elif cmd_type == 5:
449 stream_id, offset, req_length = self._unpack_payload('QQI', data)
450 return _LttngLiveViewerGetDataStreamPacketDataCommand(
451 version, stream_id, offset, req_length
452 )
453 elif cmd_type == 6:
75882e97 454 (stream_id,) = self._unpack_payload('Q', data)
584af91e
PP
455 return _LttngLiveViewerGetMetadataStreamDataCommand(version, stream_id)
456 elif cmd_type == 7:
75882e97 457 (tracing_session_id,) = self._unpack_payload('Q', data)
584af91e
PP
458 return _LttngLiveViewerGetNewStreamInfosCommand(version, tracing_session_id)
459 elif cmd_type == 8:
460 return _LttngLiveViewerCreateViewerSessionCommand(version)
461 elif cmd_type == 9:
75882e97 462 (tracing_session_id,) = self._unpack_payload('Q', data)
584af91e
PP
463 return _LttngLiveViewerDetachFromTracingSessionCommand(
464 version, tracing_session_id
465 )
466 else:
467 raise UnexpectedInput('Unknown command type {}'.format(cmd_type))
468
469 def _pack(self, fmt, *args):
470 # Force network byte order
471 return struct.pack('!' + fmt, *args)
472
473 def _encode_zero_padded_str(self, string, length):
474 data = string.encode()
475 return data.ljust(length, b'\x00')
476
477 def _encode_stream_info(self, info):
478 data = self._pack('QQI', info.id, info.trace_id, int(info.is_metadata))
479 data += self._encode_zero_padded_str(info.path, 4096)
480 data += self._encode_zero_padded_str(info.channel_name, 255)
481 return data
482
483 def _get_has_new_stuff_flags(self, has_new_metadata, has_new_data_streams):
484 flags = 0
485
486 if has_new_metadata:
487 flags |= 1
488
489 if has_new_data_streams:
490 flags |= 2
491
492 return flags
493
494 def encode(self, reply):
495 if type(reply) is _LttngLiveViewerConnectReply:
496 data = self._pack(
497 'QIII', reply.viewer_session_id, reply.major, reply.minor, 2
498 )
499 elif type(reply) is _LttngLiveViewerGetTracingSessionInfosReply:
500 data = self._pack('I', len(reply.tracing_session_infos))
501
502 for info in reply.tracing_session_infos:
503 data += self._pack(
504 'QIII',
505 info.tracing_session_id,
506 info.live_timer_freq,
507 info.client_count,
508 info.stream_count,
509 )
510 data += self._encode_zero_padded_str(info.hostname, 64)
511 data += self._encode_zero_padded_str(info.name, 255)
512 elif type(reply) is _LttngLiveViewerAttachToTracingSessionReply:
513 data = self._pack('II', reply.status, len(reply.stream_infos))
514
515 for info in reply.stream_infos:
516 data += self._encode_stream_info(info)
517 elif type(reply) is _LttngLiveViewerGetNextDataStreamIndexEntryReply:
71f56e5f 518 index_format = 'QQQQQQQII'
584af91e
PP
519 entry = reply.index_entry
520 flags = self._get_has_new_stuff_flags(
521 reply.has_new_metadata, reply.has_new_data_stream
522 )
523
71f56e5f
JG
524 if type(entry) is _LttngDataStreamIndexEntry:
525 data = self._pack(
526 index_format,
527 entry.offset_bytes,
528 entry.total_size_bits,
529 entry.content_size_bits,
530 entry.timestamp_begin,
531 entry.timestamp_end,
532 entry.events_discarded,
533 entry.stream_class_id,
534 reply.status,
535 flags,
536 )
537 else:
538 assert type(entry) is _LttngDataStreamBeaconEntry
539 data = self._pack(
540 index_format,
541 0,
542 0,
543 0,
544 0,
545 entry.timestamp,
546 0,
547 entry.stream_class_id,
548 reply.status,
549 flags,
550 )
584af91e
PP
551 elif type(reply) is _LttngLiveViewerGetDataStreamPacketDataReply:
552 flags = self._get_has_new_stuff_flags(
553 reply.has_new_metadata, reply.has_new_data_stream
554 )
555 data = self._pack('III', reply.status, len(reply.data), flags)
556 data += reply.data
557 elif type(reply) is _LttngLiveViewerGetMetadataStreamDataContentReply:
558 data = self._pack('QI', len(reply.data), reply.status)
559 data += reply.data
560 elif type(reply) is _LttngLiveViewerGetNewStreamInfosReply:
561 data = self._pack('II', reply.status, len(reply.stream_infos))
562
563 for info in reply.stream_infos:
564 data += self._encode_stream_info(info)
565 elif type(reply) is _LttngLiveViewerCreateViewerSessionReply:
566 data = self._pack('I', reply.status)
567 elif type(reply) is _LttngLiveViewerDetachFromTracingSessionReply:
568 data = self._pack('I', reply.status)
569 else:
570 raise ValueError(
571 'Unknown reply object with class `{}`'.format(reply.__class__.__name__)
572 )
573
574 return data
575
576
577# An entry within the index of an LTTng data stream.
578class _LttngDataStreamIndexEntry:
579 def __init__(
580 self,
581 offset_bytes,
582 total_size_bits,
583 content_size_bits,
584 timestamp_begin,
585 timestamp_end,
586 events_discarded,
587 stream_class_id,
588 ):
589 self._offset_bytes = offset_bytes
590 self._total_size_bits = total_size_bits
591 self._content_size_bits = content_size_bits
592 self._timestamp_begin = timestamp_begin
593 self._timestamp_end = timestamp_end
594 self._events_discarded = events_discarded
595 self._stream_class_id = stream_class_id
596
597 @property
598 def offset_bytes(self):
599 return self._offset_bytes
600
601 @property
602 def total_size_bits(self):
603 return self._total_size_bits
604
605 @property
606 def total_size_bytes(self):
607 return self._total_size_bits // 8
608
609 @property
610 def content_size_bits(self):
611 return self._content_size_bits
612
613 @property
614 def content_size_bytes(self):
615 return self._content_size_bits // 8
616
617 @property
618 def timestamp_begin(self):
619 return self._timestamp_begin
620
621 @property
622 def timestamp_end(self):
623 return self._timestamp_end
624
625 @property
626 def events_discarded(self):
627 return self._events_discarded
628
629 @property
630 def stream_class_id(self):
631 return self._stream_class_id
632
633
71f56e5f
JG
634# An entry within the index of an LTTng data stream. While a stream beacon entry
635# is conceptually unrelated to an index, it is sent as a reply to a
636# LttngLiveViewerGetNextDataStreamIndexEntryCommand
637class _LttngDataStreamBeaconEntry:
638 def __init__(self, stream_class_id, timestamp):
639 self._stream_class_id = stream_class_id
640 self._timestamp = timestamp
641
642 @property
643 def timestamp(self):
644 return self._timestamp
645
646 @property
647 def stream_class_id(self):
648 return self._stream_class_id
649
650
584af91e
PP
651# The index of an LTTng data stream, a sequence of index entries.
652class _LttngDataStreamIndex(collections.abc.Sequence):
71f56e5f 653 def __init__(self, path, beacons):
584af91e
PP
654 self._path = path
655 self._build()
71f56e5f
JG
656
657 if beacons:
658 stream_class_id = self._entries[0].stream_class_id
659 beacons = [
660 _LttngDataStreamBeaconEntry(stream_class_id, ts) for ts in beacons
661 ]
662 self._add_beacons(beacons)
663
584af91e
PP
664 logging.info(
665 'Built data stream index entries: path="{}", count={}'.format(
666 path, len(self._entries)
667 )
668 )
669
670 def _build(self):
671 self._entries = []
672 assert os.path.isfile(self._path)
673
674 with open(self._path, 'rb') as f:
675 # Read header first
676 fmt = '>IIII'
677 size = struct.calcsize(fmt)
678 data = f.read(size)
679 assert len(data) == size
680 magic, index_major, index_minor, index_entry_length = struct.unpack(
681 fmt, data
682 )
683 assert magic == 0xC1F1DCC1
684
685 # Read index entries
686 fmt = '>QQQQQQQ'
687 size = struct.calcsize(fmt)
688
689 while True:
690 logging.debug(
691 'Decoding data stream index entry: path="{}", offset={}'.format(
692 self._path, f.tell()
693 )
694 )
695 data = f.read(size)
696
697 if not data:
698 # Done
699 break
700
701 assert len(data) == size
75882e97
FD
702 (
703 offset_bytes,
704 total_size_bits,
705 content_size_bits,
706 timestamp_begin,
707 timestamp_end,
708 events_discarded,
709 stream_class_id,
710 ) = struct.unpack(fmt, data)
584af91e
PP
711
712 self._entries.append(
713 _LttngDataStreamIndexEntry(
714 offset_bytes,
715 total_size_bits,
716 content_size_bits,
717 timestamp_begin,
718 timestamp_end,
719 events_discarded,
720 stream_class_id,
721 )
722 )
723
724 # Skip anything else before the next entry
725 f.seek(index_entry_length - size, os.SEEK_CUR)
726
71f56e5f
JG
727 def _add_beacons(self, beacons):
728 # Assumes entries[n + 1].timestamp_end >= entries[n].timestamp_begin
729 def sort_key(entry):
730 if type(entry) is _LttngDataStreamBeaconEntry:
731 return entry.timestamp
732 else:
733 return entry.timestamp_end
734
735 self._entries += beacons
736 self._entries.sort(key=sort_key)
737
584af91e
PP
738 def __getitem__(self, index):
739 return self._entries[index]
740
741 def __len__(self):
742 return len(self._entries)
743
744 @property
745 def path(self):
746 return self._path
747
748
749# An LTTng data stream.
750class _LttngDataStream:
71f56e5f 751 def __init__(self, path, beacons):
584af91e
PP
752 self._path = path
753 filename = os.path.basename(path)
754 match = re.match(r'(.*)_\d+', filename)
755 self._channel_name = match.group(1)
756 trace_dir = os.path.dirname(path)
757 index_path = os.path.join(trace_dir, 'index', filename + '.idx')
71f56e5f 758 self._index = _LttngDataStreamIndex(index_path, beacons)
584af91e
PP
759 assert os.path.isfile(path)
760 self._file = open(path, 'rb')
761 logging.info(
762 'Built data stream: path="{}", channel-name="{}"'.format(
763 path, self._channel_name
764 )
765 )
766
767 @property
768 def path(self):
769 return self._path
770
771 @property
772 def channel_name(self):
773 return self._channel_name
774
775 @property
776 def index(self):
777 return self._index
778
779 def get_data(self, offset_bytes, len_bytes):
780 self._file.seek(offset_bytes)
781 return self._file.read(len_bytes)
782
783
784# An LTTng metadata stream.
785class _LttngMetadataStream:
786 def __init__(self, path):
787 self._path = path
788 logging.info('Built metadata stream: path="{}"'.format(path))
789
790 @property
791 def path(self):
792 return self._path
793
794 @property
795 def data(self):
796 assert os.path.isfile(self._path)
797
798 with open(self._path, 'rb') as f:
799 return f.read()
800
801
802# An LTTng trace, a sequence of LTTng data streams.
803class LttngTrace(collections.abc.Sequence):
71f56e5f 804 def __init__(self, trace_dir, beacons):
584af91e
PP
805 assert os.path.isdir(trace_dir)
806 self._path = trace_dir
807 self._metadata_stream = _LttngMetadataStream(
808 os.path.join(trace_dir, 'metadata')
809 )
71f56e5f 810 self._create_data_streams(trace_dir, beacons)
584af91e
PP
811 logging.info('Built trace: path="{}"'.format(trace_dir))
812
71f56e5f 813 def _create_data_streams(self, trace_dir, beacons):
584af91e
PP
814 data_stream_paths = []
815
816 for filename in os.listdir(trace_dir):
817 path = os.path.join(trace_dir, filename)
818
819 if not os.path.isfile(path):
820 continue
821
822 if filename.startswith('.'):
823 continue
824
825 if filename == 'metadata':
826 continue
827
828 data_stream_paths.append(path)
829
830 data_stream_paths.sort()
831 self._data_streams = []
832
833 for data_stream_path in data_stream_paths:
71f56e5f
JG
834 stream_name = os.path.basename(data_stream_path)
835 this_stream_beacons = None
836
837 if beacons is not None and stream_name in beacons:
838 this_stream_beacons = beacons[stream_name]
839
840 self._data_streams.append(
841 _LttngDataStream(data_stream_path, this_stream_beacons)
842 )
584af91e
PP
843
844 @property
845 def path(self):
846 return self._path
847
848 @property
849 def metadata_stream(self):
850 return self._metadata_stream
851
852 def __getitem__(self, index):
853 return self._data_streams[index]
854
855 def __len__(self):
856 return len(self._data_streams)
857
858
859# The state of a single data stream.
860class _LttngLiveViewerSessionDataStreamState:
861 def __init__(self, ts_state, info, data_stream):
862 self._ts_state = ts_state
863 self._info = info
864 self._data_stream = data_stream
865 self._cur_index_entry_index = 0
866 fmt = 'Built data stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
867 logging.info(
868 fmt.format(
869 info.id,
870 ts_state.tracing_session_descriptor.info.tracing_session_id,
871 ts_state.tracing_session_descriptor.info.name,
872 data_stream.path,
873 )
874 )
875
876 @property
877 def tracing_session_state(self):
878 return self._ts_state
879
880 @property
881 def info(self):
882 return self._info
883
884 @property
885 def data_stream(self):
886 return self._data_stream
887
888 @property
889 def cur_index_entry(self):
890 if self._cur_index_entry_index == len(self._data_stream.index):
891 return
892
893 return self._data_stream.index[self._cur_index_entry_index]
894
895 def goto_next_index_entry(self):
896 self._cur_index_entry_index += 1
897
898
899# The state of a single metadata stream.
900class _LttngLiveViewerSessionMetadataStreamState:
901 def __init__(self, ts_state, info, metadata_stream):
902 self._ts_state = ts_state
903 self._info = info
904 self._metadata_stream = metadata_stream
905 self._is_sent = False
906 fmt = 'Built metadata stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
907 logging.info(
908 fmt.format(
909 info.id,
910 ts_state.tracing_session_descriptor.info.tracing_session_id,
911 ts_state.tracing_session_descriptor.info.name,
912 metadata_stream.path,
913 )
914 )
915
916 @property
917 def trace_session_state(self):
918 return self._trace_session_state
919
920 @property
921 def info(self):
922 return self._info
923
924 @property
925 def metadata_stream(self):
926 return self._metadata_stream
927
928 @property
929 def is_sent(self):
930 return self._is_sent
931
932 @is_sent.setter
933 def is_sent(self, value):
934 self._is_sent = value
935
936
937# The state of a tracing session.
938class _LttngLiveViewerSessionTracingSessionState:
939 def __init__(self, tc_descr, base_stream_id):
940 self._tc_descr = tc_descr
941 self._stream_infos = []
942 self._ds_states = {}
943 self._ms_states = {}
944 stream_id = base_stream_id
945
946 for trace in tc_descr.traces:
947 trace_id = stream_id * 1000
948
949 # Data streams -> stream infos and data stream states
950 for data_stream in trace:
951 info = _LttngLiveViewerStreamInfo(
952 stream_id,
953 trace_id,
954 False,
955 data_stream.path,
956 data_stream.channel_name,
957 )
958 self._stream_infos.append(info)
959 self._ds_states[stream_id] = _LttngLiveViewerSessionDataStreamState(
960 self, info, data_stream
961 )
962 stream_id += 1
963
964 # Metadata stream -> stream info and metadata stream state
965 info = _LttngLiveViewerStreamInfo(
966 stream_id, trace_id, True, trace.metadata_stream.path, 'metadata'
967 )
968 self._stream_infos.append(info)
969 self._ms_states[stream_id] = _LttngLiveViewerSessionMetadataStreamState(
970 self, info, trace.metadata_stream
971 )
972 stream_id += 1
973
974 self._is_attached = False
975 fmt = 'Built tracing session state: id={}, name="{}"'
976 logging.info(fmt.format(tc_descr.info.tracing_session_id, tc_descr.info.name))
977
978 @property
979 def tracing_session_descriptor(self):
980 return self._tc_descr
981
982 @property
983 def data_stream_states(self):
984 return self._ds_states
985
986 @property
987 def metadata_stream_states(self):
988 return self._ms_states
989
990 @property
991 def stream_infos(self):
992 return self._stream_infos
993
994 @property
995 def has_new_metadata(self):
996 return any([not ms.is_sent for ms in self._ms_states.values()])
997
998 @property
999 def is_attached(self):
1000 return self._is_attached
1001
1002 @is_attached.setter
1003 def is_attached(self, value):
1004 self._is_attached = value
1005
1006
1007# An LTTng live viewer session manages a view on tracing sessions
1008# and replies to commands accordingly.
1009class _LttngLiveViewerSession:
1010 def __init__(
1011 self,
1012 viewer_session_id,
1013 tracing_session_descriptors,
1014 max_query_data_response_size,
1015 ):
1016 self._viewer_session_id = viewer_session_id
1017 self._ts_states = {}
1018 self._stream_states = {}
1019 self._max_query_data_response_size = max_query_data_response_size
1020 total_stream_infos = 0
1021
1022 for ts_descr in tracing_session_descriptors:
1023 ts_state = _LttngLiveViewerSessionTracingSessionState(
1024 ts_descr, total_stream_infos
1025 )
1026 ts_id = ts_state.tracing_session_descriptor.info.tracing_session_id
1027 self._ts_states[ts_id] = ts_state
1028 total_stream_infos += len(ts_state.stream_infos)
1029
1030 # Update session's stream states to have the new states
1031 self._stream_states.update(ts_state.data_stream_states)
1032 self._stream_states.update(ts_state.metadata_stream_states)
1033
1034 self._command_handlers = {
1035 _LttngLiveViewerAttachToTracingSessionCommand: self._handle_attach_to_tracing_session_command,
1036 _LttngLiveViewerCreateViewerSessionCommand: self._handle_create_viewer_session_command,
1037 _LttngLiveViewerDetachFromTracingSessionCommand: self._handle_detach_from_tracing_session_command,
1038 _LttngLiveViewerGetDataStreamPacketDataCommand: self._handle_get_data_stream_packet_data_command,
1039 _LttngLiveViewerGetMetadataStreamDataCommand: self._handle_get_metadata_stream_data_command,
1040 _LttngLiveViewerGetNewStreamInfosCommand: self._handle_get_new_stream_infos_command,
1041 _LttngLiveViewerGetNextDataStreamIndexEntryCommand: self._handle_get_next_data_stream_index_entry_command,
1042 _LttngLiveViewerGetTracingSessionInfosCommand: self._handle_get_tracing_session_infos_command,
1043 }
1044
1045 @property
1046 def viewer_session_id(self):
1047 return self._viewer_session_id
1048
1049 def _get_tracing_session_state(self, tracing_session_id):
1050 if tracing_session_id not in self._ts_states:
1051 raise UnexpectedInput(
1052 'Unknown tracing session ID {}'.format(tracing_session_id)
1053 )
1054
1055 return self._ts_states[tracing_session_id]
1056
1057 def _get_stream_state(self, stream_id):
1058 if stream_id not in self._stream_states:
1059 UnexpectedInput('Unknown stream ID {}'.format(stream_id))
1060
1061 return self._stream_states[stream_id]
1062
1063 def handle_command(self, cmd):
1064 logging.info(
1065 'Handling command in viewer session: cmd-cls-name={}'.format(
1066 cmd.__class__.__name__
1067 )
1068 )
1069 cmd_type = type(cmd)
1070
1071 if cmd_type not in self._command_handlers:
1072 raise UnexpectedInput(
1073 'Unexpected command: cmd-cls-name={}'.format(cmd.__class__.__name__)
1074 )
1075
1076 return self._command_handlers[cmd_type](cmd)
1077
1078 def _handle_attach_to_tracing_session_command(self, cmd):
1079 fmt = 'Handling "attach to tracing session" command: ts-id={}, offset={}, seek-type={}'
1080 logging.info(fmt.format(cmd.tracing_session_id, cmd.offset, cmd.seek_type))
1081 ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
1082 info = ts_state.tracing_session_descriptor.info
1083
1084 if ts_state.is_attached:
1085 raise UnexpectedInput(
1086 'Cannot attach to tracing session `{}`: viewer is already attached'.format(
1087 info.name
1088 )
1089 )
1090
1091 ts_state.is_attached = True
1092 status = _LttngLiveViewerAttachToTracingSessionReply.Status.OK
1093 return _LttngLiveViewerAttachToTracingSessionReply(
1094 status, ts_state.stream_infos
1095 )
1096
1097 def _handle_detach_from_tracing_session_command(self, cmd):
1098 fmt = 'Handling "detach from tracing session" command: ts-id={}'
1099 logging.info(fmt.format(cmd.tracing_session_id))
1100 ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
1101 info = ts_state.tracing_session_descriptor.info
1102
1103 if not ts_state.is_attached:
1104 raise UnexpectedInput(
1105 'Cannot detach to tracing session `{}`: viewer is not attached'.format(
1106 info.name
1107 )
1108 )
1109
1110 ts_state.is_attached = False
1111 status = _LttngLiveViewerDetachFromTracingSessionReply.Status.OK
1112 return _LttngLiveViewerDetachFromTracingSessionReply(status)
1113
1114 def _handle_get_next_data_stream_index_entry_command(self, cmd):
1115 fmt = 'Handling "get next data stream index entry" command: stream-id={}'
1116 logging.info(fmt.format(cmd.stream_id))
1117 stream_state = self._get_stream_state(cmd.stream_id)
1118
1119 if type(stream_state) is not _LttngLiveViewerSessionDataStreamState:
1120 raise UnexpectedInput(
1121 'Stream with ID {} is not a data stream'.format(cmd.stream_id)
1122 )
1123
1124 if stream_state.cur_index_entry is None:
1125 # The viewer is done reading this stream
1126 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.HUP
1127
1128 # Dummy data stream index entry to use with the `HUP` status
1129 # (the reply needs one, but the viewer ignores it)
1130 index_entry = _LttngDataStreamIndexEntry(0, 0, 0, 0, 0, 0, 0)
1131
1132 return _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1133 status, index_entry, False, False
1134 )
1135
1136 # The viewer only checks the `has_new_metadata` flag if the
1137 # reply's status is `OK`, so we need to provide an index here
1138 has_new_metadata = stream_state.tracing_session_state.has_new_metadata
71f56e5f
JG
1139 if type(stream_state.cur_index_entry) is _LttngDataStreamIndexEntry:
1140 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.OK
1141 else:
1142 assert type(stream_state.cur_index_entry) is _LttngDataStreamBeaconEntry
1143 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.INACTIVE
1144
584af91e
PP
1145 reply = _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1146 status, stream_state.cur_index_entry, has_new_metadata, False
1147 )
1148 stream_state.goto_next_index_entry()
1149 return reply
1150
1151 def _handle_get_data_stream_packet_data_command(self, cmd):
1152 fmt = 'Handling "get data stream packet data" command: stream-id={}, offset={}, req-length={}'
1153 logging.info(fmt.format(cmd.stream_id, cmd.offset, cmd.req_length))
1154 stream_state = self._get_stream_state(cmd.stream_id)
1155 data_response_length = cmd.req_length
1156
1157 if type(stream_state) is not _LttngLiveViewerSessionDataStreamState:
1158 raise UnexpectedInput(
1159 'Stream with ID {} is not a data stream'.format(cmd.stream_id)
1160 )
1161
1162 if stream_state.tracing_session_state.has_new_metadata:
1163 status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.ERROR
1164 return _LttngLiveViewerGetDataStreamPacketDataReply(
1165 status, bytes(), True, False
1166 )
1167
1168 if self._max_query_data_response_size:
1169 # Enforce a server side limit on the query requested length.
1170 # To ensure that the transaction terminate take the minimum of both
1171 # value.
1172 data_response_length = min(
1173 cmd.req_length, self._max_query_data_response_size
1174 )
1175 fmt = 'Limiting "get data stream packet data" command: req-length={} actual response size={}'
1176 logging.info(fmt.format(cmd.req_length, data_response_length))
1177
1178 data = stream_state.data_stream.get_data(cmd.offset, data_response_length)
1179 status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.OK
1180 return _LttngLiveViewerGetDataStreamPacketDataReply(status, data, False, False)
1181
1182 def _handle_get_metadata_stream_data_command(self, cmd):
1183 fmt = 'Handling "get metadata stream data" command: stream-id={}'
1184 logging.info(fmt.format(cmd.stream_id))
1185 stream_state = self._get_stream_state(cmd.stream_id)
1186
1187 if type(stream_state) is not _LttngLiveViewerSessionMetadataStreamState:
1188 raise UnexpectedInput(
1189 'Stream with ID {} is not a metadata stream'.format(cmd.stream_id)
1190 )
1191
1192 if stream_state.is_sent:
1193 status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.NO_NEW
1194 return _LttngLiveViewerGetMetadataStreamDataContentReply(status, bytes())
1195
1196 stream_state.is_sent = True
1197 status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.OK
1198 return _LttngLiveViewerGetMetadataStreamDataContentReply(
1199 status, stream_state.metadata_stream.data
1200 )
1201
1202 def _handle_get_new_stream_infos_command(self, cmd):
1203 fmt = 'Handling "get new stream infos" command: ts-id={}'
1204 logging.info(fmt.format(cmd.tracing_session_id))
1205
1206 # As of this version, all the tracing session's stream infos are
1207 # always given to the viewer when sending the "attach to tracing
1208 # session" reply, so there's nothing new here. Return the `HUP`
1209 # status as, if we're handling this command, the viewer consumed
1210 # all the existing data streams.
1211 status = _LttngLiveViewerGetNewStreamInfosReply.Status.HUP
1212 return _LttngLiveViewerGetNewStreamInfosReply(status, [])
1213
1214 def _handle_get_tracing_session_infos_command(self, cmd):
1215 logging.info('Handling "get tracing session infos" command.')
1216 infos = [
1217 tss.tracing_session_descriptor.info for tss in self._ts_states.values()
1218 ]
1219 infos.sort(key=lambda info: info.name)
1220 return _LttngLiveViewerGetTracingSessionInfosReply(infos)
1221
1222 def _handle_create_viewer_session_command(self, cmd):
1223 logging.info('Handling "create viewer session" command.')
1224 status = _LttngLiveViewerCreateViewerSessionReply.Status.OK
1225
1226 # This does nothing here. In the LTTng relay daemon, it
1227 # allocates the viewer session's state.
1228 return _LttngLiveViewerCreateViewerSessionReply(status)
1229
1230
1231# An LTTng live TCP server.
1232#
1233# On creation, it binds to `localhost` with an OS-assigned TCP port. It writes
1234# the decimal TCP port number to a temporary port file. It renames the
1235# temporary port file to `port_filename`.
1236#
1237# `tracing_session_descriptors` is a list of tracing session descriptors
1238# (`LttngTracingSessionDescriptor`) to serve.
1239#
1240# This server accepts a single viewer (client).
1241#
1242# When the viewer closes the connection, the server's constructor
1243# returns.
1244class LttngLiveServer:
1245 def __init__(
1246 self, port_filename, tracing_session_descriptors, max_query_data_response_size
1247 ):
1248 logging.info('Server configuration:')
1249
1250 logging.info(' Port file name: `{}`'.format(port_filename))
1251
1252 if max_query_data_response_size is not None:
1253 logging.info(
1254 ' Maximum response data query size: `{}`'.format(
1255 max_query_data_response_size
1256 )
1257 )
1258
1259 for ts_descr in tracing_session_descriptors:
1260 info = ts_descr.info
1261 fmt = ' TS descriptor: name="{}", id={}, hostname="{}", live-timer-freq={}, client-count={}, stream-count={}:'
1262 logging.info(
1263 fmt.format(
1264 info.name,
1265 info.tracing_session_id,
1266 info.hostname,
1267 info.live_timer_freq,
1268 info.client_count,
1269 info.stream_count,
1270 )
1271 )
1272
1273 for trace in ts_descr.traces:
1274 logging.info(' Trace: path="{}"'.format(trace.path))
1275
1276 self._ts_descriptors = tracing_session_descriptors
1277 self._max_query_data_response_size = max_query_data_response_size
1278 self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1279 self._codec = _LttngLiveViewerProtocolCodec()
1280
1281 # Port 0: OS assigns an unused port
1282 serv_addr = ('localhost', 0)
1283 self._sock.bind(serv_addr)
1284 self._write_port_to_file(port_filename)
1285
1286 try:
1287 self._listen()
1288 finally:
1289 self._sock.close()
1290 logging.info('Closed connection and socket.')
1291
1292 @property
1293 def _server_port(self):
1294 return self._sock.getsockname()[1]
1295
1296 def _recv_command(self):
1297 data = bytes()
1298
1299 while True:
1300 logging.info('Waiting for viewer command.')
1301 buf = self._conn.recv(128)
1302
1303 if not buf:
1304 logging.info('Client closed connection.')
1305
1306 if data:
1307 raise UnexpectedInput(
1308 'Client closed connection after having sent {} command bytes.'.format(
1309 len(data)
1310 )
1311 )
1312
1313 return
1314
1315 logging.info('Received data from viewer: length={}'.format(len(buf)))
1316
1317 data += buf
1318
1319 try:
1320 cmd = self._codec.decode(data)
1321 except struct.error as exc:
1322 raise UnexpectedInput('Malformed command: {}'.format(exc)) from exc
1323
1324 if cmd is not None:
1325 logging.info(
1326 'Received command from viewer: cmd-cls-name={}'.format(
1327 cmd.__class__.__name__
1328 )
1329 )
1330 return cmd
1331
1332 def _send_reply(self, reply):
1333 data = self._codec.encode(reply)
1334 logging.info(
1335 'Sending reply to viewer: reply-cls-name={}, length={}'.format(
1336 reply.__class__.__name__, len(data)
1337 )
1338 )
1339 self._conn.sendall(data)
1340
1341 def _handle_connection(self):
1342 # First command must be "connect"
1343 cmd = self._recv_command()
1344
1345 if type(cmd) is not _LttngLiveViewerConnectCommand:
1346 raise UnexpectedInput(
1347 'First command is not "connect": cmd-cls-name={}'.format(
1348 cmd.__class__.__name__
1349 )
1350 )
1351
1352 # Create viewer session (arbitrary ID 23)
1353 logging.info(
1354 'LTTng live viewer connected: version={}.{}'.format(cmd.major, cmd.minor)
1355 )
1356 viewer_session = _LttngLiveViewerSession(
1357 23, self._ts_descriptors, self._max_query_data_response_size
1358 )
1359
1360 # Send "connect" reply
1361 self._send_reply(
1362 _LttngLiveViewerConnectReply(viewer_session.viewer_session_id, 2, 10)
1363 )
1364
1365 # Make the viewer session handle the remaining commands
1366 while True:
1367 cmd = self._recv_command()
1368
1369 if cmd is None:
1370 # Connection closed (at an expected location within the
1371 # conversation)
1372 return
1373
1374 self._send_reply(viewer_session.handle_command(cmd))
1375
1376 def _listen(self):
1377 logging.info('Listening: port={}'.format(self._server_port))
1726ac08
JR
1378 # Backlog must be present for Python version < 3.5.
1379 # 128 is an arbitrary number since we expect only 1 connection anyway.
1380 self._sock.listen(128)
584af91e
PP
1381 self._conn, viewer_addr = self._sock.accept()
1382 logging.info(
1383 'Accepted viewer: addr={}:{}'.format(viewer_addr[0], viewer_addr[1])
1384 )
1385
1386 try:
1387 self._handle_connection()
1388 finally:
1389 self._conn.close()
1390
1391 def _write_port_to_file(self, port_filename):
1392 # Write the port number to a temporary file.
1393 with tempfile.NamedTemporaryFile(mode='w', delete=False) as tmp_port_file:
1394 print(self._server_port, end='', file=tmp_port_file)
1395
1396 # Rename temporary file to real file
9c878ece 1397 os.replace(tmp_port_file.name, port_filename)
584af91e
PP
1398 logging.info(
1399 'Renamed port file: src-path="{}", dst-path="{}"'.format(
1400 tmp_port_file.name, port_filename
1401 )
1402 )
1403
1404
1405# A tracing session descriptor.
1406#
1407# In the constructor, `traces` is a list of LTTng traces (`LttngTrace`
1408# objects).
1409class LttngTracingSessionDescriptor:
1410 def __init__(
1411 self, name, tracing_session_id, hostname, live_timer_freq, client_count, traces
1412 ):
1413 for trace in traces:
1414 if name not in trace.path:
1415 fmt = 'Tracing session name must be part of every trace path (`{}` not found in `{}`)'
1416 raise ValueError(fmt.format(name, trace.path))
1417
1418 self._traces = traces
1419 stream_count = sum([len(t) + 1 for t in traces])
1420 self._info = _LttngLiveViewerTracingSessionInfo(
1421 tracing_session_id,
1422 live_timer_freq,
1423 client_count,
1424 stream_count,
1425 hostname,
1426 name,
1427 )
1428
1429 @property
1430 def traces(self):
1431 return self._traces
1432
1433 @property
1434 def info(self):
1435 return self._info
1436
1437
2b763e29
JG
1438def _session_descriptors_from_path(sessions_filename, trace_path_prefix):
1439 # File format is:
1440 #
1441 # [
1442 # {
1443 # "name": "my-session",
1444 # "id": 17,
1445 # "hostname": "myhost",
1446 # "live-timer-freq": 1000000,
1447 # "client-count": 23,
1448 # "traces": [
1449 # {
1450 # "path": "lol"
1451 # },
1452 # {
71f56e5f
JG
1453 # "path": "meow/mix",
1454 # "beacons": {
1455 # "my_stream": [ 5235787, 728375283 ]
1456 # }
2b763e29
JG
1457 # }
1458 # ]
1459 # }
1460 # ]
1461 with open(sessions_filename, 'r') as sessions_file:
1462 params = json.load(sessions_file)
1463
1464 sessions = []
1465
1466 for session in params:
1467 name = session['name']
1468 tracing_session_id = session['id']
1469 hostname = session['hostname']
1470 live_timer_freq = session['live-timer-freq']
1471 client_count = session['client-count']
1472 traces = []
1473
1474 for trace in session['traces']:
71f56e5f 1475 beacons = trace.get('beacons')
2b763e29
JG
1476 path = trace['path']
1477
1478 if not os.path.isabs(path):
1479 path = os.path.join(trace_path_prefix, path)
1480
71f56e5f 1481 traces.append(LttngTrace(path, beacons))
2b763e29
JG
1482
1483 sessions.append(
1484 LttngTracingSessionDescriptor(
1485 name,
1486 tracing_session_id,
1487 hostname,
1488 live_timer_freq,
1489 client_count,
1490 traces,
1491 )
1492 )
1493
1494 return sessions
584af91e
PP
1495
1496
1497def _loglevel_parser(string):
1498 loglevels = {'info': logging.INFO, 'warning': logging.WARNING}
1499 if string not in loglevels:
1500 msg = "{} is not a valid loglevel".format(string)
1501 raise argparse.ArgumentTypeError(msg)
1502 return loglevels[string]
1503
1504
1505if __name__ == '__main__':
1506 logging.basicConfig(format='# %(asctime)-25s%(message)s')
1507 parser = argparse.ArgumentParser(
1508 description='LTTng-live protocol mocker', add_help=False
1509 )
1510 parser.add_argument(
1511 '--log-level',
1512 default='warning',
1513 choices=['info', 'warning'],
1514 help='The loglevel to be used.',
1515 )
1516
1517 loglevel_namespace, remaining_args = parser.parse_known_args()
1518 logging.getLogger().setLevel(_loglevel_parser(loglevel_namespace.log_level))
1519
1520 parser.add_argument(
1521 '--port-filename',
1522 help='The final port file. This file is present when the server is ready to receive connection.',
1523 required=True,
1524 )
1525 parser.add_argument(
1526 '--max-query-data-response-size',
1527 type=int,
1528 help='The maximum size of control data response in bytes',
1529 )
1530 parser.add_argument(
2b763e29
JG
1531 '--trace-path-prefix',
1532 type=str,
1533 help='Prefix to prepend to the trace paths of session configurations',
1534 )
1535 parser.add_argument(
776a2a25
PP
1536 '--sessions-filename',
1537 type=str,
1538 help='Path to a session configuration file',
584af91e
PP
1539 )
1540 parser.add_argument(
1541 '-h',
1542 '--help',
1543 action='help',
1544 default=argparse.SUPPRESS,
1545 help='Show this help message and exit.',
1546 )
1547
1548 args = parser.parse_args(args=remaining_args)
1549 try:
2b763e29
JG
1550 sessions = _session_descriptors_from_path(
1551 args.sessions_filename, args.trace_path_prefix
584af91e 1552 )
2b763e29 1553 LttngLiveServer(args.port_filename, sessions, args.max_query_data_response_size)
584af91e
PP
1554 except UnexpectedInput as exc:
1555 logging.error(str(exc))
1556 print(exc, file=sys.stderr)
1557 sys.exit(1)
This page took 0.131094 seconds and 4 git commands to generate.