from __future__ import print_function
import logging
from compas_fab.backends.interfaces.client import ClientInterface
from compas_fab.backends.vrep import VrepError
from compas_fab.backends.vrep.helpers import DEFAULT_OP_MODE
from compas_fab.backends.vrep.helpers import assert_robot
from compas_fab.backends.vrep.helpers import config_from_vrep
from compas_fab.backends.vrep.helpers import config_to_vrep
from compas_fab.backends.vrep.helpers import floats_from_vrep
from compas_fab.backends.vrep.helpers import floats_to_vrep
from compas_fab.backends.vrep.helpers import resolve_host
from compas_fab.backends.vrep.planner import VrepPlanner
from compas_fab.utilities import LazyLoader
DEFAULT_SCALE = 1.
CHILD_SCRIPT_TYPE = 1 # defined in vrepConst.sim_scripttype_childscript, but redefined here to prevent loading the remoteApi library
LOG = logging.getLogger('compas_fab.backends.vrep.client')
vrep = LazyLoader('vrep', globals(), 'compas_fab.backends.vrep.remote_api.vrep')
__all__ = [
'VrepClient',
]
[docs]class VrepClient(ClientInterface):
"""Interface to run simulations using VREP as
the engine for kinematics and path planning.
:class:`.VrepClient` is a context manager type, so it's best used in combination
with the ``with`` statement to ensure resource deallocation.
Args:
host (:obj:`str`): IP address or DNS name of the V-REP simulator.
port (:obj:`int`): Port of the V-REP simulator.
scale (:obj:`int`): Scaling of the model. Defaults to meters (``1``).
lua_script (:obj:`str`): Name of the LUA script on the V-REP scene.
debug (:obj:`bool`): True to enable debug messages, False otherwise.
Examples:
>>> from compas_fab.backends import VrepClient
>>> with VrepClient() as client:
... print ('Connected: %s' % client.is_connected)
Connected: True
Note:
For more examples, check out the :ref:`V-REP examples page <examples_vrep>`.
"""
[docs] def __init__(self, host='127.0.0.1', port=19997, scale=DEFAULT_SCALE, lua_script='RFL', debug=False):
super(VrepClient, self).__init__()
self.client_id = None
self.host = resolve_host(host)
self.port = port
self.default_timeout_in_ms = -50000000
self.thread_cycle_in_ms = 5
self.debug = debug
self.scale = float(scale)
self.lua_script = lua_script
self._added_handles = []
self.planner = VrepPlanner(self)
def __enter__(self):
# Stop existing simulation, if any
vrep.simxFinish(-1)
if self.debug:
LOG.debug('Connecting to V-REP on %s:%d...', self.host, self.port)
# Connect to V-REP, set a very large timeout for blocking commands
self.client_id = vrep.simxStart(self.host, self.port, True, True,
self.default_timeout_in_ms,
self.thread_cycle_in_ms)
# Start simulation
vrep.simxStartSimulation(self.client_id, DEFAULT_OP_MODE)
if self.client_id == -1:
raise VrepError('Unable to connect to V-REP on %s:%d' % (self.host, self.port), -1)
return self
def __exit__(self, *args):
# Stop simulation
vrep.simxStopSimulation(self.client_id, DEFAULT_OP_MODE)
# Close the connection to V-REP
vrep.simxFinish(self.client_id)
self.client_id = None
# Objects are removed by V-REP itself when simulation stops
self._added_handles = []
if self.debug:
LOG.debug('Disconnected from V-REP')
@property
def is_connected(self):
"""Indicates whether the client has an active connection.
Returns:
bool: True if connected, False otherwise.
"""
return self.client_id is not None and self.client_id != -1
[docs] def get_object_handle(self, object_name):
"""Gets the object handle (identifier) for a given object name.
Args:
object_name (:obj:`str`): Name of the object.
Returns:
int: Object handle.
"""
_res, handle = vrep.simxGetObjectHandle(self.client_id,
object_name,
DEFAULT_OP_MODE)
return handle
[docs] def get_object_matrices(self, object_handles):
"""Gets a dictionary of matrices keyed by object handle.
Args:
object_handles (:obj:`list` of :obj:`float`): List of object handles (identifiers)
to retrieve matrices from.
Returns:
dict: Dictionary of matrices represented by a :obj:`list` of 12 :obj:`float` values.
Examples:
>>> from compas_fab.backends import VrepClient
>>> with VrepClient() as client:
... matrices = client.get_object_matrices([0])
... print([int(i) for i in matrices[0]]) # doctest: +SKIP
[0, 0, 0, 19, 0, 0, 0, 10, 0, 0, 0, 6] # doctest: +SKIP
.. note::
The resulting dictionary is keyed by object handle.
"""
_res, _, matrices, _, _ = self.run_child_script('getShapeMatrices', object_handles, [], [])
return dict([(object_handles[i // 12], floats_from_vrep(matrices[i:i + 12], self.scale)) for i in range(0, len(matrices), 12)])
[docs] def get_all_visible_handles(self):
"""Gets a list of object handles (identifiers) for all visible
shapes of the 3D model.
Returns:
list: List of object handles (identifiers) of the 3D model.
"""
return self.run_child_script('getRobotVisibleShapeHandles', [], [], [])[1]
[docs] def set_robot_metric(self, group, metric_values):
"""Assigns a metric defining relations between axis values of a robot.
It takes a list containing one value per configurable joint. Each value
ranges from 0 to 1, where 1 indicates the axis is blocked and cannot
move during inverse kinematic solving. A value of 1 on any of these
effectively removes one degree of freedom (DOF).
Args:
robot (:class:`compas_fab.robots.Robot`): Robot instance.
metric_values (:obj:`list` of :obj:`float`): List containing one value
per configurable joint. Each value ranges from 0 to 1.
"""
vrep.simxCallScriptFunction(self.client_id,
self.lua_script,
CHILD_SCRIPT_TYPE, 'setTheMetric',
[group], metric_values, [],
bytearray(), DEFAULT_OP_MODE)
[docs] def set_robot_pose(self, robot, frame):
"""Moves the robot to a given pose, specified as a frame.
Args:
robot (:class:`compas_fab.robots.Robot`): Robot instance to move.
frame (:class:`Frame`): Target or goal frame.
Returns:
An instance of :class:`Configuration` found for the given pose.
"""
assert_robot(robot)
# First check if the start state is reachable
joints = len(robot.get_configurable_joints())
options = {
'num_joints': joints,
'metric_values': [0.] * joints,
}
joint_values, joint_names = list(self.inverse_kinematics(robot, frame, group=robot.model.attr['index'], options=options))[-1]
config = config_from_vrep(joint_values, self.scale)
if not config:
raise ValueError('Cannot find a valid config for the given pose')
self.set_robot_config(robot, config)
return config
[docs] def set_robot_config(self, robot, config):
"""Moves the robot to the specified configuration.
Args:
robot (:class:`compas_fab.robots.Robot`): Robot instance to move.
config (:class:`Configuration` instance): Describes the position of the
robot as an instance of :class:`Configuration`.
Examples:
>>> from compas.robots import Configuration
>>> from compas_fab.robots import *
>>> with VrepClient() as client:
... config = Configuration.from_prismatic_and_revolute_values([7.600, -4.500, -5.500],
... to_radians([90, 0, 0, 0, 0, -90]))
... client.set_robot_config(rfl.Robot('A'), config)
...
"""
assert_robot(robot)
if not config:
raise ValueError('Unsupported config value')
values = config_to_vrep(config, self.scale)
self.set_robot_metric(robot.model.attr['index'], [0.0] * len(config.joint_values))
self.run_child_script('moveRobotFK',
[], values, ['robot' + robot.name])
[docs] def get_robot_config(self, robot):
"""Gets the current configuration of the specified robot.
Args:
robot (:class:`compas_fab.robots.Robot`): Robot instance.
Examples:
>>> from compas_fab.robots import *
>>> with VrepClient() as client:
... config = client.get_robot_config(rfl.Robot('A'))
Returns:
An instance of :class:`.Configuration`.
"""
assert_robot(robot)
_res, _, config, _, _ = self.run_child_script('getRobotState',
[robot.model.attr['index']],
[], [])
return config_from_vrep(config, self.scale)
[docs] def find_raw_robot_states(self, group, goal_vrep_pose, gantry_joint_limits, arm_joint_limits, max_trials=None, max_results=1):
i = 0
final_states = []
retry_until_success = True if not max_trials else False
while True:
string_param_list = []
if gantry_joint_limits or arm_joint_limits:
joint_limits = []
joint_limits.extend(floats_to_vrep(gantry_joint_limits or [], self.scale))
joint_limits.extend(arm_joint_limits or [])
string_param_list.append(','.join(map(str, joint_limits)))
res, _, states, _, _ = self.run_child_script('searchRobotStates',
[group,
max_trials or 1,
max_results],
goal_vrep_pose, string_param_list)
# Even if the retry_until_success is set to True, we short circuit
# at some point to prevent infinite loops caused by misconfiguration
i += 1
if i > 20 or (res != 0 and not retry_until_success):
raise VrepError('Failed to search robot states', res)
final_states.extend(states)
if len(final_states):
LOG.info('Found %d valid robot states', len(final_states) // 9)
break
else:
LOG.info('No valid robot states found, will retry.')
return final_states
[docs] def run_child_script(self, function_name, in_ints, in_floats, in_strings):
return vrep.simxCallScriptFunction(self.client_id,
self.lua_script,
CHILD_SCRIPT_TYPE, function_name,
in_ints, in_floats, in_strings,
bytearray(), DEFAULT_OP_MODE)