Source code for GraphSLAMSolve

import numpy as np
import scipy as sp
from typing import Union

[docs]class GraphSLAMSolve: def __init__(self, x0: np.ndarray = np.array([0., 0.]), initial_rows: int = 100, initial_cols: int = 100, dx_weight: float = 2.0, z_weight: float = 1.0, dclip: float = 2.0, expansion_ratio: float = 1.7, local_radius: float = 1e5): """initialize GraphSLAMFast object Args: x0 (np.ndarray, optional): array [x, y] of initial state. Defaults to np.array([0.0, 0.0]). initial_rows (int, optional): initial number of equations this GraphSLAMFast can store. Will expand after this is reached. Defaults to 100. initial_cols (int, optional): initial number of variables this GraphSLAMFast can store. Will expand after this is reached. Defaults to 100. max_landmark_distance (float, optional): how far away landmarks can be from the closest landmark guess (AFTER data association optimally translates and rotates the landmarks) before they are recognized as independent. Defaults to 1. dx_weight (float, optional): weight (certainty) for odometry measurements. Defaults to 2.0. z_weight (float, optional): weight (certainty) for landmark measurements. Defaults to 1.0. dclip (float, optional): distance at which to clip cost function for data association. Defaults to 2.0. expansion_ratio (float, optional): amount by which to grow matrices when we run out of space. Defaults to 1.7. local radius (float, optional): radius to include cones in local map. Defaults to 1e5 """ self.maxrows=initial_rows self.maxcols=initial_cols self.dx_weight = dx_weight # how much to weigh localization self.z_weight = z_weight # how much to weigh mapping # Constraints for optimization problem, x here represents our optimal state vector representing the most likely values for all time steps & landmark positions #Ax-b = 0 self.A = sp.sparse.lil_array((self.maxrows, self.maxcols), dtype=np.float64) self.b = np.zeros((self.maxcols), np.float64) #matrix initialized w these values #time step 0's x is x0[0] #time step 0's y is x0[1] self.A[0, 0] = 1 self.A[1, 1] = 1 self.b[0] = x0[0] # x self.b[1] = x0[1] # y self.nvars = 2 self.neqns = 2 self.x = [0] self.l = [] self.z = [] self.d = [0] self.xhat = np.zeros((1, 2)) #running estimate of where car was at each time step self.lhat = np.zeros((0, 2)) #running estimate of where each landmark is self.color = np.zeros(0, dtype=np.uint8) self.xhat[0, :] = x0 self.dclip = dclip def _transform(self, x: np.ndarray, center: np.ndarray, z: np.ndarray) -> np.ndarray: return (np.array([[np.cos(x[2]), -np.sin(x[2])], [np.sin(x[2]), np.cos(x[2])]])@((z - center).T)).T + center + x[:2]
[docs] def icp(self, z: np.ndarray, color: np.ndarray) -> np.ndarray: """rudimentary data association method that tries to rotate and translate `z` until it optimally lines up with self.lhat Args: z (np.ndarray): measurements, in global reference frame color (np.ndarray): color of measurements Returns: np.ndarray: measurements, translated and rotated optimally """ # center = np.mean(z, axis=0) center = self.xhat[-1, :] uniquecolors = filter(lambda x: np.any(self.color==x), np.unique(color)) filters = [(self.lhat[self.color==c], color==c) for c in uniquecolors] # zprime = np.empty(z.shape) def cost(x: np.ndarray) -> float: """cost function for data association optimization. computes sum of squared distances from each measurement to each point, capped at self.dclip**2. This way further away landmarks don't have an effect. Args: x (np.ndarray): array of [x shift, y shift, rotation angle] Returns: float: cost of this x """ zprime = self._transform(x, center, z) return sum( np.sum( np.clip( np.sum(np.square(zprime[filt, np.newaxis]-lhat), axis=2), a_min=None, a_max=self.dclip**2, ) ) for (lhat, filt) in filters ) res = sp.optimize.minimize(cost, x0=np.zeros(3), jac="2-point", options={"maxiter": 10}, tol=1e-3) return self._transform(res.x, center, z)
[docs] def data_assocation_lap_2(self, z: np.ndarray, color: np.ndarray) -> np.ndarray: """ use ICP for loop closure with every new set of cones in lap 2+ Args: """ pass
#define line btwn orange cones method
[docs] def lap_completion(self) -> bool: """ checks whether the car's latest position estimate crosses line between orange cone position estimate Args: N/A - uses slam position data Updates: lap_counter: number of times car has passed orange cones Returns: N/A """ pass
[docs] def update_graph(self, dx: np.ndarray, new_cones: np.ndarray, matched_cones: np.ndarray, first_update=False) -> None: """add edges to the graph corresponding to a movement and new vision data Args: dx (ndarray): difference in position from last update. [dx, dy] new_cones(ndarray): cones not yet in slam map, in local frame to car [x y color] matched_cones(ndarray): cones in slam map, with indices in map, in local frame [map index, x, y, color] """ # Grow A and b matrix to account for # measurements and # variables >100 = initial_rows = initial_cols or updates beyond that if not first_update: if new_cones is not None: len_message = (len(new_cones) + len(matched_cones)) else: len_message = len(matched_cones) self.check_resize(len_message) self.update_position(dx) idxs = self.update_cones(new_cones, matched_cones, first_update) return idxs
[docs] def update_cones(self, new_cones, matched_cones, first_update=False): """ Args: new_cones(ndarray): cones not yet in slam map, in local frame to car [x y color] matched_cones(ndarray): cones in slam map, with indices in map, in local frame [map index, x, y, color] Returns: indices of new cones in global slam map """ # Start by adding new cones into slam map # initialize array of length of new_cones if new_cones is None: cones_indices = matched_cones[:, 0] total_cones = matched_cones else: new_cones_indices = np.zeros((len(new_cones), 1), dtype=int) for idx, cone in enumerate(new_cones): self.l.append(self.nvars) self.nvars += 2 self.lhat = np.append(self.lhat, cone[0:2][np.newaxis], axis=0) self.color = np.append(self.color, cone[2]) l_idx = len(self.lhat) - 1 new_cones_indices[idx] = l_idx # Now combine them all into one big z message, each having their own index # updated new cones becomes [map index, x, y, color] updated_new_cones = np.hstack([new_cones_indices, new_cones]) if first_update: total_cones = updated_new_cones cones_indices = new_cones_indices else: print("MATCHED CONES: ", matched_cones) print("UPDATED NEW CONES: ",updated_new_cones) if len(matched_cones.shape)!=2: matched_cones = np.empty((0, 4)) if len(updated_new_cones.shape)!=2: updated_new_cones = np.empty((0, 4)) total_cones = np.vstack([matched_cones, updated_new_cones]) cones_indices = total_cones[:, 0] # Now add all the cones to the slam matrix for cone in total_cones: l_idx = int(cone[0]) cone_pos = cone[1:3] self.z.append(self.neqns) self.neqns += 2 self.A[self.z[-1], self.l[l_idx]] = self.z_weight self.A[self.z[-1] + 1, self.l[l_idx] + 1] = self.z_weight self.A[self.z[-1], self.x[-1]] = -self.z_weight self.A[self.z[-1] + 1, self.x[-1] + 1] = -self.z_weight self.b[self.z[-1]] = cone_pos[0] * self.z_weight self.b[self.z[-1] + 1] = cone_pos[1] * self.z_weight return np.array(cones_indices)
[docs] def solve_graph(self) -> None: """solve graph. does not return results. """ # converting to csr here is much more efficient than using csr to begin with # because lil is much more efficient to construct # and csr is much better for math # and the conversion is pretty fast since they're both row-based A = self.A.tocsr()[:self.neqns, :self.nvars] b = self.b[:self.neqns] # ok options to solve the system # 1. sp.sparse.linalg.lsqr(A, b) takes about 4000 us # 2. adding x0 given by xhat and lhat reduces that to 3000 us # 3. just A.T@A \ A.T@b with sp.sparse.linalg.spsolve is only 820 us # Adding `permc_spec="NATURAL"` reduced solve time from 820 to 630 us. not entirely # sure what that does; best guess is something about col/row permutations, which we # shouldn't really do since our matrix is already fairly nice # use_umfpack=False is necessary because for some reason it doesn't work with ufmpack. likely scipy bug #* also turns out ufmpack breaks local_path for some reason? it has trouble linking to MA57 in codegen after installing #* libsuitesparse-dev, a dependency of scikit-ufmpack # soln = sp.sparse.linalg.lsqr(A, b)[0] # x0 = np.zeros(self.nvars) # for idx, val in zip(self.x+self.l, np.vstack((self.xhat, self.lhat))): # x0[idx:idx+2] = val # soln = sp.sparse.linalg.lsqr(A, b, x0=x0)[0] soln = sp.sparse.linalg.spsolve(A.T@A, A.T@b, permc_spec="NATURAL", use_umfpack=False) for idx, i in enumerate(self.x): self.xhat[idx, :] = soln[i:i+2] for idx, i in enumerate(self.l): self.lhat[idx, :] = soln[i:i+2]
# above is MUCH better than these lines because it avoids reallocating # self.xhat = np.array([soln[i:i+2] for i in self.x]) # self.lhat = np.array([soln[i:i+2] for i in self.l])
[docs] def get_cones(self, color=None, indices=None) -> np.ndarray: mask1 = np.ones(len(self.lhat), dtype=bool) mask2 = np.ones(len(self.lhat), dtype=bool) if indices is not None: mask1 = np.zeros(len(self.lhat), dtype=bool) mask1[indices] = True if color is not None: mask2 = self.color == color return self.lhat[mask1 & mask2]
[docs] def get_positions(self) -> np.ndarray: return self.xhat
[docs] def get_colors(self, indices=None): mask1 = np.ones(len(self.lhat), dtype=bool) if indices is not None: mask1 = np.zeroes(len(self.lhat, dtype=bool)) mask1[indices] = True return self.color[mask1]
[docs] def check_resize(self, len_message): cols = self.nvars + 2 + len_message*2 > self.maxcols rows = self.neqns + 2 + len_message*2 > self.maxrows if rows or cols: if cols: self.maxcols = int(self.maxcols*1.5) if rows: self.maxrows = int(self.maxrows*1.5) self.b = np.append(self.b, np.zeros(self.maxrows-len(self.b))) self.A.resize((self.maxrows, self.maxcols))
[docs] def update_position(self, dx): # first add two equations and two variables # for the next position and the dx self.x.append(self.nvars) self.d.append(self.neqns) self.nvars += 2 self.neqns += 2 #localization update equations for new car position #d[-1], d[-1]+1 are last two rows in A, representing x & y equations for the latest measurement #x[-1], x[-1] +1 are the last two columns in A, representing x& y variables for car at the # last timestep, but why aren't the variables for car position just the first two in A self.A[self.d[-1], self.x[-1]] = 1 * self.dx_weight self.A[self.d[-1]+1, self.x[-1]+1] = 1 * self.dx_weight self.A[self.d[-1], self.x[-2]] = -1 * self.dx_weight self.A[self.d[-1]+1, self.x[-2]+1] = -1 * self.dx_weight self.b[self.d[-1]] = dx[0]*self.dx_weight self.b[self.d[-1]+1] = dx[1]*self.dx_weight # now add the guess for this position to xhat self.xhat = np.append(self.xhat, self.xhat[-1:, :] + dx, axis=0)