Commit ffcfcee7 authored by Brian Hou's avatar Brian Hou
Browse files

Update Project 2: Localization.

Fix bugs in sensor model autograder tests. Specifically, the probability table
was being normalized along the wrong axis.

Add test suite for particle initializer. Add new tests to catch resampling
errors earlier.

Fix small issue in sensor model likelihood plot that affected some maps. Add new
small map to help with sensor model tuning.

Reduce excessive locking in particle filter, motion model, and sensor model.
parent c0ec79dc
Pipeline #413736 failed with stage
in 8 minutes and 2 seconds
image: shapes_world_small.pgm
resolution: 0.025000
origin: [0, 0, 0]
negate: 0
occupied_thresh: 0.65
free_thresh: 0.196
......@@ -40,6 +40,7 @@ if(CATKIN_ENABLE_TESTING)
# Q1, Q2, Q3 tests
catkin_add_nosetests(test/motion_model.py)
catkin_add_nosetests(test/sensor_model.py)
catkin_add_nosetests(test/particle_initializer.py)
catkin_add_nosetests(test/resample.py)
# add_rostest tests the ROS integrations
......
......@@ -68,7 +68,7 @@ def plot_sensor_model(raw_map_msg, laser_msg, sensor_params=None, show_angle=Fal
map_msg=map_msg,
particles=particles,
weights=weights,
car_length=0.33,
car_length=0, # Ensure particles don't get shifted into laser frame
sensor_params=sensor_params,
)
......@@ -113,6 +113,9 @@ def plot_sensor_model(raw_map_msg, laser_msg, sensor_params=None, show_angle=Fal
img = np.flip(img, axis=0)
plt.figure(figsize=(map_msg.info.width / 100, map_msg.info.height / 100), dpi=100)
plt.title("Sensor model likelihood")
plt.xlabel("x (pixels)")
plt.ylabel("y (pixels)")
plt.imshow(img)
plt.show()
......
......@@ -178,6 +178,23 @@ class KinematicCarMotionModelROS:
rospy.logwarn_throttle(5, "No servo command received")
return
if self.last_vesc_stamp is None:
rospy.loginfo("Motion information received for the first time")
self.last_vesc_stamp = msg.header.stamp
return
# Convert raw msgs to controls
# Note that control = (raw_msg_val - offset_param) / gain_param
curr_speed = (
msg.state.speed - self.speed_to_erpm_offset
) / self.speed_to_erpm_gain
curr_steering_angle = (
self.last_servo_cmd - self.steering_to_servo_offset
) / self.steering_to_servo_gain
dt = (msg.header.stamp - self.last_vesc_stamp).to_sec()
# Acquire the lock that synchronizes access to the particles. This is
# necessary because self.particles is shared by the other particle
# filter classes.
......@@ -186,26 +203,9 @@ class KinematicCarMotionModelROS:
# See the Python documentation for more information:
# https://docs.python.org/3/library/threading.html#using-locks-conditions-and-semaphores-in-the-with-statement
with self.state_lock:
if self.last_vesc_stamp is None:
rospy.loginfo("Motion information received for the first time")
self.last_vesc_stamp = msg.header.stamp
return
# Convert raw msgs to controls
# Note that control = (raw_msg_val - offset_param) / gain_param
curr_speed = (
msg.state.speed - self.speed_to_erpm_offset
) / self.speed_to_erpm_gain
curr_steering_angle = (
self.last_servo_cmd - self.steering_to_servo_offset
) / self.steering_to_servo_gain
dt = (msg.header.stamp - self.last_vesc_stamp).to_sec()
# Propagate particles with the motion model
self.motion_model.apply_motion_model(
self.particles, curr_speed, curr_steering_angle, dt
)
self.last_vesc_stamp = msg.header.stamp
self.last_vesc_stamp = msg.header.stamp
#!/usr/bin/env python
from threading import Lock
import numpy as np
import rospy
import tf2_ros
......@@ -304,15 +305,16 @@ class ParticleFilter:
"""Return a geometry_msgs/PoseStamped message with the current expected pose."""
with self.state_lock:
inferred_pose = self.expected_pose()
ps = None
if isinstance(inferred_pose, np.ndarray):
ps = PoseStamped()
ps.header = utils.make_header("map")
ps.header.stamp = self.motion_model.last_vesc_stamp
ps.pose.position.x = inferred_pose[0]
ps.pose.position.y = inferred_pose[1]
ps.pose.orientation = utils.angle_to_quaternion(inferred_pose[2])
return ps
ps = None
if isinstance(inferred_pose, np.ndarray):
ps = PoseStamped()
ps.header = utils.make_header("map")
# This estimate is as current as the particles
ps.header.stamp = rospy.Time.now()
ps.pose.position.x = inferred_pose[0]
ps.pose.position.y = inferred_pose[1]
ps.pose.orientation = utils.angle_to_quaternion(inferred_pose[2])
return ps
def _publish_tf(self):
"""Publish a transform between map and odom frames."""
......
......@@ -182,6 +182,13 @@ class LaserScanSensorModelROS:
Args:
msg: a sensor_msgs/LaserScan message
"""
# Initialize angles
if self.laser_angles is None:
self.laser_angles = np.linspace(
msg.angle_min, msg.angle_max, len(msg.ranges)
)
ranges, angles = self.downsample(msg.ranges)
# Acquire the lock that synchronizes access to the particles. This is
# necessary because self.particles is shared by the other particle
# filter classes.
......@@ -190,18 +197,11 @@ class LaserScanSensorModelROS:
# See the Python documentation for more information:
# https://docs.python.org/3/library/threading.html#using-locks-conditions-and-semaphores-in-the-with-statement
with self.state_lock:
# Initialize angles
if self.laser_angles is None:
self.laser_angles = np.linspace(
msg.angle_min, msg.angle_max, len(msg.ranges)
)
ranges, angles = self.downsample(msg.ranges)
self.apply_sensor_model(ranges, angles)
self.weights /= np.sum(self.weights)
self.last_laser = msg
self.do_resample = True
self.last_laser = msg
self.do_resample = True
def apply_sensor_model(self, obs_ranges, obs_angles):
"""Apply the sensor model to self.weights based on the observed laser scan.
......
#!/usr/bin/env python
import numpy as np
import rosunit
import unittest
from geometry_msgs.msg import Pose
from cse478 import utils
from localization.particle_filter import ParticleInitializer
class TestParticleInitializer(unittest.TestCase):
def setUp(self):
self.num_particles = 50
self.particle_initializer = ParticleInitializer(
x_std=0.05,
y_std=0.05,
theta_std=0.1,
)
self.initial_pose = np.array([-0.7, 1.3, 0.6])
x, y, theta = self.initial_pose
self.initial_pose_msg = Pose()
self.initial_pose_msg.position.x = x
self.initial_pose_msg.position.y = y
self.initial_pose_msg.orientation = utils.angle_to_quaternion(theta)
def test_initializer_inplace(self):
particles = np.zeros((self.num_particles, 3))
weights = np.arange(self.num_particles, dtype=float)
weights[:] /= sum(weights)
prev_particles = particles.copy()
prev_weights = weights.copy()
self.particle_initializer.reset_click_pose(
self.initial_pose_msg, particles, weights
)
self.assertFalse(
np.allclose(prev_particles, particles),
msg="Initializer should modify particles in-place",
)
self.assertFalse(
np.allclose(prev_weights, weights),
msg="Initializer should modify weights in-place",
)
def test_initializer_particles_are_centered(self):
self.num_particles = 1000000
particles = np.zeros((self.num_particles, 3))
weights = np.zeros(self.num_particles)
self.particle_initializer.reset_click_pose(
self.initial_pose_msg, particles, weights
)
weighted_avg_particles = np.einsum("ij,i->j", particles, weights)
np.testing.assert_equal(
weights,
np.full_like(weights, 1.0 / self.num_particles),
err_msg="Initializer should set the weights to be uniform",
)
self.assertTrue(
np.allclose(weighted_avg_particles, self.initial_pose, atol=1e-3),
msg="Initializer should produce a distribution of particles centered around the given initial pose",
)
if __name__ == "__main__":
rosunit.unitrun(
"localization", "test_particle_initializer", TestParticleInitializer
)
......@@ -15,7 +15,35 @@ def naive_sample(weights, target_population_size):
class TestResample(unittest.TestCase):
def test_resampler(self):
def test_resampler_inplace(self):
n_particles = 100 # number of particles
particles = np.arange(n_particles)
particles = particles[..., np.newaxis]
weights = np.arange(n_particles, dtype=np.float)
weights /= np.sum(weights)
prev_particles = particles.copy()
prev_weights = weights.copy()
rs = LowVarianceSampler(particles, weights)
rs.resample()
self.assertTrue(
np.allclose(rs.particles, particles),
msg="Resampler should modify particles in-place",
)
self.assertFalse(
np.allclose(prev_particles, particles),
msg="Resampler should modify particles in-place",
)
self.assertTrue(
np.allclose(rs.weights, weights),
msg="Resampler should modify weights in-place",
)
self.assertFalse(
np.allclose(prev_weights, weights),
msg="Resampler should modify weights in-place",
)
def test_resampler_is_fair(self):
n_particles = 100 # number of particles
k_val = 50 # number of particles that have non-zero weight
trials = 10 # number of resamplings to do
......@@ -83,6 +111,23 @@ class TestResample(unittest.TestCase):
msg="All particles should have been sampled at least once",
)
def test_resampler_resets_weights(self):
n_particles = 100 # number of particles
# Create a set of particles with uniform weights
particles = np.arange(n_particles)
particles = particles[..., np.newaxis]
weights = np.random.uniform([n_particles], 1.0 / n_particles)
rs = LowVarianceSampler(particles, weights)
rs.resample()
np.testing.assert_equal(
weights,
np.full_like(weights, 1.0 / n_particles),
err_msg="The resampler should set the weights to be uniform",
)
if __name__ == "__main__":
rosunit.unitrun("localization", "test_resample", TestResample)
......@@ -49,7 +49,7 @@ class TestSensorModel(unittest.TestCase):
max_range = 100
table = sensor_model.precompute_sensor_model(max_range)
np.testing.assert_allclose(
table.sum(axis=1),
table.sum(axis=0),
1.0,
err_msg="The masses for the different possible ranges given any particular "
"expected reading d should form a proper distribution",
......@@ -64,8 +64,6 @@ class TestSensorModel(unittest.TestCase):
z_hit=0,
)
table = sensor_model.precompute_sensor_model(1)
# The probability of the one event in this model is 0, so we should
# get back a sign that this isn't a distribution
self.assertEqual(
1.0,
table[0, 1],
......@@ -81,7 +79,7 @@ class TestSensorModel(unittest.TestCase):
)
table = sensor_model.precompute_sensor_model(1)
self.assertEqual(
0.5,
1.0,
table[0, 1],
msg="An event under the expected should have mass from the 'random' component",
)
......@@ -94,10 +92,10 @@ class TestSensorModel(unittest.TestCase):
z_hit=0,
)
table = sensor_model.precompute_sensor_model(1)
self.assertTrue(
np.isnan(table[0, 1]),
msg="An event under the expected should not have mass from "
"the 'max' component, so the resulting distribution should be undefined",
self.assertEqual(
0.0,
table[0, 0],
msg="An event under the expected should not have mass from the 'max' component",
)
sensor_model = SingleBeamSensorModel(
......@@ -108,8 +106,9 @@ class TestSensorModel(unittest.TestCase):
z_hit=7,
)
table = sensor_model.precompute_sensor_model(1)
self.assertTrue(
table[0, 1] > 0.0,
self.assertGreater(
table[0, 1],
0.0,
msg="An event under the expected should have mass from the 'hit' component",
)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment