# MPC ROS Node
# General Imports
import numpy as np
from all_settings.all_settings import MPCSettings as settings
from .mpc_controller import MPCPathFollower
from time import perf_counter, sleep
# ROS Imports
import rclpy
from rclpy.node import Node
from std_msgs.msg import Float64, Bool
from sensor_msgs.msg import PointCloud
from geometry_msgs.msg import Point32, PoseArray, Pose, Point, Quaternion
from feb_msgs.msg import State, FebPath, Map
from ackermann_msgs.msg import AckermannDriveStamped
from eufs_msgs.msg import WheelSpeeds, WheelSpeedsStamped
import scipy as sp
[docs]def makepose(x):
p = Pose()
p.position.x = x[0]
p.position.y = x[1]
p.position.z = 0*x[3] # velocity
p.orientation.w = np.cos(x[2]/2)
p.orientation.x = 0.0
p.orientation.y = 0.0
p.orientation.z = np.sin(x[2]/2)
return p
[docs]class MPC(Node):
def __init__(self):
super().__init__('mpc_node')
# Subscribers
self.curr_steer_sub = self.create_subscription(WheelSpeedsStamped, '/ground_truth/wheel_speeds', self.steer_callback, 1)
self.curr_acc_sub = self.create_subscription(Float64, '/odometry/wss', self.acc_callback, 1)
self.global_path_sub = self.create_subscription(FebPath, '/path/global', self.global_path_callback, 1)
self.local_path_sub = self.create_subscription(FebPath, '/path/local', self.local_path_callback, 1)
self.state_sub = self.create_subscription(State, '/slam/state', self.state_callback, 1)
# Publishers
self.throttle_pub = self.create_publisher(Float64, '/control/throttle', 1)
self.steer_pub = self.create_publisher(Float64, '/control/steer', 1)
self.control_pub = self.create_publisher(AckermannDriveStamped, 'cmd', 1)
self.viz_pub = self.create_publisher(PoseArray, '/mpc/viz', 1)
print('set ros subscribers and publishers')
self.mpc = MPCPathFollower(**settings)
self.mpc.construct_solver(use_c=True) # load compiled solver
print("inited mpc")
self.race_done = False
self.path = None
self.global_path = None
self.local_path = None
self.prev_soln = np.array([0.0, 0.0])
self.DT = settings.DT/10.0
self.curr_state = np.zeros(5)
self.create_timer(self.DT, self.run_control)
self.create_subscription(Bool, '/end_race', self.end_race_callback, 1)
[docs] def end_race_callback(self, msg):
self.race_done = True
msg = AckermannDriveStamped()
msg.header.stamp = self.get_clock().now().to_msg()
msg.drive.acceleration = -100.0
msg.drive.steering_angle = self.curr_state[4]
# with open("sim_data.txt", "a") as f:
# print("------------------------------------------------", file=f)
# print("FROM MPC:", file=f)
# print("Sending Acceleration Of:", msg.drive.acceleration, file=f)
# print("Sending Steering Of:", msg.drive.steering_angle, file=f)
# print("-------------------------------------------------", file=f)
self.control_pub.publish(msg)
throttle_msg = Float64()
steer_msg = Float64()
throttle_msg.data = -100.0
steer_msg.data = self.curr_state[4]
for _ in range(10):
self.throttle_pub.publish(throttle_msg)
self.steer_pub.publish(steer_msg)
self.control_pub.publish(msg)
sleep(0.1)
rclpy.shutdown()
[docs] def steer_callback(self, msg: WheelSpeedsStamped):
'''
Return Steering Angle from steering angle sensor on car
'''
self.curr_steer = float(msg.speeds.steering)
[docs] def acc_callback(self, msg: Float64):
'''
Set curr_acc to value recieved from
'''
self.curr_acc = float(msg.data)
[docs] def global_path_callback(self, msg: FebPath):
'''
Input: msg.PathState -> List of State (from State.msg) vectors
Returns: List of numpy state vectors (len 4: x, y, velocity, heading)
Description: Takes in a local or global path depending on what
lap we're in and converts to np.array of state vectors
'''
x = np.array(msg.x)
y = np.array(msg.y)
v = np.array(msg.v)
psi = np.array(msg.psi)
th = np.array(msg.th)
a = np.array(msg.a)
thdot = np.array(msg.thdot)
path = np.column_stack((x, y, psi, v, th, a, thdot))
self.global_path = path
self.path = self.global_path if self.global_path is not None else self.local_path
# self.update_term_cost_mats()
[docs] def update_term_cost_mats(self):
self.Ps = []
for waypt in self.path:
try:
self.Ps.append(sp.linalg.solve_continuous_are(
a = np.array(self.mpc.A(waypt[0:4], waypt[4:6])),
b = np.array(self.mpc.B(waypt[0:4], waypt[4:6])),
q = self.mpc.Q,
r = self.mpc.R,
)/self.mpc.DT)
except np.linalg.LinAlgError:
self.Ps.append(None) # use default of driving fwd at 10m/s
toc = perf_counter()
print(f"solved care {len(self.path)} times in {toc-tic:.3f}s (avg {(toc-tic)/len(self.path):.5f}s each)")
[docs] def local_path_callback(self, msg: FebPath):
'''
Input: msg.PathState -> List of State (from State.msg) vectors
Returns: List of numpy state vectors (len 4: x, y, velocity, heading)
Description: Takes in a local or global path depending on what
lap we're in and converts to np.array of state vectors
'''
x = np.array(msg.x)
y = np.array(msg.y)
psi = np.array(msg.psi)
v = np.array(msg.v)
th = np.array(msg.th)
a = np.array(msg.a)
thdot = np.array(msg.thdot)
path = np.column_stack((x, y, psi, v, th, a, thdot))
self.local_path = path
self.path = self.global_path if self.global_path is not None else self.local_path
# self.update_term_cost_mats()
[docs] def state_callback(self, carstate: State):
'''
Input: Float64[4]
Description: Converts State msg to numpy vector,
updates variables initalized in __init__ (e.g. self.z_ref, self.z_curr, etc.) with get_update_dict and self.update,
and solves mpc problem -> stores result in self.prev_soln
'''
# returns the current state as an np array with these values in this order: x,y,velocity,heading
#curr_state = msg.carstate
self.curr_state[0] = carstate.x # x value
self.curr_state[1] = carstate.y # y value
self.curr_state[2] = carstate.heading
self.curr_state[3] = carstate.velocity
# self.curr_state[4] = carstate.theta
[docs] def run_control(self):
if self.race_done: return
curr_state = self.curr_state
# print(self.DT)
pt = np.array(curr_state[0:2])
# self.curr_state[4] += self.DT*self.prev_soln[1]
if self.path is not None:
idx = np.argmin(np.linalg.norm(self.path[:, :2] - pt, axis=1))
idxs = (np.arange(idx, idx+10*self.mpc.N, 10))
if self.global_path is not None:
xidxs = ((idxs+10)%len(self.path)).tolist()
uidxs = ((idxs)%len(self.path)).tolist()
else:
xidxs = np.clip((idxs+10), 0, len(self.path)-1).astype(int).tolist()
uidxs = np.clip((idxs), 0, len(self.path)-1).astype(int).tolist()
# trajectory = self.path[idxs]
x_traj = self.path[xidxs, 0:5]
u_traj = self.path[uidxs, 5:7]
trajectory = np.hstack([x_traj, u_traj]).T
self.prev_soln = self.mpc.solve(np.array(self.curr_state), self.prev_soln, trajectory).flatten()
if not settings.PUBLISH:
# print("skipping!")
return
# else: print("publishing!")
# print(self.prev_soln)
# print('drive message:', self.prev_soln['u_control'][0])
# print('steering message:', self.prev_soln['u_control'][1])
# Publishing controls
msg = AckermannDriveStamped()
msg.header.stamp = self.get_clock().now().to_msg()
msg.drive.acceleration = self.prev_soln[0]
self.curr_state[4] += self.DT*self.prev_soln[1]
msg.drive.steering_angle = self.curr_state[4]
msg.drive.steering_angle_velocity = self.prev_soln[1]# + self.DT*self.prev_soln[1]
# with open("sim_data.txt", "a") as f:
# print("------------------------------------------------", file=f)
# print("FROM MPC:", file=f)
# print("Sending Acceleration Of:", msg.drive.acceleration, file=f)
# print("Sending Steering Of:", msg.drive.steering_angle, file=f)
# print("-------------------------------------------------", file=f)
self.control_pub.publish(msg)
throttle_msg = Float64()
steer_msg = Float64()
throttle_msg.data = self.prev_soln[0]
steer_msg.data = self.curr_state[4]
self.throttle_pub.publish(throttle_msg)
self.steer_pub.publish(steer_msg)
msg = PoseArray()
msg.poses = []
msg.poses += [makepose(x) for x in self.mpc.soln[0:4, :].T]
msg.poses += [makepose(x) for x in trajectory[0:4, :].T]
# print(trajectory)
# print(self.mpc.soln)
msg.header.frame_id = "map"
msg.header.stamp = self.get_clock().now().to_msg()
self.viz_pub.publish(msg)
[docs]def main(args=None):
rclpy.init(args=args)
mpc_node = MPC()
rclpy.spin(mpc_node)
rclpy.shutdown()
if __name__ == "__main__":
main()