Commit 0be2f55a authored by Kay Ke's avatar Kay Ke
Browse files

Update code

parent e2a64f6a
......@@ -3,32 +3,91 @@ import threading
class BaseController(object):
def __init__(self):
def __init__(self, error='CrossTrackError'):
self.path_lock = threading.RLock()
self.path = np.array([])
self._ready = False
self._is_ready = False
def ready(self):
self.error = error
self.reset_params()
self.reset_state()
self.waypoint_lookahead = 0.6 # Average distance from the current
# reference pose to lookahed. You may need
# this parameter in implementations later.
def reset_params(self):
# updating parameters from the ros parameter
# overwrite this function in the child class
raise NotImplementedError
def reset_state(self):
# resets the controller's internal state
# overwrite this function in the child class
raise NotImplementedError
def get_control(self, pose, index):
'''
ready returns whether controller is ready to begin tracking trajectory.
get_control - computes the control action given the current pose of
the car and an index into the reference trajectory
input:
pose - the vehicle's current pose [x, y, heading]
index - an integer index into the reference path
output:
ready_val - whether controller is ready to begin tracking trajectory.
control - [target velocity, target steering angle]
'''
return self._ready
# overwrite this function in the child class
raise NotImplementedError
############################################################
# Helper Functions
############################################################
def is_ready(self):
# returns whether controller is ready to begin tracking trajectory.
return self._is_ready
def get_reference_pose(self, index):
# returns the pose from the reference path at the reference index.
with self.path_lock:
assert len(self.path) > index
return self.path[index]
def set_path(self, path):
# sets the reference trajectory, implicitly resets internal state
with self.path_lock:
self.path = np.array(
[np.array([path[i].x, path[i].y, path[i].h, path[i].v])
for i in range(len(path))])
self.reset_state()
self._is_ready = True
# average distance between path waypoints
self.waypoint_diff = np.average(
np.linalg.norm(np.diff(self.path[:, :2], axis=0), axis=1))
def get_reference_index(self, pose):
'''
set_path - sets trajectory of controller, implicitly resets internal state
given the current pose, finds the the "reference index" i of the
controller's path that will be used as the next control target.
input:
path - reference path to track
pose - current pose of the car, represented as [x, y, heading]
output:
none
i - index
'''
with self.path_lock:
self.path = np.array([np.array([path[i].x, path[i].y, path[i].h, path[i].v]) for i in range(len(path))])
self.reset_state()
self._ready = True
self.waypoint_diff = np.average(np.linalg.norm(np.diff(self.path[:, :2], axis=0), axis=1))
# TODO 1.1: INSERT CODE HERE. Don't modify / delete this line
#
# Determine a strategy for selecting a reference point
# in the path. One option is to:
# STEP1. find the nearest reference point to the current_pose, p_close
# STEP2. chose the next point after p_close, some distance away
# along the path.
# Here, "some distance" can be defined as a parameter
# e.g. self.waypoint_lookahead.
#
# Note: this method must be computationally efficient
# as it is running directly in the control loop.
raise NotImplementedError
def path_complete(self, pose, error):
'''
......@@ -44,16 +103,10 @@ class BaseController(object):
reached the end of the path
'''
err_l2 = np.linalg.norm(error)
return (self.get_reference_index(pose) == (len(self.path) - 1) and err_l2 < self.finish_threshold) or (err_l2 > self.exceed_threshold)
return ((self.get_reference_index(pose) == (len(self.path) - 1)
and err_l2 < self.finish_threshold)
or (err_l2 > self.exceed_threshold))
def get_reference_pose(self, index):
'''
Utility function returns the reference pose from the reference path given a
reference index.
'''
with self.path_lock:
assert len(self.path) > index
return self.path[index]
def get_error(self, pose, index):
'''
......@@ -65,9 +118,22 @@ class BaseController(object):
output:
e_p - error vector [e_x, e_y]
'''
# TODO: INSERT CODE HERE
# Use the method described in the handout to
# compute the error vector. Be careful about the order
# in which you subtract the car's pose from the
# reference pose.
assert False, "Complete this function"
if self.error == 'CrossTrackError':
return self._get_cross_track_error(pose, index)
else:
return self._get_alternative_error(pose, index)
def _get_cross_track_error(self, pose, index):
# TODO 1.2: INSERT CODE HERE. Don't modify / delete this line
#
# Use the method described in the handout to compute the error vector.
# Be careful about the order in which you subtract the car's pose
# from the reference pose.
raise NotImplementedError
def _get_alternative_error(self, pose, index):
# TODO E.A: INSERT CODE HERE. Don't modify / delete this line
#
# (Extra Credit) implement an alternative error definition
raise NotImplementedError
......@@ -11,75 +11,30 @@ class ModelPredictiveController(BaseController):
def __init__(self):
super(ModelPredictiveController, self).__init__()
self.reset_params()
self.reset_state()
def get_reference_index(self, pose):
'''
get_reference_index finds the index i in the controller's path
to compute the next control control against
input:
pose - current pose of the car, represented as [x, y, heading]
output:
i - referencence index
'''
def reset_params(self):
with self.path_lock:
# TODO: INSERT CODE HERE
# Determine a strategy for selecting a reference point
# in the path. One option is to find the nearest reference
# point and chose the next point some distance away along
# the path. You are welcome to use the same method for MPC
# as you used for PID or Pure Pursuit.
#
# Note: this method must be computationally efficient
# as it is running directly in the tight control loop.
assert False, "Complete this function"
def get_control(self, pose, index):
'''
get_control - computes the control action given an index into the
reference trajectory, and the current pose of the car.
Note: the output velocity is given in the reference point.
input:
pose - the vehicle's current pose [x, y, heading]
index - an integer corresponding to the reference index into the
reference path to control against
output:
control - [velocity, steering angle]
'''
assert len(pose) == 3
rollouts = np.zeros((self.K, self.T, 3))
# TODO: INSERT CODE HERE
# With MPC, you should roll out K control trajectories using
# the kinematic car model, then score each one with your cost
# function, and finally select the one with the lowest cost.
# For each K trial, the first position is at the current position (pose)
# For control trajectories, get the speed from reference point.
# Then step forward the K control trajectory T timesteps using
# self.apply_kinematics
self.finish_threshold = float(rospy.get_param("mpc/finish_threshold", 1.0))
self.exceed_threshold = float(rospy.get_param("mpc/exceed_threshold", 4.0))
self.waypoint_lookahead = float(rospy.get_param("mpc/waypoint_lookahead", 1.0))
assert False, "Complete this function"
# Apply the cost function to the rolled out poses.
costs = self.apply_cost(rollouts, index)
self.K = int(rospy.get_param("mpc/K", 62)) # Sample K rollouts
self.T = int(rospy.get_param("mpc/T", 8)) # Each rollout has T steps
self.speed = \
float(rospy.get_param("mpc/speed", 1.0)) # speed of car in
# sample rollouts
# Finally, find the control trajectory with the minimum cost.
min_control = np.argmin(costs)
self.collision_w = float(rospy.get_param("mpc/collision_w", 1e5))
self.error_w = float(rospy.get_param("mpc/error_w", 1.0))
# Return the controls which yielded the min cost.
return self.trajs[min_control][0]
self.car_length = float(rospy.get_param("mpc/car_length", 0.33))
self.car_width = float(rospy.get_param("mpc/car_width", 0.15))
self.wheelbase = float(rospy.get_param("trajgen/wheelbase", 0.33))
self.min_delta = float(rospy.get_param("trajgen/min_delta", -0.34))
self.max_delta = float(rospy.get_param("trajgen/max_delta", 0.34))
def reset_state(self):
'''
Utility function for resetting internal states.
'''
with self.path_lock:
self.trajs = self.get_control_trajectories()
assert self.trajs.shape == (self.K, self.T, 2)
self.sampled_controls = self.sample_controls()
self.scaled = np.zeros((self.K * self.T, 3))
self.bbox_map = np.zeros((self.K * self.T, 2, 4))
self.perm = np.zeros(self.K * self.T).astype(np.int)
......@@ -91,51 +46,54 @@ class ModelPredictiveController(BaseController):
self.map_c = np.cos(self.map_angle)
self.map_s = np.sin(self.map_angle)
def reset_params(self):
'''
Utility function for updating parameters which depend on the ros parameter
server. Setting parameters, such as gains, can be useful for interative
testing.
'''
with self.path_lock:
self.wheelbase = float(rospy.get_param("trajgen/wheelbase", 0.33))
self.min_delta = float(rospy.get_param("trajgen/min_delta", -0.34))
self.max_delta = float(rospy.get_param("trajgen/max_delta", 0.34))
def get_control(self, pose, index):
rollouts = np.zeros((self.K, self.T, 3))
self.K = int(rospy.get_param("mpc/K", 62))
self.T = int(rospy.get_param("mpc/T", 8))
# TODO 3.1: INSERT CODE HERE. Don't modify / delete this line
#
# In MPC, you should first roll out K trajectories. The first
# positions for all K trajectories are at the current position
# (pose); the control speed for all trajectories should be the
# one at the reference point; each of the K trajectories may have
# a different steering angle.
#
# Use the kinematic car model to apply controls to the trajectories,
# then score all rollouts with your cost function,
# and finally select the one with the lowest cost.
self.speed = float(rospy.get_param("mpc/speed", 1.0))
self.finish_threshold = float(rospy.get_param("mpc/finish_threshold", 1.0))
self.exceed_threshold = float(rospy.get_param("mpc/exceed_threshold", 4.0))
# Average distance from the current reference pose to lookahed.
self.waypoint_lookahead = float(rospy.get_param("mpc/waypoint_lookahead", 1.0))
self.collision_w = float(rospy.get_param("mpc/collision_w", 1e5))
self.error_w = float(rospy.get_param("mpc/error_w", 1.0))
raise NotImplementedError
costs = self.calculate_cost_for_rollouts(rollouts, index)
min_control = np.argmin(costs) #find the control trajectory with the minimum cost.
# Return the first control signal from the argmin trajectory.
return self.sampled_controls[min_control][0]
self.car_length = float(rospy.get_param("mpc/car_length", 0.33))
self.car_width = float(rospy.get_param("mpc/car_width", 0.15))
def get_control_trajectories(self):
def sample_controls(self):
'''
get_control_trajectories computes K control trajectories to be
rolled out on each update step. You should only execute this
function when initializing the state of the controller.
sample_controls computes K series of control signals to be
rolled out on each of the K trajectories on each step.
various methods can be used for generating a sufficient set
Various methods can be used for generating a sufficient set
of control trajectories, but we suggest stepping over the
control space (of steering angles) to create the initial
control space of steering angles to create the initial
set of control trajectories.
output:
ctrls - a (K x T x 2) vector which lists K control trajectories
of length T
'''
# TODO: INSERT CODE HERE
# Create a trajectory library of K trajectories for T timesteps to
# roll out when computing the best control.
ctrls - a (K x T x 2) vector which lists K series of controls.
each series contains T steps. A control at each step
contains [speed, steering_angle]. Note that, if you decide
to sweep over the possible steering angles, you can
put a dummy speed here and use a different speed when
you actually rollout.
'''
# TODO 3.2: INSERT CODE HERE. Don't modify / delete this line
#
# Create a control library of K series of controls. Each with T
# timesteps to use when rolling out trajectories.
ctrls = np.zeros((self.K, self.T, 2))
step_size = (self.max_delta - self.min_delta) / (self.K - 1)
assert False, "Complete this function"
raise NotImplementedError
def apply_kinematics(self, cur_x, control):
'''
......@@ -148,34 +106,35 @@ class ModelPredictiveController(BaseController):
(x_dot, y_dot, theta_dot) - where each *_dot is a list
of k deltas computed by the kinematic car model.
'''
# TODO: INSERT CODE HERE
# TODO 3.3: INSERT CODE HERE. Don't modify / delete this line
#
# Use the kinematic car model discussed in class to step
# forward the pose of the car. We will step all K poses
# simultaneously, so we recommend using Numpy to compute
# this operation.
#
# ulimately, return a triplet with x_dot, y_dot_, theta_dot
# where each is a length K numpy vector.
# where each is a numpy vector of length K
dt = 0.1
x_dot = None
y_dot = None
theta_dot = None
assert False, "Complete this function"
return (x_dot, y_dot, theta_dot)
def apply_cost(self, poses, index):
raise NotImplementedError
def calculate_cost_for_rollouts(self, poses, index):
'''
rollouts (K,T,3) - poses of each rollout
index (int) - reference index in path
'''
all_poses = poses.copy()
all_poses.resize(self.K * self.T, 3)
collisions = self.check_collisions_in_map(all_poses)
collisions.resize(self.K, self.T)
collision_cost = collisions.sum(axis=1) * self.collision_w
error_cost = np.linalg.norm(poses[:, self.T - 1, :2] - self.path[index, :2], axis=1) * self.error_w
return collision_cost + error_cost
# TODO 3.4: INSERT CODE HERE. Don't modify / delete this line
#
# For each of the K given rollouts, calculate a score that
# considers the distance to the reference waypoint and collisions
raise NotImplementedError
#===============================================================
# Collision checking and map utilities
#===============================================================
def check_collisions_in_map(self, poses):
'''
......
......@@ -5,70 +5,28 @@ from controller import BaseController
class PIDController(BaseController):
def __init__(self):
super(PIDController, self).__init__()
def __init__(self, error='CrossTrackError'):
super(PIDController, self).__init__(error)
self.reset_params()
self.reset_state()
def get_reference_index(self, pose):
'''
get_reference_index finds the index i in the controller's path
to compute the next control control against
input:
pose - current pose of the car, represented as [x, y, heading]
output:
i - referencence index
'''
def reset_params(self):
with self.path_lock:
# TODO: INSERT CODE HERE
# Determine a strategy for selecting a reference point
# in the path. One option is to find the nearest reference
# point and chose the next point some distance away along
# the path.
#
# Note: this method must be computationally efficient
# as it is running directly in the tight control loop.
assert False, "Complete this function"
self.finish_threshold = float(rospy.get_param("/pid/finish_threshold", 0.2))
self.exceed_threshold = float(rospy.get_param("/pid/exceed_threshold", 4.0))
self.waypoint_lookahead = float(rospy.get_param("/pid/waypoint_lookahead", 0.6))
self.kp = float(rospy.get_param("/pid/kp", 0.15))
self.kd = float(rospy.get_param("/pid/kd", 0.2))
self.error = rospy.get_param("/pid/error", "CrossTrackError")
def reset_state(self):
pass
def get_control(self, pose, index):
'''
get_control - computes the control action given an index into the
reference trajectory, and the current pose of the car.
Note: the output velocity is given in the reference point.
input:
pose - the vehicle's current pose [x, y, heading]
index - an integer corresponding to the reference index into the
reference path to control against
output:
control - [velocity, steering angle]
'''
# TODO: INSERT CODE HERE
# Compute the next control using the PD control strategy.
# Consult the project report for details on how PD control
# works.
# TODO 2.1: INSERT CODE HERE. Don't delete this line.
#
# First, compute the cross track error. Then, using known
# gains, generate the control.
assert False, "Complete this function"
# Compute the next control using the PD control strategy.
# Consult the spec for details on how PD control works.
def reset_state(self):
'''
Utility function for resetting internal states.
'''
with self.path_lock:
pass
raise NotImplementedError
def reset_params(self):
'''
Utility function for updating parameters which depend on the ros parameter
server. Setting parameters, such as gains, can be useful for interative
testing.
'''
with self.path_lock:
self.kp = float(rospy.get_param("/pid/kp", 0.15))
self.kd = float(rospy.get_param("/pid/kd", 0.2))
self.finish_threshold = float(rospy.get_param("/pid/finish_threshold", 0.2))
self.exceed_threshold = float(rospy.get_param("/pid/exceed_threshold", 4.0))
# Average distance from the current reference pose to lookahed.
self.waypoint_lookahead = float(rospy.get_param("/pid/waypoint_lookahead", 0.6))
......@@ -4,67 +4,46 @@ from controller import BaseController
class PurePursuitController(BaseController):
def __init__(self):
super(PurePursuitController, self).__init__()
def __init__(self, error='CrossTrackError'):
super(PurePursuitController, self).__init__(error)
self.reset_params()
self.reset_state()
def reset_params(self):
with self.path_lock:
self.speed = float(rospy.get_param("/purpursuit/speed", 1.0))
self.finish_threshold = float(rospy.get_param("/purepursuit/finish_threshold", 0.2))
self.exceed_threshold = float(rospy.get_param("/purepursuit/exceed_threshold", 4.0))
# Lookahead distance from current pose to next reference waypoint.
# Different from waypoint_lookahead used by other controllers,
# as those are indexes from the reference point.
self.distance_lookahead = float(rospy.get_param("/purepursuit/distance_lookahead", 0.6))
def reset_state(self):
pass
def get_reference_index(self, pose):
'''
get_reference_index finds the index i in the controller's path
to compute the next control control against
input:
pose - current pose of the car, represented as [x, y, heading]
output:
i - referencence index
purepursuit controller uses a different way to find reference index
it finds the next reference waypoint that is about distance_lookahead
away from the current pose
'''
with self.path_lock:
# TODO: INSERT CODE HERE
# Use the pure pursuit lookahead method described in the
# handout for determining the reference index.
#
# Note: this method must be computationally efficient
# as it is running directly in the tight control loop.
assert False, "Complete this function"
# TODO E.P1: INSERT CODE HERE. Don't modify / delete this line
#
# Use the pure pursuit lookahead method described in the
# handout for determining the reference index.
#
# Note: this method must be computationally efficient
# as it is running directly in the tight control loop.
raise NotImplementedError
def get_control(self, pose, index):
'''
get_control - computes the control action given an index into the
reference trajectory, and the current pose of the car.
Note: the output velocity is given in the reference point.
input:
pose - the vehicle's current pose [x, y, heading]
index - an integer corresponding to the reference index into the
reference path to control against
output:
control - [velocity, steering angle]
'''
# TODO: INSERT CODE HERE
# First, compute the appropriate error.
# TODO E.P2: INSERT CODE HERE. Don't modify / delete this line
#
# Then, use the pure pursuit control method to compute the
# Use the pure pursuit control method to compute the
# steering angle. Refer to the hand out and referenced
# articles for more details about this strategy.
assert False, "Complete this function"
def reset_state(self):
'''
Utility function for resetting internal states.
'''
pass
raise NotImplementedError
def reset_params(self):
'''
Utility function for updating parameters which depend on the ros parameter
server. Setting parameters, such as gains, can be useful for interative
testing.
'''
with self.path_lock:
self.speed = float(rospy.get_param("/pid/speed", 1.0))
self.finish_threshold = float(rospy.get_param("/pid/finish_threshold", 0.2))
self.exceed_threshold = float(rospy.get_param("/pid/exceed_threshold", 4.0))
# Lookahead distance from current pose to next waypoint. Different from
# waypoint_lookahead in the other controllers, as those are distance from
# the reference point.
self.pose_lookahead = float(rospy.get_param("/purepursuit/pose_lookahead", 0.6))
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment