import os.path
import argparse
import tempfile
+from abc import ABC, abstractmethod
from typing import Dict, Union, Iterable, Optional, Sequence, overload
import tjson
return self._path
+# Any LTTng stream (metadata or data).
+class _LttngStream(ABC):
+ @abstractmethod
+ def __init__(self, creation_timestamp: int):
+ self._creation_timestamp: int = creation_timestamp
+
+ @property
+ def creation_timestamp(self):
+ return self._creation_timestamp
+
+
# An LTTng data stream.
-class _LttngDataStream:
- def __init__(self, path: str, beacons_json: Optional[tjson.ArrayVal]):
+class _LttngDataStream(_LttngStream):
+ def __init__(
+ self, path: str, beacons_json: Optional[tjson.ArrayVal], creation_timestamp: int
+ ):
+ super().__init__(creation_timestamp)
self._path = path
filename = os.path.basename(path)
match = re.match(r"(.*)_\d+", filename)
# An LTTng metadata stream.
-class _LttngMetadataStream:
+class _LttngMetadataStream(_LttngStream):
def __init__(
self,
metadata_file_path: str,
config_sections: Sequence[_LttngMetadataStreamSection],
+ creation_timestamp: int,
):
+ super().__init__(creation_timestamp)
self._path = metadata_file_path
self._sections = config_sections
logging.info(
for section in metadata_sections_json:
if isinstance(section, tjson.StrVal):
if section.val == "empty":
- # Found a empty section marker. Actually append the section at the
- # timestamp of the next concrete section.
+ # Found an empty section marker. Actually append the
+ # section at the timestamp of the next concrete section.
append_empty_section = True
else:
raise ValueError("Invalid string value at {}.".format(section.path))
trace_dir: str,
metadata_sections_json: Optional[tjson.ArrayVal],
beacons_json: Optional[tjson.ObjVal],
+ creation_timestamp: int,
):
self._path = trace_dir
+ self._creation_timestamp = creation_timestamp
self._create_metadata_stream(trace_dir, metadata_sections_json)
self._create_data_streams(trace_dir, beacons_json)
logging.info('Built trace: path="{}"'.format(trace_dir))
this_beacons_json = beacons_json.at(stream_name, tjson.ArrayVal)
self._data_streams.append(
- _LttngDataStream(data_stream_path, this_beacons_json)
+ _LttngDataStream(
+ data_stream_path, this_beacons_json, self._creation_timestamp
+ )
)
def _create_metadata_stream(
metadata_path, metadata_sections_json
)
- self._metadata_stream = _LttngMetadataStream(metadata_path, metadata_sections)
+ self._metadata_stream = _LttngMetadataStream(
+ metadata_path, metadata_sections, self.creation_timestamp
+ )
@property
def path(self):
def metadata_stream(self):
return self._metadata_stream
+ @property
+ def creation_timestamp(self):
+ return self._creation_timestamp
+
@overload
def __getitem__(self, index: int) -> _LttngDataStream:
...
return len(self._data_streams)
+# Stream (metadata or data) state specific to the LTTng live protocol.
+class _LttngLiveViewerSessionStreamState:
+ @abstractmethod
+ def __init__(self):
+ # A stream is considered "announced" when it has been returned
+ # to the LTTng live client in response to a "get new stream
+ # infos" (`_LttngLiveViewerGetNewStreamInfosCommand`) command.
+ self._announced = False # type: bool
+ pass
+
+ @property
+ def is_announced(self):
+ return self._announced
+
+ def mark_as_announced(self):
+ self._announced = True
+
+ @property
+ @abstractmethod
+ def stream(self) -> _LttngStream:
+ pass
+
+
# The state of a single data stream.
-class _LttngLiveViewerSessionDataStreamState:
+class _LttngLiveViewerSessionDataStreamState(_LttngLiveViewerSessionStreamState):
def __init__(
self,
ts_state: "_LttngLiveViewerSessionTracingSessionState",
data_stream: _LttngDataStream,
metadata_stream_id: int,
):
+ super().__init__()
self._ts_state = ts_state
self._info = info
self._data_stream = data_stream
return self._info
@property
- def data_stream(self):
+ def stream(self):
return self._data_stream
@property
# The state of a single metadata stream.
-class _LttngLiveViewerSessionMetadataStreamState:
+class _LttngLiveViewerSessionMetadataStreamState(_LttngLiveViewerSessionStreamState):
def __init__(
self,
ts_state: "_LttngLiveViewerSessionTracingSessionState",
info: _LttngLiveViewerStreamInfo,
metadata_stream: _LttngMetadataStream,
):
+ super().__init__()
self._ts_state = ts_state
self._info = info
self._metadata_stream = metadata_stream
else:
self._next_metadata_stream_section_timestamp = None
- self._is_sent = False
+ self._all_data_is_sent = False
fmt = 'Built metadata stream state: id={}, ts-id={}, ts-name="{}", path="{}"'
logging.info(
fmt.format(
return self._info
@property
- def metadata_stream(self):
+ def stream(self):
return self._metadata_stream
@property
- def is_sent(self):
- return self._is_sent
+ def all_data_is_sent(self):
+ return self._all_data_is_sent
- @is_sent.setter
- def is_sent(self, value: bool):
- self._is_sent = value
+ @all_data_is_sent.setter
+ def all_data_is_sent(self, value: bool):
+ self._all_data_is_sent = value
@property
def cur_section(self):
class _LttngLiveViewerSessionTracingSessionState:
def __init__(self, tc_descr: LttngTracingSessionDescriptor, base_stream_id: int):
self._tc_descr = tc_descr
- self._stream_infos = [] # type: list[_LttngLiveViewerStreamInfo]
+ self._client_visible_stream_infos = [] # type: list[_LttngLiveViewerStreamInfo]
self._ds_states = {} # type: dict[int, _LttngLiveViewerSessionDataStreamState]
self._ms_states = (
{}
) # type: dict[int, _LttngLiveViewerSessionMetadataStreamState]
- stream_id = base_stream_id
+ self._last_delivered_index_timestamp = 0
+ self._last_allocated_stream_id = base_stream_id
for trace in tc_descr.traces:
- trace_id = stream_id * 1000
+ trace_stream_infos = [] # type: list[_LttngLiveViewerStreamInfo]
+ trace_id = self._last_allocated_stream_id * 1000
# Metadata stream -> stream info and metadata stream state
info = _LttngLiveViewerStreamInfo(
- stream_id, trace_id, True, trace.metadata_stream.path, "metadata"
+ self._last_allocated_stream_id,
+ trace_id,
+ True,
+ trace.metadata_stream.path,
+ "metadata",
)
- self._stream_infos.append(info)
- self._ms_states[stream_id] = _LttngLiveViewerSessionMetadataStreamState(
+
+ trace_stream_infos.append(info)
+ self._ms_states[
+ self._last_allocated_stream_id
+ ] = _LttngLiveViewerSessionMetadataStreamState(
self, info, trace.metadata_stream
)
- metadata_stream_id = stream_id
- stream_id += 1
+ metadata_stream_id = self._last_allocated_stream_id
+ self._last_allocated_stream_id += 1
# Data streams -> stream infos and data stream states
for data_stream in trace:
info = _LttngLiveViewerStreamInfo(
- stream_id,
+ self._last_allocated_stream_id,
trace_id,
False,
data_stream.path,
data_stream.channel_name,
)
- self._stream_infos.append(info)
- self._ds_states[stream_id] = _LttngLiveViewerSessionDataStreamState(
+ trace_stream_infos.append(info)
+ self._ds_states[
+ self._last_allocated_stream_id
+ ] = _LttngLiveViewerSessionDataStreamState(
self, info, data_stream, metadata_stream_id
)
- stream_id += 1
+ self._last_allocated_stream_id += 1
+
+ if trace.creation_timestamp == 0:
+ # Only announce streams for traces that are created at
+ # the origin.
+ #
+ # The rest of the streams will be discovered by the
+ # client as indexes are received with a "has new data
+ # streams" flag set in the reply.
+ self._client_visible_stream_infos.extend(trace_stream_infos)
self._is_attached = False
fmt = 'Built tracing session state: id={}, name="{}"'
return self._ms_states
@property
- def stream_infos(self):
- return self._stream_infos
+ def client_visible_stream_infos(self):
+ return self._client_visible_stream_infos
@property
def has_new_metadata(self):
- return any([not ms.is_sent for ms in self._ms_states.values()])
+ return any(
+ [
+ ms.is_announced and not ms.all_data_is_sent
+ for ms in self._ms_states.values()
+ ]
+ )
@property
def is_attached(self):
)
ts_id = ts_state.tracing_session_descriptor.info.tracing_session_id
self._ts_states[ts_id] = ts_state
- total_stream_infos += len(ts_state.stream_infos)
+ total_stream_infos += len(ts_state.client_visible_stream_infos)
# Update session's stream states to have the new states
self._stream_states.update(ts_state.data_stream_states)
ts_state.is_attached = True
status = _LttngLiveViewerAttachToTracingSessionReply.Status.OK
+ stream_infos_to_announce = ts_state.client_visible_stream_infos
+
+ # Mark stream infos transmitted as part of the reply as
+ # announced.
+ for si in stream_infos_to_announce:
+ if si.is_metadata:
+ self._get_metadata_stream_state(si.id).mark_as_announced()
+ else:
+ self._get_data_stream_state(si.id).mark_as_announced()
+
return _LttngLiveViewerAttachToTracingSessionReply(
- status, ts_state.stream_infos
+ status, stream_infos_to_announce
)
def _handle_detach_from_tracing_session_command(
status = _LttngLiveViewerDetachFromTracingSessionReply.Status.OK
return _LttngLiveViewerDetachFromTracingSessionReply(status)
+ @staticmethod
+ def _stream_is_ready(
+ stream_state: _LttngLiveViewerSessionStreamState, creation_timestamp: int
+ ):
+ return (
+ not stream_state.is_announced
+ and stream_state.stream.creation_timestamp <= creation_timestamp
+ )
+
+ def _needs_new_streams(self, current_timestamp: int):
+ return any(
+ self._stream_is_ready(ss, current_timestamp)
+ for ss in self._stream_states.values()
+ )
+
def _handle_get_next_data_stream_index_entry_command(
self, cmd: _LttngLiveViewerGetNextDataStreamIndexEntryCommand
):
timestamp_begin = _get_entry_timestamp_begin(stream_state.cur_index_entry)
if needs_new_metadata_section(metadata_stream_state, timestamp_begin):
- metadata_stream_state.is_sent = False
+ metadata_stream_state.all_data_is_sent = False
metadata_stream_state.goto_next_section()
# The viewer only checks the `has_new_metadata` flag if the
status = _LttngLiveViewerGetNextDataStreamIndexEntryReply.Status.INACTIVE
reply = _LttngLiveViewerGetNextDataStreamIndexEntryReply(
- status, stream_state.cur_index_entry, has_new_metadata, False
+ status,
+ stream_state.cur_index_entry,
+ has_new_metadata,
+ self._needs_new_streams(timestamp_begin),
)
+ self._last_delivered_index_timestamp_begin = timestamp_begin
stream_state.goto_next_index_entry()
return reply
fmt = 'Limiting "get data stream packet data" command: req-length={} actual response size={}'
logging.info(fmt.format(cmd.req_length, data_response_length))
- data = stream_state.data_stream.get_data(cmd.offset, data_response_length)
+ data = stream_state.stream.get_data(cmd.offset, data_response_length)
status = _LttngLiveViewerGetDataStreamPacketDataReply.Status.OK
return _LttngLiveViewerGetDataStreamPacketDataReply(status, data, False, False)
logging.info(fmt.format(cmd.stream_id))
metadata_stream_state = self._get_metadata_stream_state(cmd.stream_id)
- if metadata_stream_state.is_sent:
+ if metadata_stream_state.all_data_is_sent:
status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.NO_NEW
return _LttngLiveViewerGetMetadataStreamDataContentReply(status, bytes())
- metadata_stream_state.is_sent = True
+ metadata_stream_state.all_data_is_sent = True
status = _LttngLiveViewerGetMetadataStreamDataContentReply.Status.OK
metadata_section = metadata_stream_state.cur_section
assert metadata_section is not None
# If we are sending an empty section, ready the next one right away.
if len(metadata_section.data) == 0:
- metadata_stream_state.is_sent = False
+ metadata_stream_state.all_data_is_sent = False
metadata_stream_state.goto_next_section()
fmt = 'Replying to "get metadata stream data" command: metadata-size={}'
status, metadata_section.data
)
+ def _get_stream_infos_ready_for_announcement(self):
+ ready_stream_infos = [] # type: list[_LttngLiveViewerStreamInfo]
+
+ for ss in self._stream_states.values():
+ if self._stream_is_ready(ss, ss.stream.creation_timestamp):
+ ready_stream_infos.append(ss.info)
+
+ return ready_stream_infos
+
+ # A stream is considered finished if it has been announced and all
+ # of its index entries have been provided to the client.
+ def _all_streams_finished(self):
+ return all(
+ isinstance(stream_state, _LttngLiveViewerSessionMetadataStreamState)
+ or (stream_state.cur_index_entry is None and stream_state.is_announced)
+ for stream_state in self._stream_states.values()
+ )
+
def _handle_get_new_stream_infos_command(
self, cmd: _LttngLiveViewerGetNewStreamInfosCommand
):
fmt = 'Handling "get new stream infos" command: ts-id={}'
logging.info(fmt.format(cmd.tracing_session_id))
+ newly_announced_stream_infos = self._get_stream_infos_ready_for_announcement()
- # As of this version, all the tracing session's stream infos are
- # always given to the viewer when sending the "attach to tracing
- # session" reply, so there's nothing new here. Return the `HUP`
- # status as, if we're handling this command, the viewer consumed
- # all the existing data streams.
- status = _LttngLiveViewerGetNewStreamInfosReply.Status.HUP
- return _LttngLiveViewerGetNewStreamInfosReply(status, [])
+ # Mark stream infos transmitted as part of the reply as
+ # announced.
+ for si in newly_announced_stream_infos:
+ if si.is_metadata:
+ self._get_metadata_stream_state(si.id).mark_as_announced()
+ else:
+ self._get_data_stream_state(si.id).mark_as_announced()
+
+ status = _LttngLiveViewerGetNewStreamInfosReply.Status.OK
+
+ if len(newly_announced_stream_infos) == 0:
+ # If all streams have been transmitted and no new traces are
+ # scheduled for creation, hang up to signal that the tracing
+ # session is "done".
+ status = (
+ _LttngLiveViewerGetNewStreamInfosReply.Status.HUP
+ if self._all_streams_finished()
+ else _LttngLiveViewerGetNewStreamInfosReply.Status.NO_NEW
+ )
+
+ return _LttngLiveViewerGetNewStreamInfosReply(
+ status, newly_announced_stream_infos
+ )
def _handle_get_tracing_session_infos_command(
self, cmd: _LttngLiveViewerGetTracingSessionInfosCommand
# "client-count": 23,
# "traces": [
# {
- # "path": "lol"
+ # "path": "lol",
+ # creation-timestamp: 12948
# },
# {
# "path": "meow/mix",
else None
)
path = trace_json.at("path", tjson.StrVal).val
+ creation_timestamp = (
+ trace_json.at("creation-timestamp", tjson.IntVal).val
+ if "creation-timestamp" in trace_json
+ else 0
+ ) # type: int
if not os.path.isabs(path) and trace_path_prefix:
path = os.path.join(trace_path_prefix, path)
traces.append(
- LttngTrace(
- path,
- metadata_sections,
- beacons,
- )
+ LttngTrace(path, metadata_sections, beacons, creation_timestamp)
)
sessions.append(