Source code for mpc.mpc_controller

# XY Nonlinear Kinematic MPC Module.

# Imports
import time
import casadi as ca
import numpy as np
import scipy as sp
import control as ct
import os

[docs]class MPCPathFollower: def __init__(self, N, DT, L_F, L_R, V_MIN, V_MAX, A_MIN, A_MAX, A_DOT_MIN, A_DOT_MAX, DF_MIN, DF_MAX, DF_DOT_MIN, DF_DOT_MAX, Q, R, RUNTIME_FREQUENCY, ivpsolver, **kwargs): self.USE_WARMSTARTING = True self.__dict__.update(kwargs) for key in list(locals()): if key == 'self': pass elif key in 'QRP': setattr(self, key, ca.diag(locals()[key])) else: setattr(self, key, locals()[key]) self.P_default = self.P ### initialize optimization variables # each column is a timestep containing [states, controls] # There are 6 states because we have augmented the state to # include the previous timestep's controls # this is necessary to implement jerk limits. # x = [x, y, psi, v, acc_prev, theta_prev] # u = [acc, theta] self.q = ca.SX.sym('q', 7, self.N+1) self.x = self.q[0:5, :] self.u = self.q[5:7, :] ### Initialize parameters # this is the target trajectory in the (non-augmented) state and controls # plus the initial state in the first column self.p = ca.SX.sym('p', 7, self.N+1) self.x0 = self.p[0:5, 0:1] self.xbar = self.p[0:5, 1:self.N+1] # target states self.ubar = self.p[5:7, 0:self.N] # target controls (applied one timestep before target states) ##### q (opt vars) if n=3 # | t = 0 | t = 1 | t = 2 | t = 3 | # |---------------|-------------------|-------------------|-------------------| # | initial x (x0)| predicted x 1 | predicted x 2 | predicted x 3 | (5 rows) # | u(0) | u(1) | u(2) | (unused) | (2 rows) ##### p (parameters) if n=3 # | t = 0 | t = 1 | t = 2 | t = 3 | # |---------------|-------------------|-------------------|-------------------| # | initial x (x0)| xbar(1) | xbar(2) | xbar(3) | (5 rows) # | ubar(0) | ubar(1) | ubar(2) | (unused) | (2 rows) # Additional parameter: terminal cost matrix # technically doesn't need to be 16 params since it's symmetric but issok # doesn't really impact performance self.P = ca.SX.sym('P', 5, 5) # dictionary to hold warmstart keyword arguments. # empty now; will be updated when we run the solver. self.warmstart = dict() # get dynamics # returns: # - F: discrete dynamics with augmented state (evolves ([x(k), u(k-1)], u(k)) to [x(k+1), u(k)]) # - f: continuous dynamics with normal state (gives dx/dt from x and u) # - A: jacobian of f wrt x # - B: jacobian of f wrt u self.F, self.f, self.A, self.B = self.make_dynamics(**self.ivpsolver) # hack for angle wrapping self.fix_angle = ca.Function('fix_angle', [x:=ca.MX.sym("x", 5)], [ca.horzcat(x[0, :], x[1, :], 2*ca.pi*ca.sin(x[2, :]/2), x[3, :], x[4, :])]) #### compute default terminal cost that assumes we're traveling forward at 10 m/s # first compute linearized system and check controllability # A = np.array(self.A([0, 0, 0, 10], [0, 0])) # B = np.array(self.B([0, 0, 0, 10], [0, 0])) # assert 3.9<np.linalg.matrix_rank(ct.ctrb(A, B))<4.1 # it's an integer (mathematically at least) # # then solve the CARE to get a lyapunov terminal cost # self.default_P = sp.linalg.solve_continuous_are( # a = A, # b = B, # q = self.Q, # r = self.R, # )/self.DT self.default_P = np.diag([5., 5., 50., 0.3, 0.3]) # formulate dynamics constraint using map, which helps the expression graph be more compact dynamics_constr = self.x[:, 1:] - self.F.map(self.N)(self.x[:, :-1], self.u[:, :-1]) # formulate differential of u, which we can use to constrain rate of change. self.g = [] self.lbg = [] self.ubg = [] self.equality = [] def constrain(expr, lb, ub): assert expr.shape==lb.shape, f"lower bound must have same shape as expression, but got {lb.shape} and {expr.shape}" assert expr.shape==ub.shape, f"upper bound must have same shape as expression, but got {ub.shape} and {expr.shape}" self.g.append(ca.vec(expr)) self.lbg.append(ca.vec(lb)) self.ubg.append(ca.vec(ub)) # we must keep track of which constraints are equalities bc fatrop needs this info self.equality.append(lb==ub) # now we actually add all the constraints! # we must do this all in a big loop so that each stage's constraints are grouped together. # the order of the constraints here is important; we must match the structure FATROP expects. cost = 0 for stage in range(self.N+1): if stage < self.N: # dynamics (gap-closing constraints) constrain(dynamics_constr[:, stage], ca.DM([0.0]*5), ca.DM([0.0]*5)) # drive jerk limit # constrain(du[0:1, stage]*(self.RUNTIME_FREQUENCY if stage==0 else 1/self.DT), ca.DM([self.A_DOT_MIN]), ca.DM([self.A_DOT_MAX])) # steering velocity limit # constrain(du[1:2, stage]*(self.RUNTIME_FREQUENCY if stage==0 else 1/self.DT), ca.DM([self.DF_DOT_MIN]), ca.DM([self.DF_DOT_MAX])) # control bounds constrain(self.u[0:1, stage], ca.DM([self.A_MIN]), ca.DM([self.A_MAX])) constrain(self.u[1:2, stage], ca.DM([self.DF_DOT_MIN]), ca.DM([self.DF_DOT_MAX])) if stage==0: # initial states constrain(self.x[0:5, 0]-self.x0, ca.DM([0.0]*5), ca.DM([0.0]*5)) # constrain(self.x[5:7, 0]-self.u_prev, ca.DM([0.0]*2), ca.DM([0.0]*2)) else: constrain(self.x[3, stage], ca.DM([self.V_MIN]), ca.DM([self.V_MAX])) constrain(self.x[4, stage], ca.DM([self.DF_MIN]), ca.DM([self.DF_MAX])) if stage<self.N: # control cost cost += ca.bilin(self.R, self.u[:, stage] - self.ubar[:, stage]) if 0<stage<self.N: cost += ca.bilin(self.Q, self.fix_angle(self.x[:, stage] - self.xbar[:, stage-1]).T) # terminal state cost cost += ca.bilin(self.P, self.fix_angle(self.x[:, self.N]-self.xbar[:, self.N-1])) # collect everything into vectors in the format for `nlpsol` self.nlp = { 'x': ca.vec(self.q), 'f': cost, 'g': ca.vertcat(*self.g), 'p': ca.vertcat(ca.vec(self.p), ca.vec(self.P)), } # setup options # we update the options dict from settings with the equality list # which tells the solver which constraints are equality vs inequality constraints if self.nlpsolver.name=='fatrop': self.nlpsolver.opts['equality'] = np.array(ca.vertcat(*self.equality)).flatten().astype(bool).tolist() self.options = dict(**(self.nlpsolver.opts)) # now construct the solver and the constraints! self.solver = ca.nlpsol('solver', self.nlpsolver.name, self.nlp, self.options) self.lbg = ca.vertcat(*self.lbg) self.ubg = ca.vertcat(*self.ubg)
[docs] def construct_solver(self, generate_c=False, compile_c=False, use_c=False, gcc_opt_flag='-Ofast'): """handles codegeneration and loading. Args: generate_c (bool, optional): whether or not to generate new C code. Defaults to False. compile_c (bool, optional): whether or not to look for and compile the C code. Defaults to False. use_c (bool, optional): whether or not to load the compiled C code. Defaults to False. gcc_opt_flag (str, optional): optimization flags to pass to GCC. can be -O1, -O2, -O3, or -Ofast depending on how long you're willing to wait. Defaults to '-Ofast'. """ if generate_c: self.solver.generate_dependencies('mpc.c') os.system(f"mv mpc.c {os.path.dirname(__file__)}/mpc.c") if compile_c: os.system(f'gcc -fPIC {gcc_opt_flag} -shared {os.path.dirname(__file__)}/mpc.c -o {os.path.dirname(__file__)}/mpc.so') if use_c: self.solver = ca.nlpsol('solver', self.nlpsolver.name, f'{os.path.dirname(__file__)}/mpc.so', self.options)
[docs] def solve(self, x0, u_prev, trajectory, P=None): """crunch the numbers; solve the problem. Args: x0: length 4 vector of the current car state. u_prev: length 2 vector of the previous control command trajectory: shape (6, N) trajectory of target states and controls. P (optional): terminal cost matrix. If not passed, use default from 10 m/s fwd driving. Returns: array of shape (2, 1): control result. [[acc], [theta]] """ if P is None: P = self.default_P p = ca.blockcat([[ca.DM(x0.reshape((5, 1))), trajectory[0:5, :]], [trajectory[5:7, :], ca.DM.zeros(2, 1)]]) P = ca.DM(P) p = ca.vertcat(ca.vec(p), ca.vec(P)) res = self.solver(p=p, lbg=self.lbg, ubg=self.ubg, **self.warmstart) if self.USE_WARMSTARTING: self.warmstart = { 'x0': res['x'], 'lam_x0': res['lam_x'], 'lam_g0': res['lam_g'], } self.soln = np.array(ca.reshape(res['x'], self.q.shape)) # (8, self.N+1) return self.soln[5:7, 0:1]
[docs] def make_dynamics(self, n, method='rk4'): """construct functions for the dynamics of the car. uses bicycle model. Args: n (int): number of steps the integrator should take. method (str, optional): integration method to use. one of ('midpoint', 'rk4'). Defaults to 'rk4' for a 4th order explicit runge-kutta method Returns: (F, f, A, B): discrete dynamics w/ augmented state, continuous dynamics, jac(f, x), jac(f, u). All casadi.Function objects. """ # state: [ x, y, theta, v] (theta is heading) # control: [a, phi] (fwd acceleration, steering angle) x0 = ca.SX.sym('q0', 5) u0 = ca.SX.sym('u0', 2) beta = ca.arctan(self.L_R/(self.L_F + self.L_R) * ca.tan(x0[4])) # calculate dx/dt x1 = ca.vertcat(x0[3] * ca.cos(x0[2] + beta), # dxPos/dt = v*cos(theta+beta) x0[3] * ca.sin(x0[2] + beta), # dyPos/dt = v*sin(theta+beta) x0[3] / self.L_R * ca.sin(beta), # dtheta/dt = v/l_r*sin(beta) u0[0], # dv/dt = a u0[1] ) f = ca.Function('f', [x0, u0], [x1]) A = ca.Function('A', [x0, u0], [ca.jacobian(x1, x0)]) B = ca.Function('A', [x0, u0], [ca.jacobian(x1, u0)]) if method == 'midpoint': x = x0 for _ in range(n): xm = x + f(x, u0)*(self.DT/(2*n)) x += f(xm, u0)*(self.DT/n) elif method == 'rk4': x = x0 h = self.DT/n for _ in range(n): k1 = h*f(x, u0) k2 = h*f(x+h*k1/2, u0) k3 = h*f(x+h*k2/2, u0) k4 = h*f(x+h*k3, u0) x += (k1 + 2*k2 + 2*k3 + k4)/6 return ca.Function('F', [x0, u0], [x]), f, A, B