Skip to content
Snippets Groups Projects
Commit 1025d23a authored by Brian Hou's avatar Brian Hou
Browse files

Release Project 4: Planning.

parent 70829590
No related branches found
No related tags found
No related merge requests found
Pipeline #424155 failed with stage
in 14 minutes and 38 seconds
Showing
with 2138 additions and 7 deletions
......@@ -28,6 +28,14 @@ catkin_package(
std_msgs
)
install(
DIRECTORY
config
launch
DESTINATION
${CATKIN_PACKAGE_SHARE_DESTINATION}
)
catkin_install_python(
PROGRAMS
scripts/control_node
......
......@@ -318,6 +318,13 @@ def time_parameterize_ramp_up_ramp_down(path_xyt, speed):
Returns:
path_xytv: np.array of states and speed with shape L x 4
"""
# For paths with fewer waypoints than necessary to ramp up and ramp down,
# just set the desired speed directly (with a non-zero final speed, to
# guarantee forward movement).
if path_xyt.shape[0] < 4:
speeds = speed * np.ones(path_xyt.shape[0])
return np.hstack([path_xyt, speeds[:, np.newaxis]])
ramp_distance = 0.5
displacement_vectors = np.diff(path_xyt[:, [0, 1]], axis=0)
displacements = np.linalg.norm(displacement_vectors, axis=1)
......
......@@ -223,11 +223,6 @@ class ModelPredictiveController(BaseController):
self.bbox_map = np.zeros((self.K * self.T, 2, 4))
self.collisions = np.zeros(self.K * self.T, dtype=bool)
self.obstacle_map = ~self.permissible_region
self.map_x = self.map_info.origin.position.x
self.map_y = self.map_info.origin.position.y
self.map_angle = utils.quaternion_to_angle(self.map_info.origin.orientation)
self.map_c = np.cos(self.map_angle)
self.map_s = np.sin(self.map_angle)
car_l = self.car_length
car_w = self.car_width
self.car_bbox = (
......
......@@ -2,9 +2,10 @@
"""ROS geometry and map utility functions."""
from __future__ import division
import heapq
import numpy as np
import rospy
from geometry_msgs.msg import (
Pose,
PoseStamped,
......@@ -150,7 +151,7 @@ def rotation_matrix(theta):
The equivalent 2x2 rotation matrix as a np.array
"""
c, s = np.cos(theta), np.sin(theta)
return np.matrix([[c, -s], [s, c]])
return np.array([[c, -s], [s, c]])
def particle_to_pose(particle):
......@@ -342,3 +343,27 @@ def estimation_error(estimates, references):
np.cos(reference_angles - estimate_angles),
)
return position_errors, np.abs(angular_errors)
class PriorityQueue(object):
def __init__(self):
self.min_heap = []
self.pop_counter = 0
def __len__(self):
return len(self.min_heap)
def push(self, elem):
heapq.heappush(self.min_heap, elem)
def peek(self):
if not self.min_heap:
raise IndexError("no elements to peek")
return self.min_heap[0][1]
def pop(self):
if not self.min_heap:
raise IndexError("no elements to pop")
elem = heapq.heappop(self.min_heap)
self.pop_counter += 1
return elem
cmake_minimum_required(VERSION 3.0.2)
project(planning)
find_package(catkin REQUIRED COMPONENTS
rospy
)
catkin_python_setup()
catkin_package()
install(
DIRECTORY
launch
DESTINATION
${CATKIN_PACKAGE_SHARE_DESTINATION}
)
catkin_install_python(
PROGRAMS
scripts/roadmap
scripts/run_search
scripts/planner
DESTINATION
${CATKIN_PACKAGE_BIN_DESTINATION}
)
if(CATKIN_ENABLE_TESTING)
find_package(roslaunch REQUIRED)
# find_package(rostest REQUIRED)
roslaunch_add_file_check(launch)
# catkin_add_nosetests tests Python code directly
catkin_add_nosetests(test/samplers.py)
catkin_add_nosetests(test/roadmap.py)
catkin_add_nosetests(test/astar.py)
catkin_add_nosetests(test/shortcut.py)
# add_rostest tests the ROS integrations
endif()
Panels:
- Class: rviz/Displays
Help Height: 78
Name: Displays
Property Tree Widget:
Expanded:
- /Map1/Orientation1
Splitter Ratio: 0.5
Tree Height: 648
- Class: rviz/Selection
Name: Selection
- Class: rviz/Tool Properties
Expanded:
- /2D Pose Estimate1
- /2D Nav Goal1
- /Publish Point1
Name: Tool Properties
Splitter Ratio: 0.5886790156364441
- Class: rviz/Views
Expanded:
- /Current View1
Name: Views
Splitter Ratio: 0.5
- Class: rviz/Time
Experimental: false
Name: Time
SyncMode: 0
SyncSource: LaserScan
Preferences:
PromptSaveOnExit: true
Toolbars:
toolButtonStyle: 2
Visualization Manager:
Class: ""
Displays:
- Alpha: 0.5
Cell Size: 1
Class: rviz/Grid
Color: 0; 0; 0
Enabled: true
Line Style:
Line Width: 0.029999999329447746
Value: Lines
Name: Grid
Normal Cell Count: 0
Offset:
X: 0
Y: 0
Z: 0
Plane: XY
Plane Cell Count: 400
Reference Frame: <Fixed Frame>
Value: true
- Alpha: 0.30000001192092896
Class: rviz/Map
Color Scheme: map
Draw Behind: true
Enabled: true
Name: Map
Topic: /map
Unreliable: false
Use Timestamp: false
Value: true
- Alpha: 1
Class: rviz/RobotModel
Collision Enabled: false
Enabled: true
Links:
All Links Enabled: true
Expand Joint Details: false
Expand Link Details: false
Expand Tree: false
Link Tree Style: Links in Alphabetic Order
back_left/wheel_link:
Alpha: 1
Show Axes: false
Show Trail: false
Value: true
back_right/wheel_link:
Alpha: 1
Show Axes: false
Show Trail: false
Value: true
base_footprint:
Alpha: 1
Show Axes: false
Show Trail: false
base_link:
Alpha: 1
Show Axes: false
Show Trail: false
Value: true
camera_bottom_screw_frame:
Alpha: 1
Show Axes: false
Show Trail: false
camera_link:
Alpha: 1
Show Axes: false
Show Trail: false
Value: true
front_left/wheel_link:
Alpha: 1
Show Axes: false
Show Trail: false
Value: true
front_left/wheel_steer_link:
Alpha: 1
Show Axes: false
Show Trail: false
front_right/wheel_link:
Alpha: 1
Show Axes: false
Show Trail: false
Value: true
front_right/wheel_steer_link:
Alpha: 1
Show Axes: false
Show Trail: false
insets_link:
Alpha: 1
Show Axes: false
Show Trail: false
laser_link:
Alpha: 1
Show Axes: false
Show Trail: false
Value: true
platform_link:
Alpha: 1
Show Axes: false
Show Trail: false
Name: RobotModel
Robot Description: car/robot_description
TF Prefix: ""
Update Interval: 0
Value: true
Visual Enabled: true
- Alpha: 1
Autocompute Intensity Bounds: true
Autocompute Value Bounds:
Max Value: 10
Min Value: -10
Value: true
Axis: Z
Channel Name: intensity
Class: rviz/LaserScan
Color: 255; 22; 144
Color Transformer: FlatColor
Decay Time: 0
Enabled: false
Invert Rainbow: false
Max Color: 255; 22; 144
Min Color: 255; 22; 144
Name: LaserScan
Position Transformer: XYZ
Queue Size: 10
Selectable: true
Size (Pixels): 3
Size (m): 0.029999999329447746
Style: Points
Topic: /car/scan
Unreliable: false
Use Fixed Frame: true
Use rainbow: true
Value: false
- Alpha: 1
Arrow Length: 0.30000001192092896
Axes Length: 0.30000001192092896
Axes Radius: 0.009999999776482582
Class: rviz/PoseArray
Color: 255; 25; 0
Enabled: true
Head Length: 0.07000000029802322
Head Radius: 0.029999999329447746
Name: Real Path
Queue Size: 10
Shaft Length: 0.23000000417232513
Shaft Radius: 0.009999999776482582
Shape: Arrow (Flat)
Topic: /car/controller/real_path/poses
Unreliable: false
Value: true
- Alpha: 1
Arrow Length: 0.30000001192092896
Axes Length: 0.30000001192092896
Axes Radius: 0.009999999776482582
Class: rviz/PoseArray
Color: 138; 226; 52
Enabled: true
Head Length: 0.07000000029802322
Head Radius: 0.029999999329447746
Name: Reference Path
Queue Size: 10
Shaft Length: 0.23000000417232513
Shaft Radius: 0.009999999776482582
Shape: Arrow (Flat)
Topic: /car/controller/path/poses
Unreliable: false
Value: true
- Alpha: 1
Axes Length: 1
Axes Radius: 0.10000000149011612
Class: rviz/Pose
Color: 0; 0; 255
Enabled: true
Head Length: 0.20000000298023224
Head Radius: 0.10000000149011612
Name: Reference State
Queue Size: 10
Shaft Length: 0.20000000298023224
Shaft Radius: 0.02500000037252903
Shape: Arrow
Topic: /car/controller/path/reference_state
Unreliable: false
Value: true
- Class: rviz/MarkerArray
Enabled: true
Marker Topic: /car/controller/rollouts
Name: Rollouts
Namespaces:
{}
Queue Size: 100
Value: true
- Alpha: 1
Axes Length: 1
Axes Radius: 0.10000000149011612
Class: rviz/Pose
Color: 255; 25; 0
Enabled: false
Head Length: 0.30000001192092896
Head Radius: 0.10000000149011612
Name: Pose
Queue Size: 10
Shaft Length: 1
Shaft Radius: 0.05000000074505806
Shape: Arrow
Topic: /move_base_simple/goal
Unreliable: false
Value: false
- Alpha: 1
Arrow Length: 0.30000001192092896
Axes Length: 0.30000001192092896
Axes Radius: 0.009999999776482582
Class: rviz/PoseArray
Color: 0; 0; 0
Enabled: true
Head Length: 0.07000000029802322
Head Radius: 0.10000000149011612
Name: Planning Vertices
Queue Size: 10
Shaft Length: 0.10000000149011612
Shaft Radius: 0.019999999552965164
Shape: Arrow (3D)
Topic: /car/planner/vertices
Unreliable: false
Value: true
- Class: rviz/Marker
Enabled: true
Marker Topic: /car/planner/edges
Name: Planning Edges
Namespaces:
"": true
Queue Size: 100
Value: true
Enabled: true
Global Options:
Background Color: 255; 255; 255
Default Light: true
Fixed Frame: map
Frame Rate: 30
Name: root
Tools:
- Class: rviz/Interact
Hide Inactive Objects: true
- Class: rviz/MoveCamera
- Class: rviz/Select
- Class: rviz/FocusCamera
- Class: rviz/Measure
- Class: rviz/SetInitialPose
Theta std deviation: 0.2617993950843811
Topic: /initialpose
X std deviation: 0.5
Y std deviation: 0.5
- Class: rviz/SetGoal
Topic: /move_base_simple/goal
- Class: rviz/PublishPoint
Single click: true
Topic: /clicked_point
Value: true
Views:
Current:
Angle: 0
Class: rviz/TopDownOrtho
Enable Stereo Rendering:
Stereo Eye Separation: 0.05999999865889549
Stereo Focal Distance: 1
Swap Stereo Eyes: false
Value: false
Invert Z Axis: false
Name: Current View
Near Clip Distance: 0.009999999776482582
Scale: 20
Target Frame: <Fixed Frame>
X: 20
Y: 40
Saved:
- Angle: 0
Class: rviz/TopDownOrtho
Enable Stereo Rendering:
Stereo Eye Separation: 0.05999999865889549
Stereo Focal Distance: 1
Swap Stereo Eyes: false
Value: false
Invert Z Axis: false
Name: TopDownOrtho
Near Clip Distance: 0.009999999776482582
Scale: 206.59109497070312
Target Frame: base_link
X: 0
Y: 0
- Angle: 0
Class: rviz/TopDownOrtho
Enable Stereo Rendering:
Stereo Eye Separation: 0.05999999865889549
Stereo Focal Distance: 1
Swap Stereo Eyes: false
Value: false
Invert Z Axis: false
Name: TopDownOrtho
Near Clip Distance: 0.009999999776482582
Scale: 20
Target Frame: <Fixed Frame>
X: 20
Y: 40
Window Geometry:
Displays:
collapsed: false
Height: 945
Hide Left Dock: false
Hide Right Dock: false
QMainWindow State: 000000ff00000000fd00000004000000000000018a000005d1fc0200000008fb0000001200530065006c0065006300740069006f006e00000001e10000009b0000005c00fffffffb0000001e0054006f006f006c002000500072006f007000650072007400690065007302000001ed000001df00000185000000a3fb000000120056006900650077007300200054006f006f02000001df000002110000018500000122fb000000200054006f006f006c002000500072006f0070006500720074006900650073003203000002880000011d000002210000017afb000000100044006900730070006c006100790073010000003d000005d1000000c900fffffffb0000002000730065006c0065006300740069006f006e00200062007500660066006500720200000138000000aa0000023a00000294fb00000014005700690064006500530074006500720065006f02000000e6000000d2000003ee0000030bfb0000000c004b0069006e0065006300740200000186000001060000030c00000261000000010000010f000005d1fc0200000003fb0000001e0054006f006f006c002000500072006f00700065007200740069006500730100000041000000780000000000000000fb0000000a00560069006500770073010000003d000005d1000000a400fffffffb0000001200530065006c0065006300740069006f006e010000025a000000b200000000000000000000000200000490000000a9fc0100000001fb0000000a00560069006500770073030000004e00000080000002e10000019700000003000005580000003efc0100000002fb0000000800540069006d0065010000000000000558000002eb00fffffffb0000000800540069006d00650100000000000004500000000000000000000002b3000005d100000004000000040000000800000008fc0000000100000002000000010000000a0054006f006f006c00730100000000ffffffff0000000000000000
Selection:
collapsed: false
Time:
collapsed: false
Tool Properties:
collapsed: false
Views:
collapsed: false
Width: 1720
X: 72
Y: 27
<launch>
<arg name="car_name" default="car"/>
<arg name="map" default="$(find cse478)/maps/maze_0.yaml"/>
<arg name="initial_x" default="0"/>
<arg name="initial_y" default="0"/>
<arg name="initial_theta" default="0"/>
<arg name="fake_localization" default="false" />
<arg name="teleop" default="false"/>
<arg name="sampler" default="halton"/>
<arg name="problem" default="se2"/>
<arg name="num_vertices" default="200"/>
<arg name="connection_radius" default="100.0"/>
<arg name="curvature" default="0.05"/>
<include file="$(find cse478)/launch/teleop.launch">
<arg name="fake_localization" value="$(arg fake_localization)" />
<arg name="map" value="$(arg map)"/>
<arg name="initial_x" value="$(arg initial_x)"/>
<arg name="initial_y" value="$(arg initial_y)"/>
<arg name="initial_theta" value="$(arg initial_theta)"/>
<arg name="teleop" value="$(arg teleop)" />
</include>
<group ns="$(arg car_name)">
<node pkg="planning" type="roadmap" name="roadmap"
args="--use-ros-map --lazy --sampler $(arg sampler) --problem $(arg problem) --num-vertices $(arg num_vertices) --connection-radius $(arg connection_radius) --curvature $(arg curvature)" output="screen"/>
</group>
</launch>
<launch>
<arg name="car_name" default="car"/>
<arg name="map" default="$(find cse478)/maps/maze_0.yaml"/>
<arg name="initial_x" default="0"/>
<arg name="initial_y" default="0"/>
<arg name="initial_theta" default="0"/>
<arg name="fake_localization" default="false" />
<arg name="controller" default="mpc"/>
<arg name="teleop" default="true"/>
<arg name="rviz" default="true" />
<arg name="num_vertices" default="200"/>
<arg name="connection_radius" default="100.0"/>
<arg name="curvature" default="0.05"/>
<include file="$(find cse478)/launch/teleop.launch">
<arg name="fake_localization" value="$(arg fake_localization)" />
<arg name="map" value="$(arg map)"/>
<arg name="initial_x" value="$(arg initial_x)"/>
<arg name="initial_y" value="$(arg initial_y)"/>
<arg name="initial_theta" value="$(arg initial_theta)"/>
<arg name="teleop" value="$(arg teleop)" />
</include>
<group ns="$(arg car_name)">
<include file="$(find localization)/launch/include/particle_filter.xml">
<arg name="publish_tf" value="$(eval not fake_localization)" />
<arg name="initial_x" value="$(arg initial_x)"/>
<arg name="initial_y" value="$(arg initial_y)"/>
<arg name="initial_theta" value="$(arg initial_theta)"/>
</include>
<node pkg="planning" type="planner" name="planner" output="screen">
<param name="num_vertices" value="$(arg num_vertices)"/>
<param name="connection_radius" value="$(arg connection_radius)"/>
<param name="curvature" value="$(arg curvature)"/>
</node>
</group>
<include file="$(find control)/launch/controller.launch">
<arg name="type" default="$(arg controller)"/>
</include>
<node if="$(arg rviz)" pkg="rviz" type="rviz" name="rviz" args="-d $(find planning)/config/planning.rviz"/>
</launch>
<?xml version="1.0"?>
<package format="3">
<name>planning</name>
<version>1.0.0</version>
<description>CSE 478 MuSHR Planning Project</description>
<author>Brian Hou</author>
<author>Gilwoo Lee</author>
<author>Mohit Shridhar</author>
<author>Nick Walker</author>
<maintainer email="mushr@cs.washington.edu">UW CSE Robotics Teaching Staff</maintainer>
<url>https://courses.cs.washington.edu/courses/cse478/</url>
<url>https://mushr.io/</url>
<license>Educational Use Only</license>
<test_depend>roslaunch</test_depend>
<buildtool_depend>catkin</buildtool_depend>
<build_depend>rospy</build_depend>
<build_depend>std_msgs</build_depend>
<build_export_depend>rospy</build_export_depend>
<build_export_depend>std_msgs</build_export_depend>
<exec_depend>geometry_msgs</exec_depend>
<exec_depend>cse478</exec_depend>
<exec_depend>localization</exec_depend>
<exec_depend>control</exec_depend>
<exec_depend condition="$ROS_PYTHON_VERSION == 2">ipython</exec_depend>
<exec_depend condition="$ROS_PYTHON_VERSION == 3">ipython3</exec_depend>
<exec_depend condition="$ROS_PYTHON_VERSION == 2">python-matplotlib</exec_depend>
<exec_depend condition="$ROS_PYTHON_VERSION == 3">python3-matplotlib</exec_depend>
<exec_depend condition="$ROS_PYTHON_VERSION == 2">python-networkx</exec_depend>
<exec_depend condition="$ROS_PYTHON_VERSION == 3">python3-networkx</exec_depend>
<exec_depend condition="$ROS_PYTHON_VERSION == 2">python-numpy</exec_depend>
<exec_depend condition="$ROS_PYTHON_VERSION == 3">python3-numpy</exec_depend>
<exec_depend>rospy</exec_depend>
<exec_depend>rviz</exec_depend>
<exec_depend>std_msgs</exec_depend>
<exec_depend>tf</exec_depend>
<exec_depend>visualization_msgs</exec_depend>
<export>
</export>
</package>
#!/usr/bin/env python
import rospy
from planning.planner_ros import PlannerROS
if __name__ == "__main__":
rospy.init_node("planner")
PlannerROS(
num_vertices=rospy.get_param("~num_vertices"),
connection_radius=float(rospy.get_param("~connection_radius")),
curvature=float(rospy.get_param("~curvature")),
tf_prefix=rospy.get_param("~tf_prefix", ""),
)
rospy.spin()
#!/usr/bin/env python
"""Script for generating and visualizing roadmaps."""
import argparse
import matplotlib.pyplot as plt
import numpy as np
import os
import rospy
from cse478 import utils
from planning.planner_ros import graph_location
from planning.problems import R2Problem, SE2Problem
from planning.roadmap import Roadmap
from planning.samplers import samplers
def main(args):
if args.use_ros_map:
permissible_region, map_info = utils.get_map("/static_map")
map_name = None
elif args.text_map:
permissible_region = np.loadtxt(args.text_map, dtype=bool)
_, fname = os.path.split(args.text_map)
map_name, _ = os.path.splitext(fname)
else:
permissible_region = np.ones((10, 10), dtype=bool)
map_name = "empty"
saveto = graph_location(
args.problem,
args.sampler,
args.num_vertices,
args.connection_radius,
args.curvature,
map_name,
)
if args.problem == "r2":
problem = R2Problem(permissible_region)
else:
problem = SE2Problem(permissible_region, curvature=args.curvature)
sampler = samplers[args.sampler](problem.extents)
rm = Roadmap(
problem,
sampler,
args.num_vertices,
connection_radius=args.connection_radius,
lazy=args.lazy,
saveto=saveto,
)
rm.visualize(show_edges=args.show_edges)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="generate and visualize roadmaps")
parser.add_argument("-n", "--num-vertices", type=int, default=100)
parser.add_argument("-r", "--connection-radius", type=float, default=2.0)
parser.add_argument("-c", "--curvature", type=float, default=0.05)
parser.add_argument("--sampler", choices=samplers, default="halton")
parser.add_argument("--lazy", action="store_true")
parser.add_argument("--show-edges", action="store_true")
parser.add_argument("--problem", choices=("r2", "se2"), default="r2")
map_group = parser.add_mutually_exclusive_group()
map_group.add_argument("--use-ros-map", action="store_true")
map_group.add_argument("-m", "--text-map")
args = parser.parse_args(rospy.myargv()[1:])
main(args)
#!/usr/bin/env python
"""Script for running A* on R2 and SE2 problems without ROS."""
from __future__ import print_function
import argparse
import networkx as nx
import numpy as np
import time
from planning import search
from planning.problems import R2Problem, SE2Problem
from planning.roadmap import Roadmap
from planning.samplers import samplers
def main(args):
assert len(args.start) == len(args.goal)
start = np.array(args.start)
goal = np.array(args.goal)
if args.problem == "se2":
start[2] = np.radians(start[2])
goal[2] = np.radians(goal[2])
permissible_region = np.loadtxt(args.text_map, dtype=bool)
if args.problem == "r2":
problem = R2Problem(permissible_region)
if args.problem == "se2":
problem = SE2Problem(permissible_region, curvature=args.curvature)
sampler = samplers[args.sampler](problem.extents)
start_time = time.time()
rm = Roadmap(problem, sampler, args.num_vertices, args.connection_radius, args.lazy)
start_id = rm.add_node(start, is_start=True)
goal_id = rm.add_node(goal, is_start=False)
try:
print("Running A*")
vpath, _ = search.astar(rm, start_id, goal_id)
end_time = time.time()
print("Path length:", rm.compute_path_length(vpath))
print("Planning time:", end_time - start_time)
print("Edges evaluated:", rm.edges_evaluated)
except nx.NetworkXNoPath as e:
print(e)
return
rm.visualize(show_edges=args.show_edges, vpath=vpath)
if args.shortcut:
print("Shortcutting A* path")
start_time = time.time()
vpath = search.shortcut(rm, vpath)
end_time = time.time()
print("Shortcut length:", rm.compute_path_length(vpath))
print("Shortcut time:", end_time - start_time)
rm.visualize(show_edges=args.show_edges, vpath=vpath)
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="test (lazy) A* on R2 and SE2 problems"
)
parser.add_argument("-m", "--text-map", type=str, help="the environment to plan on")
parser.add_argument("-n", "--num-vertices", type=int, required=True)
parser.add_argument("-r", "--connection-radius", type=float, default=2.0)
parser.add_argument("--sampler", type=str, default="halton", choices=samplers)
parser.add_argument("--lazy", action="store_true")
parser.add_argument("--shortcut", action="store_true")
parser.add_argument("--show-edges", action="store_true")
subparsers = parser.add_subparsers(dest="problem")
r2_parser = subparsers.add_parser("r2", help="R2 options")
r2_parser.add_argument("-s", "--start", nargs=2, type=int, required=True)
r2_parser.add_argument("-g", "--goal", nargs=2, type=int, required=True)
se2_parser = subparsers.add_parser("se2", help="SE2 options")
se2_parser.add_argument("-s", "--start", nargs=3, type=int, required=True)
se2_parser.add_argument("-g", "--goal", nargs=3, type=int, required=True)
se2_parser.add_argument("-c", "--curvature", type=float, default=0.05)
args = parser.parse_args()
main(args)
# ! DO NOT MANUALLY INVOKE THIS setup.py, USE CATKIN INSTEAD
from setuptools import setup
from catkin_pkg.python_setup import generate_distutils_setup
# fetch values from package.xml
setup_args = generate_distutils_setup(packages=["planning"], package_dir={"": "src"})
setup(**setup_args)
"""Dubins path code (https://en.wikipedia.org/wiki/Dubins_path), originally
authored by Atsushi Sakai (@Atsushi_twi).
Modified by Aditya Vamsikrishna (adityavk), Gilwoo Lee (gilwoo), and Nick Walker
(nswalker) for CSE 478.
"""
import math
import matplotlib.pyplot as plt
import numpy as np
from cse478 import utils
def path_planning(start, end, curvature, resolution=0.1):
"""Return the Dubins path between two states.
Args:
start: list of (sx, sy, stheta)
sx: x position of start state [m]
sy: y position of start state [m]
stheta: yaw angle of start state [rad]
end: list of (ex, ey, etheta)
ex: x position of end state [m]
ey: y position of end state [m]
etheta: yaw angle of end state [rad]
curvature: Dubins path curvature [1/m]
resolution: interpolation resolution
Returns:
path: interpolated Dubins path between start and end
length: length of Dubins path
"""
# Transform to make start of path the origin
start = np.array(start, dtype=float)
end = np.array(end, dtype=float)
end[:2] -= start[:2]
start_orientation = start[2]
rotation = utils.rotation_matrix(start_orientation)
end[:2] = np.matmul(end[:2], rotation)
end[2] -= start[2]
path, mode, length = path_planning_from_origin(
end, curvature, resolution=resolution
)
# Transform back into original reference frame
inv_rot = utils.rotation_matrix(-start_orientation)
path[:, :2] = np.matmul(path[:, :2], inv_rot) + start[:2]
path[:, 2] += start[2]
pi_2_pi(path[:, 2])
# Turn this back on if you see unnecessary rotations
# path, length = process_dubins(start, path, length)
# FIXME(nickswalker5-17-21): The dubins stuff is littered with this
# "scale by curvature." It can probably be refactored out
real_path_length = length * 1 / curvature
return path, real_path_length
def path_length(s, e, c):
"""Return the length of the Dubins path between two states.
Args:
s: see documentation for start above
e: see documentation for end above
c: see documentation for curvature above
Returns:
length: length of Dubins path
"""
# Transform to make start of path the origin
start = np.array(s, dtype=float)
end = np.array(e, dtype=float)
end[:2] -= start[:2]
start_orientation = start[2]
rotation = utils.rotation_matrix(start_orientation)
end[:2] = np.matmul(end[:2], rotation)
end[2] -= start[2]
lengths, mode, cost = get_best_plan_from_origin(end, c)
return cost * (1 / c)
##########################
# Implementation Details #
##########################
def mod2pi(theta):
return theta - 2.0 * math.pi * math.floor(theta / 2.0 / math.pi)
def pi_2_pi(angles):
angles[:] = (angles[:] - np.pi) % (2 * np.pi) + np.pi
def lsl(alpha, beta, d):
sa = math.sin(alpha)
sb = math.sin(beta)
ca = math.cos(alpha)
cb = math.cos(beta)
c_ab = math.cos(alpha - beta)
tmp0 = d + sa - sb
mode = "LSL"
p_squared = 2 + (d * d) - (2 * c_ab) + (2 * d * (sa - sb))
if p_squared < 0:
return None, None, None, mode
tmp1 = math.atan2((cb - ca), tmp0)
t = mod2pi(-alpha + tmp1)
p = math.sqrt(p_squared)
q = mod2pi(beta - tmp1)
return t, p, q, mode
def rsr(alpha, beta, d):
sa = math.sin(alpha)
sb = math.sin(beta)
ca = math.cos(alpha)
cb = math.cos(beta)
c_ab = math.cos(alpha - beta)
tmp0 = d - sa + sb
mode = "RSR"
p_squared = 2 + (d * d) - (2 * c_ab) + (2 * d * (sb - sa))
if p_squared < 0:
return None, None, None, mode
tmp1 = math.atan2((ca - cb), tmp0)
t = mod2pi(alpha - tmp1)
p = math.sqrt(p_squared)
q = mod2pi(-beta + tmp1)
return t, p, q, mode
def lsr(alpha, beta, d):
sa = math.sin(alpha)
sb = math.sin(beta)
ca = math.cos(alpha)
cb = math.cos(beta)
c_ab = math.cos(alpha - beta)
p_squared = -2 + (d * d) + (2 * c_ab) + (2 * d * (sa + sb))
mode = "LSR"
if p_squared < 0:
return None, None, None, mode
p = math.sqrt(p_squared)
tmp2 = math.atan2((-ca - cb), (d + sa + sb)) - math.atan2(-2.0, p)
t = mod2pi(-alpha + tmp2)
q = mod2pi(-mod2pi(beta) + tmp2)
return t, p, q, mode
def rsl(alpha, beta, d):
sa = math.sin(alpha)
sb = math.sin(beta)
ca = math.cos(alpha)
cb = math.cos(beta)
c_ab = math.cos(alpha - beta)
p_squared = (d * d) - 2 + (2 * c_ab) - (2 * d * (sa + sb))
mode = "RSL"
if p_squared < 0:
return None, None, None, mode
p = math.sqrt(p_squared)
tmp2 = math.atan2((ca + cb), (d - sa - sb)) - math.atan2(2.0, p)
t = mod2pi(alpha - tmp2)
q = mod2pi(beta - tmp2)
return t, p, q, mode
def rlr(alpha, beta, d):
sa = math.sin(alpha)
sb = math.sin(beta)
ca = math.cos(alpha)
cb = math.cos(beta)
c_ab = math.cos(alpha - beta)
mode = "RLR"
tmp_rlr = (6.0 - d * d + 2.0 * c_ab + 2.0 * d * (sa - sb)) / 8.0
if abs(tmp_rlr) > 1.0:
return None, None, None, mode
p = mod2pi(2 * math.pi - math.acos(tmp_rlr))
t = mod2pi(alpha - math.atan2(ca - cb, d - sa + sb) + mod2pi(p / 2.0))
q = mod2pi(alpha - beta - t + mod2pi(p))
return t, p, q, mode
def lrl(alpha, beta, d):
sa = math.sin(alpha)
sb = math.sin(beta)
ca = math.cos(alpha)
cb = math.cos(beta)
c_ab = math.cos(alpha - beta)
mode = "LRL"
tmp_lrl = (6.0 - d * d + 2.0 * c_ab + 2.0 * d * (-sa + sb)) / 8.0
if abs(tmp_lrl) > 1:
return None, None, None, mode
p = mod2pi(2 * math.pi - math.acos(tmp_lrl))
t = mod2pi(-alpha - math.atan2(ca - cb, d + sa - sb) + p / 2.0)
q = mod2pi(mod2pi(beta) - alpha - t + mod2pi(p))
return t, p, q, mode
def get_best_plan_from_origin(goal, curvature):
# nomalize
d = math.sqrt(goal[0] ** 2.0 + goal[1] ** 2.0)
d = d * curvature
# print(dx, dy, D, d)
theta = mod2pi(math.atan2(goal[1], goal[0]))
alpha = mod2pi(-theta)
beta = mod2pi(goal[2] - theta)
# print(theta, alpha, beta, d)
planners = [lsl, rsr, lsr, rsl, rlr, lrl]
bcost = float("inf")
bt, bp, bq, bmode = None, None, None, None
for planner in planners:
t, p, q, mode = planner(alpha, beta, d)
if t is None:
continue
cost = abs(t) + abs(p) + abs(q)
if bcost > cost:
bt, bp, bq, bmode = t, p, q, mode
bcost = cost
return [bt, bp, bq], bmode, bcost
def path_planning_from_origin(goal, curvature, resolution=0.1):
if np.all(np.array(goal) == 0.0):
return np.zeros((1, 3)), [], 0.0
lengths, modes, cost = get_best_plan_from_origin(goal, curvature)
path = generate_course(lengths, modes, curvature, step_size=resolution)
return path, modes, cost
def line(length=10.0, waypoint_sep=0.1):
# Desired waypoint_sep may not divide the length evenly.
# In that case, we'll round up (add at most one extra waypoint),
# but still hit the length exactly.
# Need at least a start and end (2 points)
num_waypoints = max(math.ceil(length / waypoint_sep), 2)
line_coords = np.mgrid[0 : length : complex(num_waypoints)]
return np.vstack([line_coords, np.zeros((2, len(line_coords)))]).T
def turn(radius, angle, waypoint_sep=0.1):
turn_center = [0, radius]
arclength = angle * radius
# Need at least a start and end (2 points)
num_turn_points = max(math.ceil(arclength / waypoint_sep), 2)
thetas = np.mgrid[0 : angle : complex(num_turn_points)]
thetas -= np.pi / 2
turn_poses = np.empty((len(thetas), 3))
turn_poses[:, 0] = radius * np.cos(thetas) + turn_center[0]
turn_poses[:, 1] = radius * np.sin(thetas) + turn_center[1]
turn_poses[:, 2] = thetas + (np.pi / 2)
return turn_poses
def generate_course(lengths, mode, curvature, step_size=0.1):
segments = []
for m, length in zip(mode, lengths):
# 0 length segments are just a noop
if length == 0.0:
continue
if m == "S":
segments.append(line(length / curvature, step_size))
elif m == "L":
segments.append(turn(1 / curvature, length, step_size))
elif m == "R":
segments.append(turn(1 / curvature, length, step_size) * [1, -1, -1])
else:
raise RuntimeError("Unexpected mode type '{}'".format(m))
# Each of the segments starts at the origin. String
# them together by transforming each to start
# at the previous' endpoint
for i in range(1, len(segments)):
seg = segments[i]
start = segments[i - 1][-1]
rot = utils.rotation_matrix(-start[2])
seg[:, :2] = np.matmul(seg[:, :2], rot) + start[:2]
seg[:, 2] += start[2]
return np.vstack(segments)
def process_dubins(start, path, cost):
"""Ensure no 2pi rotations in the output due to numerical issues."""
eps = 1e-6
close_to_start = np.sum(abs(path - start) < eps, axis=1) == 3
ind = np.where(close_to_start[1 : len(path) - 1])[0]
if len(ind) > 0:
return path[ind[0] + 1 :, :], cost - math.radians(360.0)
return path, cost
from __future__ import print_function
import errno
import networkx as nx
import numpy as np
import os
import rospy
import tf2_ros
import time
from geometry_msgs.msg import PoseStamped, PoseArray, Point
from visualization_msgs.msg import Marker
from nav_msgs.msg import Path
from std_msgs.msg import Header
from cse478 import utils
from control.srv import FollowPath
from planning import search
from planning.problems import SE2Problem
from planning.roadmap import Roadmap
from planning.samplers import samplers
class PlannerROS:
def __init__(
self,
num_vertices,
connection_radius,
curvature,
do_shortcut=True,
sampler_type="halton",
tf_prefix="",
tf_listener=None,
):
"""Motion planning ROS wrapper.
Args:
num_vertices: number of vertices in the graph
connection_radius: radius for connecting vertices
do_shortcut: whether to shortcut the planned path
sampler_type: name of the sampler to construct
"""
self.tf_prefix = tf_prefix
if tf_listener:
self.tl = tf_listener
else:
self.tl = tf2_ros.TransformListener(tf2_ros.Buffer())
self.num_vertices = num_vertices
self.connection_radius = connection_radius
self.do_shortcut = do_shortcut
self.permissible_region, self.map_info = utils.get_map("/static_map")
self.problem = SE2Problem(
self.permissible_region, check_resolution=0.1, curvature=curvature
)
self.rm = None
self.goal_sub = rospy.Subscriber(
"/move_base_simple/goal", PoseStamped, self.get_goal
)
self.nodes_viz = rospy.Publisher(
"~vertices", PoseArray, queue_size=1, latch=True
)
self.edges_viz = rospy.Publisher("~edges", Marker, queue_size=1, latch=True)
self.controller = rospy.ServiceProxy("controller/follow_path", FollowPath)
# Load or construct a roadmap
saveto = graph_location(
"se2", sampler_type, num_vertices, connection_radius, curvature
)
sampler = samplers[sampler_type](self.problem.extents)
rospy.loginfo("Constructing roadmap...")
rospy.loginfo(saveto)
start_stamp = time.time()
self.rm = Roadmap(
self.problem,
sampler,
num_vertices,
connection_radius,
lazy=True,
saveto=saveto,
)
load_time = time.time() - start_stamp
rospy.loginfo("Roadmap constructed in {:2.2f}s".format(load_time))
self.visualize()
# self.rm.visualize(saveto="graph.png")
def plan_to_goal(self, start, goal):
"""Return a planned path from start to goal."""
# Add current pose and goal to the planning env
rospy.loginfo("Adding start and goal node")
try:
start_id = self.rm.add_node(start, is_start=True)
goal_id = self.rm.add_node(goal, is_start=False)
except ValueError:
rospy.loginfo("Either start or goal was in collision")
return None
# self.rm.visualize(saveto="graph.png")
# Call the Lazy A* planner
try:
rospy.loginfo("Planning...")
start_time = time.time()
path, _ = search.astar(self.rm, start_id, goal_id)
end_time = time.time()
rospy.loginfo("Path length: {}".format(self.rm.compute_path_length(path)))
rospy.loginfo("Planning time: {}".format(end_time - start_time))
rospy.loginfo("Edges evaluated: {}".format(self.rm.edges_evaluated))
# self.rm.visualize(vpath=path, saveto="planned_path.png")
except nx.NetworkXNoPath:
rospy.loginfo("Failed to find a plan")
return None
# Shortcut if necessary
if self.do_shortcut:
rospy.loginfo("Shortcutting path...")
start_time = time.time()
path = search.shortcut(self.rm, path)
end_time = time.time()
rospy.loginfo(
"Shortcut length: {}".format(self.rm.compute_path_length(path))
)
rospy.loginfo("Shortcut time: {}".format(end_time - start_time))
# self.rm.visualize(vpath=path, saveto="shortcut_path.png")
states = []
for i in range(1, len(path)):
u, v = path[i - 1 : i + 1]
q1 = self.rm.vertices[u, :]
q2 = self.rm.vertices[v, :]
edge, _ = self.rm.problem.steer(q1, q2)
states.append(edge)
states = np.vstack(states)
return utils.map_to_world(states, self.map_info)
def get_goal(self, msg):
"""Goal callback function."""
if self.rm is None:
return False
self.goal = np.array([utils.pose_to_particle(msg.pose)])
utils.world_to_map(self.goal, self.map_info)
self.goal = self.goal.squeeze()
start = self._get_car_pose()[np.newaxis, :]
utils.world_to_map(start, self.map_info)
start = start.squeeze()
path_states = self.plan_to_goal(start, self.goal)
if path_states is None:
return False
return self.send_path(path_states)
def _get_car_pose(self):
"""Return the current vehicle state."""
try:
transform = self.tl.buffer.lookup_transform(
"map", self.tf_prefix + "base_footprint", rospy.Time(0)
)
# Drop stamp header
transform = transform.transform
return np.array(
[
transform.translation.x,
transform.translation.y,
utils.quaternion_to_angle(transform.rotation),
]
)
except (
tf2_ros.LookupException,
tf2_ros.ConnectivityException,
tf2_ros.ExtrapolationException,
) as e:
rospy.logwarn_throttle(5, e)
return None
def send_path(self, path_states):
"""Send a planned path to the controller."""
h = Header()
h.stamp = rospy.Time.now()
h.frame_id = "map"
desired_speed = 1.0
path = Path()
path.header = h
path.poses = [
PoseStamped(h, utils.particle_to_pose(state)) for state in path_states
]
return self.controller(path, desired_speed)
def visualize(self):
"""Visualize the nodes and edges of the roadmap."""
vertices = self.rm.vertices.copy()
utils.map_to_world(vertices, self.map_info)
poses = list(map(utils.particle_to_pose, vertices))
msg = PoseArray(header=Header(frame_id="map"), poses=poses)
self.nodes_viz.publish(msg)
return
all_edges = []
edges = list(self.rm.graph.edges())
for u, v in edges:
if not self.rm.check_edge_validity(u, v):
continue
q1 = self.rm.vertices[u, :]
q2 = self.rm.vertices[v, :]
edge, _ = self.problem.steer(q1, q2)
utils.map_to_world(edge, self.map_info)
with_repeats = np.repeat(edge[:, :2], 2, 0).reshape(-1, 2)[1:-1]
all_edges.append(with_repeats)
all_edges = np.vstack(all_edges)
points = list(map(lambda x: Point(x=x[0], y=x[1]), all_edges))
msg = Marker(
header=Header(frame_id="map"), type=Marker.LINE_LIST, points=points
)
msg.scale.x = 0.01
msg.pose.orientation.w = 1.0
msg.color.a = 0.1
self.edges_viz.publish(msg)
def mkdir_p(path):
"""Equivalent to mkdir -p path."""
try:
os.makedirs(path)
except OSError as exc: # Python ≥ 2.5
if exc.errno == errno.EEXIST and os.path.isdir(path):
pass
# possibly handle other errno cases here, otherwise finally:
else:
raise
def graph_location(
problem_name,
sampler_name,
num_vertices,
connection_radius,
curvature=None,
map_name=None,
):
"""Return the name of this graph in the cache."""
param_string = "_{}_{}_{}_{}".format(
problem_name, sampler_name, num_vertices, connection_radius
)
if problem_name == "se2":
param_string += "_{}".format(curvature)
cache_dir = os.path.expanduser("~/.ros/graph_cache/")
mkdir_p(cache_dir)
if map_name is None:
map_name = os.path.basename(rospy.get_param("/map_file")).split(".")[0]
return os.path.join(cache_dir, map_name + param_string + ".pkl")
import numpy as np
from planning import dubins
class PlanarProblem(object):
def __init__(self, permissible_region, check_resolution=0.1):
"""Construct a planar planning problem.
Args:
permissible_region: Boolean np.array with shape map height x map width,
where one indicates that the location is permissible
check_resolution: collision-checking resolution
"""
self.permissible_region = permissible_region
self.check_resolution = check_resolution
self.height, self.width = permissible_region.shape
self.extents = np.zeros((2, 2))
self.extents[0, 1] = self.width
self.extents[1, 1] = self.height
def check_state_validity(self, states):
"""Return whether states are valid.
Valid states are within the extents of the map and collision-free.
Args:
states: np.array with shape N x D (where D may be 2 or 3)
Returns:
valid: np.array of Booleans with shape N
"""
x = states[:, 0]
y = states[:, 1]
valid = np.ones_like(x, dtype=bool) # feel free to delete this line
# Check that x and y are within the extents of the map.
# BEGIN QUESTION 1.2
"*** REPLACE THIS LINE ***"
# END QUESTION 1.2
# For states within the extents of the map, collision check by reading
# the corresponding entry of self.permissible_region. For simplicity,
# we'll assume the robot is a point robot: just index directly with the
# robot state x and y indices into self.permissible_region.
#
# Hint: use the `astype` method to cast the x and y positions into
# integers. Then, index into self.permissible_region, remembering that
# the zeroth dimension is the height.
# BEGIN QUESTION 1.2
"*** REPLACE THIS LINE ***"
# END QUESTION 1.2
return valid
def check_edge_validity(self, q1, q2):
"""Return whether an edge is valid.
Args:
q1, q2: np.arrays with shape D (where D may be 2 or 3)
Returns:
valid: True or False
"""
path, length = self.steer(q1, q2)
if length == 0:
return False
return self.check_state_validity(path).all()
def compute_heuristic(self, q1, q2):
"""Compute an admissible heuristic between two states.
Args:
q1, q2: np.arrays with shape (N, D) (where D may be 2 or 3)
Returns:
heuristic: np.array with shape N of cost estimates between pairs of states
"""
# Subclasses can override this with more efficient implementations.
start, end = np.atleast_2d(q1), np.atleast_2d(q2)
start_ind = np.arange(start.shape[0])
end_ind = np.arange(end.shape[0])
# We'll use broadcasting semantics to match up
# potentially differently shaped inputs
broad = np.broadcast(start_ind, end_ind)
num_pairs = broad.size
heuristic_cost = np.empty((num_pairs))
for i, (start_i, end_i) in enumerate(zip(*broad.iters)):
_, length = self.steer(start[start_i], end[end_i])
heuristic_cost[i] = length
return heuristic_cost
def steer(self, q1, q2):
"""Return a local path connecting two states.
Intermediate states are used for edge collision-checking.
Args:
q1, q2: np.arrays with shape D (where D may be 2 or 3)
Returns:
path: sequence of states between q1 and q2
length: length of local path
"""
raise NotImplementedError
class R2Problem(PlanarProblem):
def compute_heuristic(self, q1, q2):
"""Compute the Euclidean distance between two states.
Args:
q1, q2: np.arrays with shape (N, 2)
Returns:
heuristic: cost estimate between two states
"""
return np.linalg.norm(np.atleast_2d(q2) - np.atleast_2d(q1), axis=1)
def steer(self, q1, q2):
"""Return a straight-line path connecting two R^2 states.
Args:
q1, q2: np.arrays with shape 2
Returns:
path: sequence of states between q1 and q2
length: length of local path
"""
q1 = q1.reshape((1, -1))
q2 = q2.reshape((1, -1))
dist = np.linalg.norm(q2 - q1)
if dist < self.check_resolution:
return np.vstack((q1, q2)), dist
q1_toward_q2 = (q2 - q1) / dist
steps = np.hstack(
(np.arange(0, dist, self.check_resolution), np.array([dist]))
).reshape((-1, 1))
return q1 + q1_toward_q2 * steps, dist
class SE2Problem(PlanarProblem):
def __init__(self, permissible_region, check_resolution=0.01, curvature=3):
super(SE2Problem, self).__init__(permissible_region, check_resolution)
self.curvature = curvature
self.extents = np.vstack((self.extents, np.array([[-np.pi, np.pi]])))
def compute_heuristic(self, q1, q2):
"""Compute the length of the Dubins path between two SE(2) states.
Args:
q1, q2: np.arrays with shape 3
Returns:
heuristic: cost estimate between two states
"""
start, end = np.atleast_2d(q1), np.atleast_2d(q2)
start_ind = np.arange(start.shape[0])
end_ind = np.arange(end.shape[0])
broad = np.broadcast(start_ind, end_ind)
num_pairs = broad.size
heuristic_cost = np.empty((num_pairs))
for i, (start_i, end_i) in enumerate(zip(*broad.iters)):
heuristic_cost[i] = dubins.path_length(
start[start_i], end[end_i], self.curvature
)
return heuristic_cost
def steer(self, q1, q2):
"""Return a Dubins path connecting two SE(2) states.
Args:
q1, q2: np.arrays with shape 3
Returns:
path: sequence of states on Dubins path between q1 and q2
length: length of local path
"""
path, length = dubins.path_planning(
q1, q2, self.curvature, resolution=self.check_resolution
)
return path, length
from __future__ import division
import os
import networkx as nx
import numpy as np
import matplotlib.pyplot as plt
import pickle
from planning import problems
from planning import samplers
class Roadmap(object):
def __init__(
self, problem, sampler, num_vertices, connection_radius, lazy=False, saveto=None
):
"""Construct a motion planning roadmap.
Nodes in the roadmap are labeled with unique integers. These labels
are indices into the graph's vertices.
Args:
problem: a problem description (either an R2Problem or SE2Problem)
sampler: a sampler (either a HaltonSampler, LatticeSampler, RandomSampler)
num_vertices: desired number of vertices in the roadmap
connection_radius: connection radius between vertices
lazy: whether the roadmap edges should be lazily collision-checked
saveto: path to cached roadmap data
"""
self.problem = problem
self.sampler = sampler
self.num_vertices = num_vertices
self.connection_radius = connection_radius
self.lazy = lazy
self.saveto = saveto
# R2 graph is undirected, SE3 graph is directed
self.directed = isinstance(self.problem, problems.SE2Problem)
self.start = None
self.goal = None
self.edges_evaluated = 0
self.graph, self.vertices, self.weighted_edges = self.construct()
# Print some graph summary statistics
print("Lazy:", self.lazy)
print("Vertices:", self.num_vertices)
print("Edges:", self.weighted_edges.shape[0])
def heuristic(self, n1, n2):
"""Compute the heuristic between two nodes in the roadmap.
Args:
n1, n2 (int): node labels
Returns:
heuristic: cost estimate between two nodes
"""
return self.problem.compute_heuristic(
self.vertices[n1, :],
self.vertices[n2, :],
)
def check_edge_validity(self, n1, n2):
"""Collision check the edge between two nodes in the roadmap.
Args:
n1, n2 (int): node labels
Returns:
valid: whether the edge is collision-free
"""
self.edges_evaluated += 1
return self.problem.check_edge_validity(
self.vertices[n1, :],
self.vertices[n2, :],
)
def check_weighted_edges_validity(self, weighted_edges):
"""Collision check the edges in weighted_edges.
Args:
weighted_edges: np.array of edges and edge lengths with shape
num_edges x 3, where each row is (u, v, length) and u, v are
node labels
Returns:
weighted_edges: a subset of the original weighted_edges, where only
rows that correspond to collision-free edges are preserved
"""
# uv contains only the node labels for each edge (source, target).
uv = weighted_edges[:, :2].astype(int)
# If edges aren't lazily evaluated, collision-check all the edges and
# only keep collision-free edges in weighted_edges.
#
# Your implementation should call the check_edge_validity method above.
# BEGIN QUESTION 1.3
"*** REPLACE THIS LINE ***"
raise NotImplementedError
# END QUESTION 1.3
return weighted_edges
def construct(self):
"""Construct the roadmap.
Initialize the graph, vertices, and weighted_edges fields.
Returns:
graph: a NetworkX graph
vertices: np.array of states with shape num_vertices x D,
indexed by node labels
weighted_edges: np.array of edges and edge lengths with shape
num_edges x 3, where each row is (u, v, length) and u, v are
node labels
"""
# Load a cached roadmap, if one exists at that path.
if self.saveto is not None and os.path.exists(self.saveto):
try:
with open(self.saveto, "rb") as f:
data = pickle.load(f)
self.graph = data["graph"]
self.vertices = data["vertices"]
self.weighted_edges = data["weighted_edges"]
print("Loaded roadmap from", self.saveto)
return self.graph, self.vertices, self.weighted_edges
except pickle.PickleError:
pass
# Compute the set of vertices and edges.
self.vertices = self.sample_vertices()
self.weighted_edges = self.connect_vertices(self.vertices)
if not self.lazy:
self.weighted_edges = self.check_weighted_edges_validity(
self.weighted_edges
)
# Insert the vertices and edges into a NetworkX graph object
if self.directed:
self.graph = nx.DiGraph()
else:
self.graph = nx.Graph()
vbunch = [
(i, dict(config=config))
for i, config in zip(np.arange(self.num_vertices, dtype=int), self.vertices)
]
ebunch = [(int(u), int(v), float(w)) for u, v, w in self.weighted_edges]
self.graph.add_nodes_from(vbunch)
self.graph.add_weighted_edges_from(ebunch, "weight")
# Cache this roadmap, if path is specified.
if self.saveto is not None:
with open(self.saveto, "wb") as f:
data = {
"graph": self.graph,
"vertices": self.vertices,
"weighted_edges": self.weighted_edges,
}
pickle.dump(data, f)
print("Saved roadmap to", self.saveto)
return self.graph, self.vertices, self.weighted_edges
def sample_vertices(self):
"""Sample self.num_vertices vertices from self.sampler.
Returns:
vertices: np.array of states with shape num_vertices x D
"""
# Lattice sampler only samples a fixed set of samples. Some of them may
# be in collision, so they're unusable; pretend we only asked for that
# many vertices.
if isinstance(self.sampler, samplers.LatticeSampler):
samples = self.sampler.sample(self.num_vertices)
valid = self.problem.check_state_validity(samples)
self.num_vertices = valid.sum()
return samples[valid, :]
# For other samplers, sample until the target number of collision-free
# vertices is reached.
configs = []
batch_size = self.num_vertices
valid_samples = 0
total_samples = 0
while valid_samples < self.num_vertices:
samples = self.sampler.sample(batch_size)
valid = self.problem.check_state_validity(samples)
configs.append(samples[valid, :])
valid_samples += valid.sum()
total_samples += batch_size
# This adaptively changes the sample batch size based on how hard it
# is to get collision-free vertices.
est_validity = valid_samples / total_samples
batch_size = int((self.num_vertices - valid_samples) / est_validity) + 1
configs = np.vstack(configs)
assert configs.shape[0] >= self.num_vertices
return configs[: self.num_vertices, :]
def connect_vertices(self, vertices):
"""Connect vertices within self.connection_radius.
Returns:
weighted_edges: np.array of edges and edge lengths with shape
num_edges x 3, where each row is (u, v, length) and u, v are
node labels (vertex indices)
"""
h = np.zeros((self.num_vertices, self.num_vertices))
for i in range(self.num_vertices):
u = vertices[i, :]
# For undirected graphs, only compute heuristic to all later vertices.
# For directed graphs, compute heuristic to previous vertices as well.
h[i, i + 1 :] = self.problem.compute_heuristic(u, vertices[i + 1 :, :])
if self.directed:
h[i, :i] = self.problem.compute_heuristic(u, vertices[:i, :])
# Edges where the heuristic distance is greater than the connection
# radius are dropped. Find the indices to all nonzero heuristics; i is
# the sources and j is the targets.
h[h > self.connection_radius] = 0
i, j = h.nonzero()
i = i.reshape((-1, 1))
j = j.reshape((-1, 1))
return np.hstack([i, j, h[i, j]])
def add_node(self, state, is_start):
"""Add a node for the state, which is either a start or goal.
Args:
state: new state to add to the roadmap
is_start: whether this new state is the start
Returns:
new_index: the integer label for the added state
"""
assert len(state.shape) == 1
valid = self.problem.check_state_validity(state.reshape((1, -1))).item()
if not valid:
raise ValueError("state {} is invalid".format(state))
index = self.graph.number_of_nodes()
# This logic is very similar to connect_vertices. Either compute the
# heuristic from the start to all other vertices, or from all other
# vertices to the goal.
if is_start:
self.start = index
h = self.problem.compute_heuristic(state, self.vertices)
else:
self.goal = index
h = self.problem.compute_heuristic(self.vertices, state)
h[h > self.connection_radius] = 0
(i,) = h.nonzero()
i = i.reshape((-1, 1))
index_arr = index * np.ones_like(i)
if is_start:
weighted_edges = np.hstack([index_arr, i, h[i]])
else:
weighted_edges = np.hstack([i, index_arr, h[i]])
# Update self.graph, self.vertices, and self.weighted_edges.
self.graph.add_node(index, config=state)
self.vertices = np.vstack([self.vertices, state.reshape((1, -1))])
self.num_vertices += 1
if not self.lazy:
weighted_edges = self.check_weighted_edges_validity(weighted_edges)
ebunch = [(int(u), int(v), float(w)) for u, v, w in weighted_edges]
self.graph.add_weighted_edges_from(ebunch)
self.weighted_edges = np.vstack([self.weighted_edges, weighted_edges])
return index
def compute_path_length(self, vpath):
"""Compute the path length of a sequence of vertices.
Args:
vpath: sequence of vertex labels
Returns:
length: path length
"""
total = 0
for i in range(1, len(vpath)):
u, v = vpath[i - 1 : i + 1]
total += self.heuristic(u, v)
return total
def compute_qpath(self, vpath):
"""Compute a sequence of states from a sequence of vertices.
Args:
vpath: sequence of vertex labels
Returns:
qpath: sequence of configuration states
"""
edges = []
for i in range(1, len(vpath)):
u, v = vpath[i - 1 : i + 1]
q1 = self.vertices[u, :]
q2 = self.vertices[v, :]
edge, _ = self.problem.steer(q1, q2)
edges.append(edge)
return np.vstack(edges)
def visualize(self, show_edges=False, vpath=None, saveto=None):
"""Visualize the roadmap.
Args:
show_edges: whether the roadmap's edges should be shown
vpath: sequence of vertex labels (or None)
saveto: path to save roadmap plot (or None)
"""
plt.imshow(
self.problem.permissible_region,
cmap=plt.cm.gray,
aspect="equal",
interpolation="none",
vmin=0,
vmax=1,
origin="lower",
extent=self.problem.extents.ravel()[:4],
)
if show_edges:
for u, v in self.graph.edges():
q1 = self.vertices[u, :]
q2 = self.vertices[v, :]
edge, _ = self.problem.steer(q1, q2)
plt.plot(edge[:, 0], edge[:, 1], c="#dddddd", zorder=1)
if vpath is not None:
qpath = self.compute_qpath(vpath)
plt.plot(qpath[:, 0], qpath[:, 1], c="#0000ff", zorder=1)
plt.scatter(self.vertices[:, 0], self.vertices[:, 1], c="k", zorder=2)
if self.start is not None:
plt.scatter(
self.vertices[self.start, 0],
self.vertices[self.start, 1],
c="g",
zorder=3,
)
if self.goal is not None:
plt.scatter(
self.vertices[self.goal, 0],
self.vertices[self.goal, 1],
c="r",
zorder=3,
)
plt.xlim((0, self.problem.extents[0, 1]))
plt.ylim((0, self.problem.extents[1, 1]))
if saveto is not None:
plt.savefig(saveto, bbox_inches="tight")
print("Saved graph image to", self.saveto)
plt.show()
from __future__ import division
import numpy as np
try:
# Python 2 compatibility
from itertools import izip as zip
except ImportError:
pass
class Sampler(object):
def __init__(self, extents):
"""Construct a sampler for the half-open interval defined by extents.
Args:
extents: np.array of lower and upper bounds with shape D x 2
"""
self.extents = extents
self.dim = extents.shape[0]
def sample(self, num_samples):
"""Return samples from the sampler.
Args:
num_samples: number of samples to return
Returns:
samples: np.array of N x D sample configurations
"""
raise NotImplementedError
class HaltonSampler(Sampler):
# This is hard-coded to avoid writing code to generate primes. Halton
# sequences with larger primes need to drop more initial terms.
primes = [2, 3, 5, 7, 11, 13]
def __init__(self, extents):
super(HaltonSampler, self).__init__(extents)
self.index = self.primes[self.dim] # drop the first few terms
self.gen = self.make_generator()
def compute_sample(self, index, base):
"""Return an element from the Halton sequence for a desired base.
This reference may be useful: https://observablehq.com/@jrus/halton.
Args:
index: index of the desired element of the Halton sequence
base: base for the Halton sequence
Returns:
the element at index in the base Halton sequence
"""
# This differs by one from the reference implementation. This excludes 0
# from our zero-indexed Halton sequence.
index += 1
# BEGIN QUESTION 1.1
"*** REPLACE THIS LINE ***"
raise NotImplementedError
# END QUESTION 1.1
def make_base_generator(self, base):
"""Generate the Halton sequence for a desired base."""
while True:
yield self.compute_sample(self.index, base)
def make_generator(self):
"""Generate the Halton sequence for a list of coprime bases."""
seqs = [self.make_base_generator(p) for p in self.primes[: self.dim]]
for x in zip(*seqs):
yield np.array(x)
self.index += 1
def sample(self, num_samples):
"""Return samples from the Halton quasirandom sampler.
Args:
num_samples: number of samples to return
Returns:
samples: np.array of N x D sample configurations
"""
# Generate Halton samples. Each entry lies in the range (0, 1).
batch = np.empty((num_samples, self.dim))
for i, x in zip(range(num_samples), self.gen):
batch[i, :] = x
# Scale the batch of samples to fit the extents of the space.
# BEGIN QUESTION 1.1
"*** REPLACE THIS LINE ***"
raise NotImplementedError
# END QUESTION 1.1
class LatticeSampler(Sampler):
def __init__(self, extents):
super(LatticeSampler, self).__init__(extents)
def sample(self, num_samples):
"""Return samples from the lattice sampler.
Note: this method may return fewer samples than desired.
Args:
num_samples: number of samples to return
Returns:
samples: np.array of N x D sample configurations
"""
volume = np.prod(self.extents[:, 1] - self.extents[:, 0])
resolution = (volume / num_samples) ** (1.0 / self.dim)
steps = [
np.arange(
self.extents[i, 0] + resolution / 2, self.extents[i, 1], resolution
)
for i in range(self.dim)
]
meshed = np.array(np.meshgrid(*steps)).reshape((2, -1)).T
return meshed[:num_samples, :]
class RandomSampler(Sampler):
def sample(self, num_samples):
"""Return samples from the random sampler.
Args:
num_samples: number of samples to return
Returns:
samples: np.array of N x D sample configurations
"""
return np.random.uniform(
self.extents[:, 0],
self.extents[:, 1],
size=(num_samples, self.dim),
)
samplers = {
"halton": HaltonSampler,
"lattice": LatticeSampler,
"random": RandomSampler,
}
"""Implementation of (Lazy) A* and shortcutting."""
import collections
import numpy as np
import networkx as nx
from itertools import count
from cse478.utils import PriorityQueue
# Entries in the PriorityQueue are prioritized in lexicographic order, i.e. by
# the first element of QueueEntry (the f-value). Ties are broken by proceeding
# to the next element, a unique counter. This prevents the underlying
# PriorityQueue from breaking ties by comparing nodes. QueueEntry objects
# contain the node, a possible parent, and cost-to-come via that parent. These
# can be accessed via dot notation, e.g. `entry.node`.
QueueEntry = collections.namedtuple(
"QueueEntry",
["f_value", "counter", "node", "parent", "cost_to_come"],
)
NULL = -1
def astar(rm, start, goal):
"""Compute the shortest path from start to goal on a roadmap.
Args:
rm: Roadmap instance to plan on
start, goal: integer labels for the start and goal states
Returns:
vpath: a sequence of node labels, including the start and goal
"""
if start not in rm.graph or goal not in rm.graph:
msg = "Either start {} or goal {} is not in G"
raise nx.NodeNotFound(msg.format(start, goal))
# expanded is a Boolean array that tracks whether each node has previously
# been expanded, in order to avoid expanding nodes multiple times.
expanded = np.zeros(rm.num_vertices, dtype=bool)
# parents is an int array that tracks the best parent for each node. It
# defaults to NULL (-1) for each node.
parents = NULL * np.ones(rm.num_vertices, dtype=int)
# heuristic_cache is a float array that caches the returned heuristic value.
heuristic_cache = NULL * np.ones(rm.num_vertices)
c = count()
queue = PriorityQueue()
queue.push(QueueEntry(rm.heuristic(start, goal), next(c), start, NULL, 0))
while len(queue) > 0:
entry = queue.pop()
if expanded[entry.node]:
continue
if rm.lazy:
# Collision check the edge between the parent and this node (if a
# parent exists). If it's in collision, stop processing this entry;
# we'll wait for another possible parent later in the queue.
# BEGIN QUESTION 2.2
"*** REPLACE THIS LINE ***"
raise NotImplementedError
# END QUESTION 2.2
expanded[entry.node] = True
parents[entry.node] = entry.parent
if entry.node == goal:
path = extract_path(parents, goal)
return path, parents
for neighbor, w in rm.graph[entry.node].items():
# Get the edge weight (length) and cache the heuristic value
weight = w.get("weight", 1)
if heuristic_cache[neighbor] == NULL:
heuristic_cache[neighbor] = rm.heuristic(neighbor, goal)
h = heuristic_cache[neighbor]
# Compute this neighbor's cost-to-come via entry.node, and insert a
# new QueueEntry.
#
# Since this may be Lazy A*, it is misleading to compare the f-value
# against any current QueueEntry objects for this neighbor that are
# already in the queue. Those may turn out to be in collision and
# therefore unusable. Therefore, this entry should be inserted even
# if it seems more costly than other entries already in the queue.
#
# However, if the neighbor has already been expanded, it's no longer
# necessary to insert this QueueEntry.
# BEGIN QUESTION 2.1
"*** REPLACE THIS LINE ***"
raise NotImplementedError
# END QUESTION 2.1
raise nx.NetworkXNoPath("Node {} not reachable from {}".format(goal, start))
def extract_path(parents, goal):
"""Extract the shortest path from start to goal.
Args:
parents: np.array of integer node labels
goal: integer node label for the goal state
Returns:
vpath: a sequence of node labels from the start to the goal
"""
# Follow the parents of the node until a NULL entry is reached
# BEGIN QUESTION 2.1
"*** REPLACE THIS LINE ***"
raise NotImplementedError
# END QUESTION 2.1
def shortcut(rm, vpath, num_trials=100):
"""Shortcut the path between the start and goal.
Args:
rm: Roadmap instance to plan on
vpath: a sequence of node labels from the start to the goal
num_trials: number of random trials to attempt
Returns:
vpath: a subset of the original vpath that connects vertices directly
"""
for _ in range(num_trials):
if len(vpath) == 2:
break
# Randomly select two indices to try and connect. Verify that an edge
# connecting the two vertices would be collision-free, and that
# connecting the two indices would actually be shorter.
#
# You may find these Roadmap methods useful: check_edge_validity,
# heuristic, and compute_path_length.
indices = np.random.choice(len(vpath), size=2, replace=False)
i, j = np.sort(indices)
# BEGIN QUESTION 2.3
"*** REPLACE THIS LINE ***"
raise NotImplementedError
# END QUESTION 2.3
return vpath
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment