from casadi import *
import numpy as np
from time import perf_counter
from .dynamics import discrete_custom_integrator
from all_settings.all_settings import MPCSettings as mpc_settings
import os
# hsl checking fuckery
# only works on mac/linux. if you havee windows, I'm willing to bet you have bigger problems.
# hsl_avail = False
# paths = ":".join([(linuxpath if (linuxpath:=os.environ.get('LD_LIBRARY_PATH')) is not None else ""),
# (macospath if (macospath:=os.environ.get('DYLD_LIBRARY_PATH')) is not None else "")])
# for folder in paths.split(":"):
# if len(folder)>1 and np.any(['libhsl' in j for j in os.listdir(folder)]):
# hsl_avail = True
# break
# assert hsl_avail, "You must have HSL linear solvers installed on your system, but they were not found (or you have windows). If you're on windows, comment out this line."
## MPC class variable that'll keep info abt the car + help us solve optimzation problems!
[docs]class CompiledLocalOpt:
DEFAULT_SOPTS = {
'ipopt': {
'ipopt.linear_solver': 'ma57',
'expand': True,
'honor_original_bounds': True,
'acceptable_tol': 1e-2, # large so we're mostly relying on the obj_change_tol
'acceptable_obj_change_tol': 1e-3, # change in seconds between iterations
# 'ipopt.print_level': 0,
# 'ipopt.sb': 'yes',
# 'ipopt.ma57_automatic_scaling': 'no',
# 'print_time': 0,
},
'worhp': { # https://casadi.sourceforge.net/v1.9.0/api/html/dc/ddf/classCasADi_1_1WorhpSolver.html
'expand': True,
'worhp.NLPprint': -1,
},
'snopt': {
'expand': False,
# 'start': "warm"
},
}
def __init__(self, N, nlp_solver='ipopt', solver_opts=None, car_params={'l_r': 1.4987, 'l_f':1.5213, 'm': 1.}, bbox={'l': 2, 'w': 1}, **constraints):
"""initializes a CompiledLocalOpt object.
Args:
N (int): number of points to expect in the path.
nlp_solver (str, optional): which NLP solver plugin to use. can be 'ipopt', 'worhp', 'snopt', or whatever you have installed. Defaults to 'ipopt'.
solver_opts (dict, optional): options to pass to NLP solver. if None, use default options. Defaults to None.
car_params (dict, optional): parameters for dynamics model (currently bicycle model). Defaults to {'l_r': 1.4987, 'l_f':1.5213, 'm': 1.}.
bbox (dict, optional): length and width of car (assumes car is roughly rectangular). Defaults to {'l': 2, 'w': 1}.
DF_MAX (float, optional): maximum steering angle (radians). Defaults to 0.5.
ACC_MIN (float, optional): maximum braking acceleration (m/s^2, should be <0). Defaults to -3.0.
ACC_MAX_FN (float or casadi.Function, optional): function of velocity (m/s) which gives maximum forward acceleration (m/s^2) at that velocity, or constant max. Should be torque at that velocity divided by mass of car (ie, no aero consideration). Defaults to 2.0.
V_MIN (float, optional): minimum velocity. Defaults to 0.0.
V_MAX (float, optional): maximum velocity. Defaults to 25.0.
FRIC_MAX (float or casadi.Function, optional): function of velocity (m/s) which gives maximum frictional acceleration (m/s^2) at that velocity, or constant max. Defaults to 12.0.
"""
self.N = N
self.car_params = car_params
self.bbox = bbox
self.nlp_solver = nlp_solver
self.sopts = solver_opts if solver_opts else CompiledLocalOpt.DEFAULT_SOPTS[nlp_solver]
self.linear_solver = 'MA97' if self.nlp_solver == 'worhp' else self.sopts['ipopt.linear_solver'] if 'ipopt.linear_solver' in self.sopts else 'MUMPS'
####* constraints ####
self.DF_MAX = 0.5
self.ACC_MIN = -3.0
self.ACC_MAX_FN = 2.0
self.DF_DOT_MAX = 0.5
self.V_MIN = 0.0
self.V_MAX = 25.0
self.FRIC_MAX = 12.0
# update with user provided constraints
self.__dict__.update(constraints)
# if not Function objects already, make the acceleration limiting functions
v = SX.sym('v')
if not isinstance(self.FRIC_MAX, Function):
self.FRIC_MAX = Function('FRIC_MAX', [v], [self.FRIC_MAX])
if not isinstance(self.ACC_MAX_FN, Function):
self.ACC_MAX_FN = Function('ACC_MAX_FN', [v], [self.ACC_MAX_FN])
####* utility functions ####
# easy way to account for angle wrapping
x = MX.sym('x', 5)
# self.fix_angle = Function('fix_angle', [x], [horzcat(x[0, :], x[1, :], sin(2*x[2, :]), x[3, :])])
self.fix_angle = Function('fix_angle', [x], [horzcat(x[0, :], x[1, :], sin(x[2, :]/2), x[3, :], x[4, :])])
# generate 2x2 rotation matrices
psi = MX.sym('psi')
self.rot = Function('rot', [psi], [reshape(horzcat(cos(psi), sin(psi), -sin(psi), cos(psi)), 2, 2)])
###########################################*
####* Symbolic Variable Initialization ####*
###########################################*
#* Track parameters
self.bound_pairs = MX.sym('bound_pairs', self.N, 4)
self.curr_state = MX.sym('curr_state', 1, 5)
# self.curr_state = currAndCones[0, :]
# self.bound_pairs = currAndCones[1:, :]
#* opt variables
self.x = MX.sym('x', self.N, 7)
self.t = self.x[:, 0]
self.psi = self.x[:, 1]
self.v = self.x[:, 2]
self.theta = self.x[:, 3]
self.u = self.x[:, 4:6]
self.dt = self.x[:, 6:7]
# self.sl_dyn = self.x[:, 7:11]
#* this represents the full state at each discretization point
self.z = horzcat(
self.bound_pairs[:, :2]*(1-self.t)+self.bound_pairs[:, 2:4]*self.t, # LERP between pairs of points on track bounds
self.psi,
self.v,
self.theta,
)
#* this is the same thing but shifted by one index. Allows us to take diffs.
self.z_i = vertcat(
self.z[self.N-1:, :],
self.z[:self.N-1, :]
)
######################*
####* Constraints ####*
######################*
#* get discrete dynamics function
self.dynamics = discrete_custom_integrator(n=3, **car_params)
#* these will store our constraint expressions (g) and bounds (lbg, ubg)
self.g = []
self.lbg = []
self.ubg = []
#* table to keep track of indices of each constraint
self.gtable = dict()
self.glen = 0
#* Dynamics constraints: # Math: F(z_i, u_i, \Delta t_i) == z_{i+1}
dynamics = self.fix_angle.map(self.N-1)(
self.dynamics.map(self.N-1, 'thread', 19)(
self.z[:-1, :].T, self.u[:-1, :].T, self.dt[:-1, :].T
) - self.z[1:, :].T
)
# for i in range(-1, self.N-1):
# self._add_constraint(
# f'dynamics{i}',
# g = vec(self.fix_angle(
# self.dynamics(self.z[i, :], self.u[i, :], self.dt[i, :]).T-self.z[i+1, :])),# + self.sl_dyn[i, :]),
# lbg = DM([0.0]*5),
# ubg = DM([0.0]*5)
# )
self._add_constraint(
'dynamics',
vec(dynamics),
vec(DM.zeros(dynamics.shape)),
vec(DM.zeros(dynamics.shape))
)
# #* Dynamics constraints: # Math: F(z_i, u_i, \Delta t_i) == z_{i+1}
# for i in range(0, self.N-1):
# # print(self.z[i, :]);
# self._add_constraint(
# f'dynamics{i}',
# g = vec(self.fix_angle(
# self.dynamics(self.z[i, :], self.u[i, :], self.dt[i, :]).T-self.z[i+1, :])),# + self.sl_dyn[i, :]),
# lbg = DM([0.0]*5),
# ubg = DM([0.0]*5)
# )
#* other constraints. all follow the same pattern.
self._add_constraint(
'vel',
g = vec(self.v),
lbg = DM([self.V_MIN]*self.N),
ubg = DM([self.V_MAX]*self.N)
)
self._add_constraint(
'acc_min',
g = vec(self.u[:, 0]),
lbg = DM([self.ACC_MIN]*self.N),
ubg = DM([inf]*self.N)
)
self._add_constraint(
'acc_max',
g = vec(self.u[:, 0]-self.ACC_MAX_FN(self.v)),
lbg = DM([-inf]*self.N),
ubg = DM([0.0]*self.N)
)
self._add_constraint(
'steering',
g = vec(self.theta),
lbg = DM([-self.DF_MAX]*self.N),
ubg = DM([self.DF_MAX]*self.N)
)
self._add_constraint(
'dt',
g = vec(self.dt),
lbg = DM([0.0]*self.N),
ubg = DM([2.0]*self.N)
)
self._add_constraint(
'df_dot',
g = vec(self.u[:, 1]),
lbg = DM([-self.DF_DOT_MAX]*(self.N)),
ubg = DM([self.DF_DOT_MAX]*(self.N))
)
self._add_constraint(
't',
g = vec(self.t),
lbg = DM([0.45]*self.N),
ubg = DM([0.55]*self.N)
)
self._add_constraint(
't0',
g = self.t[0],
lbg = DM([0.5]),
ubg = DM([0.5]),
)
self._add_constraint(
'tf',
g = self.t[-1],
lbg = DM([0.5]),
ubg = DM([0.5]),
)
# Keeps initial heading and velocity unchangeable
self._add_constraint(
'curr_velocity',
g = self.curr_state[3] - self.v[0],
lbg = DM(-0.2),
ubg = DM(0.2)
)
self._add_constraint(
'final velocity',
g = self.v[-1],
lbg=DM([0.5]),
ubg=DM([1.0]),
)
self._add_constraint(
'final control',
g = self.u[-1,:].T,
lbg=DM([0.0, 0.0]),
ubg=DM([0.0, 0.0]),
)
self._add_constraint(
'curr_heading',
g = self.curr_state[2] - self.psi[0],
lbg = DM(-pi/16),
ubg = DM(pi/16)
)
self._add_constraint(
'curr_steer',
g = self.curr_state[4] - self.theta[0],
lbg = DM(0.0),
ubg = DM(0.0)
)
# Makes it so that the time to run is at minimum 2 (soft constraint, scalar included)
# self.scalar = MX.sym("scalar")
# self._add_constraint(
# 'min_time',
# g = sum1(self.dt) + self.scalar,
# lbg = DM(0.5),
# ubg = DM(float('inf'))
# )
# centripetal acceleration: # Math: \frac{v^2}{r}=\frac{v^2}{\frac{v}{\dot{\theta}}}=\frac{v^2\dot{\theta}}{v}=v\dot{\theta}
ac = (self.v**2 / self.car_params['l_r']) * sin(arctan(self.car_params['l_r']/(self.car_params['l_f'] + self.car_params['l_r']) * tan(self.theta)))
self._add_constraint(
'centripetal_acc',
g = ac**2 + self.u[:, 0]**2-self.FRIC_MAX(self.v)**2,
lbg = DM([-inf]*self.N),
ubg = DM([0.0]*self.N)
)
#* Cone Constraints: stop the car from hitting any cones
# TODO: allow the cone locations to be different from the track side points
#* construct a function which is close enough to a rectangle
self.safe = Function('safespace', [x:=MX.sym('x', 2)], [(DM([1/self.bbox['w'], 1/self.bbox['l']])**6).T@x**6])
self.cones = vertcat(self.bound_pairs[:, :2], self.bound_pairs[:, 2:4]).T
left = self.bound_pairs[:, :2].T
right = self.bound_pairs[:, 2:4].T
self.nc = 5 #* number of cones to consider (ahead of and behind the current cone, on each side)
considered = horzcat(left[:, 1:-1], right[:, 1:-1])
for i in range(self.N):
pass
# self._add_constraint(
# f'cones{i}',
# g=self.safe(self.rot(-self.psi[i])@((considered-self.z[i, :2].T) + vertcat(self.car_params['l_f']-self.car_params['l_r'], 0))).T,
# lbg=DM([1.0]*(self.N-2)*2),
# ubg=DM([inf]*(self.N-2)*2)
# )
#* YAY we're done adding constraints. Now concatenate all the constraints into one big vector.
#* note that while vertcat is typically slower than horzcat, since these are all vectors, it's the exact same memory operation
print([i.shape for i in self.g])
self.g = vertcat(*self.g)
self.lbg = vertcat(*self.lbg)
self.ubg = vertcat(*self.ubg)
#* COST FUNCITON: # Math: \sum_{i=1}^N\left[ \Delta t_i + 10^5\cdot\sum(\text{sl}_\text{dyn})^2\right]
#* slack vars aren't really neccessary for dynamics but I'm too lazy to remove them. Really they should be on the cone constraints but it's ok.
# adding the scalar portion will penalize having to use the scalar (i.e. giving too short a path)
self.f = sum1(self.dt)# + 1e12*sumsqr(self.sl_dyn)**2 + 1e2*(self.scalar)**2
#* construct the NLP dictionary to be passed into casadi.
# make sure you add the self.scalar to x so the nlp can modify it!
self.nlp = {
'x': vertcat(vec(self.x)),# self.scalar),
'f': self.f,
'g': self.g,
'p': vertcat(vec(self.curr_state), vec(self.bound_pairs)),
}
self.warmstart = None
def _add_constraint(self, name, g, lbg, ubg):
"""utility funciton to add a constraint and keep track of all the indices
Args:
name (str): name for this constraint (human-readable)
g (casadi symbolic expression): expression to be constrained
lbg (float): lower bound for g
ubg (float): upper bound for g
"""
self.gtable[name] = (self.glen, self.glen+g.shape[0])
self.glen += g.shape[0]
self.g.append(g)
self.lbg.append(lbg)
self.ubg.append(ubg)
[docs] def construct_solver(self, generate_c=False, compile_c=False, use_c=False, gcc_opt_flag='-Ofast'):
"""creates a solver object
Args:
generate_c (bool, optional): whether or not to generate new C code. Defaults to False.
compile_c (bool, optional): whether or not to look for and compile the C code. Defaults to False.
use_c (bool, optional): whether or not to load the compiled C code. Defaults to False.
gcc_opt_flag (str, optional): optimization flags to pass to GCC. can be -O1, -O2, -O3, or -Ofast depending on how long you're willing to wait. Defaults to '-Ofast'.
"""
self.solver = nlpsol('solver', self.nlp_solver, self.nlp, self.sopts)
if generate_c:
self.solver.generate_dependencies('local_opt.c')
os.system(f"mv local_opt.c {os.path.dirname(__file__)}/local_opt.c")
if compile_c:
os.system(f'gcc -fPIC {gcc_opt_flag} -shared {os.path.dirname(__file__)}/local_opt.c -o {os.path.dirname(__file__)}/local_opt.so')
# os.system(f'mv local_opt.so MPC/bin/local_opt.so') #TODO: fix this path if necessary
if use_c:
new_opts = self.sopts
new_opts['expand']=False
self.solver = nlpsol('solver', self.nlp_solver, f'{os.path.dirname(__file__)}/local_opt.so', new_opts)
[docs] def load_solver(self):
"""alternative to construct_solver if you're just loading a saved solver.
"""
new_opts = self.sopts
new_opts['expand']=False
self.solver = nlpsol('solver', self.nlp_solver, 'local_opt.so', new_opts)
[docs] def angle(self, a, b):
cosine = (a@b)/(np.linalg.norm(a)*np.linalg.norm(b))
cosine = 1 if cosine > 1 else -1 if cosine < -1 else cosine
return np.arccos(cosine)*np.sign(np.linalg.det([a, b]))
[docs] def to_constant_tgrid(self, dt, z, u, t):
"""converts a solver result to a constant-dt representation. z, u, and t can be passed in with **res (from solve())
Args:
dt (float): desired resultant time discretization interval
z (np.ndarray): states from solver
u (np.ndarray): controls from solver
t (np.ndarray): timestamps from solver
Returns:
(np.ndarray, np.ndarray): arrays of states, controls
"""
print(f'LOCAL PATH INPUTS: dt={dt}, \n\tz={z}, \n\tu={u}, \n\tt={t}')
cur = 0.
states = []
controls = []
while cur<t[-1]:
idx = int(np.sum(t<=cur)-1)
extra = cur-t[idx]
controls.append(u[idx])
states.append(np.array(self.dynamics(z[idx], u[idx], extra)).flatten())
if len(states)>1 and abs(states[-1][2] - states[-2][2])>1.5*pi:
states[-1][2] = states[-1][-2] + (states[-1][2]-states[-2][2])%(2*pi)
cur += dt
# now pad it until it's 1.5x whats needed, for safety
while len(states)*dt < mpc_settings.N*mpc_settings.DT*2:
controls.append(u[-2])
controls[-1][0]=0.0
states.append(np.array(self.dynamics(states[-1].tolist(), u[-2], dt)).flatten())
return np.array(states), np.array(controls)
[docs] def solve(self, left, right, curr_state, err_ok=True):
"""crunch the numbers.
Args:
left (np.ndarray): location of left cones. shape (N, 2).
right (np.ndarray): location of right cones. shape (N, 2).
curr_state (np.ndarray): current state of the car. shape (x, y, phi, v).
Returns:
dict: result of solve. keys 'z' (states), 'u' (controls), 't' (timestamps)
"""
# print(repr(left))
# print(repr(right))
# print(repr(curr_state))
center = (left+right)/2
diffs = np.diff(center, axis=0)
diffs = np.concatenate([[[1, 0]], diffs], axis=0)
angles = [self.angle(diffs[0], diffs[i]) for i in range(1, len(diffs))]
angles.append(angles[-1])
angles = np.array(angles)
# angles = np.cumsum(angles)
# left = center
# right = center
# print(f"angles shape: {angles.shape}")
self.x0 = self.warmstart if self.warmstart is not None else vec(vertcat(
DM([0.5]*self.N), # t
DM(angles), # psi
DM([min(curr_state[3], 0.1)]*self.N), # v
DM([curr_state[4]]*self.N), # th
DM([0.0]*self.N), # a
DM([0.0]*self.N), # thdot
DM([0.1]*self.N), # dt
# DM([0.0]*self.N*4),
# DM([0.0]) # scalar
))
# print(self.x0, self.x0.shape())
# print("leo is mean!!!!!")
# print(DM(curr_state).shape, horzcat(DM(left), DM(right)).shape)
# print(dict(
# x0=self.x0,
# lbg=self.lbg,
# ubg=self.ubg,
# p=vertcat(DM(curr_state).T, horzcat(DM(left), DM(right))),
# ))
self.soln = self.solver(
x0=self.x0,
lbg=self.lbg,
ubg=self.ubg,
p=vertcat(vec(DM(curr_state).T), vec(horzcat(DM(left), DM(right)))),
)
# self.warmstart = self.soln['x']
print("SOLVE FINISHED")
print(self.solver.stats())
if self.solver.stats()['return_status'] == 'Invalid_Number_Detected':
print('cone ordering funny! overlapping bound pairs.')
raise AssertionError('cone ordering failed!')
if err_ok and not self.solver.stats()['success']:
raise AssertionError("Solver failed to converge")
print("SOLVE OK")
self.soln['x'] = np.array(reshape(self.soln['x'], (self.N, 7)))
# print("LOCAL OPT OUTPUT")
# print(self.soln)
self.soln['xy'] = (left.T*(1-self.soln['x'][:, 0])+right.T*self.soln['x'][:, 0]).T
# print(self.soln['x'][:, 5])
res=dict()
res['z'] = np.hstack([
self.soln['xy'],
self.soln['x'][:, 1:4],
])
# print('silly:',res['z'].shape)
res['u'] = self.soln['x'][:, 4:6]
res['t'] = np.concatenate([[0.], np.cumsum(self.soln['x'][:, 6])[:-1]])
## if the solved solution doesn't fill all 2 seconds
### NAIVE (pad "straight" controls until 2 seconds are filled)
# if res['t'][-1] < 2.0:
# step_size = res['t'][1] - res['t'][0]
# for i in range(res['t'][-1], 2.0 + step_size, step_size):
# ###### TO DO ######
# # res['z'] = np.append(res[''])
# # res['u'] = np.append(res['u'], )
# res['t'] = np.append(res['t'], i)
### IMPROVED (stretch optimized controls across the 2 seconds)
# added constraint that the solution must be at least 2 seconds long
return res