def _create_input_port_message_iterator(self, input_port):
utils._check_type(input_port, bt2_port._UserComponentInputPort)
- status, msg_iter_ptr = native_bt.bt2_self_component_port_input_message_iterator_create_from_sink_component(
+ (
+ status,
+ msg_iter_ptr,
+ ) = native_bt.bt2_self_component_port_input_message_iterator_create_from_sink_component(
self._bt_ptr, input_port._ptr
)
utils._handle_func_status(status, 'cannot create message iterator object')
return bt2_message._create_from_ptr(msg_ptr)
def can_seek_beginning(self):
- status, res = native_bt.self_component_port_input_message_iterator_can_seek_beginning(
+ (
+ status,
+ res,
+ ) = native_bt.self_component_port_input_message_iterator_can_seek_beginning(
self._ptr
)
utils._handle_func_status(
def can_seek_ns_from_origin(self, ns_from_origin):
utils._check_int64(ns_from_origin)
- status, res = native_bt.self_component_port_input_message_iterator_can_seek_ns_from_origin(
+ (
+ status,
+ res,
+ ) = native_bt.self_component_port_input_message_iterator_can_seek_ns_from_origin(
self._ptr, ns_from_origin
)
utils._handle_func_status(
def _create_input_port_message_iterator(self, input_port):
utils._check_type(input_port, bt2_port._UserComponentInputPort)
- status, msg_iter_ptr = native_bt.bt2_self_component_port_input_message_iterator_create_from_message_iterator(
+ (
+ status,
+ msg_iter_ptr,
+ ) = native_bt.bt2_self_component_port_input_message_iterator_create_from_message_iterator(
self._bt_ptr, input_port._ptr
)
utils._handle_func_status(status, 'cannot create message iterator object')
)
def test_can_seek_ns_from_origin_returns_false_can_seek_beginning_forward_seekable(
- self
+ self,
):
# Test the case where:
#
)
def test_can_seek_ns_from_origin_returns_false_can_seek_beginning_not_forward_seekable(
- self
+ self,
):
# Test the case where:
#
)
def test_can_seek_ns_from_origin_returns_false_cant_seek_beginning_forward_seekable(
- self
+ self,
):
# Test the case where:
#
)
def test_can_seek_ns_from_origin_returns_false_cant_seek_beginning_not_forward_seekable(
- self
+ self,
):
# Test the case where:
#
)
def test_no_can_seek_ns_from_origin_no_seek_ns_from_origin_can_seek_beginning_forward_seekable(
- self
+ self,
):
# Test the case where:
#
)
def test_no_can_seek_ns_from_origin_no_seek_ns_from_origin_can_seek_beginning_not_forward_seekable(
- self
+ self,
):
# Test the case where:
#
)
def test_no_can_seek_ns_from_origin_no_seek_ns_from_origin_cant_seek_beginning_forward_seekable(
- self
+ self,
):
# Test the case where:
#
)
def test_no_can_seek_ns_from_origin_no_seek_ns_from_origin_cant_seek_beginning_not_forward_seekable(
- self
+ self,
):
# Test the case where:
#
src._add_output_port('out2', (test_name, cc2))
def create_msgs(msg_iter, params):
- cc, = params
+ (cc,) = params
sb_msg = msg_iter._create_message_iterator_inactivity_message(cc, 0)
msg_iter._msgs = [sb_msg]
version, tracing_session_id, offset, seek_type
)
elif cmd_type == 4:
- stream_id, = self._unpack_payload('Q', data)
+ (stream_id,) = self._unpack_payload('Q', data)
return _LttngLiveViewerGetNextDataStreamIndexEntryCommand(
version, stream_id
)
version, stream_id, offset, req_length
)
elif cmd_type == 6:
- stream_id, = self._unpack_payload('Q', data)
+ (stream_id,) = self._unpack_payload('Q', data)
return _LttngLiveViewerGetMetadataStreamDataCommand(version, stream_id)
elif cmd_type == 7:
- tracing_session_id, = self._unpack_payload('Q', data)
+ (tracing_session_id,) = self._unpack_payload('Q', data)
return _LttngLiveViewerGetNewStreamInfosCommand(version, tracing_session_id)
elif cmd_type == 8:
return _LttngLiveViewerCreateViewerSessionCommand(version)
elif cmd_type == 9:
- tracing_session_id, = self._unpack_payload('Q', data)
+ (tracing_session_id,) = self._unpack_payload('Q', data)
return _LttngLiveViewerDetachFromTracingSessionCommand(
version, tracing_session_id
)
break
assert len(data) == size
- offset_bytes, total_size_bits, content_size_bits, timestamp_begin, timestamp_end, events_discarded, stream_class_id = struct.unpack(
- fmt, data
- )
+ (
+ offset_bytes,
+ total_size_bits,
+ content_size_bits,
+ timestamp_begin,
+ timestamp_end,
+ events_discarded,
+ stream_class_id,
+ ) = struct.unpack(fmt, data)
self._entries.append(
_LttngDataStreamIndexEntry(