Skip to content
Snippets Groups Projects
Commit ceaf63a2 authored by Brian Hou's avatar Brian Hou
Browse files

Update Project 4: Planning.

Vectorize Dubins path computation. This is orders of magnitudes faster than the
previous implementation, so graph caching is now less necessary. As a result,
only graphs for ROS maps are cached; the other test maps are too small to merit
the hassle.

Fix a serious issue with PlanarProblems. Distances were previously measured in
pixels rather than meters. As a result, the connection radius and curvature
parameters didn't have a clear physical interpretation, and would differ based
on the resolution of the map (the conversion factor between meters and pixels).

Add a more efficient edge visualization to RViz, which incrementally sends edges
to be visualized.

Fix minor bugs with zero edges and zero valid samples.

Set a minimum controller speed to avoid issues with reference velocities of
zero. This would occur at the end of a path.
parent 1025d23a
No related branches found
No related tags found
No related merge requests found
Pipeline #425031 failed with stage
in 14 minutes and 34 seconds
Showing
with 322 additions and 269 deletions
...@@ -188,7 +188,9 @@ class ControlROS: ...@@ -188,7 +188,9 @@ class ControlROS:
""" """
if not self.controller.is_alive(): if not self.controller.is_alive():
raise RuntimeError("Path command set before controller was started") raise RuntimeError("Path command set before controller was started")
path_xytv = time_parameterize_ramp_up_ramp_down(path_xyt, speed) path_xytv = time_parameterize_ramp_up_ramp_down(
path_xyt, speed, self.controller.min_speed
)
self.follow_path(path_xytv) self.follow_path(path_xytv)
def follow_path(self, path_xytv): def follow_path(self, path_xytv):
...@@ -294,6 +296,7 @@ def get_ros_params(): ...@@ -294,6 +296,7 @@ def get_ros_params():
"finish_threshold": float(rospy.get_param("~finish_threshold", 0.3)), "finish_threshold": float(rospy.get_param("~finish_threshold", 0.3)),
"exceed_threshold": float(rospy.get_param("~exceed_threshold", 4.00)), "exceed_threshold": float(rospy.get_param("~exceed_threshold", 4.00)),
"distance_lookahead": float(rospy.get_param("~distance_lookahead", 0.6)), "distance_lookahead": float(rospy.get_param("~distance_lookahead", 0.6)),
"min_speed": float(rospy.get_param("~min_speed", 0.5)),
} }
controller_type = rospy.get_param("~type", "").lower() controller_type = rospy.get_param("~type", "").lower()
...@@ -307,12 +310,14 @@ def get_ros_params(): ...@@ -307,12 +310,14 @@ def get_ros_params():
override_param(params, "~pid/finish_threshold", float) override_param(params, "~pid/finish_threshold", float)
override_param(params, "~pid/exceed_threshold", float) override_param(params, "~pid/exceed_threshold", float)
override_param(params, "~pid/distance_lookahead", float) override_param(params, "~pid/distance_lookahead", float)
override_param(params, "~pid/min_speed", float)
elif controller_type == "pp": elif controller_type == "pp":
params = {} params = {}
override_param(params, "~pp/frequency", float) override_param(params, "~pp/frequency", float)
override_param(params, "~pp/finish_threshold", float) override_param(params, "~pp/finish_threshold", float)
override_param(params, "~pp/exceed_threshold", float) override_param(params, "~pp/exceed_threshold", float)
override_param(params, "~pp/distance_lookahead", float) override_param(params, "~pp/distance_lookahead", float)
override_param(params, "~pp/min_speed", float)
override_param(params, "~pp/car_length", float) override_param(params, "~pp/car_length", float)
elif controller_type == "mpc": elif controller_type == "mpc":
params = { params = {
...@@ -338,6 +343,7 @@ def get_ros_params(): ...@@ -338,6 +343,7 @@ def get_ros_params():
override_param(params, "~mpc/finish_threshold", float) override_param(params, "~mpc/finish_threshold", float)
override_param(params, "~mpc/exceed_threshold", float) override_param(params, "~mpc/exceed_threshold", float)
override_param(params, "~mpc/distance_lookahead", float) override_param(params, "~mpc/distance_lookahead", float)
override_param(params, "~mpc/min_speed", float)
override_param(params, "~mpc/car_length", float) override_param(params, "~mpc/car_length", float)
else: else:
raise RuntimeError( raise RuntimeError(
......
...@@ -27,6 +27,7 @@ class BaseController(object): ...@@ -27,6 +27,7 @@ class BaseController(object):
"finish_threshold", "finish_threshold",
"exceed_threshold", "exceed_threshold",
"distance_lookahead", "distance_lookahead",
"min_speed",
} }
if not self.__properties == set(kwargs): if not self.__properties == set(kwargs):
raise ValueError( raise ValueError(
...@@ -305,7 +306,7 @@ class BaseController(object): ...@@ -305,7 +306,7 @@ class BaseController(object):
return True return True
def time_parameterize_ramp_up_ramp_down(path_xyt, speed): def time_parameterize_ramp_up_ramp_down(path_xyt, speed, min_speed):
"""Parameterize a geometric path of states with a desired speed. """Parameterize a geometric path of states with a desired speed.
Vehicles can't instantly reach the desired speed, so we need to ramp up to Vehicles can't instantly reach the desired speed, so we need to ramp up to
...@@ -323,6 +324,7 @@ def time_parameterize_ramp_up_ramp_down(path_xyt, speed): ...@@ -323,6 +324,7 @@ def time_parameterize_ramp_up_ramp_down(path_xyt, speed):
# guarantee forward movement). # guarantee forward movement).
if path_xyt.shape[0] < 4: if path_xyt.shape[0] < 4:
speeds = speed * np.ones(path_xyt.shape[0]) speeds = speed * np.ones(path_xyt.shape[0])
speeds = np.maximum(speeds, min_speed)
return np.hstack([path_xyt, speeds[:, np.newaxis]]) return np.hstack([path_xyt, speeds[:, np.newaxis]])
ramp_distance = 0.5 ramp_distance = 0.5
...@@ -340,4 +342,5 @@ def time_parameterize_ramp_up_ramp_down(path_xyt, speed): ...@@ -340,4 +342,5 @@ def time_parameterize_ramp_up_ramp_down(path_xyt, speed):
[0.0, change_points[0] / len(path_xyt), change_points[1] / len(path_xyt), 1.0], [0.0, change_points[0] / len(path_xyt), change_points[1] / len(path_xyt), 1.0],
[0, speed, speed, 0], [0, speed, speed, 0],
) )
speeds = np.maximum(speeds, min_speed)
return np.hstack([path_xyt, speeds[:, np.newaxis]]) return np.hstack([path_xyt, speeds[:, np.newaxis]])
...@@ -14,6 +14,7 @@ class TestController(unittest.TestCase): ...@@ -14,6 +14,7 @@ class TestController(unittest.TestCase):
"finish_threshold": 0.2, "finish_threshold": 0.2,
"exceed_threshold": 2, "exceed_threshold": 2,
"distance_lookahead": 0.8, "distance_lookahead": 0.8,
"min_speed": 0.5,
} }
self.controller = BaseController(**self.defaults) self.controller = BaseController(**self.defaults)
......
...@@ -69,6 +69,7 @@ if __name__ == "__main__": ...@@ -69,6 +69,7 @@ if __name__ == "__main__":
# test (defined above) in different configurations and present the correct # test (defined above) in different configurations and present the correct
# facade to the test runner. # facade to the test runner.
controller_type, params = get_ros_params() controller_type, params = get_ros_params()
params["min_speed"] = 0.0 # remain consistent with old behavior for tests
performance_expectations = [ performance_expectations = [
("line", line(), 0.12), ("line", line(), 0.12),
......
...@@ -19,6 +19,7 @@ class TestMPCController(unittest.TestCase): ...@@ -19,6 +19,7 @@ class TestMPCController(unittest.TestCase):
"finish_threshold": 0.2, "finish_threshold": 0.2,
"exceed_threshold": 4.0, "exceed_threshold": 4.0,
"distance_lookahead": 1.0, "distance_lookahead": 1.0,
"min_speed": 0.5,
"car_length": 0.33, "car_length": 0.33,
"car_width": 0.15, "car_width": 0.15,
"min_delta": -0.34, "min_delta": -0.34,
......
...@@ -14,6 +14,7 @@ class TestPIDController(unittest.TestCase): ...@@ -14,6 +14,7 @@ class TestPIDController(unittest.TestCase):
"finish_threshold": 0.2, "finish_threshold": 0.2,
"exceed_threshold": 4.0, "exceed_threshold": 4.0,
"distance_lookahead": 1.0, "distance_lookahead": 1.0,
"min_speed": 0.5,
"kp": 1.0, "kp": 1.0,
"kd": 1.0, "kd": 1.0,
} }
......
...@@ -14,6 +14,7 @@ class TestPPController(unittest.TestCase): ...@@ -14,6 +14,7 @@ class TestPPController(unittest.TestCase):
"distance_lookahead": 0.1, "distance_lookahead": 0.1,
"finish_threshold": 1.0, "finish_threshold": 1.0,
"exceed_threshold": 4.0, "exceed_threshold": 4.0,
"min_speed": 0.5,
"car_length": 1, "car_length": 1,
} }
......
...@@ -267,36 +267,32 @@ def get_map(map_topic): ...@@ -267,36 +267,32 @@ def get_map(map_topic):
return map_img, map_msg.info return map_img, map_msg.info
def map_to_world(poses, map_info): def map_to_world(poses, map_info, out=None):
"""Convert an array of pixel locations in the map to poses in the world. """Convert an array of pixel locations in the map to poses in the world.
Args: Args:
poses: Pixel poses in the map, converted in place. Should be nx3 np.array. poses: Pixel poses in the map, converted in place. Should be nx3 np.array.
map_info: Info about the map (returned by get_map) map_info: Info about the map (returned by get_map)
out: Optional output buffer to store results in. Defaults to the poses array
Returns:
Poses in the world as a np.array
""" """
poses = np.atleast_2d(poses)
if out is None:
out = poses
scale = map_info.resolution scale = map_info.resolution
angle = quaternion_to_angle(map_info.origin.orientation) angle = quaternion_to_angle(map_info.origin.orientation)
# Rotation # Rotation
c, s = np.cos(angle), np.sin(angle) out[:, [0, 1]] = np.matmul(poses[:, [0, 1]], rotation_matrix(angle))
out[:, 2] = poses[:, 2] + angle
# Store the x coordinates since they will be overwritten
temp = np.copy(poses[:, 0])
poses[:, 0] = c * poses[:, 0] - s * poses[:, 1]
poses[:, 1] = s * temp + c * poses[:, 1]
# Scale # Scale
poses[:, :2] *= float(scale) out[:, :2] *= float(scale)
# Translate # Translate
poses[:, 0] += map_info.origin.position.x out[:, 0] += map_info.origin.position.x
poses[:, 1] += map_info.origin.position.y out[:, 1] += map_info.origin.position.y
poses[:, 2] += angle
return poses return out
def world_to_map(poses, map_info, out=None): def world_to_map(poses, map_info, out=None):
...@@ -307,6 +303,7 @@ def world_to_map(poses, map_info, out=None): ...@@ -307,6 +303,7 @@ def world_to_map(poses, map_info, out=None):
map_info: Info about the map (returned by get_map) map_info: Info about the map (returned by get_map)
out: Optional output buffer to store results in. Defaults to the poses array out: Optional output buffer to store results in. Defaults to the poses array
""" """
poses = np.atleast_2d(poses)
if out is None: if out is None:
out = poses out = poses
scale = map_info.resolution scale = map_info.resolution
...@@ -323,6 +320,8 @@ def world_to_map(poses, map_info, out=None): ...@@ -323,6 +320,8 @@ def world_to_map(poses, map_info, out=None):
out[:, [0, 1]] = np.matmul(out[:, [0, 1]], rotation_matrix(angle)) out[:, [0, 1]] = np.matmul(out[:, [0, 1]], rotation_matrix(angle))
out[:, 2] = poses[:, 2] + angle out[:, 2] = poses[:, 2] + angle
return out
def estimation_error(estimates, references): def estimation_error(estimates, references):
"""Compute the error between the estimated and ground truth states. """Compute the error between the estimated and ground truth states.
......
...@@ -10,8 +10,8 @@ ...@@ -10,8 +10,8 @@
<arg name="sampler" default="halton"/> <arg name="sampler" default="halton"/>
<arg name="problem" default="se2"/> <arg name="problem" default="se2"/>
<arg name="num_vertices" default="200"/> <arg name="num_vertices" default="200"/>
<arg name="connection_radius" default="100.0"/> <arg name="connection_radius" default="10.0"/>
<arg name="curvature" default="0.05"/> <arg name="curvature" default="1.0"/>
<include file="$(find cse478)/launch/teleop.launch"> <include file="$(find cse478)/launch/teleop.launch">
<arg name="fake_localization" value="$(arg fake_localization)" /> <arg name="fake_localization" value="$(arg fake_localization)" />
......
...@@ -10,8 +10,8 @@ ...@@ -10,8 +10,8 @@
<arg name="rviz" default="true" /> <arg name="rviz" default="true" />
<arg name="num_vertices" default="200"/> <arg name="num_vertices" default="200"/>
<arg name="connection_radius" default="100.0"/> <arg name="connection_radius" default="10.0"/>
<arg name="curvature" default="0.05"/> <arg name="curvature" default="1.0"/>
<include file="$(find cse478)/launch/teleop.launch"> <include file="$(find cse478)/launch/teleop.launch">
<arg name="fake_localization" value="$(arg fake_localization)" /> <arg name="fake_localization" value="$(arg fake_localization)" />
......
...@@ -17,29 +17,28 @@ from planning.samplers import samplers ...@@ -17,29 +17,28 @@ from planning.samplers import samplers
def main(args): def main(args):
map_info = None
saveto = None
if args.use_ros_map: if args.use_ros_map:
permissible_region, map_info = utils.get_map("/static_map") permissible_region, map_info = utils.get_map("/static_map")
map_name = None saveto = graph_location(
args.problem,
args.sampler,
args.num_vertices,
args.connection_radius,
args.curvature,
)
elif args.text_map: elif args.text_map:
permissible_region = np.loadtxt(args.text_map, dtype=bool) permissible_region = np.loadtxt(args.text_map, dtype=bool)
_, fname = os.path.split(args.text_map)
map_name, _ = os.path.splitext(fname)
else: else:
permissible_region = np.ones((10, 10), dtype=bool) permissible_region = np.ones((10, 10), dtype=bool)
map_name = "empty"
saveto = graph_location(
args.problem,
args.sampler,
args.num_vertices,
args.connection_radius,
args.curvature,
map_name,
)
if args.problem == "r2": if args.problem == "r2":
problem = R2Problem(permissible_region) problem = R2Problem(permissible_region, map_info=map_info)
else: else:
problem = SE2Problem(permissible_region, curvature=args.curvature) problem = SE2Problem(
permissible_region, map_info=map_info, curvature=args.curvature
)
sampler = samplers[args.sampler](problem.extents) sampler = samplers[args.sampler](problem.extents)
rm = Roadmap( rm = Roadmap(
problem, problem,
...@@ -56,7 +55,7 @@ if __name__ == "__main__": ...@@ -56,7 +55,7 @@ if __name__ == "__main__":
parser = argparse.ArgumentParser(description="generate and visualize roadmaps") parser = argparse.ArgumentParser(description="generate and visualize roadmaps")
parser.add_argument("-n", "--num-vertices", type=int, default=100) parser.add_argument("-n", "--num-vertices", type=int, default=100)
parser.add_argument("-r", "--connection-radius", type=float, default=2.0) parser.add_argument("-r", "--connection-radius", type=float, default=2.0)
parser.add_argument("-c", "--curvature", type=float, default=0.05) parser.add_argument("-c", "--curvature", type=float, default=1.0)
parser.add_argument("--sampler", choices=samplers, default="halton") parser.add_argument("--sampler", choices=samplers, default="halton")
parser.add_argument("--lazy", action="store_true") parser.add_argument("--lazy", action="store_true")
parser.add_argument("--show-edges", action="store_true") parser.add_argument("--show-edges", action="store_true")
......
...@@ -63,7 +63,7 @@ if __name__ == "__main__": ...@@ -63,7 +63,7 @@ if __name__ == "__main__":
) )
parser.add_argument("-m", "--text-map", type=str, help="the environment to plan on") parser.add_argument("-m", "--text-map", type=str, help="the environment to plan on")
parser.add_argument("-n", "--num-vertices", type=int, required=True) parser.add_argument("-n", "--num-vertices", type=int, required=True)
parser.add_argument("-r", "--connection-radius", type=float, default=2.0) parser.add_argument("-r", "--connection-radius", type=float, default=1.0)
parser.add_argument("--sampler", type=str, default="halton", choices=samplers) parser.add_argument("--sampler", type=str, default="halton", choices=samplers)
parser.add_argument("--lazy", action="store_true") parser.add_argument("--lazy", action="store_true")
parser.add_argument("--shortcut", action="store_true") parser.add_argument("--shortcut", action="store_true")
...@@ -76,7 +76,7 @@ if __name__ == "__main__": ...@@ -76,7 +76,7 @@ if __name__ == "__main__":
se2_parser = subparsers.add_parser("se2", help="SE2 options") se2_parser = subparsers.add_parser("se2", help="SE2 options")
se2_parser.add_argument("-s", "--start", nargs=3, type=int, required=True) se2_parser.add_argument("-s", "--start", nargs=3, type=int, required=True)
se2_parser.add_argument("-g", "--goal", nargs=3, type=int, required=True) se2_parser.add_argument("-g", "--goal", nargs=3, type=int, required=True)
se2_parser.add_argument("-c", "--curvature", type=float, default=0.05) se2_parser.add_argument("-c", "--curvature", type=float, default=1.0)
args = parser.parse_args() args = parser.parse_args()
main(args) main(args)
...@@ -10,9 +10,10 @@ import math ...@@ -10,9 +10,10 @@ import math
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import numpy as np import numpy as np
from cse478 import utils from cse478 import utils
import warnings
def path_planning(start, end, curvature, resolution=0.1): def path_planning(start, end, curvature, resolution=0.1, interpolate_line=True):
"""Return the Dubins path between two states. """Return the Dubins path between two states.
Args: Args:
...@@ -41,7 +42,7 @@ def path_planning(start, end, curvature, resolution=0.1): ...@@ -41,7 +42,7 @@ def path_planning(start, end, curvature, resolution=0.1):
end[2] -= start[2] end[2] -= start[2]
path, mode, length = path_planning_from_origin( path, mode, length = path_planning_from_origin(
end, curvature, resolution=resolution end, curvature, resolution=resolution, interpolate_line=interpolate_line
) )
# Transform back into original reference frame # Transform back into original reference frame
inv_rot = utils.rotation_matrix(-start_orientation) inv_rot = utils.rotation_matrix(-start_orientation)
...@@ -58,28 +59,35 @@ def path_planning(start, end, curvature, resolution=0.1): ...@@ -58,28 +59,35 @@ def path_planning(start, end, curvature, resolution=0.1):
return path, real_path_length return path, real_path_length
def path_length(s, e, c): def path_length(start, end, curvature):
"""Return the length of the Dubins path between two states. """Return the length of the Dubins path between pairs of states.
Args: Args:
s: see documentation for start above start: start configurations shape (N, 3)
e: see documentation for end above end: end configurations shape (N, 3)
c: see documentation for curvature above curvature: shape N
Returns: Returns:
length: length of Dubins path length: lengths of Dubins paths shape N
""" """
# Transform to make start of path the origin # Transform to make start of path the origin
start = np.array(s, dtype=float) start = np.atleast_2d(start)
end = np.array(e, dtype=float) end = np.atleast_2d(end)
end[:2] -= start[:2] # We'll gracefully handle mismatched argument dimensions
start_orientation = start[2] # in some situations where broadcasting semantics apply;
# if either start or end have a singleton dimension, they'll
# be broadcast to match the larger argument.
end_broad = np.empty(np.broadcast(start, end).shape)
end_broad[:, :2] = end[:, :2] - start[:, :2]
end_broad[:, 2] = end[:, 2]
start_orientation = start[:, 2]
rotation = utils.rotation_matrix(start_orientation) rotation = utils.rotation_matrix(start_orientation)
end[:2] = np.matmul(end[:2], rotation) rotation = np.moveaxis(rotation, -1, 0)
end[2] -= start[2] end_broad[:, :2] = np.matmul(end_broad[:, :2][:, np.newaxis], rotation).squeeze()
end_broad[:, 2] -= start[:, 2]
lengths, mode, cost = get_best_plan_from_origin(end, c) _, _, cost = get_best_plan_from_origin(end_broad, curvature)
return cost * (1 / c) return cost * (1 / curvature)
########################## ##########################
...@@ -87,165 +95,138 @@ def path_length(s, e, c): ...@@ -87,165 +95,138 @@ def path_length(s, e, c):
########################## ##########################
def mod2pi(theta): def mod2pi(theta, out=None):
return theta - 2.0 * math.pi * math.floor(theta / 2.0 / math.pi) if out is None:
return theta - 2.0 * np.pi * np.floor(theta / 2.0 / np.pi)
else:
out[:] = theta - 2.0 * np.pi * np.floor(theta / 2.0 / np.pi)
def pi_2_pi(angles): def pi_2_pi(angles):
"""
Wrap angles to (-pi, pi]
Args:
angles:
"""
angles[:] = (angles[:] - np.pi) % (2 * np.pi) + np.pi angles[:] = (angles[:] - np.pi) % (2 * np.pi) + np.pi
def lsl(alpha, beta, d): planner_modes = ["LSL", "RSR", "LSR", "RSL", "RLR", "LRL"]
sa = math.sin(alpha)
sb = math.sin(beta)
ca = math.cos(alpha)
cb = math.cos(beta)
c_ab = math.cos(alpha - beta)
tmp0 = d + sa - sb
mode = "LSL" def planner(alpha, beta, d):
"""
Args:
alpha: shape N
beta: shape N
d: shape N
Returns: np.array of path parameters with shape (N, 6, 3) where
the second dimension indexes the planner mode and
the third dimension indexes the segment parameter.
Turning segments are in radians and lines are in
distance units.
"""
out = np.empty((len(alpha), 6, 3))
sa = np.sin(alpha)
sb = np.sin(beta)
ca = np.cos(alpha)
cb = np.cos(beta)
c_ab = np.cos(alpha - beta)
# LSL
tmp0 = d + sa - sb
p_squared = 2 + (d * d) - (2 * c_ab) + (2 * d * (sa - sb)) p_squared = 2 + (d * d) - (2 * c_ab) + (2 * d * (sa - sb))
if p_squared < 0: tmp1 = np.arctan2((cb - ca), tmp0)
return None, None, None, mode out[:, 0, 0] = -alpha + tmp1
tmp1 = math.atan2((cb - ca), tmp0) out[:, 0, 1] = np.sqrt(p_squared)
t = mod2pi(-alpha + tmp1) out[:, 0, 2] = beta - tmp1
p = math.sqrt(p_squared)
q = mod2pi(beta - tmp1)
return t, p, q, mode
def rsr(alpha, beta, d):
sa = math.sin(alpha)
sb = math.sin(beta)
ca = math.cos(alpha)
cb = math.cos(beta)
c_ab = math.cos(alpha - beta)
# RSR
tmp0 = d - sa + sb tmp0 = d - sa + sb
mode = "RSR"
p_squared = 2 + (d * d) - (2 * c_ab) + (2 * d * (sb - sa)) p_squared = 2 + (d * d) - (2 * c_ab) + (2 * d * (sb - sa))
if p_squared < 0: tmp1 = np.arctan2((ca - cb), tmp0)
return None, None, None, mode out[:, 1, 0] = alpha - tmp1
tmp1 = math.atan2((ca - cb), tmp0) out[:, 1, 1] = np.sqrt(p_squared)
t = mod2pi(alpha - tmp1) out[:, 1, 2] = -beta + tmp1
p = math.sqrt(p_squared)
q = mod2pi(-beta + tmp1)
return t, p, q, mode
def lsr(alpha, beta, d):
sa = math.sin(alpha)
sb = math.sin(beta)
ca = math.cos(alpha)
cb = math.cos(beta)
c_ab = math.cos(alpha - beta)
# LSR
p_squared = -2 + (d * d) + (2 * c_ab) + (2 * d * (sa + sb)) p_squared = -2 + (d * d) + (2 * c_ab) + (2 * d * (sa + sb))
mode = "LSR" out[:, 2, 1] = np.sqrt(p_squared)
if p_squared < 0: p = out[:, 2, 1]
return None, None, None, mode tmp2 = np.arctan2((-ca - cb), (d + sa + sb)) - np.arctan2(-2.0, p)
p = math.sqrt(p_squared) out[:, 2, 0] = -alpha + tmp2
tmp2 = math.atan2((-ca - cb), (d + sa + sb)) - math.atan2(-2.0, p) out[:, 2, 2] = -mod2pi(beta) + tmp2
t = mod2pi(-alpha + tmp2)
q = mod2pi(-mod2pi(beta) + tmp2)
return t, p, q, mode
def rsl(alpha, beta, d):
sa = math.sin(alpha)
sb = math.sin(beta)
ca = math.cos(alpha)
cb = math.cos(beta)
c_ab = math.cos(alpha - beta)
# RSL
p_squared = (d * d) - 2 + (2 * c_ab) - (2 * d * (sa + sb)) p_squared = (d * d) - 2 + (2 * c_ab) - (2 * d * (sa + sb))
mode = "RSL" out[:, 3, 1] = np.sqrt(p_squared)
if p_squared < 0: p = out[:, 3, 1]
return None, None, None, mode tmp2 = np.arctan2((ca + cb), (d - sa - sb)) - np.arctan2(2.0, p)
p = math.sqrt(p_squared) out[:, 3, 0] = alpha - tmp2
tmp2 = math.atan2((ca + cb), (d - sa - sb)) - math.atan2(2.0, p) out[:, 3, 2] = beta - tmp2
t = mod2pi(alpha - tmp2)
q = mod2pi(beta - tmp2)
return t, p, q, mode
# RLR
def rlr(alpha, beta, d):
sa = math.sin(alpha)
sb = math.sin(beta)
ca = math.cos(alpha)
cb = math.cos(beta)
c_ab = math.cos(alpha - beta)
mode = "RLR"
tmp_rlr = (6.0 - d * d + 2.0 * c_ab + 2.0 * d * (sa - sb)) / 8.0 tmp_rlr = (6.0 - d * d + 2.0 * c_ab + 2.0 * d * (sa - sb)) / 8.0
if abs(tmp_rlr) > 1.0: out[:, 4, 1] = mod2pi(2 * np.pi - np.arccos(tmp_rlr))
return None, None, None, mode p = out[:, 4, 1]
out[:, 4, 0] = alpha - np.arctan2(ca - cb, d - sa + sb) + mod2pi(p / 2.0)
p = mod2pi(2 * math.pi - math.acos(tmp_rlr)) rlr_t = out[:, 4, 0]
t = mod2pi(alpha - math.atan2(ca - cb, d - sa + sb) + mod2pi(p / 2.0)) out[:, 4, 2] = alpha - beta - rlr_t + mod2pi(p)
q = mod2pi(alpha - beta - t + mod2pi(p))
return t, p, q, mode
def lrl(alpha, beta, d): # LRL
sa = math.sin(alpha)
sb = math.sin(beta)
ca = math.cos(alpha)
cb = math.cos(beta)
c_ab = math.cos(alpha - beta)
mode = "LRL"
tmp_lrl = (6.0 - d * d + 2.0 * c_ab + 2.0 * d * (-sa + sb)) / 8.0 tmp_lrl = (6.0 - d * d + 2.0 * c_ab + 2.0 * d * (-sa + sb)) / 8.0
if abs(tmp_lrl) > 1: out[:, 5, 1] = mod2pi(2 * np.pi - np.arccos(tmp_lrl))
return None, None, None, mode p = out[:, 5, 1]
p = mod2pi(2 * math.pi - math.acos(tmp_lrl)) out[:, 5, 0] = -alpha - np.arctan2(ca - cb, d + sa - sb) + p / 2.0
t = mod2pi(-alpha - math.atan2(ca - cb, d + sa - sb) + p / 2.0) lrl_t = out[:, 5, 0]
q = mod2pi(mod2pi(beta) - alpha - t + mod2pi(p)) out[:, 5, 2] = mod2pi(beta) - alpha - lrl_t + mod2pi(p)
return t, p, q, mode mod2pi(out[:, :, 0], out=out[:, :, 0])
mod2pi(out[:, :, 2], out=out[:, :, 2])
return out
def get_best_plan_from_origin(goal, curvature): def get_best_plan_from_origin(goal, curvature):
# nomalize # nomalize
d = math.sqrt(goal[0] ** 2.0 + goal[1] ** 2.0) d = np.sqrt(goal[:, 0] ** 2.0 + goal[:, 1] ** 2.0)
d = d * curvature d = d * curvature
# print(dx, dy, D, d) # print(dx, dy, D, d)
theta = mod2pi(math.atan2(goal[1], goal[0])) theta = mod2pi(np.arctan2(goal[:, 1], goal[:, 0]))
alpha = mod2pi(-theta) alpha = mod2pi(-theta)
beta = mod2pi(goal[2] - theta) beta = mod2pi(goal[:, 2] - theta)
# print(theta, alpha, beta, d) # print(theta, alpha, beta, d)
planners = [lsl, rsr, lsr, rsl, rlr, lrl] # Some configurations can't be connected with some (or all)
# Dubins path configurations. This will manifest as domain errors
bcost = float("inf") # in calls to trig functions. Numpy gracefully propogates NaNs
bt, bp, bq, bmode = None, None, None, None # and we will hide the warning because the code can handle them
# just fine.
for planner in planners: with warnings.catch_warnings():
t, p, q, mode = planner(alpha, beta, d) warnings.simplefilter("ignore")
if t is None: all_plans = planner(alpha, beta, d)
continue all_i = np.arange(len(all_plans))
cost = np.abs(all_plans).sum(-1)
cost = abs(t) + abs(p) + abs(q) best_i = np.nanargmin(cost, axis=1)
if bcost > cost: best_plan = all_plans[all_i, best_i]
bt, bp, bq, bmode = t, p, q, mode best_cost = cost[all_i, best_i]
bcost = cost return best_plan, list(map(lambda x: planner_modes[x], best_i)), best_cost
return [bt, bp, bq], bmode, bcost
def path_planning_from_origin(goal, curvature, resolution=0.1, interpolate_line=False):
def path_planning_from_origin(goal, curvature, resolution=0.1):
if np.all(np.array(goal) == 0.0): if np.all(np.array(goal) == 0.0):
return np.zeros((1, 3)), [], 0.0 return np.zeros((1, 3)), [], 0.0
lengths, modes, cost = get_best_plan_from_origin(goal, curvature) lengths, modes, cost = get_best_plan_from_origin(np.atleast_2d(goal), curvature)
path = generate_course(lengths, modes, curvature, step_size=resolution) path = generate_course(
lengths.squeeze(),
modes,
curvature,
step_size=resolution,
interpolate_line=interpolate_line,
)
return path, modes, cost return path, modes, cost
...@@ -273,14 +254,17 @@ def turn(radius, angle, waypoint_sep=0.1): ...@@ -273,14 +254,17 @@ def turn(radius, angle, waypoint_sep=0.1):
return turn_poses return turn_poses
def generate_course(lengths, mode, curvature, step_size=0.1): def generate_course(lengths, mode, curvature, step_size=0.1, interpolate_line=False):
segments = [] segments = []
for m, length in zip(mode, lengths): for m, length in zip(mode[0], lengths):
# 0 length segments are just a noop # 0 length segments are just a noop
if length == 0.0: if length == 0.0:
continue continue
if m == "S": if m == "S":
segments.append(line(length / curvature, step_size)) if interpolate_line:
segments.append(line(length / curvature, step_size))
else:
segments.append(np.array([[0, 0, 0], [length / curvature, 0, 0]]))
elif m == "L": elif m == "L":
segments.append(turn(1 / curvature, length, step_size)) segments.append(turn(1 / curvature, length, step_size))
elif m == "R": elif m == "R":
......
...@@ -55,7 +55,10 @@ class PlannerROS: ...@@ -55,7 +55,10 @@ class PlannerROS:
self.permissible_region, self.map_info = utils.get_map("/static_map") self.permissible_region, self.map_info = utils.get_map("/static_map")
self.problem = SE2Problem( self.problem = SE2Problem(
self.permissible_region, check_resolution=0.1, curvature=curvature self.permissible_region,
map_info=self.map_info,
check_resolution=0.1,
curvature=curvature,
) )
self.rm = None self.rm = None
...@@ -87,7 +90,7 @@ class PlannerROS: ...@@ -87,7 +90,7 @@ class PlannerROS:
) )
load_time = time.time() - start_stamp load_time = time.time() - start_stamp
rospy.loginfo("Roadmap constructed in {:2.2f}s".format(load_time)) rospy.loginfo("Roadmap constructed in {:2.2f}s".format(load_time))
self.visualize() rospy.Timer(rospy.Duration(1), lambda x: self.visualize(), oneshot=True)
# self.rm.visualize(saveto="graph.png") # self.rm.visualize(saveto="graph.png")
def plan_to_goal(self, start, goal): def plan_to_goal(self, start, goal):
...@@ -105,12 +108,14 @@ class PlannerROS: ...@@ -105,12 +108,14 @@ class PlannerROS:
# Call the Lazy A* planner # Call the Lazy A* planner
try: try:
rospy.loginfo("Planning...") rospy.loginfo("Planning...")
start_edges_evaluated = self.rm.edges_evaluated
start_time = time.time() start_time = time.time()
path, _ = search.astar(self.rm, start_id, goal_id) path, _ = search.astar(self.rm, start_id, goal_id)
end_time = time.time() end_time = time.time()
edges_evaluated = self.rm.edges_evaluated - start_edges_evaluated
rospy.loginfo("Path length: {}".format(self.rm.compute_path_length(path))) rospy.loginfo("Path length: {}".format(self.rm.compute_path_length(path)))
rospy.loginfo("Planning time: {}".format(end_time - start_time)) rospy.loginfo("Planning time: {}".format(end_time - start_time))
rospy.loginfo("Edges evaluated: {}".format(self.rm.edges_evaluated)) rospy.loginfo("Edges evaluated: {}".format(edges_evaluated))
# self.rm.visualize(vpath=path, saveto="planned_path.png") # self.rm.visualize(vpath=path, saveto="planned_path.png")
except nx.NetworkXNoPath: except nx.NetworkXNoPath:
rospy.loginfo("Failed to find a plan") rospy.loginfo("Failed to find a plan")
...@@ -128,15 +133,8 @@ class PlannerROS: ...@@ -128,15 +133,8 @@ class PlannerROS:
rospy.loginfo("Shortcut time: {}".format(end_time - start_time)) rospy.loginfo("Shortcut time: {}".format(end_time - start_time))
# self.rm.visualize(vpath=path, saveto="shortcut_path.png") # self.rm.visualize(vpath=path, saveto="shortcut_path.png")
states = [] # Return interpolated path
for i in range(1, len(path)): return self.rm.compute_qpath(path)
u, v = path[i - 1 : i + 1]
q1 = self.rm.vertices[u, :]
q2 = self.rm.vertices[v, :]
edge, _ = self.rm.problem.steer(q1, q2)
states.append(edge)
states = np.vstack(states)
return utils.map_to_world(states, self.map_info)
def get_goal(self, msg): def get_goal(self, msg):
...@@ -144,13 +142,8 @@ class PlannerROS: ...@@ -144,13 +142,8 @@ class PlannerROS:
if self.rm is None: if self.rm is None:
return False return False
self.goal = np.array([utils.pose_to_particle(msg.pose)]) self.goal = np.array(utils.pose_to_particle(msg.pose))
utils.world_to_map(self.goal, self.map_info) start = self._get_car_pose()
self.goal = self.goal.squeeze()
start = self._get_car_pose()[np.newaxis, :]
utils.world_to_map(start, self.map_info)
start = start.squeeze()
path_states = self.plan_to_goal(start, self.goal) path_states = self.plan_to_goal(start, self.goal)
if path_states is None: if path_states is None:
...@@ -198,36 +191,56 @@ class PlannerROS: ...@@ -198,36 +191,56 @@ class PlannerROS:
def visualize(self): def visualize(self):
"""Visualize the nodes and edges of the roadmap.""" """Visualize the nodes and edges of the roadmap."""
vertices = self.rm.vertices.copy() vertices = self.rm.vertices.copy()
utils.map_to_world(vertices, self.map_info)
poses = list(map(utils.particle_to_pose, vertices)) poses = list(map(utils.particle_to_pose, vertices))
msg = PoseArray(header=Header(frame_id="map"), poses=poses) msg = PoseArray(header=Header(frame_id="map"), poses=poses)
self.nodes_viz.publish(msg) self.nodes_viz.publish(msg)
return
all_edges = [] # There are lots of edges, so we'll shuffle and incrementally visualize
edges = list(self.rm.graph.edges()) # them. This is more work overall, but gives more immediate feedback
for u, v in edges: # about the graph.
if not self.rm.check_edge_validity(u, v): all_edges = np.empty((0, 2), dtype=int)
continue edges = np.array(self.rm.graph.edges(), dtype=int)
q1 = self.rm.vertices[u, :] np.random.shuffle(edges)
q2 = self.rm.vertices[v, :] split_indices = np.array(
edge, _ = self.problem.steer(q1, q2) [500, 1000, 2000, 5000]
utils.map_to_world(edge, self.map_info) + [10000, 20000, 50000]
with_repeats = np.repeat(edge[:, :2], 2, 0).reshape(-1, 2)[1:-1] + list(range(100000, edges.shape[0], 100000)),
all_edges.append(with_repeats) dtype=int,
all_edges = np.vstack(all_edges)
points = list(map(lambda x: Point(x=x[0], y=x[1]), all_edges))
msg = Marker(
header=Header(frame_id="map"), type=Marker.LINE_LIST, points=points
) )
msg.scale.x = 0.01 for batch in np.split(edges, split_indices, axis=0):
msg.pose.orientation.w = 1.0 batch_edges = []
msg.color.a = 0.1 for u, v in batch:
self.edges_viz.publish(msg) q1 = self.rm.vertices[u, :]
q2 = self.rm.vertices[v, :]
# Check edge validity on the problem rather than roadmap to
# circumvent edge collision-checking count
if not self.rm.problem.check_edge_validity(q1, q2):
continue
edge, _ = self.problem.steer(
q1, q2, resolution=0.25, interpolate_line=False
)
with_repeats = np.repeat(edge[:, :2], 2, 0).reshape(-1, 2)[1:-1]
batch_edges.append(with_repeats)
if not batch_edges:
continue
batch_edges = np.vstack(batch_edges)
all_edges = np.vstack((all_edges, batch_edges))
points = list(map(lambda x: Point(x=x[0], y=x[1], z=-1), all_edges))
msg = Marker(
header=Header(frame_id="map"), type=Marker.LINE_LIST, points=points
)
msg.scale.x = 0.01
msg.pose.orientation.w = 1.0
msg.color.a = 0.1
self.edges_viz.publish(msg)
def mkdir_p(path): def mkdir_p(path):
"""Equivalent to mkdir -p path.""" """Equivalent to mkdir -p path.
The exist_ok flag for os.makedirs was introduced in Python 3.2.
This function provides Python 2 support.
"""
try: try:
os.makedirs(path) os.makedirs(path)
except OSError as exc: # Python ≥ 2.5 except OSError as exc: # Python ≥ 2.5
...@@ -247,13 +260,12 @@ def graph_location( ...@@ -247,13 +260,12 @@ def graph_location(
map_name=None, map_name=None,
): ):
"""Return the name of this graph in the cache.""" """Return the name of this graph in the cache."""
param_string = "_{}_{}_{}_{}".format(
problem_name, sampler_name, num_vertices, connection_radius
)
if problem_name == "se2":
param_string += "_{}".format(curvature)
cache_dir = os.path.expanduser("~/.ros/graph_cache/") cache_dir = os.path.expanduser("~/.ros/graph_cache/")
mkdir_p(cache_dir) mkdir_p(cache_dir)
if map_name is None: if map_name is None:
map_name = os.path.basename(rospy.get_param("/map_file")).split(".")[0] map_name, _ = os.path.splitext(os.path.basename(rospy.get_param("/map_file")))
return os.path.join(cache_dir, map_name + param_string + ".pkl") params = [map_name, problem_name, sampler_name, num_vertices, connection_radius]
if problem_name == "se2":
params.append(curvature)
name = "_".join(str(p) for p in params)
return os.path.join(cache_dir, name + ".pkl")
import numpy as np import numpy as np
from cse478 import utils
from planning import dubins from planning import dubins
class PlanarProblem(object): class PlanarProblem(object):
def __init__(self, permissible_region, check_resolution=0.1): def __init__(self, permissible_region, map_info=None, check_resolution=0.1):
"""Construct a planar planning problem. """Construct a planar planning problem.
Args: Args:
permissible_region: Boolean np.array with shape map height x map width, permissible_region: Boolean np.array with shape map height x map width,
where one indicates that the location is permissible where one indicates that the location is permissible
map_info: map information, returned by get_map
check_resolution: collision-checking resolution check_resolution: collision-checking resolution
""" """
self.permissible_region = permissible_region self.permissible_region = permissible_region
self.map_info = map_info
self.check_resolution = check_resolution self.check_resolution = check_resolution
self.height, self.width = permissible_region.shape height, width = permissible_region.shape
self.extents = np.zeros((2, 2)) self.extents = np.zeros((3, 2))
self.extents[0, 1] = self.width self.extents[0, 1] = width
self.extents[1, 1] = self.height self.extents[1, 1] = height
if map_info is not None:
map_angle = utils.quaternion_to_angle(map_info.origin.orientation)
assert map_angle == 0
utils.map_to_world(self.extents.T, map_info)
self.extents = self.extents[:2, :]
def check_state_validity(self, states): def check_state_validity(self, states):
"""Return whether states are valid. """Return whether states are valid.
...@@ -40,18 +49,28 @@ class PlanarProblem(object): ...@@ -40,18 +49,28 @@ class PlanarProblem(object):
"*** REPLACE THIS LINE ***" "*** REPLACE THIS LINE ***"
# END QUESTION 1.2 # END QUESTION 1.2
# The units of the state are meters and radians. We need to convert the
# meters to pixels, in order to index into the permissible region. This
# function converts them in place.
if self.map_info is not None:
utils.world_to_map(states, self.map_info)
# For states within the extents of the map, collision check by reading # For states within the extents of the map, collision check by reading
# the corresponding entry of self.permissible_region. For simplicity, # the corresponding entry of self.permissible_region. For simplicity,
# we'll assume the robot is a point robot: just index directly with the # we'll assume the robot is a point robot: just index directly with the
# robot state x and y indices into self.permissible_region. # robot state x and y pixel indices into self.permissible_region.
# #
# Hint: use the `astype` method to cast the x and y positions into # Hint: use the `astype` method to cast the x and y pixel positions into
# integers. Then, index into self.permissible_region, remembering that # integers. Then, index into self.permissible_region, remembering that
# the zeroth dimension is the height. # the zeroth dimension is the height.
# BEGIN QUESTION 1.2 # BEGIN QUESTION 1.2
"*** REPLACE THIS LINE ***" "*** REPLACE THIS LINE ***"
# END QUESTION 1.2 # END QUESTION 1.2
# Convert the units back from pixels to meters for the caller
if self.map_info is not None:
utils.map_to_world(states, self.map_info)
return valid return valid
def check_edge_validity(self, q1, q2): def check_edge_validity(self, q1, q2):
...@@ -91,7 +110,7 @@ class PlanarProblem(object): ...@@ -91,7 +110,7 @@ class PlanarProblem(object):
heuristic_cost[i] = length heuristic_cost[i] = length
return heuristic_cost return heuristic_cost
def steer(self, q1, q2): def steer(self, q1, q2, **kwargs):
"""Return a local path connecting two states. """Return a local path connecting two states.
Intermediate states are used for edge collision-checking. Intermediate states are used for edge collision-checking.
...@@ -118,31 +137,38 @@ class R2Problem(PlanarProblem): ...@@ -118,31 +137,38 @@ class R2Problem(PlanarProblem):
""" """
return np.linalg.norm(np.atleast_2d(q2) - np.atleast_2d(q1), axis=1) return np.linalg.norm(np.atleast_2d(q2) - np.atleast_2d(q1), axis=1)
def steer(self, q1, q2): def steer(self, q1, q2, resolution=None, interpolate_line=True):
"""Return a straight-line path connecting two R^2 states. """Return a straight-line path connecting two R^2 states.
Args: Args:
q1, q2: np.arrays with shape 2 q1, q2: np.arrays with shape 2
resolution: the space between waypoints in the resulting path
interpolate_line: whether to provide fine waypoint discretization
for line segments
Returns: Returns:
path: sequence of states between q1 and q2 path: sequence of states between q1 and q2
length: length of local path length: length of local path
""" """
if resolution is None:
resolution = self.check_resolution
q1 = q1.reshape((1, -1)) q1 = q1.reshape((1, -1))
q2 = q2.reshape((1, -1)) q2 = q2.reshape((1, -1))
dist = np.linalg.norm(q2 - q1) dist = np.linalg.norm(q2 - q1)
if dist < self.check_resolution: if not interpolate_line or dist < resolution:
return np.vstack((q1, q2)), dist return np.vstack((q1, q2)), dist
q1_toward_q2 = (q2 - q1) / dist q1_toward_q2 = (q2 - q1) / dist
steps = np.hstack( steps = np.hstack((np.arange(0, dist, resolution), np.array([dist]))).reshape(
(np.arange(0, dist, self.check_resolution), np.array([dist])) (-1, 1)
).reshape((-1, 1)) )
return q1 + q1_toward_q2 * steps, dist return q1 + q1_toward_q2 * steps, dist
class SE2Problem(PlanarProblem): class SE2Problem(PlanarProblem):
def __init__(self, permissible_region, check_resolution=0.01, curvature=3): def __init__(
super(SE2Problem, self).__init__(permissible_region, check_resolution) self, permissible_region, map_info=None, check_resolution=0.01, curvature=1.0
):
super(SE2Problem, self).__init__(permissible_region, map_info, check_resolution)
self.curvature = curvature self.curvature = curvature
self.extents = np.vstack((self.extents, np.array([[-np.pi, np.pi]]))) self.extents = np.vstack((self.extents, np.array([[-np.pi, np.pi]])))
...@@ -150,34 +176,37 @@ class SE2Problem(PlanarProblem): ...@@ -150,34 +176,37 @@ class SE2Problem(PlanarProblem):
"""Compute the length of the Dubins path between two SE(2) states. """Compute the length of the Dubins path between two SE(2) states.
Args: Args:
q1, q2: np.arrays with shape 3 q1, q2: np.arrays with shape (N, 3)
Returns: Returns:
heuristic: cost estimate between two states heuristic: cost estimate between two states
""" """
start, end = np.atleast_2d(q1), np.atleast_2d(q2) start, end = np.atleast_2d(q1), np.atleast_2d(q2)
start_ind = np.arange(start.shape[0]) # This function will handle broadcasting start and end,
end_ind = np.arange(end.shape[0]) # if they're compatible
broad = np.broadcast(start_ind, end_ind) heuristic_cost = dubins.path_length(start, end, self.curvature)
num_pairs = broad.size
heuristic_cost = np.empty((num_pairs))
for i, (start_i, end_i) in enumerate(zip(*broad.iters)):
heuristic_cost[i] = dubins.path_length(
start[start_i], end[end_i], self.curvature
)
return heuristic_cost return heuristic_cost
def steer(self, q1, q2): def steer(self, q1, q2, resolution=None, interpolate_line=True):
"""Return a Dubins path connecting two SE(2) states. """Return a Dubins path connecting two SE(2) states.
Args: Args:
q1, q2: np.arrays with shape 3 q1, q2: np.arrays with shape 3
resolution: the space between waypoints in the resulting path
interpolate_line: whether to provide fine waypoint discretization
for line segments
Returns: Returns:
path: sequence of states on Dubins path between q1 and q2 path: sequence of states on Dubins path between q1 and q2
length: length of local path length: length of local path
""" """
if resolution is None:
resolution = self.check_resolution
path, length = dubins.path_planning( path, length = dubins.path_planning(
q1, q2, self.curvature, resolution=self.check_resolution q1,
q2,
self.curvature,
resolution=resolution,
interpolate_line=interpolate_line,
) )
return path, length return path, length
from __future__ import division from __future__ import division
import os import os
import matplotlib.collections
import networkx as nx import networkx as nx
import numpy as np import numpy as np
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
...@@ -60,7 +62,7 @@ class Roadmap(object): ...@@ -60,7 +62,7 @@ class Roadmap(object):
return self.problem.compute_heuristic( return self.problem.compute_heuristic(
self.vertices[n1, :], self.vertices[n1, :],
self.vertices[n2, :], self.vertices[n2, :],
) ).item()
def check_edge_validity(self, n1, n2): def check_edge_validity(self, n1, n2):
"""Collision check the edge between two nodes in the roadmap. """Collision check the edge between two nodes in the roadmap.
...@@ -89,6 +91,9 @@ class Roadmap(object): ...@@ -89,6 +91,9 @@ class Roadmap(object):
weighted_edges: a subset of the original weighted_edges, where only weighted_edges: a subset of the original weighted_edges, where only
rows that correspond to collision-free edges are preserved rows that correspond to collision-free edges are preserved
""" """
if weighted_edges.shape[0] == 0:
return weighted_edges
# uv contains only the node labels for each edge (source, target). # uv contains only the node labels for each edge (source, target).
uv = weighted_edges[:, :2].astype(int) uv = weighted_edges[:, :2].astype(int)
...@@ -191,7 +196,8 @@ class Roadmap(object): ...@@ -191,7 +196,8 @@ class Roadmap(object):
# This adaptively changes the sample batch size based on how hard it # This adaptively changes the sample batch size based on how hard it
# is to get collision-free vertices. # is to get collision-free vertices.
est_validity = valid_samples / total_samples est_validity = valid_samples / total_samples
batch_size = int((self.num_vertices - valid_samples) / est_validity) + 1 if valid_samples > 0:
batch_size = int((self.num_vertices - valid_samples) / est_validity) + 1
configs = np.vstack(configs) configs = np.vstack(configs)
assert configs.shape[0] >= self.num_vertices assert configs.shape[0] >= self.num_vertices
return configs[: self.num_vertices, :] return configs[: self.num_vertices, :]
...@@ -233,7 +239,8 @@ class Roadmap(object): ...@@ -233,7 +239,8 @@ class Roadmap(object):
Returns: Returns:
new_index: the integer label for the added state new_index: the integer label for the added state
""" """
assert len(state.shape) == 1 state = state.astype(float)
assert state.ndim == 1
valid = self.problem.check_state_validity(state.reshape((1, -1))).item() valid = self.problem.check_state_validity(state.reshape((1, -1))).item()
if not valid: if not valid:
raise ValueError("state {} is invalid".format(state)) raise ValueError("state {} is invalid".format(state))
...@@ -246,7 +253,6 @@ class Roadmap(object): ...@@ -246,7 +253,6 @@ class Roadmap(object):
if is_start: if is_start:
self.start = index self.start = index
h = self.problem.compute_heuristic(state, self.vertices) h = self.problem.compute_heuristic(state, self.vertices)
else: else:
self.goal = index self.goal = index
h = self.problem.compute_heuristic(self.vertices, state) h = self.problem.compute_heuristic(self.vertices, state)
...@@ -279,11 +285,8 @@ class Roadmap(object): ...@@ -279,11 +285,8 @@ class Roadmap(object):
Returns: Returns:
length: path length length: path length
""" """
total = 0 q = self.vertices[vpath, :]
for i in range(1, len(vpath)): return self.problem.compute_heuristic(q[:-1, :], q[1:, :]).sum()
u, v = vpath[i - 1 : i + 1]
total += self.heuristic(u, v)
return total
def compute_qpath(self, vpath): def compute_qpath(self, vpath):
"""Compute a sequence of states from a sequence of vertices. """Compute a sequence of states from a sequence of vertices.
...@@ -323,11 +326,18 @@ class Roadmap(object): ...@@ -323,11 +326,18 @@ class Roadmap(object):
) )
if show_edges: if show_edges:
edges = []
for u, v in self.graph.edges(): for u, v in self.graph.edges():
q1 = self.vertices[u, :] q1 = self.vertices[u, :]
q2 = self.vertices[v, :] q2 = self.vertices[v, :]
edge, _ = self.problem.steer(q1, q2) edge, _ = self.problem.steer(
plt.plot(edge[:, 0], edge[:, 1], c="#dddddd", zorder=1) q1, q2, resolution=0.1, interpolate_line=False
)
edges.append(edge[:, :2])
edges = matplotlib.collections.LineCollection(
edges, colors="#dddddd", zorder=1
)
plt.gca().add_collection(edges)
if vpath is not None: if vpath is not None:
qpath = self.compute_qpath(vpath) qpath = self.compute_qpath(vpath)
...@@ -348,8 +358,8 @@ class Roadmap(object): ...@@ -348,8 +358,8 @@ class Roadmap(object):
c="r", c="r",
zorder=3, zorder=3,
) )
plt.xlim((0, self.problem.extents[0, 1])) plt.xlim(self.problem.extents[0, :])
plt.ylim((0, self.problem.extents[1, 1])) plt.ylim(self.problem.extents[1, :])
if saveto is not None: if saveto is not None:
plt.savefig(saveto, bbox_inches="tight") plt.savefig(saveto, bbox_inches="tight")
......
...@@ -25,6 +25,12 @@ class TestRoadmap(unittest.TestCase): ...@@ -25,6 +25,12 @@ class TestRoadmap(unittest.TestCase):
counter[v] += 1 counter[v] += 1
return counter return counter
def test_r2problem_roadmap_disconnected(self):
num_vertices = 9
connection_radius = 0.1
lazy = False
Roadmap(self.problem, self.sampler, num_vertices, connection_radius, lazy)
def test_r2problem_roadmap_edge_collisions(self): def test_r2problem_roadmap_edge_collisions(self):
# This lattice roadmap has vertices for x and y = (1.67, 5.0, 8.33). # This lattice roadmap has vertices for x and y = (1.67, 5.0, 8.33).
# Setting the connection radius to 15 creates a fully-connected graph. # Setting the connection radius to 15 creates a fully-connected graph.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment