-# The MIT License (MIT)
+# SPDX-License-Identifier: MIT
#
# Copyright (c) 2017 Philippe Proulx <pproulx@efficios.com>
-#
-# Permission is hereby granted, free of charge, to any person obtaining a copy
-# of this software and associated documentation files (the "Software"), to deal
-# in the Software without restriction, including without limitation the rights
-# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-# copies of the Software, and to permit persons to whom the Software is
-# furnished to do so, subject to the following conditions:
-#
-# The above copyright notice and this permission notice shall be included in
-# all copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-# THE SOFTWARE.
from bt2 import native_bt, object, utils
from bt2 import message_iterator as bt2_message_iterator
)
)
+ if hasattr(iter_cls, '_user_can_seek_ns_from_origin') and not hasattr(
+ iter_cls, '_user_seek_ns_from_origin'
+ ):
+ raise bt2._IncompleteUserClass(
+ "cannot create component class '{}': message iterator class implements _user_can_seek_ns_from_origin but not _user_seek_ns_from_origin".format(
+ cls.__name__
+ )
+ )
+
+ if hasattr(iter_cls, '_user_can_seek_beginning') and not hasattr(
+ iter_cls, '_user_seek_beginning'
+ ):
+ raise bt2._IncompleteUserClass(
+ "cannot create component class '{}': message iterator class implements _user_can_seek_beginning but not _user_seek_beginning".format(
+ cls.__name__
+ )
+ )
+
cls._iter_cls = iter_cls
@property
def _user_get_supported_mip_versions(cls, params, obj, log_level):
return [0]
- def _bt_query_from_native(cls, priv_query_exec_ptr, object, params_ptr, method_obj):
+ def _bt_query_from_native(
+ cls, priv_query_exec_ptr, object_name, params_ptr, method_obj
+ ):
# this can raise, but the native side checks the exception
if params_ptr is not None:
params = bt2_value._create_from_const_ptr_and_get_ref(params_ptr)
try:
# this can raise, but the native side checks the exception
- results = cls._user_query(priv_query_exec, object, params, method_obj)
+ results = cls._user_query(priv_query_exec, object_name, params, method_obj)
finally:
# the private query executor is a private view on the query
# executor; it's not a shared object (the library does not
bt2_value._Value._get_ref(results_ptr)
return int(results_ptr)
- def _user_query(cls, priv_query_executor, object, params, method_obj):
+ def _user_query(cls, priv_query_executor, object_name, params, method_obj):
raise bt2.UnknownObject
def _bt_component_class_ptr(self):
def _add_output_port(self, name, user_data=None):
utils._check_str(name)
+
+ if name in self._output_ports:
+ raise ValueError(
+ 'source component `{}` already contains an output port named `{}`'.format(
+ self.name, name
+ )
+ )
+
fn = native_bt.self_component_source_add_output_port
comp_status, self_port_ptr = fn(self._bt_ptr, name, user_data)
utils._handle_func_status(
comp_status, 'cannot add output port to source component object'
)
assert self_port_ptr is not None
- return bt2_port._UserComponentOutputPort._create_from_ptr(self_port_ptr)
+ return bt2_port._UserComponentOutputPort._create_from_ptr_and_get_ref(
+ self_port_ptr
+ )
class _UserFilterComponent(_UserComponent, _FilterComponentConst):
def _add_output_port(self, name, user_data=None):
utils._check_str(name)
+
+ if name in self._output_ports:
+ raise ValueError(
+ 'filter component `{}` already contains an output port named `{}`'.format(
+ self.name, name
+ )
+ )
+
fn = native_bt.self_component_filter_add_output_port
comp_status, self_port_ptr = fn(self._bt_ptr, name, user_data)
utils._handle_func_status(
comp_status, 'cannot add output port to filter component object'
)
assert self_port_ptr
- return bt2_port._UserComponentOutputPort._create_from_ptr(self_port_ptr)
+ return bt2_port._UserComponentOutputPort._create_from_ptr_and_get_ref(
+ self_port_ptr
+ )
def _add_input_port(self, name, user_data=None):
utils._check_str(name)
+
+ if name in self._input_ports:
+ raise ValueError(
+ 'filter component `{}` already contains an input port named `{}`'.format(
+ self.name, name
+ )
+ )
+
fn = native_bt.self_component_filter_add_input_port
comp_status, self_port_ptr = fn(self._bt_ptr, name, user_data)
utils._handle_func_status(
comp_status, 'cannot add input port to filter component object'
)
assert self_port_ptr
- return bt2_port._UserComponentInputPort._create_from_ptr(self_port_ptr)
+ return bt2_port._UserComponentInputPort._create_from_ptr_and_get_ref(
+ self_port_ptr
+ )
class _UserSinkComponent(_UserComponent, _SinkComponentConst):
def _add_input_port(self, name, user_data=None):
utils._check_str(name)
+
+ if name in self._input_ports:
+ raise ValueError(
+ 'sink component `{}` already contains an input port named `{}`'.format(
+ self.name, name
+ )
+ )
+
fn = native_bt.self_component_sink_add_input_port
comp_status, self_port_ptr = fn(self._bt_ptr, name, user_data)
utils._handle_func_status(
comp_status, 'cannot add input port to sink component object'
)
assert self_port_ptr
- return bt2_port._UserComponentInputPort._create_from_ptr(self_port_ptr)
+ return bt2_port._UserComponentInputPort._create_from_ptr_and_get_ref(
+ self_port_ptr
+ )
- def _create_input_port_message_iterator(self, input_port):
+ def _create_message_iterator(self, input_port):
utils._check_type(input_port, bt2_port._UserComponentInputPort)
- status, msg_iter_ptr = native_bt.bt2_self_component_port_input_message_iterator_create_from_sink_component(
+ if not input_port.is_connected:
+ raise ValueError('input port is not connected')
+
+ (
+ status,
+ msg_iter_ptr,
+ ) = native_bt.bt2_message_iterator_create_from_sink_component(
self._bt_ptr, input_port._ptr
)
utils._handle_func_status(status, 'cannot create message iterator object')