tests: convert paths passed to lttng_live_server.py
[babeltrace.git] / tests / data / plugins / src.ctf.lttng-live / lttng_live_server.py
CommitLineData
584af91e
PP
1# The MIT License (MIT)
2#
3# Copyright (c) 2019 Philippe Proulx <pproulx@efficios.com>
4#
5# Permission is hereby granted, free of charge, to any person obtaining a copy
6# of this software and associated documentation files (the "Software"), to deal
7# in the Software without restriction, including without limitation the rights
8# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9# copies of the Software, and to permit persons to whom the Software is
10# furnished to do so, subject to the following conditions:
11#
12# The above copyright notice and this permission notice shall be included in
13# all copies or substantial portions of the Software.
14#
15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21# THE SOFTWARE.
22
23import argparse
24import collections.abc
25import logging
26import os
27import os.path
28import re
29import socket
30import struct
31import sys
32import tempfile
33
34
35class UnexpectedInput(RuntimeError):
36 pass
37
38
39class _LttngLiveViewerCommand:
40 def __init__(self, version):
41 self._version = version
42
43 @property
44 def version(self):
45 return self._version
46
47
48class _LttngLiveViewerConnectCommand(_LttngLiveViewerCommand):
49 def __init__(self, version, viewer_session_id, major, minor):
50 super().__init__(version)
51 self._viewer_session_id = viewer_session_id
52 self._major = major
53 self._minor = minor
54
55 @property
56 def viewer_session_id(self):
57 return self._viewer_session_id
58
59 @property
60 def major(self):
61 return self._major
62
63 @property
64 def minor(self):
65 return self._minor
66
67
68class _LttngLiveViewerConnectReply:
69 def __init__(self, viewer_session_id, major, minor):
70 self._viewer_session_id = viewer_session_id
71 self._major = major
72 self._minor = minor
73
74 @property
75 def viewer_session_id(self):
76 return self._viewer_session_id
77
78 @property
79 def major(self):
80 return self._major
81
82 @property
83 def minor(self):
84 return self._minor
85
86
87class _LttngLiveViewerGetTracingSessionInfosCommand(_LttngLiveViewerCommand):
88 pass
89
90
91class _LttngLiveViewerTracingSessionInfo:
92 def __init__(
93 self,
94 tracing_session_id,
95 live_timer_freq,
96 client_count,
97 stream_count,
98 hostname,
99 name,
100 ):
101 self._tracing_session_id = tracing_session_id
102 self._live_timer_freq = live_timer_freq
103 self._client_count = client_count
104 self._stream_count = stream_count
105 self._hostname = hostname
106 self._name = name
107
108 @property
109 def tracing_session_id(self):
110 return self._tracing_session_id
111
112 @property
113 def live_timer_freq(self):
114 return self._live_timer_freq
115
116 @property
117 def client_count(self):
118 return self._client_count
119
120 @property
121 def stream_count(self):
122 return self._stream_count
123
124 @property
125 def hostname(self):
126 return self._hostname
127
128 @property
129 def name(self):
130 return self._name
131
132
133class _LttngLiveViewerGetTracingSessionInfosReply:
134 def __init__(self, tracing_session_infos):
135 self._tracing_session_infos = tracing_session_infos
136
137 @property
138 def tracing_session_infos(self):
139 return self._tracing_session_infos
140
141
142class _LttngLiveViewerAttachToTracingSessionCommand(_LttngLiveViewerCommand):
143 class SeekType:
144 BEGINNING = 1
145 LAST = 2
146
147 def __init__(self, version, tracing_session_id, offset, seek_type):
148 super().__init__(version)
149 self._tracing_session_id = tracing_session_id
150 self._offset = offset
151 self._seek_type = seek_type
152
153 @property
154 def tracing_session_id(self):
155 return self._tracing_session_id
156
157 @property
158 def offset(self):
159 return self._offset
160
161 @property
162 def seek_type(self):
163 return self._seek_type
164
165
166class _LttngLiveViewerStreamInfo:
167 def __init__(self, id, trace_id, is_metadata, path, channel_name):
168 self._id = id
169 self._trace_id = trace_id
170 self._is_metadata = is_metadata
171 self._path = path
172 self._channel_name = channel_name
173
174 @property
175 def id(self):
176 return self._id
177
178 @property
179 def trace_id(self):
180 return self._trace_id
181
182 @property
183 def is_metadata(self):
184 return self._is_metadata
185
186 @property
187 def path(self):
188 return self._path
189
190 @property
191 def channel_name(self):
192 return self._channel_name
193
194
195class _LttngLiveViewerAttachToTracingSessionReply:
196 class Status:
197 OK = 1
198 ALREADY = 2
199 UNKNOWN = 3
200 NOT_LIVE = 4
201 SEEK_ERROR = 5
202 NO_SESSION = 6
203
204 def __init__(self, status, stream_infos):
205 self._status = status
206 self._stream_infos = stream_infos
207
208 @property
209 def status(self):
210 return self._status
211
212 @property
213 def stream_infos(self):
214 return self._stream_infos
215
216
217class _LttngLiveViewerGetNextDataStreamIndexEntryCommand(_LttngLiveViewerCommand):
218 def __init__(self, version, stream_id):
219 super().__init__(version)
220 self._stream_id = stream_id
221
222 @property
223 def stream_id(self):
224 return self._stream_id
225
226
227class _LttngLiveViewerGetNextDataStreamIndexEntryReply:
228 class Status:
229 OK = 1
230 RETRY = 2
231 HUP = 3
232 ERROR = 4
233 INACTIVE = 5
234 EOF = 6
235
236 def __init__(self, status, index_entry, has_new_metadata, has_new_data_stream):
237 self._status = status
238 self._index_entry = index_entry
239 self._has_new_metadata = has_new_metadata
240 self._has_new_data_stream = has_new_data_stream
241
242 @property
243 def status(self):
244 return self._status
245
246 @property
247 def index_entry(self):
248 return self._index_entry
249
250 @property
251 def has_new_metadata(self):
252 return self._has_new_metadata
253
254 @property
255 def has_new_data_stream(self):
256 return self._has_new_data_stream
257
258
259class _LttngLiveViewerGetDataStreamPacketDataCommand(_LttngLiveViewerCommand):
260 def __init__(self, version, stream_id, offset, req_length):
261 super().__init__(version)
262 self._stream_id = stream_id
263 self._offset = offset
264 self._req_length = req_length
265
266 @property
267 def stream_id(self):
268 return self._stream_id
269
270 @property
271 def offset(self):
272 return self._offset
273
274 @property
275 def req_length(self):
276 return self._req_length
277
278
279class _LttngLiveViewerGetDataStreamPacketDataReply:
280 class Status:
281 OK = 1
282 RETRY = 2
283 ERROR = 3
284 EOF = 4
285
286 def __init__(self, status, data, has_new_metadata, has_new_data_stream):
287 self._status = status
288 self._data = data
289 self._has_new_metadata = has_new_metadata
290 self._has_new_data_stream = has_new_data_stream
291
292 @property
293 def status(self):
294 return self._status
295
296 @property
297 def data(self):
298 return self._data
299
300 @property
301 def has_new_metadata(self):
302 return self._has_new_metadata
303
304 @property
305 def has_new_data_stream(self):
306 return self._has_new_data_stream
307
308
309class _LttngLiveViewerGetMetadataStreamDataCommand(_LttngLiveViewerCommand):
310 def __init__(self, version, stream_id):
311 super().__init__(version)
312 self._stream_id = stream_id
313
314 @property
315 def stream_id(self):
316 return self._stream_id
317
318
319class _LttngLiveViewerGetMetadataStreamDataContentReply:
320 class Status:
321 OK = 1
322 NO_NEW = 2
323 ERROR = 3
324
325 def __init__(self, status, data):
326 self._status = status
327 self._data = data
328
329 @property
330 def status(self):
331 return self._status
332
333 @property
334 def data(self):
335 return self._data
336
337
338class _LttngLiveViewerGetNewStreamInfosCommand(_LttngLiveViewerCommand):
339 def __init__(self, version, tracing_session_id):
340 super().__init__(version)
341 self._tracing_session_id = tracing_session_id
342
343 @property
344 def tracing_session_id(self):
345 return self._tracing_session_id
346
347
348class _LttngLiveViewerGetNewStreamInfosReply:
349 class Status:
350 OK = 1
351 NO_NEW = 2
352 ERROR = 3
353 HUP = 4
354
355 def __init__(self, status, stream_infos):
356 self._status = status
357 self._stream_infos = stream_infos
358
359 @property
360 def status(self):
361 return self._status
362
363 @property
364 def stream_infos(self):
365 return self._stream_infos
366
367
368class _LttngLiveViewerCreateViewerSessionCommand(_LttngLiveViewerCommand):
369 pass
370
371
372class _LttngLiveViewerCreateViewerSessionReply:
373 class Status:
374 OK = 1
375 ERROR = 2
376
377 def __init__(self, status):
378 self._status = status
379
380 @property
381 def status(self):
382 return self._status
383
384
385class _LttngLiveViewerDetachFromTracingSessionCommand(_LttngLiveViewerCommand):
386 def __init__(self, version, tracing_session_id):
387 super().__init__(version)
388 self._tracing_session_id = tracing_session_id
389
390 @property
391 def tracing_session_id(self):
392 return self._tracing_session_id
393
394
395class _LttngLiveViewerDetachFromTracingSessionReply:
396 class Status:
397 OK = 1
398 UNKNOWN = 2
399 ERROR = 3
400
401 def __init__(self, status):
402 self._status = status
403
404 @property
405 def status(self):
406 return self._status
407
408
409# An LTTng live protocol codec can convert bytes to command objects and
410# reply objects to bytes.
411class _LttngLiveViewerProtocolCodec:
412 _COMMAND_HEADER_STRUCT_FMT = 'QII'
413 _COMMAND_HEADER_SIZE_BYTES = struct.calcsize(_COMMAND_HEADER_STRUCT_FMT)
414
415 def __init__(self):
416 pass
417
418 def _unpack(self, fmt, data, offset=0):
419 fmt = '!' + fmt
420 return struct.unpack_from(fmt, data, offset)
421
422 def _unpack_payload(self, fmt, data):
423 return self._unpack(
424 fmt, data, _LttngLiveViewerProtocolCodec._COMMAND_HEADER_SIZE_BYTES
425 )
426
427 def decode(self, data):
428 if len(data) < self._COMMAND_HEADER_SIZE_BYTES:
429 # Not enough data to read the command header
430 return
431
432 payload_size, cmd_type, version = self._unpack(
433 self._COMMAND_HEADER_STRUCT_FMT, data
434 )
435 logging.info(
436 'Decoded command header: payload-size={}, cmd-type={}, version={}'.format(
437 payload_size, cmd_type, version
438 )
439 )
440
441 if len(data) < self._COMMAND_HEADER_SIZE_BYTES + payload_size:
442 # Not enough data to read the whole command
443 return
444
445 if cmd_type == 1:
446 viewer_session_id, major, minor, conn_type = self._unpack_payload(
447 'QIII', data
448 )
449 return _LttngLiveViewerConnectCommand(
450 version, viewer_session_id, major, minor
451 )
452 elif cmd_type == 2:
453 return _LttngLiveViewerGetTracingSessionInfosCommand(version)
454 elif cmd_type == 3:
455 tracing_session_id, offset, seek_type = self._unpack_payload('QQI', data)
456 return _LttngLiveViewerAttachToTracingSessionCommand(
457 version, tracing_session_id, offset, seek_type
458 )
459 elif cmd_type == 4:
460 stream_id, = self._unpack_payload('Q', data)
461 return _LttngLiveViewerGetNextDataStreamIndexEntryCommand(
462 version, stream_id
463 )
464 elif cmd_type == 5:
465 stream_id, offset, req_length = self._unpack_payload('QQI', data)
466 return _LttngLiveViewerGetDataStreamPacketDataCommand(
467 version, stream_id, offset, req_length
468 )
469 elif cmd_type == 6:
470 stream_id, = self._unpack_payload('Q', data)
471 return _LttngLiveViewerGetMetadataStreamDataCommand(version, stream_id)
472 elif cmd_type == 7:
473 tracing_session_id, = self._unpack_payload('Q', data)
474 return _LttngLiveViewerGetNewStreamInfosCommand(version, tracing_session_id)
475 elif cmd_type == 8:
476 return _LttngLiveViewerCreateViewerSessionCommand(version)
477 elif cmd_type == 9:
478 tracing_session_id, = self._unpack_payload('Q', data)
479 return _LttngLiveViewerDetachFromTracingSessionCommand(
480 version, tracing_session_id
481 )
482 else:
483 raise UnexpectedInput('Unknown command type {}'.format(cmd_type))
484
485 def _pack(self, fmt, *args):
486 # Force network byte order
487 return struct.pack('!' + fmt, *args)
488
489 def _encode_zero_padded_str(self, string, length):
490 data = string.encode()
491 return data.ljust(length, b'\x00')
492
493 def _encode_stream_info(self, info):
494 data = self._pack('QQI', info.id, info.trace_id, int(info.is_metadata))
495 data += self._encode_zero_padded_str(info.path, 4096)
496 data += self._encode_zero_padded_str(info.channel_name, 255)
497 return data
498
499 def _get_has_new_stuff_flags(self, has_new_metadata, has_new_data_streams):
500 flags = 0
501
502 if has_new_metadata:
503 flags |= 1
504
505 if has_new_data_streams:
506 flags |= 2
507
508 return flags
509
510 def encode(self, reply):
511 if type(reply) is _LttngLiveViewerConnectReply:
512 data = self._pack(
513 'QIII', reply.viewer_session_id, reply.major, reply.minor, 2
514 )
515 elif type(reply) is _LttngLiveViewerGetTracingSessionInfosReply:
516 data = self._pack('I', len(reply.tracing_session_infos))
517
518 for info in reply.tracing_session_infos:
519 data += self._pack(
520 'QIII',
521 info.tracing_session_id,
522 info.live_timer_freq,
523 info.client_count,
524 info.stream_count,
525 )
526 data += self._encode_zero_padded_str(info.hostname, 64)
527 data += self._encode_zero_padded_str(info.name, 255)
528 elif type(reply) is _LttngLiveViewerAttachToTracingSessionReply:
529 data = self._pack('II', reply.status, len(reply.stream_infos))
530
531 for info in reply.stream_infos:
532 data += self._encode_stream_info(info)
533 elif type(reply) is _LttngLiveViewerGetNextDataStreamIndexEntryReply:
534 entry = reply.index_entry
535 flags = self._get_has_new_stuff_flags(
536 reply.has_new_metadata, reply.has_new_data_stream
537 )
538
539 data = self._pack(
540 'QQQQQQQII',
541 entry.offset_bytes,
542 entry.total_size_bits,
543 entry.content_size_bits,
544 entry.timestamp_begin,
545 entry.timestamp_end,
546 entry.events_discarded,
547 entry.stream_class_id,
548 reply.status,
549 flags,
550 )
551 elif type(reply) is _LttngLiveViewerGetDataStreamPacketDataReply:
552 flags = self._get_has_new_stuff_flags(
553 reply.has_new_metadata, reply.has_new_data_stream
554 )
555 data = self._pack('III', reply.status, len(reply.data), flags)
556 data += reply.data
557 elif type(reply) is _LttngLiveViewerGetMetadataStreamDataContentReply:
558 data = self._pack('QI', len(reply.data), reply.status)
559 data += reply.data
560 elif type(reply) is _LttngLiveViewerGetNewStreamInfosReply:
561 data = self._pack('II', reply.status, len(reply.stream_infos))
562
563 for info in reply.stream_infos:
564 data += self._encode_stream_info(info)
565 elif type(reply) is _LttngLiveViewerCreateViewerSessionReply:
566 data = self._pack('I', reply.status)
567 elif type(reply) is _LttngLiveViewerDetachFromTracingSessionReply:
568 data = self._pack('I', reply.status)
569 else:
570 raise ValueError(
571 'Unknown reply object with class `{}`'.format(reply.__class__.__name__)
572 )
573
574 return data
575
576
577# An entry within the index of an LTTng data stream.
578class _LttngDataStreamIndexEntry:
579 def __init__(
580 self,
581 offset_bytes,
582 total_size_bits,
583 content_size_bits,
584 timestamp_begin,
585 timestamp_end,
586 events_discarded,
587 stream_class_id,
588 ):
589 self._offset_bytes = offset_bytes
590 self._total_size_bits = total_size_bits
591 self._content_size_bits = content_size_bits
592 self._timestamp_begin = timestamp_begin
593 self._timestamp_end = timestamp_end
594 self._events_discarded = events_discarded
595 self._stream_class_id = stream_class_id
596
597 @property
598 def offset_bytes(self):
599 return self._offset_bytes
600
601 @property
602 def total_size_bits(self):
603 return self._total_size_bits
604
605 @property
606 def total_size_bytes(self):
607 return self._total_size_bits // 8
608
609 @property
610 def content_size_bits(self):
611 return self._content_size_bits
612
613 @property
614 def content_size_bytes(self):
615 return self._content_size_bits // 8
616
617 @property
618 def timestamp_begin(self):
619 return self._timestamp_begin
620
621 @property
622 def timestamp_end(self):
623 return self._timestamp_end
624
625 @property
626 def events_discarded(self):
627 return self._events_discarded
628
629 @property
630 def stream_class_id(self):
631 return self._stream_class_id
632
633
634# The index of an LTTng data stream, a sequence of index entries.
635class _LttngDataStreamIndex(collections.abc.Sequence):
636 def __init__(self, path):
637 self._path = path
638 self._build()
639 logging.info(
640 'Built data stream index entries: path="{}", count={}'.format(
641 path, len(self._entries)
642 )
643 )
644
645 def _build(self):
646 self._entries = []
647 assert os.path.isfile(self._path)
648
649 with open(self._path, 'rb') as f:
650 # Read header first
651 fmt = '>IIII'
652 size = struct.calcsize(fmt)
653 data = f.read(size)
654 assert len(data) == size
655 magic, index_major, index_minor, index_entry_length = struct.unpack(
656 fmt, data
657 )
658 assert magic == 0xC1F1DCC1
659
660 # Read index entries
661 fmt = '>QQQQQQQ'
662 size = struct.calcsize(fmt)
663
664 while True:
665 logging.debug(
666 'Decoding data stream index entry: path="{}", offset={}'.format(
667 self._path, f.tell()
668 )
669 )
670 data = f.read(size)
671
672 if not data:
673 # Done
674 break
675
676 assert len(data) == size
677 offset_bytes, total_size_bits, content_size_bits, timestamp_begin, timestamp_end, events_discarded, stream_class_id = struct.unpack(
678 fmt, data
679 )
680
681 self._entries.append(
682 _LttngDataStreamIndexEntry(
683 offset_bytes,
684 total_size_bits,
685 content_size_bits,
686 timestamp_begin,
687 timestamp_end,
688 events_discarded,
689 stream_class_id,
690 )
691 )
692
693 # Skip anything else before the next entry
694 f.seek(index_entry_length - size, os.SEEK_CUR)
695
696 def __getitem__(self, index):
697 return self._entries[index]
698
699 def __len__(self):
700 return len(self._entries)
701
702 @property
703 def path(self):
704 return self._path
705
706
707# An LTTng data stream.
708class _LttngDataStream:
709 def __init__(self, path):
710 self._path = path
711 filename = os.path.basename(path)
712 match = re.match(r'(.*)_\d+', filename)
713 self._channel_name = match.group(1)
714 trace_dir = os.path.dirname(path)
715 index_path = os.path.join(trace_dir, 'index', filename + '.idx')
716 self._index = _LttngDataStreamIndex(index_path)
717 assert os.path.isfile(path)
718 self._file = open(path, 'rb')
719 logging.info(
720 'Built data stream: path="{}", channel-name="{}"'.format(
721 path, self._channel_name
722 )
723 )
724
725 @property
726 def path(self):
727 return self._path
728
729 @property
730 def channel_name(self):
731 return self._channel_name
732
733 @property
734 def index(self):
735 return self._index
736
737 def get_data(self, offset_bytes, len_bytes):
738 self._file.seek(offset_bytes)
739 return self._file.read(len_bytes)
740
741
742# An LTTng metadata stream.
743class _LttngMetadataStream:
744 def __init__(self, path):
745 self._path = path
746 logging.info('Built metadata stream: path="{}"'.format(path))
747
748 @property
749 def path(self):
750 return self._path
751
752 @property
753 def data(self):
754 assert os.path.isfile(self._path)
755
756 with open(self._path, 'rb') as f:
757 return f.read()
758
759
760# An LTTng trace, a sequence of LTTng data streams.
761class LttngTrace(collections.abc.Sequence):
762 def __init__(self, trace_dir):
763 assert os.path.isdir(trace_dir)
764 self._path = trace_dir
765 self._metadata_stream = _LttngMetadataStream(
766 os.path.join(trace_dir, 'metadata')
767 )
768 self._create_data_streams(trace_dir)
769 logging.info('Built trace: path="{}"'.format(trace_dir))
770
771 def _create_data_streams(self, trace_dir):
772 data_stream_paths = []
773
774 for filename in os.listdir(trace_dir):
775 path = os.path.join(trace_dir, filename)
776
777 if not os.path.isfile(path):
778 continue
779
780 if filename.startswith('.'):
781 continue
782
783 if filename == 'metadata':
784 continue
785
786 data_stream_paths.append(path)
787
788 data_stream_paths.sort()
789 self._data_streams = []
790
791 for data_stream_path in data_stream_paths:
792 self._data_streams.append(_LttngDataStream(data_stream_path))
793
794 @property
795 def path(self):
796 return self._path
797
798 @property
799 def metadata_stream(self):
800 return self._metadata_stream
801
802 def __getitem__(self, index):
803 return self._data_streams[index]
804
805 def __len__(self):
806 return len(self._data_streams)
807
808
809# The state of a single data stream.
810class _LttngLiveViewerSessionDataStreamState:
811 def __init__(self, ts_state, info, data_stream):
812 self._ts_state = ts_state
813 self._info = info
814 self._data_stream = data_stream
815 self._cur_index_entry_index = 0
816 fmt = 'Built data stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
817 logging.info(
818 fmt.format(
819 info.id,
820 ts_state.tracing_session_descriptor.info.tracing_session_id,
821 ts_state.tracing_session_descriptor.info.name,
822 data_stream.path,
823 )
824 )
825
826 @property
827 def tracing_session_state(self):
828 return self._ts_state
829
830 @property
831 def info(self):
832 return self._info
833
834 @property
835 def data_stream(self):
836 return self._data_stream
837
838 @property
839 def cur_index_entry(self):
840 if self._cur_index_entry_index == len(self._data_stream.index):
841 return
842
843 return self._data_stream.index[self._cur_index_entry_index]
844
845 def goto_next_index_entry(self):
846 self._cur_index_entry_index += 1
847
848
849# The state of a single metadata stream.
850class _LttngLiveViewerSessionMetadataStreamState:
851 def __init__(self, ts_state, info, metadata_stream):
852 self._ts_state = ts_state
853 self._info = info
854 self._metadata_stream = metadata_stream
855 self._is_sent = False
856 fmt = 'Built metadata stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
857 logging.info(
858 fmt.format(
859 info.id,
860 ts_state.tracing_session_descriptor.info.tracing_session_id,
861 ts_state.tracing_session_descriptor.info.name,
862 metadata_stream.path,
863 )
864 )
865
866 @property
867 def trace_session_state(self):
868 return self._trace_session_state
869
870 @property
871 def info(self):
872 return self._info
873
874 @property
875 def metadata_stream(self):
876 return self._metadata_stream
877
878 @property
879 def is_sent(self):
880 return self._is_sent
881
882 @is_sent.setter
883 def is_sent(self, value):
884 self._is_sent = value
885
886
887# The state of a tracing session.
888class _LttngLiveViewerSessionTracingSessionState:
889 def __init__(self, tc_descr, base_stream_id):
890 self._tc_descr = tc_descr
891 self._stream_infos = []
892 self._ds_states = {}
893 self._ms_states = {}
894 stream_id = base_stream_id
895
896 for trace in tc_descr.traces:
897 trace_id = stream_id * 1000
898
899 # Data streams -> stream infos and data stream states
900 for data_stream in trace:
901 info = _LttngLiveViewerStreamInfo(
902 stream_id,
903 trace_id,
904 False,
905 data_stream.path,
906 data_stream.channel_name,
907 )
908 self._stream_infos.append(info)
909 self._ds_states[stream_id] = _LttngLiveViewerSessionDataStreamState(
910 self, info, data_stream
911 )
912 stream_id += 1
913
914 # Metadata stream -> stream info and metadata stream state
915 info = _LttngLiveViewerStreamInfo(
916 stream_id, trace_id, True, trace.metadata_stream.path, 'metadata'
917 )
918 self._stream_infos.append(info)
919 self._ms_states[stream_id] = _LttngLiveViewerSessionMetadataStreamState(
920 self, info, trace.metadata_stream
921 )
922 stream_id += 1
923
924 self._is_attached = False
925 fmt = 'Built tracing session state: id={}, name="{}"'
926 logging.info(fmt.format(tc_descr.info.tracing_session_id, tc_descr.info.name))
927
928 @property
929 def tracing_session_descriptor(self):
930 return self._tc_descr
931
932 @property
933 def data_stream_states(self):
934 return self._ds_states
935
936 @property
937 def metadata_stream_states(self):
938 return self._ms_states
939
940 @property
941 def stream_infos(self):
942 return self._stream_infos
943
944 @property
945 def has_new_metadata(self):
946 return any([not ms.is_sent for ms in self._ms_states.values()])
947
948 @property
949 def is_attached(self):
950 return self._is_attached
951
952 @is_attached.setter
953 def is_attached(self, value):
954 self._is_attached = value
955
956
957# An LTTng live viewer session manages a view on tracing sessions
958# and replies to commands accordingly.
959class _LttngLiveViewerSession:
960 def __init__(
961 self,
962 viewer_session_id,
963 tracing_session_descriptors,
964 max_query_data_response_size,
965 ):
966 self._viewer_session_id = viewer_session_id
967 self._ts_states = {}
968 self._stream_states = {}
969 self._max_query_data_response_size = max_query_data_response_size
970 total_stream_infos = 0
971
972 for ts_descr in tracing_session_descriptors:
973 ts_state = _LttngLiveViewerSessionTracingSessionState(
974 ts_descr, total_stream_infos
975 )
976 ts_id = ts_state.tracing_session_descriptor.info.tracing_session_id
977 self._ts_states[ts_id] = ts_state
978 total_stream_infos += len(ts_state.stream_infos)
979
980 # Update session's stream states to have the new states
981 self._stream_states.update(ts_state.data_stream_states)
982 self._stream_states.update(ts_state.metadata_stream_states)
983
984 self._command_handlers = {
985 _LttngLiveViewerAttachToTracingSessionCommand: self._handle_attach_to_tracing_session_command,
986 _LttngLiveViewerCreateViewerSessionCommand: self._handle_create_viewer_session_command,
987 _LttngLiveViewerDetachFromTracingSessionCommand: self._handle_detach_from_tracing_session_command,
988 _LttngLiveViewerGetDataStreamPacketDataCommand: self._handle_get_data_stream_packet_data_command,
989 _LttngLiveViewerGetMetadataStreamDataCommand: self._handle_get_metadata_stream_data_command,
990 _LttngLiveViewerGetNewStreamInfosCommand: self._handle_get_new_stream_infos_command,
991 _LttngLiveViewerGetNextDataStreamIndexEntryCommand: self._handle_get_next_data_stream_index_entry_command,
992 _LttngLiveViewerGetTracingSessionInfosCommand: self._handle_get_tracing_session_infos_command,
993 }
994
995 @property
996 def viewer_session_id(self):
997 return self._viewer_session_id
998
999 def _get_tracing_session_state(self, tracing_session_id):
1000 if tracing_session_id not in self._ts_states:
1001 raise UnexpectedInput(
1002 'Unknown tracing session ID {}'.format(tracing_session_id)
1003 )
1004
1005 return self._ts_states[tracing_session_id]
1006
1007 def _get_stream_state(self, stream_id):
1008 if stream_id not in self._stream_states:
1009 UnexpectedInput('Unknown stream ID {}'.format(stream_id))
1010
1011 return self._stream_states[stream_id]
1012
1013 def handle_command(self, cmd):
1014 logging.info(
1015 'Handling command in viewer session: cmd-cls-name={}'.format(
1016 cmd.__class__.__name__
1017 )
1018 )
1019 cmd_type = type(cmd)
1020
1021 if cmd_type not in self._command_handlers:
1022 raise UnexpectedInput(
1023 'Unexpected command: cmd-cls-name={}'.format(cmd.__class__.__name__)
1024 )
1025
1026 return self._command_handlers[cmd_type](cmd)
1027
1028 def _handle_attach_to_tracing_session_command(self, cmd):
1029 fmt = 'Handling "attach to tracing session" command: ts-id={}, offset={}, seek-type={}'
1030 logging.info(fmt.format(cmd.tracing_session_id, cmd.offset, cmd.seek_type))
1031 ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
1032 info = ts_state.tracing_session_descriptor.info
1033
1034 if ts_state.is_attached:
1035 raise UnexpectedInput(
1036 'Cannot attach to tracing session `{}`: viewer is already attached'.format(
1037 info.name
1038 )
1039 )
1040
1041 ts_state.is_attached = True
1042 status = _LttngLiveViewerAttachToTracingSessionReply.Status.OK
1043 return _LttngLiveViewerAttachToTracingSessionReply(
1044 status, ts_state.stream_infos
1045 )
1046
1047 def _handle_detach_from_tracing_session_command(self, cmd):
1048 fmt = 'Handling "detach from tracing session" command: ts-id={}'
1049 logging.info(fmt.format(cmd.tracing_session_id))
1050 ts_state = self._get_tracing_session_state(cmd.tracing_session_id)
1051 info = ts_state.tracing_session_descriptor.info
1052
1053 if not ts_state.is_attached:
1054 raise UnexpectedInput(
1055 'Cannot detach to tracing session `{}`: viewer is not attached'.format(
1056 info.name
1057 )
1058 )
1059
1060 ts_state.is_attached = False
1061 status = _LttngLiveViewerDetachFromTracingSessionReply.Status.OK
1062 return _LttngLiveViewerDetachFromTracingSessionReply(status)
1063
1064 def _handle_get_next_data_stream_index_entry_command(self, cmd):
1065 fmt = 'Handling "get next data stream index entry" command: stream-id={}'
1066 logging.info(fmt.format(cmd.stream_id))
1067 stream_state = self._get_stream_state(cmd.stream_id)
1068
1069 if type(stream_state) is not _LttngLiveViewerSessionDataStreamState:
1070 raise UnexpectedInput(
1071 'Stream with ID {} is not a data stream'.format(cmd.stream_id)
1072 )
1073
1074 if stream_state.cur_index_entry is None:
1075 # The viewer is done reading this stream
1076 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.HUP
1077
1078 # Dummy data stream index entry to use with the `HUP` status
1079 # (the reply needs one, but the viewer ignores it)
1080 index_entry = _LttngDataStreamIndexEntry(0, 0, 0, 0, 0, 0, 0)
1081
1082 return _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1083 status, index_entry, False, False
1084 )
1085
1086 # The viewer only checks the `has_new_metadata` flag if the
1087 # reply's status is `OK`, so we need to provide an index here
1088 has_new_metadata = stream_state.tracing_session_state.has_new_metadata
1089 status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.OK
1090 reply = _LttngLiveViewerGetNextDataStreamIndexEntryReply(
1091 status, stream_state.cur_index_entry, has_new_metadata, False
1092 )
1093 stream_state.goto_next_index_entry()
1094 return reply
1095
1096 def _handle_get_data_stream_packet_data_command(self, cmd):
1097 fmt = 'Handling "get data stream packet data" command: stream-id={}, offset={}, req-length={}'
1098 logging.info(fmt.format(cmd.stream_id, cmd.offset, cmd.req_length))
1099 stream_state = self._get_stream_state(cmd.stream_id)
1100 data_response_length = cmd.req_length
1101
1102 if type(stream_state) is not _LttngLiveViewerSessionDataStreamState:
1103 raise UnexpectedInput(
1104 'Stream with ID {} is not a data stream'.format(cmd.stream_id)
1105 )
1106
1107 if stream_state.tracing_session_state.has_new_metadata:
1108 status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.ERROR
1109 return _LttngLiveViewerGetDataStreamPacketDataReply(
1110 status, bytes(), True, False
1111 )
1112
1113 if self._max_query_data_response_size:
1114 # Enforce a server side limit on the query requested length.
1115 # To ensure that the transaction terminate take the minimum of both
1116 # value.
1117 data_response_length = min(
1118 cmd.req_length, self._max_query_data_response_size
1119 )
1120 fmt = 'Limiting "get data stream packet data" command: req-length={} actual response size={}'
1121 logging.info(fmt.format(cmd.req_length, data_response_length))
1122
1123 data = stream_state.data_stream.get_data(cmd.offset, data_response_length)
1124 status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.OK
1125 return _LttngLiveViewerGetDataStreamPacketDataReply(status, data, False, False)
1126
1127 def _handle_get_metadata_stream_data_command(self, cmd):
1128 fmt = 'Handling "get metadata stream data" command: stream-id={}'
1129 logging.info(fmt.format(cmd.stream_id))
1130 stream_state = self._get_stream_state(cmd.stream_id)
1131
1132 if type(stream_state) is not _LttngLiveViewerSessionMetadataStreamState:
1133 raise UnexpectedInput(
1134 'Stream with ID {} is not a metadata stream'.format(cmd.stream_id)
1135 )
1136
1137 if stream_state.is_sent:
1138 status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.NO_NEW
1139 return _LttngLiveViewerGetMetadataStreamDataContentReply(status, bytes())
1140
1141 stream_state.is_sent = True
1142 status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.OK
1143 return _LttngLiveViewerGetMetadataStreamDataContentReply(
1144 status, stream_state.metadata_stream.data
1145 )
1146
1147 def _handle_get_new_stream_infos_command(self, cmd):
1148 fmt = 'Handling "get new stream infos" command: ts-id={}'
1149 logging.info(fmt.format(cmd.tracing_session_id))
1150
1151 # As of this version, all the tracing session's stream infos are
1152 # always given to the viewer when sending the "attach to tracing
1153 # session" reply, so there's nothing new here. Return the `HUP`
1154 # status as, if we're handling this command, the viewer consumed
1155 # all the existing data streams.
1156 status = _LttngLiveViewerGetNewStreamInfosReply.Status.HUP
1157 return _LttngLiveViewerGetNewStreamInfosReply(status, [])
1158
1159 def _handle_get_tracing_session_infos_command(self, cmd):
1160 logging.info('Handling "get tracing session infos" command.')
1161 infos = [
1162 tss.tracing_session_descriptor.info for tss in self._ts_states.values()
1163 ]
1164 infos.sort(key=lambda info: info.name)
1165 return _LttngLiveViewerGetTracingSessionInfosReply(infos)
1166
1167 def _handle_create_viewer_session_command(self, cmd):
1168 logging.info('Handling "create viewer session" command.')
1169 status = _LttngLiveViewerCreateViewerSessionReply.Status.OK
1170
1171 # This does nothing here. In the LTTng relay daemon, it
1172 # allocates the viewer session's state.
1173 return _LttngLiveViewerCreateViewerSessionReply(status)
1174
1175
1176# An LTTng live TCP server.
1177#
1178# On creation, it binds to `localhost` with an OS-assigned TCP port. It writes
1179# the decimal TCP port number to a temporary port file. It renames the
1180# temporary port file to `port_filename`.
1181#
1182# `tracing_session_descriptors` is a list of tracing session descriptors
1183# (`LttngTracingSessionDescriptor`) to serve.
1184#
1185# This server accepts a single viewer (client).
1186#
1187# When the viewer closes the connection, the server's constructor
1188# returns.
1189class LttngLiveServer:
1190 def __init__(
1191 self, port_filename, tracing_session_descriptors, max_query_data_response_size
1192 ):
1193 logging.info('Server configuration:')
1194
1195 logging.info(' Port file name: `{}`'.format(port_filename))
1196
1197 if max_query_data_response_size is not None:
1198 logging.info(
1199 ' Maximum response data query size: `{}`'.format(
1200 max_query_data_response_size
1201 )
1202 )
1203
1204 for ts_descr in tracing_session_descriptors:
1205 info = ts_descr.info
1206 fmt = ' TS descriptor: name="{}", id={}, hostname="{}", live-timer-freq={}, client-count={}, stream-count={}:'
1207 logging.info(
1208 fmt.format(
1209 info.name,
1210 info.tracing_session_id,
1211 info.hostname,
1212 info.live_timer_freq,
1213 info.client_count,
1214 info.stream_count,
1215 )
1216 )
1217
1218 for trace in ts_descr.traces:
1219 logging.info(' Trace: path="{}"'.format(trace.path))
1220
1221 self._ts_descriptors = tracing_session_descriptors
1222 self._max_query_data_response_size = max_query_data_response_size
1223 self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1224 self._codec = _LttngLiveViewerProtocolCodec()
1225
1226 # Port 0: OS assigns an unused port
1227 serv_addr = ('localhost', 0)
1228 self._sock.bind(serv_addr)
1229 self._write_port_to_file(port_filename)
1230
1231 try:
1232 self._listen()
1233 finally:
1234 self._sock.close()
1235 logging.info('Closed connection and socket.')
1236
1237 @property
1238 def _server_port(self):
1239 return self._sock.getsockname()[1]
1240
1241 def _recv_command(self):
1242 data = bytes()
1243
1244 while True:
1245 logging.info('Waiting for viewer command.')
1246 buf = self._conn.recv(128)
1247
1248 if not buf:
1249 logging.info('Client closed connection.')
1250
1251 if data:
1252 raise UnexpectedInput(
1253 'Client closed connection after having sent {} command bytes.'.format(
1254 len(data)
1255 )
1256 )
1257
1258 return
1259
1260 logging.info('Received data from viewer: length={}'.format(len(buf)))
1261
1262 data += buf
1263
1264 try:
1265 cmd = self._codec.decode(data)
1266 except struct.error as exc:
1267 raise UnexpectedInput('Malformed command: {}'.format(exc)) from exc
1268
1269 if cmd is not None:
1270 logging.info(
1271 'Received command from viewer: cmd-cls-name={}'.format(
1272 cmd.__class__.__name__
1273 )
1274 )
1275 return cmd
1276
1277 def _send_reply(self, reply):
1278 data = self._codec.encode(reply)
1279 logging.info(
1280 'Sending reply to viewer: reply-cls-name={}, length={}'.format(
1281 reply.__class__.__name__, len(data)
1282 )
1283 )
1284 self._conn.sendall(data)
1285
1286 def _handle_connection(self):
1287 # First command must be "connect"
1288 cmd = self._recv_command()
1289
1290 if type(cmd) is not _LttngLiveViewerConnectCommand:
1291 raise UnexpectedInput(
1292 'First command is not "connect": cmd-cls-name={}'.format(
1293 cmd.__class__.__name__
1294 )
1295 )
1296
1297 # Create viewer session (arbitrary ID 23)
1298 logging.info(
1299 'LTTng live viewer connected: version={}.{}'.format(cmd.major, cmd.minor)
1300 )
1301 viewer_session = _LttngLiveViewerSession(
1302 23, self._ts_descriptors, self._max_query_data_response_size
1303 )
1304
1305 # Send "connect" reply
1306 self._send_reply(
1307 _LttngLiveViewerConnectReply(viewer_session.viewer_session_id, 2, 10)
1308 )
1309
1310 # Make the viewer session handle the remaining commands
1311 while True:
1312 cmd = self._recv_command()
1313
1314 if cmd is None:
1315 # Connection closed (at an expected location within the
1316 # conversation)
1317 return
1318
1319 self._send_reply(viewer_session.handle_command(cmd))
1320
1321 def _listen(self):
1322 logging.info('Listening: port={}'.format(self._server_port))
1726ac08
JR
1323 # Backlog must be present for Python version < 3.5.
1324 # 128 is an arbitrary number since we expect only 1 connection anyway.
1325 self._sock.listen(128)
584af91e
PP
1326 self._conn, viewer_addr = self._sock.accept()
1327 logging.info(
1328 'Accepted viewer: addr={}:{}'.format(viewer_addr[0], viewer_addr[1])
1329 )
1330
1331 try:
1332 self._handle_connection()
1333 finally:
1334 self._conn.close()
1335
1336 def _write_port_to_file(self, port_filename):
1337 # Write the port number to a temporary file.
1338 with tempfile.NamedTemporaryFile(mode='w', delete=False) as tmp_port_file:
1339 print(self._server_port, end='', file=tmp_port_file)
1340
1341 # Rename temporary file to real file
1342 os.rename(tmp_port_file.name, port_filename)
1343 logging.info(
1344 'Renamed port file: src-path="{}", dst-path="{}"'.format(
1345 tmp_port_file.name, port_filename
1346 )
1347 )
1348
1349
1350# A tracing session descriptor.
1351#
1352# In the constructor, `traces` is a list of LTTng traces (`LttngTrace`
1353# objects).
1354class LttngTracingSessionDescriptor:
1355 def __init__(
1356 self, name, tracing_session_id, hostname, live_timer_freq, client_count, traces
1357 ):
1358 for trace in traces:
1359 if name not in trace.path:
1360 fmt = 'Tracing session name must be part of every trace path (`{}` not found in `{}`)'
1361 raise ValueError(fmt.format(name, trace.path))
1362
1363 self._traces = traces
1364 stream_count = sum([len(t) + 1 for t in traces])
1365 self._info = _LttngLiveViewerTracingSessionInfo(
1366 tracing_session_id,
1367 live_timer_freq,
1368 client_count,
1369 stream_count,
1370 hostname,
1371 name,
1372 )
1373
1374 @property
1375 def traces(self):
1376 return self._traces
1377
1378 @property
1379 def info(self):
1380 return self._info
1381
1382
1383def _tracing_session_descriptors_from_arg(string):
1384 # Format is:
1385 # NAME,ID,HOSTNAME,FREQ,CLIENTS,TRACEPATH[,TRACEPATH]...
1386 parts = string.split(',')
1387 name = parts[0]
1388 tracing_session_id = int(parts[1])
1389 hostname = parts[2]
1390 live_timer_freq = int(parts[3])
1391 client_count = int(parts[4])
1392 traces = [LttngTrace(path) for path in parts[5:]]
1393 return LttngTracingSessionDescriptor(
1394 name, tracing_session_id, hostname, live_timer_freq, client_count, traces
1395 )
1396
1397
1398def _loglevel_parser(string):
1399 loglevels = {'info': logging.INFO, 'warning': logging.WARNING}
1400 if string not in loglevels:
1401 msg = "{} is not a valid loglevel".format(string)
1402 raise argparse.ArgumentTypeError(msg)
1403 return loglevels[string]
1404
1405
1406if __name__ == '__main__':
1407 logging.basicConfig(format='# %(asctime)-25s%(message)s')
1408 parser = argparse.ArgumentParser(
1409 description='LTTng-live protocol mocker', add_help=False
1410 )
1411 parser.add_argument(
1412 '--log-level',
1413 default='warning',
1414 choices=['info', 'warning'],
1415 help='The loglevel to be used.',
1416 )
1417
1418 loglevel_namespace, remaining_args = parser.parse_known_args()
1419 logging.getLogger().setLevel(_loglevel_parser(loglevel_namespace.log_level))
1420
1421 parser.add_argument(
1422 '--port-filename',
1423 help='The final port file. This file is present when the server is ready to receive connection.',
1424 required=True,
1425 )
1426 parser.add_argument(
1427 '--max-query-data-response-size',
1428 type=int,
1429 help='The maximum size of control data response in bytes',
1430 )
1431 parser.add_argument(
1432 'sessions',
1433 nargs="+",
1434 metavar="SESSION",
1435 type=_tracing_session_descriptors_from_arg,
1436 help='A session configuration. There is no space after comma. Format is: NAME,ID,HOSTNAME,FREQ,CLIENTS,TRACEPATH[,TRACEPATH]....',
1437 )
1438 parser.add_argument(
1439 '-h',
1440 '--help',
1441 action='help',
1442 default=argparse.SUPPRESS,
1443 help='Show this help message and exit.',
1444 )
1445
1446 args = parser.parse_args(args=remaining_args)
1447 try:
1448 LttngLiveServer(
1449 args.port_filename, args.sessions, args.max_query_data_response_size
1450 )
1451 except UnexpectedInput as exc:
1452 logging.error(str(exc))
1453 print(exc, file=sys.stderr)
1454 sys.exit(1)
This page took 0.10116 seconds and 4 git commands to generate.