From 34a02fd64e269b2bf7c8b529e40ca858e6263adf Mon Sep 17 00:00:00 2001 From: au665799 <202007669@post.au.dk> Date: Thu, 5 Sep 2024 09:28:30 +0200 Subject: [PATCH 1/4] newest under development --- .../CenterlineDrivingSuper.py | 334 ++++++++-- .../race_algorithm_pkg/race_algorithm_node.py | 584 ++++++++++++++++++ 2 files changed, 872 insertions(+), 46 deletions(-) diff --git a/src/race_algorithm_pkg/race_algorithm_pkg/CenterlineDrivingSuper.py b/src/race_algorithm_pkg/race_algorithm_pkg/CenterlineDrivingSuper.py index 2abdcd2..7bab58c 100644 --- a/src/race_algorithm_pkg/race_algorithm_pkg/CenterlineDrivingSuper.py +++ b/src/race_algorithm_pkg/race_algorithm_pkg/CenterlineDrivingSuper.py @@ -14,6 +14,10 @@ from tf_transformations import euler_from_quaternion from visualization_msgs.msg import Marker +import matplotlib.pyplot as plt +from geometry_msgs.msg import PoseWithCovarianceStamped +from geometry_msgs.msg import Quaternion +from tf_transformations import quaternion_from_euler class LidarDataProcessor(Node): """ LidarDataProcessor node that uses linear regression to find the centerline of the track.""" @@ -43,6 +47,7 @@ class LidarDataProcessor(Node): Marker, '/waypoint_vis', 1) + # 0.1 is the default value for the timer self.timer = self.create_timer(0.1, self.control_car) # Higher frequency for faster response time, but gives a higher CPU usage @@ -69,16 +74,29 @@ class LidarDataProcessor(Node): # State of the car self.state = "straight" + self.offset = 0 + self.track_width = 0 + + # Find the closest obstacle index in the lidar readings + self.find_closest_obstacle_index = -1 # None + self.car_width = 0.0001#0.2032 # meters + self.steering_gain = 0.5 + self.curve = 0 + self.sign = None + self.shift_constant = (0,0) + self.shift_constants_transformed = None + self.consecutive_cat = 1 + self.prev_turn_type = None def lidar_callback(self, data): self.lidar_ranges = np.array(data.ranges) + #self.find_closest_obstacle_index = self.if_find_closest_obstacle(self.lidar_ranges) ## Nyt - def lidar_processing(self): ranges = self.lidar_ranges # indexes in the range [180;900] are used - center_ranges = ranges[540 + int(self.fov_start): 540 + int(self.fov_end)] # Originale + center_ranges = ranges[540 + int(self.fov_start): 540 + int(self.fov_end)] # Originale, nyt -70 og 70 #center_ranges = ranges[200:880] # Degree 90 to 270 #center_ranges = ranges[220:740] # Degree 120 to 240 (trash) num_readings = len(center_ranges) @@ -119,14 +137,17 @@ class LidarDataProcessor(Node): return # Generate points along the fitted polynomial curve before max_index (Left) - x_fit_before = np.linspace(min(x_coords[:max_index-5]), max(x_coords[:max_index-5]), 100) + x_fit_before = np.linspace(min(x_coords[:max_index-5]), max(x_coords[:max_index-5]), 100) # y_fit_before = np.polyval(coeffs_before, x_fit_before) # Generate points along the fitted polynomial curve after max_index (Right)s - x_fit_after = np.linspace(min(x_coords[max_index+5:]), max(x_coords[max_index+5:]), 100) + x_fit_after = np.linspace(min(x_coords[max_index+5:]), max(x_coords[max_index+5:]), 200) #100 originalt y_fit_after = np.polyval(coeffs_after, x_fit_after) - + # Width of the track + #self.track_width = math.sqrt((x_fit_after[0]-x_fit_before[0])**2+(y_fit_after[0]-y_fit_before[0])**2) + + # Calculate the cumulative distance along the curve delta_x = np.diff(x_fit_after) delta_y = np.diff(y_fit_after) @@ -135,7 +156,7 @@ class LidarDataProcessor(Node): s = np.insert(s, 0, 0) # Start from 0 # Interpolate evenly spaced s values - s_new = np.linspace(0, s[-1], 100) # 10 evenly spaced points along the length of the curve + s_new = np.linspace(0, s[-1], 200) # 10 evenly spaced points along the length of the curve #100 originalt x_fit_evenly_spaced_after = np.interp(s_new, s, x_fit_after) y_fit_evenly_spaced_after = np.interp(s_new, s, y_fit_after) @@ -146,14 +167,13 @@ class LidarDataProcessor(Node): s = np.insert(s, 0, 0) # Start from 0 # Interpolate evenly spaced s values - s_new = np.linspace(0, s[-1], 100) # 10 evenly spaced points along the length of the curve + s_new = np.linspace(0, s[-1], 200) # 10 evenly spaced points along the length of the curve #100 originalt x_fit_evenly_spaced_before = np.interp(s_new, s, x_fit_before) y_fit_evenly_spaced_before = np.interp(s_new, s, y_fit_before) # Calculate derivative (slope) at each point slope = np.gradient(y_fit_evenly_spaced_after, x_fit_evenly_spaced_after) - @@ -186,18 +206,106 @@ class LidarDataProcessor(Node): centerline_y = (intersection_y + y_fit_evenly_spaced_after[i])/2 if(intersection_x.any()): - centerline_points.append((centerline_x[0], centerline_y[0])) + centerline_points.append((centerline_x[0], centerline_y[0])) self.target_centerline_points = centerline_points - + + #self.get_logger(f"self.target_center_points: {self.target_centerline_points}") + + def remove_outliers(self,cross_product, threshold=3): + mean = np.mean(cross_product) + std_dev = np.std(cross_product) + filtered_cross_product = [x for x in cross_product if abs((x - mean) / std_dev) < threshold] + return filtered_cross_product + + def cross_product_2d_new(self,ref1,ref2, point, margin=1e-2): # 1e-2 is 0.01 + refv = (ref2[0]-ref1[0], ref2[1] - ref1[1]) + pointv = (point[0] - ref1[0], point[1] - ref1[1]) + # Cross product of two vectors + #self.get_logger().info(f"refv: {refv}, pointv: {pointv}") + cross_product = refv[0] * pointv[1] - refv[1] * pointv[0] + #self.get_logger().info(f"cross_product: {cross_product}, Type: {type(cross_product)}") + if abs(cross_product) < margin: + return 0 + return cross_product + + def normalize_vector(self,vector): + magnitude = np.linalg.norm(vector) + if magnitude == 0: + return vector + return vector / magnitude + + def calculate_angle(self,A, B): + dot_product = np.dot(A, B) + angle = np.arccos(dot_product) + return angle + + def determine_direction(self,ref_coord, t_coord, t_coord2): + A = np.array([t_coord[0] - ref_coord[0], t_coord[1] - ref_coord[1]]) + B = np.array([t_coord2[0] - ref_coord[0], t_coord2[1] - ref_coord[1]]) + A_normalized = self.normalize_vector(A) + B_normalized = self.normalize_vector(B) + angle = self.calculate_angle(A_normalized, B_normalized) + + # Determine direction based on the angle + if np.isclose(angle, 0, atol=np.radians(2)): # Small threshold to account for floating-point precision # 5, 2.5, 2 + return 0 # Straight + else: + cross_product = np.cross(A_normalized, B_normalized) + return 1 if cross_product > 0 else -1 # Left or Right <- 1 is left and -1 is right SKIFT HER HVIS DET ER FORKERT def calculate_centerline(self): self.x_car_at_calc = self.x_car self.y_car_at_calc = self.y_car self.lidar_processing() points_map = self.transform_to_map_frame() + + # Shift x_values in points_map based on a constant + # Get direction sign, RIGHT is 1 and LEFT is -1 + if self.sign == None: + self.sign = 0 + #self.get_logger().info(f"len(points_map): {len(points_map)}") + if len(points_map) > 9: #35 + # Reference point (first point in points_map) + cross_products = [self.cross_product_2d_new((self.x_car_at_calc,self.y_car_at_calc), points_map[-1], point) for point in points_map[:-1]] + #self.get_logger().info(f"cross_product_2d_new: {cross_products}") + + # Determine the position of each point relative to refvector + signs = [1 if cp > 0 else -1 if cp < 0 else 0 for cp in cross_products] + # Determine direction + signs = [self.determine_direction((self.x_car_at_calc,self.y_car_at_calc),points_map[-1], point) for point in points_map[:-1]] + + #count_ones = sum(1 for sign in signs if sign == 1) + count_ones = signs.count(1) + count_neg_ones = signs.count(-1) + count_zeros = signs.count(0) + #self.get_logger().info(f"count_ones: {count_ones}, count_neg_ones: {count_neg_ones}, count_zeros: {count_zeros}") + #self.get_logger().info(f"signs: {signs}") + #self.get_logger().info(f"len(signs): {len(signs)}") + if count_zeros > count_neg_ones and count_zeros > count_ones: + self.sign = 0 + self.state = "straight" + self.get_logger().info(f"STRAIGHT") + elif count_ones > count_neg_ones: + self.sign = 1 #-1 + self.state = "right" + self.get_logger().info(f"RIGHT") + else: #count_neg_ones > count_ones: + self.sign = -1 #1 + self.state = "left" + self.get_logger().info(f"LEFT") + #plot_vectors(self,reference_point1,reference_point2, points) + # If previous turn type is None set it to the current turn type + if self.prev_turn_type == None: + self.prev_turn_type = self.state + # Increase the consecutive_cat if the previous turn type is the same as the current turn type + if self.prev_turn_type == self.state: #and self.consecutive_cat < 50: + self.consecutive_cat += 1 + else: + self.consecutive_cat = 1 # Reset + marker_msg = Marker() marker_msg.header.frame_id = "/base_link" # /map marker_msg.header.stamp = self.get_clock().now().to_msg() @@ -221,18 +329,50 @@ class LidarDataProcessor(Node): y_min, y_max = min(y_coords), max(y_coords) new_min, new_max = -1, 1 # Desired range for normalization -1 + #self.find_closest_obstacle_index = self.if_find_closest_obstacle(self.lidar_ranges) ## Nyt + for point_map in points_map: - normalized_x = (point_map[0] - x_min) / (x_max - x_min) * (new_max - new_min) + new_min - normalized_y = (point_map[1] - y_min) / (y_max - y_min) * (new_max - new_min) + new_min + #normalized_x = (point_map[0] - x_min) / (x_max - x_min) * (new_max - new_min) + new_min + #normalized_y = (point_map[1] - y_min) / (y_max - y_min) * (new_max - new_min) + new_min p = Point() + #self.get_logger().info(f"Point: {point_map}") p.x = point_map[0] # normalized_x p.y = point_map[1] # normalized_y p.z = 0. + #offset_x, offset_y = self.calculate_offset(self.find_closest_obstacle_index,self.x_car,self.y_car) self.target_centerline.append((point_map[0],point_map[1])) + #if offset_x == 0 and offset_y == 0: + # self.target_centerline.append((point_map[0],point_map[1])) + #else: + # self.get_logger().info(f"Offset x: {offset_x}, Offset y: {offset_y}") + # self.target_centerline.append((offset_x,offset_y))# .append((point_map[0],point_map[1])) marker_msg.points.append(p) self.marker_pub.publish(marker_msg) + def shift_points_map(self, points_map, shift_distance=1.0): + if len(points_map) < 2: + return points_map + + # Calculate the reference vector from the first to the last point + ref_vector = np.array(points_map[-1]) - np.array(points_map[0]) + ref_vector = ref_vector / np.linalg.norm(ref_vector) # Normalize the vector + + # Calculate the perpendicular vector + perp_vector = np.array([-ref_vector[1], ref_vector[0]]) + + # Determine the shift direction based on self.sign + if self.sign == -1: # Right turn + shift_vector = shift_distance * perp_vector + elif self.sign == 1: # Left turn + shift_vector = -shift_distance * perp_vector + else: # Straight + shift_vector = np.array([0, 0]) + + # Apply the shift to each point in points_map + shifted_points_map = [tuple(np.array(point) + shift_vector) for point in points_map] + + return shifted_points_map def odometry_callback(self, msg): # Extract position @@ -251,62 +391,93 @@ class LidarDataProcessor(Node): (roll, pitch, self.theta_car) = euler_from_quaternion(orientation_quaternion) #self.control_car() + # Calculate the distance between the car and the target point + def distance_to_next_waypoint(self): + if self.target_centerline == [] or self.x_car is None: + return float('inf') # Return infinity if waypoints or odometry data is not available + + return np.sqrt(self.dx**2 + self.dy**2) # Euclidean distance - def distance_to_waypoint(self, waypoint_index): + def distance_to_waypoint(self, point): + if self.x_car is None or self.y_car is None: + return float('inf') + return np.sqrt((point[0] - self.x_car)**2 + (point[1] - self.y_car)**2) + + def ecludean_distance(self,point1, point2): + return math.sqrt((point1[0] - point2[0])**2 + (point1[1] - point2[1])**2) + # Calculate self.dy and self.dx for the target point + def calculate_relative_coordinates(self, waypoint_index): if self.target_centerline == [] or self.x_car is None: return float('inf') # Return infinity if waypoints or odometry data is not available + # Calculate the relative coordinate between the target point and the car position + center_line_shifted_list = self.target_centerline + car_xy = (self.x_car_at_calc,self.y_car_at_calc) + reference_point = car_xy #points_map[-1] + min_distance = min(self.ecludean_distance(reference_point, point) for point in self.target_centerline) + distances = [self.ecludean_distance(reference_point, point) for point in self.target_centerline] + #self.get_logger().info(f"distances: {distances}") + gain = 0.05 * self.consecutive_cat # 0.05 + self.get_logger().info(f"self.consecutive_cat: {self.consecutive_cat}") + constants = [(distance) * gain for distance in distances] if min_distance != 0 else [0] * len(distances) + shift_constants = [constant * self.sign for constant in constants] + shift_constants_coord_form = [(0.0,constant) for constant in shift_constants] + self.shift_constants_transformed = shift_constants_coord_form - - self.target_pose_x = self.target_centerline[waypoint_index][0] - self.target_pose_y = self.target_centerline[waypoint_index][1] + # Kan curven også bruges? + #points_map_shifted = [(point[0]+self.shift_constant[0], point[1]+self.shift_constant[1]) for point in points_map] + #points_map_shifted = [points_map + shift_constants_transformed if shift_constants_transformed is not None else points_map for points_map, shift_constants_transformed in zip(points_map, shift_constants_transformed)] + if self.shift_constants_transformed is not None: + center_line_shifted_list = [(point[0] + point_shift[0], point[1]+point_shift[1]) for point, point_shift in zip(self.target_centerline, self.shift_constants_transformed)] - if self.state == "straight" and len(self.target_centerline) >= 3: + + + self.target_pose_x = center_line_shifted_list[waypoint_index][0] + self.target_pose_y = center_line_shifted_list[waypoint_index][1] + + if len(self.target_centerline) >= 3: #self.state == "straight" + #self.get_logger().info(f"Target 3---------------------") # Cacluting the average coordinat between thre target points. # Using the three first target points or the first, middle, and the last. nofpe = len(self.target_centerline) # number_of_points_elements - self.target_pose_x = (self.target_centerline[0][0] + self.target_centerline[nofpe//2][0] + self.target_centerline[-1][0]) / 3 - self.target_pose_y = (self.target_centerline[0][1] + self.target_centerline[nofpe//2][1] + self.target_centerline[-1][1]) / 3 + self.target_pose_x = (center_line_shifted_list[0][0] + center_line_shifted_list[nofpe//2][0] + center_line_shifted_list[-1][0]) / 3 + self.target_pose_y = (center_line_shifted_list[0][1] + center_line_shifted_list[nofpe//2][1] + center_line_shifted_list[-1][1]) / 3 + #self.target_pose_x = self.target_centerline[nofpe//2][0] + #self.target_pose_y = self.target_centerline[nofpe//2][1] + #self.target_pose_x = (self.target_centerline[0][0] + self.target_centerline[1][0] + self.target_centerline[2][0]) / 3 #self.target_pose_y = (self.target_centerline[0][1] + self.target_centerline[1][1] + self.target_centerline[2][1]) / 3 # Calculate the relative coordinate between the average target point and the car position self.dx = self.target_pose_x - self.x_car self.dy = self.target_pose_y - self.y_car - elif self.state == "turning": - # Calculate the relative coordinate between three target points and the car position - self.dx = self.target_pose_x - self.x_car - self.dy = self.target_pose_y - self.y_car else: + #self.get_logger().info(f"Target 1---------------------") # Calculate the relative coordinate between three target points and the car position self.dx = self.target_pose_x - self.x_car self.dy = self.target_pose_y - self.y_car + # original code #self.dx = self.target_pose_x - self.x_car #+ self.x_car_at_calc #self.dy = self.target_pose_y - self.y_car #+ self.y_car_at_calc - return np.sqrt(self.dx**2 + self.dy**2) - - def control_car(self): #self.get_logger().info("Control car") self.calculate_centerline() + #self.get_logger().info(f"Centerline target points: {self.target_centerline_points}") if self.target_centerline_points == [] or self.x_car is None: self.calculate_centerline() #self.get_logger().info("No target centerline points") return # Exit if waypoints or odometry data is not available - distance_to_next_waypoint = self.distance_to_waypoint(0) - #self.get_logger().info(f"Distance to next waypoint: {distance_to_next_waypoint}") + distance_to_next_waypoint = self.distance_to_next_waypoint() + self.calculate_relative_coordinates(0) # 0 is the index of the target point in centerline target array, self.dy and self.dx is calculated angle_rad = math.atan2(self.dy, self.dx) angle_difference = (angle_rad - self.theta_car + math.pi) % (2*math.pi) - math.pi - #print(np.rad2deg(angle_rad), np.rad2deg(self.theta_car), distance_to_next_waypoint) - #speed = None speed = self.speed_control() #self.get_logger().info(f"Angle difference: {angle_difference}, Speed: {speed}") - self.publish_drive_command(angle_difference * 0.5, speed) #1.0, 0.5, 3.0 - - + self.publish_drive_command(angle_difference * self.steering_gain, speed) #1.0, 0.5, 3.0 + def publish_drive_command(self, steering_angle, speed): # Publish the drive command drive_msg = AckermannDriveStamped() @@ -332,6 +503,38 @@ class LidarDataProcessor(Node): return points_map + def transform_to_map_frame_ref(self, point): + + + T = np.array([[np.cos(self.theta_car), -np.sin(self.theta_car), self.x_car_at_calc], + [np.sin(self.theta_car), np.cos(self.theta_car),self.y_car_at_calc], + [0, 0, 1]]) + + + # Transform points to map frame + point_homogeneous = np.array([[point[0]], [point[1]], [1]]) #Assuming front of the car is along x-axis + transformed_point = np.dot(T, point_homogeneous) + point_transformed = (transformed_point[0][0], transformed_point[1][0]) + + return point_transformed + + def find_point_for_origin(self): + T = np.array([[np.cos(self.theta_car), -np.sin(self.theta_car), self.x_car_at_calc], + [np.sin(self.theta_car), np.cos(self.theta_car), self.y_car_at_calc], + [0, 0, 1]]) + + # Invert the transformation matrix + T_inv = np.linalg.inv(T) + + # Homogeneous coordinates of the origin (0, 0) + origin_homogeneous = np.array([[0], [0], [1]]) + + # Find the point that transforms to (0, 0) + point_homogeneous = np.dot(T_inv, origin_homogeneous) + point = (point_homogeneous[0][0], point_homogeneous[1][0]) + + return point + def calculate_curvature(self,points): # Assuming points is a list of (x, y) tuples if len(points) < 3: @@ -356,30 +559,69 @@ class LidarDataProcessor(Node): # A larger triangle area does not necessarily mean smaller curvature, and vice versa # Curvature is not directly proportional to the area of the triangle formed by three points on a curve # curvature formula is K = 4A / (abc) and called menger curvature + + if a == 0 or b == 0 or c == 0: + return 0.0 + area = np.sqrt(s * (s - a) * (s - b) * (s - c)) K = 0 if s == 0 else 4 * area / (a * b * c) - + + min_steering_gain = 0.5 # Minimum steering gain + max_steering_gain = 10.0 # Maximum steering gain + #self.steering_gain = min(max(K*100.0, min_steering_gain), max_steering_gain) + #self.steering_gain = 0.5 + #self.get_logger().info(f"Steering_gain: {self.steering_gain}, Curvature: {K}") + self.curve = K return K - def categorize_path(self,centerline_points): - curvature_threshold = 1.0 # Define your threshold here 0.01 + def calculate_curvature_new(self, points): + # Assuming points is a list of (x, y) tuples + if len(points) < 3: + return 0 # Not enough points to calculate curvature + + # Extract x and y coordinates from points + x = np.array([p[0] for p in points]) + y = np.array([p[1] for p in points]) + + # Fit a second-degree polynomial (quadratic) to the points + degree = 2 + coefficients = np.polyfit(x, y, 2) + polynomial = np.poly1d(coefficients) + + # Calculate the first and second derivatives of the polynomial + first_derivative = np.polyder(polynomial, 1) + second_derivative = np.polyder(polynomial, 2) + + # Calculate the curvature at the midpoint of the points + midpoint_x = x[len(x) // 2] + dx = first_derivative(midpoint_x) + ddx = second_derivative(midpoint_x) + curvature = np.abs(ddx) / (1 + dx**2)**(3/2) + + return curvature + + def categorize_path(self, centerline_points): + curvature_threshold = 1.0 # Initial threshold value (1.0) curvature = self.calculate_curvature(centerline_points) - self.get_logger().info(f"Curvature: {curvature}") - if curvature < curvature_threshold: - #self.get_logger().info("straight") - self.state = "straight" + #self.get_logger().info(f"Curvature: {curvature}") + + if curvature < curvature_threshold or curvature == 0: + self.state = "straight" # "straight" + self.offset = 0 else: - self.get_logger().info("###########turning") self.state = "turning" + self.offset = self.track_width / 2 + + #self.get_logger().info(f"State: {self.state}") # Function that adjust the speed of the car based on the distance to the next waypoints # Takes the average of the middle 75 points of the lidar scan and adjust the speed based on the average distance def speed_control(self): target_speed = None - self.categorize_path(self.target_centerline_points) + #self.categorize_path(self.target_centerline_points) if self.state == "straight": - target_speed = 4.5 # 4.5, 10.5 (har ikke nået top hastighed med 10.5!!) - elif self.state == "turning": + target_speed = 10.5 # 4.5, 10.5 (har ikke nået top hastighed med 10.5!!) + elif self.state == "turning" or self.state == "left" or self.state == "right": target_speed = 3.5 # 0.5 1.5 middle_index = int(len(self.lidar_ranges) / 2) middle_size = 75 @@ -389,7 +631,7 @@ class LidarDataProcessor(Node): min_distance = 0.0 speed = None - self.get_logger().info(f"Mean distance: {mean_distance}") + #self.get_logger().info(f"Mean distance: {mean_distance}") # speed proportional to the distance interval [min_distance, max_distance] and goal_speed if mean_distance > max_distance: speed = target_speed @@ -398,7 +640,7 @@ class LidarDataProcessor(Node): # speed = (max_speed - min_speed) * (average_distance - min_distance) / (max_distance - min_distance) + min_speed else: speed = 0.0 - self.get_logger().info(f"Speed: {speed}-----------------------------") + #self.get_logger().info(f"Speed: {speed}-----------------------------") return speed def main(args=None): diff --git a/src/race_algorithm_pkg/race_algorithm_pkg/race_algorithm_node.py b/src/race_algorithm_pkg/race_algorithm_pkg/race_algorithm_node.py index e69de29..9875a2e 100644 --- a/src/race_algorithm_pkg/race_algorithm_pkg/race_algorithm_node.py +++ b/src/race_algorithm_pkg/race_algorithm_pkg/race_algorithm_node.py @@ -0,0 +1,584 @@ +import math +import warnings +import numpy as np + + +import rclpy +from rclpy.node import Node +from sensor_msgs.msg import LaserScan +from nav_msgs.msg import Path +from geometry_msgs.msg import PoseStamped, Point +from nav_msgs.msg import Odometry +from ackermann_msgs.msg import AckermannDriveStamped +from tf_transformations import euler_from_quaternion +from visualization_msgs.msg import Marker + + +class RaceAlgorithm(Node): + """ RaceAlgorithm class implements a reactive algorithm to ...""" + + def __init__(self): + """ Initialize the RaceAlgorithm node.""" + super().__init__('RaceAlgorithm') + + # Scan parameters + self.angle_min = None + self.angle_max = None + self.angle_increment = None + self.range_min = None + self.range_max = None + self.num_beams = None + + # Odom parameters + self.x = 0 + self.y = 0 + self.theta = 0 # in radians, also known as theta + self.x_car_at_calc = 0 + self.y_car_at_calc = 0 + + # Control parameters + self.target_centerline_points = [] + self.track_width = 0.0 + self.steering_angle = 0.0 + self.steering_gain = 0.5 + self.speed = 0.0 + self.dy = 0 + self.dx = 0 + self.target_pose_x = 0 + self.target_pose_y = 0 + self.target_centerline = [] + self.state = "straight" + self.curve = 0.0 + self.fov = 180 * 4 # 720 + self.fov_start = -self.fov/2 # -360 + self.fov_end = self.fov/2 # 360 + self.lidar_ranges = np.array([]) + + self.lidar_scan_topic = '/scan' + self.drive_topic = '/drive' + self.odom_topic = '/odom' + self.marker_topic = '/waypoint_vis' + + self.subscription = self.create_subscription( + LaserScan, + self.lidar_scan_topic, + self.scan_callback, + 10) + + self.subscription = self.create_subscription( + Odometry, + self.odom_topic, + self.odom_callback, + 10) + + self.publisher_drive = self.create_publisher( + AckermannDriveStamped, + self.drive_topic, + 1) + + self.publisher_marker = self.create_publisher( + Marker, + self.marker_topic, + 1) + + self.timer = self.create_timer(0.1, self.control_car) + + + def scan_callback(self, msg): + """Callback function for the LIDAR scan.""" + self.lidar_ranges = np.array(msg.ranges) + self.angle_min = msg.angle_min + self.angle_max = msg.angle_max + self.angle_increment = msg.angle_increment + self.num_beams = len(msg.ranges) + + + def odom_callback(self,msg): + """Callback function for the odometry.""" + self.x = msg.pose.pose.position.x + self.y = msg.pose.pose.position.y + orientation = msg.pose.pose.orientation + orientation_list = [orientation.x, orientation.y, orientation.z, orientation.w] + roll, pitch, self.theta = euler_from_quaternion(orientation_list) + + def drive_command(self): + """Publish the drive command.""" + drive_msg = AckermannDriveStamped() + drive_msg.drive.speed = self.speed + drive_msg.drive.steering_angle = self.steering_angle + self.publisher_drive.publish(drive_msg) + + def control_car(self): + self.control_steering() + self.get_logger().info(f"Centerline target points: {self.target_centerline_points}") + if self.target_centerline_points == [] or self.x is None: + self.control_steering() + #self.get_logger().info("No target centerline points") + return # Exit if waypoints or odometry data is not available + self.set_steering_angle() + self.control_speed() + self.drive_command() + + def set_steering_angle(self): + """Set the steering angle of the car.""" + # Calculate the steering angle, index indicates which point it drives towards + self.calculate_relative_coordinates(0) + angle_rad = math.atan2(self.dy, self.dx) + angle_difference = (angle_rad - self.theta + math.pi) % (2*math.pi) - math.pi + + self.steering_angle = angle_difference * self.steering_gain # 0.5 is a gain + + def control_speed(self): + if self.lidar_ranges.size == 0: + # self.speed = 0.0 + return + target_speed = 4.5 + middle_index = int(len(self.lidar_ranges) / 2) + middle_size = 75 + middle_slice = self.lidar_ranges[middle_index - middle_size : middle_index + middle_size + 1] + mean_distance = np.mean(middle_slice) + max_distance = 2.0 + min_distance = 0.0 + + self.categorize_path() + + #self.get_logger().info(f"Mean distance: {mean_distance}") + # speed proportional to the distance interval [min_distance, max_distance] + if mean_distance > max_distance: + self.speed = target_speed + elif min_distance < mean_distance < max_distance: + self.speed = target_speed * (mean_distance - min_distance) / (max_distance - min_distance) + # speed = (max_speed - min_speed) * (average_distance - min_distance) / (max_distance - min_distance) + min_speed + else: + self.speed = 0.0 + + def control_steering(self): + """Control the steering of the car.""" + points = [] + + self.find_path() + # Transform the points to the map frame because the marker is in the map frame + points_map = self.transform_path_to_map_frame() + + # Shift x_values in points_map based on a constant + shift_constant = 0.5 # Adjust this value as needed + #points_map_shifted = [(point[0] + shift_constant, point[1]) for point in points_map] + + # Shift x_values in points based on a curve de får en swung! + #points_map_shifted = [(point[0] + 0.5 * point[1]**2, point[1]) for point in points_map] + + # Shift x_values in points based on a self.curve, so the points map will shift based on points_map + #points_map_shifted = [(point[0] + 0.5 * self.curve * point[1]**2, point[1]) for point in points_map] + + # Add the points to the marker + for point_map in points_map: + p = Point() + p.x = point_map[0] + p.y = point_map[1] + p.z = 0.0 + points.append(p) + self.target_centerline.append((point_map[0], point_map[1])) + + self.publish_marker(points) + #curv = self.calculate_curvature_1(self.target_centerline) + + def publish_marker(self, points): + """Publish the marker for the path.""" + marker = Marker() + marker.header.frame_id = "/base_link" + marker.header.stamp = self.get_clock().now().to_msg() + marker.type = Marker.LINE_STRIP # LINE_STRIP or POINTS + marker.action = Marker.ADD + #marker.pose.orientation.w = 1.0 + marker.scale.x = 0.1 + #marker.scale.y = 0.1 + marker.color.a = 1.0 + marker.color.r = 0.0 + marker.color.g = 1.0 + marker.color.b = 0.0 + marker.points = points + + self.publisher_marker.publish(marker) + + def find_path_old(self): + """Find the path to follow.""" + self.x_car_at_calc = self.x + self.y_car_at_calc = self.y + ranges = self.lidar_ranges + # Which ranges should be followed... + if ranges is None: + return + # Angles for each reading + angles = np.linspace(self.angle_min, self.angle_max, self.num_beams) + + # Converting angles to radians + angles = np.array([math.radians(angle) for angle in angles]) + + # Find the index of the longest range + idx = np.argmax(ranges) + + # Convert polar coordinates to cartesian coordinates + x = ranges * np.cos(angles) + y = ranges * np.sin(angles) + + # Polynomial regression for left and right walls, uses the idx as the center + degree = 3 + required_points = 5 # Least number of points to fit the polynomial + + #print(f"Index of max range (idx): {idx}") + #print(f"Required points: {required_points}") + #print(f"Total number of points: {len(x)}") + + if idx - required_points < 0: + warnings.warn("Not enough points to fit a polynomial") + return + if idx + required_points > len(x): + warnings.warn("Not enough points to fit a polynomial") + return + + # Ensure the slices are not empty + left_x = x[:idx-required_points] + left_y = y[:idx-required_points] + right_x = x[idx+required_points:] + right_y = y[idx+required_points:] + + #print(f"Length of left_x: {len(left_x)}") + #print(f"Length of left_y: {len(left_y)}") + #print(f"Length of right_x: {len(right_x)}") + #print(f"Length of right_y: {len(right_y)}") + + if len(left_x) == 0 or len(left_y) == 0: + warnings.warn("Left wall points are empty") + return + if len(right_x) == 0 or len(right_y) == 0: + warnings.warn("Right wall points are empty") + return + + # MIGHT WANT TO FILTER OUT THE POINTS THAT ARE TOO FAR AWAY + # Least squares polynomial fit + try: + left_wall_coeffs = np.polyfit(left_x, left_y, degree) + except np.RankWarning: + warnings.warn("Rank warning, cannot fit the polynomial") + return + try: + right_wall_coeffs = np.polyfit(right_x, right_y, degree) + except np.RankWarning: + warnings.warn("Rank warning, cannot fit the polynomial") + return + + # Find coordinates of the polynomials + x_left = np.linspace(min(x[:idx-required_points]), max(x[:idx-required_points]), 100) + y_left = np.polyval(left_wall_coeffs, x_left) + + x_right = np.linspace(min(x[idx+required_points:]), max(x[idx+required_points:]), 100) + y_right = np.polyval(right_wall_coeffs, x_right) + + # Find width of the track using L2-norm for each point + track_widths = [] + for i in range(len(x_left)): + track_widths.append(np.linalg.norm([x_left[i] - x_right[i], y_left[i] - y_right[i]])) + self.track_width = np.min(track_widths) + + #track_width = np.linalg.norm([x_left[-1] - x_right[0], y_left[-1] - y_right[0]]) + #track_width_from_start = np.linalg.norm([x_left[0] - x_right[-1], y_left[0] - y_right[-1]]) + #track_width_2 = np.linalg.norm([x_left[0] - x_right[0], y_left[0] - y_right[0]]) + #track_width_min = np.linalg.norm([x_left[0] - x_right[0], y_left[0] - y_right[0]]) + + # The cumulative distance along the right curve + delta_x = np.diff(x_right) + delta_y = np.diff(y_right) + delta_s = np.sqrt(delta_x**2 + delta_y**2) + + s = np.cumsum(delta_s) + s = np.insert(s, 0, 0) + + # Interpolate to get evenly spaced points + t = np.linspace(0, s[-1], 100) + x_interp_right = np.interp(t, s, x_right) + y_interp_right = np.interp(t, s, y_right) + + # The cumulative distance along the left curve + delta_x = np.diff(x_left) + delta_y = np.diff(y_left) + delta_s = np.sqrt(delta_x**2 + delta_y**2) + + s = np.cumsum(delta_s) + s = np.insert(s, 0, 0) + + # Interpolate to get evenly spaced points + t = np.linspace(0, s[-1], 100) + x_interp_left = np.interp(t, s, x_left) + y_interp_left = np.interp(t, s, y_left) + + # Derivative of the left curve to get the tangent slope + slope_left = np.gradient(y_interp_left, x_interp_left) + #slope_left = np.arctan(slope_left) + + # Derivative of the right curve to get the tangent slope + slope_right = np.gradient(y_interp_right, x_interp_right) + #slope_right = np.arctan(slope_right) + + # Find the middle path of the track + # if there is more points left do left if there is more points right do right + self.target_centerline_points = self.centerline(x_interp_right, + slope_right,x_left, + x_interp_left,y_interp_right, + y_left,x) + #self.get_logger(f"self.target_center_points: {self.target_centerline_points}") + ''' + if len(x[:idx]) > len(x[idx:]): + self.target_centerline_points = self.centerline(x_interp_right, + slope_left,x_left, + x_interp_left,y_interp_left, + y_left,x) + else: + self.target_centerline_points = self.centerline(x_interp_left, + slope_right,x_right, + x_interp_right,y_interp_right, + y_right,x) + ''' + + def find_path(self): + + ranges = self.lidar_ranges + # indexes in the range [180;900] are used + center_ranges = ranges[540 + int(self.fov_start): 540 + int(self.fov_end)] # Originale + #center_ranges = ranges[200:880] # Degree 90 to 270 + #center_ranges = ranges[220:740] # Degree 120 to 240 (trash) + num_readings = len(center_ranges) + if(num_readings == 0): + return + angular_resolution_deg = 0.25 + + # Generate the angles for each reading + angles_deg = np.arange(0, num_readings) * angular_resolution_deg + + # Convert angles to radians + angles_rad = np.deg2rad(angles_deg) - (np.pi/2) + + # Find the index of the longest distance + max_index = np.argmax(center_ranges) + + # Convert polar coordinates to Cartesian coordinates + x_coords = center_ranges * np.cos(angles_rad) + y_coords = center_ranges * np.sin(angles_rad) + + points = np.vstack((x_coords, y_coords)).T + + + + # Perform polynomial regression before max_index + degree = 3 # Set the degree of the polynomial + if max_index - 5 > 0 and max_index + 5 < len(x_coords): + #coeffs_before = np.polyfit(x_coords[:max_index-5], y_coords[:max_index-5], degree) + try: + coeffs_before = np.polyfit(x_coords[:max_index-5], y_coords[:max_index-5], degree) + except np.RankWarning: + return + # Perform polynomial regression after max_index + #coeffs_after = np.polyfit(x_coords[max_index+5:], y_coords[max_index+5:], degree) + try: + coeffs_after = np.polyfit(x_coords[max_index+5:], y_coords[max_index+5:], degree) + except np.RankWarning: + return + + # Generate points along the fitted polynomial curve before max_index (Left) + x_fit_before = np.linspace(min(x_coords[:max_index-5]), max(x_coords[:max_index-5]), 100) + y_fit_before = np.polyval(coeffs_before, x_fit_before) + + # Generate points along the fitted polynomial curve after max_index (Right)s + x_fit_after = np.linspace(min(x_coords[max_index+5:]), max(x_coords[max_index+5:]), 100) + y_fit_after = np.polyval(coeffs_after, x_fit_after) + + # Width of the track + self.track_width = math.sqrt((x_fit_after[0]-x_fit_before[0])**2+(y_fit_after[0]-y_fit_before[0])**2) + + + # Calculate the cumulative distance along the curve + delta_x = np.diff(x_fit_after) + delta_y = np.diff(y_fit_after) + delta_s = np.sqrt(delta_x**2 + delta_y**2) + s = np.cumsum(delta_s) + s = np.insert(s, 0, 0) # Start from 0 + + # Interpolate evenly spaced s values + s_new = np.linspace(0, s[-1], 100) # 10 evenly spaced points along the length of the curve + x_fit_evenly_spaced_after = np.interp(s_new, s, x_fit_after) + y_fit_evenly_spaced_after = np.interp(s_new, s, y_fit_after) + + delta_x = np.diff(x_fit_before) + delta_y = np.diff(y_fit_before) + delta_s = np.sqrt(delta_x**2 + delta_y**2) + s = np.cumsum(delta_s) + s = np.insert(s, 0, 0) # Start from 0 + + # Interpolate evenly spaced s values + s_new = np.linspace(0, s[-1], 100) # 10 evenly spaced points along the length of the curve + x_fit_evenly_spaced_before = np.interp(s_new, s, x_fit_before) + y_fit_evenly_spaced_before = np.interp(s_new, s, y_fit_before) + + + # Calculate derivative (slope) at each point + slope = np.gradient(y_fit_evenly_spaced_after, x_fit_evenly_spaced_after) + + + + centerline_points = [] + # Plot lidar points and polynomial curves + + + # Plot tangent lines at evenly spaced points and find intersection points + for i in range(len(x_fit_evenly_spaced_after)): + tangent_slope = slope[i] + tangent_point = (x_fit_evenly_spaced_after[i], y_fit_evenly_spaced_after[i]) + tangent_line = tangent_slope * (x_coords - tangent_point[0]) + tangent_point[1] + #plt.plot(x_coords, tangent_line, linestyle='--', label='Tangent at ({}, {})'.format(tangent_point[0], tangent_point[1])) + + # Calculate perpendicular line's equation (negative reciprocal of tangent slope) + perpendicular_slope = -1 / tangent_slope + perpendicular_line = perpendicular_slope * (x_coords - tangent_point[0]) + tangent_point[1] + + #plt.plot(x_coords, perpendicular_line) + perpendicular_line = perpendicular_slope * (x_fit_before - tangent_point[0]) + tangent_point[1] + + # Find intersections + intersection_indices = np.where(np.isclose(perpendicular_line, y_fit_before, atol=0.03)) + + # Extract intersection points + intersection_x = x_fit_before[intersection_indices] + intersection_y = perpendicular_line[intersection_indices] # or polynomial[intersection_indices] + + centerline_x = (intersection_x + x_fit_evenly_spaced_after[i])/2 + centerline_y = (intersection_y + y_fit_evenly_spaced_after[i])/2 + + if(intersection_x.any()): + centerline_points.append((centerline_x[0], centerline_y[0])) + + + self.target_centerline_points = centerline_points + + def centerline(self,x_interp_right, + slope_right,x_left, + x_interp_left,y_interp_right, + y_left,x): + """Find the centerline of the track.""" + centerline_points = [] + + for i in range(len(x_interp_right)): + tangent_slope = slope_right[i] + tangent_point = [x_interp_right[i], y_interp_right[i]] + tangent_line = tangent_slope * (x - tangent_point[0]) + tangent_point[1] + + # Find the perpendicular line's equation + perpendicular_slope = -1 / tangent_slope + #perpendicular_line = perpendicular_slope * (x - tangent_point[0]) + tangent_point[1] + perpendicular_line = perpendicular_slope * (x_left - tangent_point[0]) + tangent_point[1] + + # Find the intersection point + intersection_indices = np.where(np.isclose(perpendicular_line, y_left, atol=0.03)) + #intersection_x = (tangent_point[1] - perpendicular_line) / (perpendicular_slope - tangent_slope) + + # Extract the intersection point + intersection_x = x_left[intersection_indices] + intersection_y = perpendicular_line[intersection_indices] + + # Find the middle point + centerline_x = (intersection_x + x_interp_right[i])/2 + centerline_y = (intersection_y + x_interp_right[i])/2 + + # Append the centerline point + if(intersection_x.any()): + centerline_points.append((centerline_x[0], centerline_y[0])) + return centerline_points + + def transform_path_to_map_frame(self): + """Transform the points to the map frame.""" + + # Transformation matrix + T = np.array([[np.cos(self.theta), -np.sin(self.theta), self.x_car_at_calc], + [np.sin(self.theta), np.cos(self.theta),self.y_car_at_calc], + [0, 0, 1]]) + # A transformation matrix is a matrix that can be used to transform points from one coordinate system to another. + + + # Transform points to map frame + points_map = [] + for point in self.target_centerline_points: + point_homogeneous = np.array([[point[0]], [point[1]], [1]]) #Assuming front of the car is along x-axis + transformed_point = np.dot(T, point_homogeneous) + points_map.append((transformed_point[0][0], transformed_point[1][0])) + + return points_map + + def calculate_relative_coordinates(self, waypoint_index): + if self.target_centerline == [] or self.x is None: + return float('inf') # --- nyt + + self.target_pose_x = self.target_centerline[waypoint_index][0] + self.target_pose_y = self.target_centerline[waypoint_index][1] + + + if self.state == "straight" and len(self.target_centerline) >= 3: + # Cacluting the average coordinat between thre target points. + # Using the three first target points or the first, middle, and the last. + nofpe = len(self.target_centerline) # number_of_points_elements + self.target_pose_x = (self.target_centerline[0][0] + self.target_centerline[nofpe//2][0] + self.target_centerline[-1][0]) / 3 + self.target_pose_y = (self.target_centerline[0][1] + self.target_centerline[nofpe//2][1] + self.target_centerline[-1][1]) / 3 + + # Calculate the relative coordinate between the average target point and the car position + self.dx = self.target_pose_x - self.x + self.dy = self.target_pose_y - self.y + else: + # Calculate the relative coordinate between three target points and the car position + self.dx = self.target_pose_x - self.x + self.dy = self.target_pose_y - self.y + + def calculate_curvature_1(self): + # Assuming self.target_centerline_points is a list of cartesian coordinates + if len(self.target_centerline_points) < 3: + self.steering_gain = 0.5 # Default steering gain + return 0.5 # Not enough points to calculate curvature + + # Select three points from the list: start, middle, and end + lenght = len(self.target_centerline_points) + p1, p2, p3 = self.target_centerline_points[0], self.target_centerline_points[lenght // 2], self.target_centerline_points[-1] + + # Calculate the curvature using the three-point formula + a = np.linalg.norm(np.array(p2) - np.array(p1)) # L2 norm + b = np.linalg.norm(np.array(p3) - np.array(p2)) + c = np.linalg.norm(np.array(p3) - np.array(p1)) + # Calculate the curvature + s = (a + b + c) / 2 + area = np.sqrt(s * (s - a) * (s - b) * (s - c)) + K = 0 if s == 0 else 4 * area / (a * b * c) + + # Calculate steering_gain from curvature + #self.steering_gain = 1 / K if K != 0 else 0.5 + self.steering_gain = 0.5 + self.curve = K + + def categorize_path(self): + curvature_threshold = 1.0 # Initial threshold value (1.0) + self.calculate_curvature_1() + #self.get_logger().info(f"Curvature: {curvature}") + + if self.curve < curvature_threshold or self.curve == 0.0: + self.state = "straight" # "straight" + self.offset = 0 + else: + self.state = "turning" + self.offset = self.track_width / 2 + + self.get_logger().info(f"State: {self.state}") + +def main(args=None): + """Run the Race algorithm node.""" + rclpy.init(args=args) + Race_Algorithm = RaceAlgorithm() + rclpy.spin(Race_Algorithm) + Race_Algorithm.destroy_node() + rclpy.shutdown() + +if __name__ == '__main__': + main() \ No newline at end of file -- GitLab From c334a839d30a15edca1f8e2b3ef3bbc9dcee6de9 Mon Sep 17 00:00:00 2001 From: au665799 <202007669@post.au.dk> Date: Tue, 10 Sep 2024 14:23:30 +0200 Subject: [PATCH 2/4] moved things over to helper function.py --- .../CenterlineDrivingSuper.py | 103 +++--------------- 1 file changed, 13 insertions(+), 90 deletions(-) diff --git a/src/race_algorithm_pkg/race_algorithm_pkg/CenterlineDrivingSuper.py b/src/race_algorithm_pkg/race_algorithm_pkg/CenterlineDrivingSuper.py index 7bab58c..1808463 100644 --- a/src/race_algorithm_pkg/race_algorithm_pkg/CenterlineDrivingSuper.py +++ b/src/race_algorithm_pkg/race_algorithm_pkg/CenterlineDrivingSuper.py @@ -19,6 +19,8 @@ from geometry_msgs.msg import PoseWithCovarianceStamped from geometry_msgs.msg import Quaternion from tf_transformations import quaternion_from_euler +import helper_functions as hf + class LidarDataProcessor(Node): """ LidarDataProcessor node that uses linear regression to find the centerline of the track.""" def __init__(self): @@ -63,6 +65,8 @@ class LidarDataProcessor(Node): self.target_pose_y = 0 self.theta_car = 0 self.target_centerline = [] + self.left_side_points = [] # New method + self.right_side_points = [] # New method self.count = 0 self.theta_car_at_calc = 0 @@ -171,6 +175,10 @@ class LidarDataProcessor(Node): x_fit_evenly_spaced_before = np.interp(s_new, s, x_fit_before) y_fit_evenly_spaced_before = np.interp(s_new, s, y_fit_before) + # Save the interpolated points along the fitted polynomials + self.left_side_points = list(zip(x_fit_evenly_spaced_before, y_fit_evenly_spaced_before)) + self.right_side_points = list(zip(x_fit_evenly_spaced_after, y_fit_evenly_spaced_after)) + # Calculate derivative (slope) at each point slope = np.gradient(y_fit_evenly_spaced_after, x_fit_evenly_spaced_after) @@ -213,12 +221,6 @@ class LidarDataProcessor(Node): #self.get_logger(f"self.target_center_points: {self.target_centerline_points}") - def remove_outliers(self,cross_product, threshold=3): - mean = np.mean(cross_product) - std_dev = np.std(cross_product) - filtered_cross_product = [x for x in cross_product if abs((x - mean) / std_dev) < threshold] - return filtered_cross_product - def cross_product_2d_new(self,ref1,ref2, point, margin=1e-2): # 1e-2 is 0.01 refv = (ref2[0]-ref1[0], ref2[1] - ref1[1]) pointv = (point[0] - ref1[0], point[1] - ref1[1]) @@ -350,30 +352,6 @@ class LidarDataProcessor(Node): self.marker_pub.publish(marker_msg) - def shift_points_map(self, points_map, shift_distance=1.0): - if len(points_map) < 2: - return points_map - - # Calculate the reference vector from the first to the last point - ref_vector = np.array(points_map[-1]) - np.array(points_map[0]) - ref_vector = ref_vector / np.linalg.norm(ref_vector) # Normalize the vector - - # Calculate the perpendicular vector - perp_vector = np.array([-ref_vector[1], ref_vector[0]]) - - # Determine the shift direction based on self.sign - if self.sign == -1: # Right turn - shift_vector = shift_distance * perp_vector - elif self.sign == 1: # Left turn - shift_vector = -shift_distance * perp_vector - else: # Straight - shift_vector = np.array([0, 0]) - - # Apply the shift to each point in points_map - shifted_points_map = [tuple(np.array(point) + shift_vector) for point in points_map] - - return shifted_points_map - def odometry_callback(self, msg): # Extract position self.x_car = msg.pose.pose.position.x @@ -403,45 +381,22 @@ class LidarDataProcessor(Node): return float('inf') return np.sqrt((point[0] - self.x_car)**2 + (point[1] - self.y_car)**2) - def ecludean_distance(self,point1, point2): - return math.sqrt((point1[0] - point2[0])**2 + (point1[1] - point2[1])**2) # Calculate self.dy and self.dx for the target point def calculate_relative_coordinates(self, waypoint_index): if self.target_centerline == [] or self.x_car is None: return float('inf') # Return infinity if waypoints or odometry data is not available - # Calculate the relative coordinate between the target point and the car position - center_line_shifted_list = self.target_centerline - car_xy = (self.x_car_at_calc,self.y_car_at_calc) - reference_point = car_xy #points_map[-1] - min_distance = min(self.ecludean_distance(reference_point, point) for point in self.target_centerline) - distances = [self.ecludean_distance(reference_point, point) for point in self.target_centerline] - #self.get_logger().info(f"distances: {distances}") - gain = 0.05 * self.consecutive_cat # 0.05 - self.get_logger().info(f"self.consecutive_cat: {self.consecutive_cat}") - constants = [(distance) * gain for distance in distances] if min_distance != 0 else [0] * len(distances) - shift_constants = [constant * self.sign for constant in constants] - shift_constants_coord_form = [(0.0,constant) for constant in shift_constants] - self.shift_constants_transformed = shift_constants_coord_form - - # Kan curven også bruges? - #points_map_shifted = [(point[0]+self.shift_constant[0], point[1]+self.shift_constant[1]) for point in points_map] - #points_map_shifted = [points_map + shift_constants_transformed if shift_constants_transformed is not None else points_map for points_map, shift_constants_transformed in zip(points_map, shift_constants_transformed)] - if self.shift_constants_transformed is not None: - center_line_shifted_list = [(point[0] + point_shift[0], point[1]+point_shift[1]) for point, point_shift in zip(self.target_centerline, self.shift_constants_transformed)] - - - - self.target_pose_x = center_line_shifted_list[waypoint_index][0] - self.target_pose_y = center_line_shifted_list[waypoint_index][1] + # Calculate the relative coordinate between the target point and the car position + self.target_pose_x = self.target_centerline [waypoint_index][0] + self.target_pose_y = self.target_centerline [waypoint_index][1] if len(self.target_centerline) >= 3: #self.state == "straight" #self.get_logger().info(f"Target 3---------------------") # Cacluting the average coordinat between thre target points. # Using the three first target points or the first, middle, and the last. nofpe = len(self.target_centerline) # number_of_points_elements - self.target_pose_x = (center_line_shifted_list[0][0] + center_line_shifted_list[nofpe//2][0] + center_line_shifted_list[-1][0]) / 3 - self.target_pose_y = (center_line_shifted_list[0][1] + center_line_shifted_list[nofpe//2][1] + center_line_shifted_list[-1][1]) / 3 + self.target_pose_x = (self.target_centerline[0][0] + self.target_centerline[nofpe//2][0] + self.target_centerline[-1][0]) / 3 + self.target_pose_y = (self.target_centerline[0][1] + self.target_centerline[nofpe//2][1] + self.target_centerline[-1][1]) / 3 #self.target_pose_x = self.target_centerline[nofpe//2][0] #self.target_pose_y = self.target_centerline[nofpe//2][1] @@ -503,38 +458,6 @@ class LidarDataProcessor(Node): return points_map - def transform_to_map_frame_ref(self, point): - - - T = np.array([[np.cos(self.theta_car), -np.sin(self.theta_car), self.x_car_at_calc], - [np.sin(self.theta_car), np.cos(self.theta_car),self.y_car_at_calc], - [0, 0, 1]]) - - - # Transform points to map frame - point_homogeneous = np.array([[point[0]], [point[1]], [1]]) #Assuming front of the car is along x-axis - transformed_point = np.dot(T, point_homogeneous) - point_transformed = (transformed_point[0][0], transformed_point[1][0]) - - return point_transformed - - def find_point_for_origin(self): - T = np.array([[np.cos(self.theta_car), -np.sin(self.theta_car), self.x_car_at_calc], - [np.sin(self.theta_car), np.cos(self.theta_car), self.y_car_at_calc], - [0, 0, 1]]) - - # Invert the transformation matrix - T_inv = np.linalg.inv(T) - - # Homogeneous coordinates of the origin (0, 0) - origin_homogeneous = np.array([[0], [0], [1]]) - - # Find the point that transforms to (0, 0) - point_homogeneous = np.dot(T_inv, origin_homogeneous) - point = (point_homogeneous[0][0], point_homogeneous[1][0]) - - return point - def calculate_curvature(self,points): # Assuming points is a list of (x, y) tuples if len(points) < 3: -- GitLab From 0b427a52ce20620b3f421ef4fcaad534cb502cf0 Mon Sep 17 00:00:00 2001 From: au665799 <202007669@post.au.dk> Date: Mon, 16 Sep 2024 16:43:09 +0200 Subject: [PATCH 3/4] changes --- .gitignore | 2 +- .../launch/race_algorithm.launch.py | 38 + .../CenterlineDrivingSuper og.py | 688 ++++++++++++++++++ .../race_algorithm_pkg/helper_functions.py | 237 ++++++ .../race_algorithm_pkg/test.py | 73 ++ 5 files changed, 1037 insertions(+), 1 deletion(-) create mode 100644 src/race_algorithm_pkg/launch/race_algorithm.launch.py create mode 100644 src/race_algorithm_pkg/race_algorithm_pkg/CenterlineDrivingSuper og.py create mode 100644 src/race_algorithm_pkg/race_algorithm_pkg/helper_functions.py create mode 100644 src/race_algorithm_pkg/race_algorithm_pkg/test.py diff --git a/.gitignore b/.gitignore index 0c1a8ec..2d0df6b 100644 --- a/.gitignore +++ b/.gitignore @@ -6,7 +6,7 @@ __pycache__ .vscode # Ignore all directories in src except for the ones listed below # This is useful for ignoring all packages in src except for the ones you want to push git -src/*/ +# src/*/ !src/emergency_stop_pkg !src/message_monitor_pkg message_monitor_node_old.py diff --git a/src/race_algorithm_pkg/launch/race_algorithm.launch.py b/src/race_algorithm_pkg/launch/race_algorithm.launch.py new file mode 100644 index 0000000..ee07af6 --- /dev/null +++ b/src/race_algorithm_pkg/launch/race_algorithm.launch.py @@ -0,0 +1,38 @@ +from launch import LaunchDescription +from launch_ros.actions import Node +from ament_index_python.packages import get_package_share_directory +import os + +def generate_launch_description(): + # Get the path to the YAML file in the config folder + race_algorithm_pkg_share_directory = get_package_share_directory('race_algorithm_pkg') + race_algorithm_pkg_parameters_file_path = os.path.join(race_algorithm_pkg_share_directory, 'config', 'parameters.yaml') + + + return LaunchDescription([ + #Node( + # name='race_algorithm_node', + # package='race_algorithm_pkg', + # executable='race_algorithm_node', + # output='screen', + # parameters=[race_algorithm_pkg_parameters_file_path], + # Open in a new terminal + # prefix='gnome-terminal --', + #), + Node( + name='centerline_drive_node', + package='race_algorithm_pkg', + executable='CenterlineDrivingSuper', + output='screen', + parameters=[race_algorithm_pkg_parameters_file_path], + # Open in a new terminal + #prefix='gnome-terminal --', + ), + #Node( + # name='follow_gap_node', + # package='race_algorithm_pkg', + # executable='FollowTheGap', + # output='screen', + # parameters=[race_algorithm_pkg_parameters_file_path] + #), + ]) \ No newline at end of file diff --git a/src/race_algorithm_pkg/race_algorithm_pkg/CenterlineDrivingSuper og.py b/src/race_algorithm_pkg/race_algorithm_pkg/CenterlineDrivingSuper og.py new file mode 100644 index 0000000..d2bc001 --- /dev/null +++ b/src/race_algorithm_pkg/race_algorithm_pkg/CenterlineDrivingSuper og.py @@ -0,0 +1,688 @@ +import math +import warnings +import numpy as np + + +import rclpy +from rclpy.node import Node +from sensor_msgs.msg import LaserScan +from nav_msgs.msg import Path +from geometry_msgs.msg import PoseStamped, Point +from nav_msgs.msg import Odometry +from ackermann_msgs.msg import AckermannDriveStamped +from tf_transformations import euler_from_quaternion +from visualization_msgs.msg import Marker + + +import matplotlib.pyplot as plt +from geometry_msgs.msg import PoseWithCovarianceStamped +from geometry_msgs.msg import Quaternion +from tf_transformations import quaternion_from_euler + +class LidarDataProcessor(Node): + """ LidarDataProcessor node that uses linear regression to find the centerline of the track.""" + def __init__(self): + """ Initialize the LidarDataProcessor node.""" + super().__init__('LidarDataProcessor') + + # Subscribe to the lidar and odometry topics + self.lidar_sub = self.create_subscription( + LaserScan, + '/scan', + self.lidar_callback, + 10) + # Subscribe to the odometry topic + self.odom_sub = self.create_subscription( + Odometry, + '/odom', + self.odometry_callback, + 10) + # Publish the drive command + self.drive_pub = self.create_publisher( + AckermannDriveStamped, + '/drive', + 1) + # Publish the centerline points + self.marker_pub = self.create_publisher( + Marker, + '/waypoint_vis', + 1) + + # 0.1 is the default value for the timer + self.timer = self.create_timer(0.1, self.control_car) # Higher frequency for faster response time, but gives a higher CPU usage + + self.target_centerline_points = [] + self.x_car = 0 + self.y_car = 0 + self.x_car_at_calc = 0 + self.y_car_at_calc = 0 + self.dx = 0 #Relative coordinate between target point from car + self.dy = 0 #Relative coordinate between target point from car + self.current_waypoint_index = 0 + self.target_pose_x = 0 + self.target_pose_y = 0 + self.theta_car = 0 + self.target_centerline = [] + self.count = 0 + self.theta_car_at_calc = 0 + + + self.fov = 180 * 4 # 720 + self.fov_start = -self.fov/2 # -360 + self.fov_end = self.fov/2 # 360 + self.lidar_ranges = np.array([]) + + # State of the car + self.state = "straight" + self.offset = 0 + self.track_width = 0 + + # Find the closest obstacle index in the lidar readings + self.find_closest_obstacle_index = -1 # None + self.car_width = 0.0001#0.2032 # meters + self.steering_gain = 0.5 + self.curve = 0 + self.sign = None + self.shift_constant = (0,0) + self.shift_constants_transformed = None + + def lidar_callback(self, data): + self.lidar_ranges = np.array(data.ranges) + #self.find_closest_obstacle_index = self.if_find_closest_obstacle(self.lidar_ranges) ## Nyt + + def lidar_processing(self): + + ranges = self.lidar_ranges + # indexes in the range [180;900] are used + center_ranges = ranges[540 + int(self.fov_start): 540 + int(self.fov_end)] # Originale, nyt -70 og 70 + #center_ranges = ranges[200:880] # Degree 90 to 270 + #center_ranges = ranges[220:740] # Degree 120 to 240 (trash) + num_readings = len(center_ranges) + if(num_readings == 0): + return + angular_resolution_deg = 0.25 + + # Generate the angles for each reading + angles_deg = np.arange(0, num_readings) * angular_resolution_deg + + # Convert angles to radians + angles_rad = np.deg2rad(angles_deg) - (np.pi/2) + + # Find the index of the longest distance + max_index = np.argmax(center_ranges) + + # Convert polar coordinates to Cartesian coordinates + x_coords = center_ranges * np.cos(angles_rad) + y_coords = center_ranges * np.sin(angles_rad) + + points = np.vstack((x_coords, y_coords)).T + + + + # Perform polynomial regression before max_index + degree = 3 # Set the degree of the polynomial + if max_index - 5 > 0 and max_index + 5 < len(x_coords): + #coeffs_before = np.polyfit(x_coords[:max_index-5], y_coords[:max_index-5], degree) + try: + coeffs_before = np.polyfit(x_coords[:max_index-5], y_coords[:max_index-5], degree) + except np.RankWarning: + return + # Perform polynomial regression after max_index + #coeffs_after = np.polyfit(x_coords[max_index+5:], y_coords[max_index+5:], degree) + try: + coeffs_after = np.polyfit(x_coords[max_index+5:], y_coords[max_index+5:], degree) + except np.RankWarning: + return + + # Generate points along the fitted polynomial curve before max_index (Left) + x_fit_before = np.linspace(min(x_coords[:max_index-5]), max(x_coords[:max_index-5]), 100) # + y_fit_before = np.polyval(coeffs_before, x_fit_before) + + # Generate points along the fitted polynomial curve after max_index (Right)s + x_fit_after = np.linspace(min(x_coords[max_index+5:]), max(x_coords[max_index+5:]), 200) #100 originalt + y_fit_after = np.polyval(coeffs_after, x_fit_after) + + # Width of the track + #self.track_width = math.sqrt((x_fit_after[0]-x_fit_before[0])**2+(y_fit_after[0]-y_fit_before[0])**2) + + + # Calculate the cumulative distance along the curve + delta_x = np.diff(x_fit_after) + delta_y = np.diff(y_fit_after) + delta_s = np.sqrt(delta_x**2 + delta_y**2) + s = np.cumsum(delta_s) + s = np.insert(s, 0, 0) # Start from 0 + + # Interpolate evenly spaced s values + s_new = np.linspace(0, s[-1], 200) # 10 evenly spaced points along the length of the curve #100 originalt + x_fit_evenly_spaced_after = np.interp(s_new, s, x_fit_after) + y_fit_evenly_spaced_after = np.interp(s_new, s, y_fit_after) + + delta_x = np.diff(x_fit_before) + delta_y = np.diff(y_fit_before) + delta_s = np.sqrt(delta_x**2 + delta_y**2) + s = np.cumsum(delta_s) + s = np.insert(s, 0, 0) # Start from 0 + + # Interpolate evenly spaced s values + s_new = np.linspace(0, s[-1], 200) # 10 evenly spaced points along the length of the curve #100 originalt + x_fit_evenly_spaced_before = np.interp(s_new, s, x_fit_before) + y_fit_evenly_spaced_before = np.interp(s_new, s, y_fit_before) + + + # Calculate derivative (slope) at each point + slope = np.gradient(y_fit_evenly_spaced_after, x_fit_evenly_spaced_after) + + + + centerline_points = [] + # Plot lidar points and polynomial curves + + + # Plot tangent lines at evenly spaced points and find intersection points + for i in range(len(x_fit_evenly_spaced_after)): + tangent_slope = slope[i] + tangent_point = (x_fit_evenly_spaced_after[i], y_fit_evenly_spaced_after[i]) + tangent_line = tangent_slope * (x_coords - tangent_point[0]) + tangent_point[1] + #plt.plot(x_coords, tangent_line, linestyle='--', label='Tangent at ({}, {})'.format(tangent_point[0], tangent_point[1])) + + # Calculate perpendicular line's equation (negative reciprocal of tangent slope) + perpendicular_slope = -1 / tangent_slope + perpendicular_line = perpendicular_slope * (x_coords - tangent_point[0]) + tangent_point[1] + + #plt.plot(x_coords, perpendicular_line) + perpendicular_line = perpendicular_slope * (x_fit_before - tangent_point[0]) + tangent_point[1] + + # Find intersections + intersection_indices = np.where(np.isclose(perpendicular_line, y_fit_before, atol=0.03)) + + # Extract intersection points + intersection_x = x_fit_before[intersection_indices] + intersection_y = perpendicular_line[intersection_indices] # or polynomial[intersection_indices] + + centerline_x = (intersection_x + x_fit_evenly_spaced_after[i])/2 + centerline_y = (intersection_y + y_fit_evenly_spaced_after[i])/2 + + if(intersection_x.any()): + centerline_points.append((centerline_x[0], centerline_y[0])) + + + self.target_centerline_points = centerline_points + + #self.get_logger(f"self.target_center_points: {self.target_centerline_points}") + + def plot_vectors(self, ref1,ref2, points): + # Create a new figure + plt.figure() + + # Plot the reference vector + refv = (ref2[0]-ref1[0], ref2[1] - ref1[1]) + plt.quiver(ref1[0], ref1[1], refv[0], refv[1], angles='xy', scale_units='xy', scale=1, color='r', label='Reference Vector') + + # Plot the other vectors + for point in points: + pointv = (point[0] - ref1[0], point[1] - ref1[1]) + plt.quiver(ref1[0], ref1[1], pointv[0], pointv[1], angles='xy', scale_units='xy', scale=1, color='b', alpha=0.5) + + # Transform and plot the x-axis of the transformed map frame + unit_x = (1, 0) + transformed_x = self.transform_to_map_frame_ref(unit_x) + plt.quiver(ref1[0], ref1[1], transformed_x[0], transformed_x[1], angles='xy', scale_units='xy', scale=1, color='g', label='Transformed X-axis') + + # Transform and plot the y-axis of the transformed map frame + unit_y = (0, 1) + transformed_y = self.transform_to_map_frame_ref(unit_y) + plt.quiver(ref1[0], ref1[1], transformed_y[0], transformed_y[1], angles='xy', scale_units='xy', scale=1, color='m', label='Transformed Y-axis') + + # Set the limits of the plot + plt.xlim(min(ref1[0], min(point[0] for point in points)) - 1, max(ref1[0], max(point[0] for point in points)) + 1) + plt.ylim(min(ref1[1], min(point[1] for point in points)) - 1, max(ref1[1], max(point[1] for point in points)) + 1) + + # Add labels and legend + plt.xlabel('X') + plt.ylabel('Y') + plt.legend() + plt.grid() + plt.title('Vectors used in cross_product_2d_new') + plt.show() + + def remove_outliers(self,cross_product, threshold=3): + mean = np.mean(cross_product) + std_dev = np.std(cross_product) + filtered_cross_product = [x for x in cross_product if abs((x - mean) / std_dev) < threshold] + return filtered_cross_product + + def cross_product_2d_new(self,ref1,ref2, point, margin=1e-2): # 1e-2 is 0.01 + refv = (ref2[0]-ref1[0], ref2[1] - ref1[1]) + pointv = (point[0] - ref1[0], point[1] - ref1[1]) + # Cross product of two vectors + #self.get_logger().info(f"refv: {refv}, pointv: {pointv}") + cross_product = refv[0] * pointv[1] - refv[1] * pointv[0] + #self.get_logger().info(f"cross_product: {cross_product}, Type: {type(cross_product)}") + if abs(cross_product) < margin: + return 0 + return cross_product + + def calculate_centerline(self): + self.x_car_at_calc = self.x_car + self.y_car_at_calc = self.y_car + self.lidar_processing() + points_map = self.transform_to_map_frame() + + # Shift x_values in points_map based on a constant + # Get direction sign, RIGHT is 1 and LEFT is -1 + + if self.sign == None: + self.sign = 0 + #self.get_logger().info(f"len(points_map): {len(points_map)}") + if len(points_map) > 35: + # Reference point (first point in points_map) + #self.get_logger().info(f"cross_product_2d_new(reference_point, point): {[cross_product_2d_new(reference_point1,reference_point2, point) for point in points_map[2:]]}") + + # Calculate cross products for all points in points_map[2:] + #cross_products = [self.cross_product_2d_new(points_map[0], points_map[-1], point) for point in points_map[1:-1]] + car_xy = (self.x_car_at_calc, self.y_car_at_calc) + cross_products = [self.cross_product_2d_new(car_xy, points_map[-1], point) for point in points_map[:-1]] + #self.get_logger().info(f"cross_product_2d_new: {cross_products}") + + # Determine the position of each point relative to refvector + signs = [1 if cp > 0 else -1 if cp < 0 else 0 for cp in cross_products] + #count_ones = sum(1 for sign in signs if sign == 1) + count_ones = signs.count(1) + count_neg_ones = signs.count(-1) + count_zeros = signs.count(0) + self.get_logger().info(f"count_ones: {count_ones}, count_neg_ones: {count_neg_ones}, count_zeros: {count_zeros}") + #self.get_logger().info(f"signs: {signs}") + #self.get_logger().info(f"len(signs): {len(signs)}") + if count_zeros > count_neg_ones and count_zeros > count_ones: + self.sign = 0 + self.state = "straight" + self.get_logger().info(f"STRAIGHT") + elif count_ones > count_neg_ones: + self.sign = -1 #-1 + self.state = "right" + self.get_logger().info(f"RIGHT") + else: #count_neg_ones > count_ones: + self.sign = 1 #1 + self.state = "left" + self.get_logger().info(f"LEFT") + #plot_vectors(self,reference_point1,reference_point2, points) + + + #distance = self.distance_to_waypoint(points_map[-1]) + + # Find the clossest point in points_map + reference_point = car_xy #points_map[-1] + min_distance = min(self.ecludean_distance(reference_point, point) for point in points_map) + distances = [self.ecludean_distance(reference_point, point) for point in points_map] + self.get_logger().info(f"distances: {distances}") + # Calculate shift_constant as the inverse of the max_distance + # constant must not be less than 1.0 + gain = 0.5 + #constant = (1 / min_distance) * gain if min_distance != 0 else 0 # Avoid division by zero + #constants = (1/distances) * gain if min_distance != 0 else 0 + constants = [(distance) * gain for distance in distances] if min_distance != 0 else [0] * len(distances) + #self.get_logger().info(f"constants: {constants}") + + #self.shift_constant = (0,constant*self.sign) + #shift_constants = constants*self.sign + shift_constants = [constant * self.sign for constant in constants] + shift_constants_coord_form = [(0.0,constant) for constant in shift_constants] + #self.get_logger().info(f"shift_constants_coord_form: {shift_constants_coord_form}") + #self.shift_constants_transformed = [self.transform_to_map_frame_ref(constant) for constant in shift_constants_coord_form] + self.shift_constants_transformed = shift_constants_coord_form + #self.shift_constant = self.transform_to_map_frame_ref(self.shift_constant) + #self.get_logger().info(f"shift_constants_transformed: {shift_constants_transformed}") + + # Kan curven også bruges? + #points_map_shifted = [(point[0]+self.shift_constant[0], point[1]+self.shift_constant[1]) for point in points_map] + #points_map_shifted = [points_map + shift_constants_transformed if shift_constants_transformed is not None else points_map for points_map, shift_constants_transformed in zip(points_map, shift_constants_transformed)] + if self.shift_constants_transformed is not None: + points_map_shifted = [(point[0] + point_shift[0], point[1]+point_shift[1]) for point, point_shift in zip(points_map, self.shift_constants_transformed)] + points_map = points_map_shifted + + marker_msg = Marker() + marker_msg.header.frame_id = "/base_link" # /map + marker_msg.header.stamp = self.get_clock().now().to_msg() + marker_msg.ns = "waypoint_vis" + marker_msg.id = 0 + marker_msg.type = Marker.POINTS + marker_msg.action = Marker.ADD + marker_msg.pose.orientation.w = 1.0 + marker_msg.scale.x = 0.1 # size of the points + marker_msg.scale.y = 0.1 # size of the points + marker_msg.color.r = 1.0 + marker_msg.color.a = 1.0 + self.target_centerline = [] + + # Normalize points + x_coords = [point[0] for point in points_map] + y_coords = [point[1] for point in points_map] + if not x_coords or not y_coords: # Exit if no points are available + return + x_min, x_max = min(x_coords), max(x_coords) + y_min, y_max = min(y_coords), max(y_coords) + new_min, new_max = -1, 1 # Desired range for normalization -1 + + #self.find_closest_obstacle_index = self.if_find_closest_obstacle(self.lidar_ranges) ## Nyt + + for point_map in points_map: + #normalized_x = (point_map[0] - x_min) / (x_max - x_min) * (new_max - new_min) + new_min + #normalized_y = (point_map[1] - y_min) / (y_max - y_min) * (new_max - new_min) + new_min + p = Point() + #self.get_logger().info(f"Point: {point_map}") + p.x = point_map[0] # normalized_x + p.y = point_map[1] # normalized_y + p.z = 0. + #offset_x, offset_y = self.calculate_offset(self.find_closest_obstacle_index,self.x_car,self.y_car) + self.target_centerline.append((point_map[0],point_map[1])) + #if offset_x == 0 and offset_y == 0: + # self.target_centerline.append((point_map[0],point_map[1])) + #else: + # self.get_logger().info(f"Offset x: {offset_x}, Offset y: {offset_y}") + # self.target_centerline.append((offset_x,offset_y))# .append((point_map[0],point_map[1])) + marker_msg.points.append(p) + + self.marker_pub.publish(marker_msg) + + def shift_points_map(self, points_map, shift_distance=1.0): + if len(points_map) < 2: + return points_map + + # Calculate the reference vector from the first to the last point + ref_vector = np.array(points_map[-1]) - np.array(points_map[0]) + ref_vector = ref_vector / np.linalg.norm(ref_vector) # Normalize the vector + + # Calculate the perpendicular vector + perp_vector = np.array([-ref_vector[1], ref_vector[0]]) + + # Determine the shift direction based on self.sign + if self.sign == -1: # Right turn + shift_vector = shift_distance * perp_vector + elif self.sign == 1: # Left turn + shift_vector = -shift_distance * perp_vector + else: # Straight + shift_vector = np.array([0, 0]) + + # Apply the shift to each point in points_map + shifted_points_map = [tuple(np.array(point) + shift_vector) for point in points_map] + + return shifted_points_map + + def odometry_callback(self, msg): + # Extract position + self.x_car = msg.pose.pose.position.x + self.y_car = msg.pose.pose.position.y + #self.get_logger().info(f"x: {self.x_car}, y: {self.y_car}") + # Extract orientation (in quaternion) + orientation_quaternion = ( + msg.pose.pose.orientation.x, + msg.pose.pose.orientation.y, + msg.pose.pose.orientation.z, + msg.pose.pose.orientation.w + ) + + # Convert orientation from quaternion to euler angles (roll, pitch, yaw) + (roll, pitch, self.theta_car) = euler_from_quaternion(orientation_quaternion) + #self.control_car() + + # Calculate the distance between the car and the target point + def distance_to_next_waypoint(self): + if self.target_centerline == [] or self.x_car is None: + return float('inf') # Return infinity if waypoints or odometry data is not available + + return np.sqrt(self.dx**2 + self.dy**2) # Euclidean distance + + def distance_to_waypoint(self, point): + if self.x_car is None or self.y_car is None: + return float('inf') + return np.sqrt((point[0] - self.x_car)**2 + (point[1] - self.y_car)**2) + + def ecludean_distance(self,point1, point2): + return math.sqrt((point1[0] - point2[0])**2 + (point1[1] - point2[1])**2) + # Calculate self.dy and self.dx for the target point + def calculate_relative_coordinates(self, waypoint_index): + if self.target_centerline == [] or self.x_car is None: + return float('inf') # Return infinity if waypoints or odometry data is not available + + + + self.target_pose_x = self.target_centerline[waypoint_index][0] + self.target_pose_y = self.target_centerline[waypoint_index][1] + + if len(self.target_centerline) >= 3: #self.state == "straight" + self.get_logger().info(f"Target 3---------------------") + # Cacluting the average coordinat between thre target points. + # Using the three first target points or the first, middle, and the last. + nofpe = len(self.target_centerline) # number_of_points_elements + self.target_pose_x = (self.target_centerline[0][0] + self.target_centerline[nofpe//2][0] + self.target_centerline[-1][0]) / 3 + self.target_pose_y = (self.target_centerline[0][1] + self.target_centerline[nofpe//2][1] + self.target_centerline[-1][1]) / 3 + #self.target_pose_x = self.target_centerline[nofpe//2][0] + #self.target_pose_y = self.target_centerline[nofpe//2][1] + + #self.target_pose_x = (self.target_centerline[0][0] + self.target_centerline[1][0] + self.target_centerline[2][0]) / 3 + #self.target_pose_y = (self.target_centerline[0][1] + self.target_centerline[1][1] + self.target_centerline[2][1]) / 3 + + # Calculate the relative coordinate between the average target point and the car position + self.dx = self.target_pose_x - self.x_car + self.dy = self.target_pose_y - self.y_car + else: + self.get_logger().info(f"Target 1---------------------") + # Calculate the relative coordinate between three target points and the car position + self.dx = self.target_pose_x - self.x_car + self.dy = self.target_pose_y - self.y_car + + # original code + #self.dx = self.target_pose_x - self.x_car #+ self.x_car_at_calc + #self.dy = self.target_pose_y - self.y_car #+ self.y_car_at_calc + + def control_car(self): + #self.get_logger().info("Control car") + self.calculate_centerline() + #self.get_logger().info(f"Centerline target points: {self.target_centerline_points}") + if self.target_centerline_points == [] or self.x_car is None: + self.calculate_centerline() + #self.get_logger().info("No target centerline points") + return # Exit if waypoints or odometry data is not available + + distance_to_next_waypoint = self.distance_to_next_waypoint() + self.calculate_relative_coordinates(0) # 0 is the index of the target point in centerline target array, self.dy and self.dx is calculated + angle_rad = math.atan2(self.dy, self.dx) + angle_difference = (angle_rad - self.theta_car + math.pi) % (2*math.pi) - math.pi + speed = self.speed_control() + #self.get_logger().info(f"Angle difference: {angle_difference}, Speed: {speed}") + self.publish_drive_command(angle_difference * self.steering_gain, speed) #1.0, 0.5, 3.0 + + def publish_drive_command(self, steering_angle, speed): + # Publish the drive command + drive_msg = AckermannDriveStamped() + drive_msg.header.stamp = self.get_clock().now().to_msg() + drive_msg.drive.steering_angle = steering_angle + drive_msg.drive.speed = speed #Max speed + self.drive_pub.publish(drive_msg) + + def transform_to_map_frame(self): + + + T = np.array([[np.cos(self.theta_car), -np.sin(self.theta_car), self.x_car_at_calc], + [np.sin(self.theta_car), np.cos(self.theta_car),self.y_car_at_calc], + [0, 0, 1]]) + + + # Transform points to map frame + points_map = [] + for point in self.target_centerline_points: + point_homogeneous = np.array([[point[0]], [point[1]], [1]]) #Assuming front of the car is along x-axis + transformed_point = np.dot(T, point_homogeneous) + points_map.append((transformed_point[0][0], transformed_point[1][0])) + + return points_map + + def transform_to_map_frame_ref(self, point): + + + T = np.array([[np.cos(self.theta_car), -np.sin(self.theta_car), self.x_car_at_calc], + [np.sin(self.theta_car), np.cos(self.theta_car),self.y_car_at_calc], + [0, 0, 1]]) + + + # Transform points to map frame + point_homogeneous = np.array([[point[0]], [point[1]], [1]]) #Assuming front of the car is along x-axis + transformed_point = np.dot(T, point_homogeneous) + point_transformed = (transformed_point[0][0], transformed_point[1][0]) + + return point_transformed + + def find_point_for_origin(self): + T = np.array([[np.cos(self.theta_car), -np.sin(self.theta_car), self.x_car_at_calc], + [np.sin(self.theta_car), np.cos(self.theta_car), self.y_car_at_calc], + [0, 0, 1]]) + + # Invert the transformation matrix + T_inv = np.linalg.inv(T) + + # Homogeneous coordinates of the origin (0, 0) + origin_homogeneous = np.array([[0], [0], [1]]) + + # Find the point that transforms to (0, 0) + point_homogeneous = np.dot(T_inv, origin_homogeneous) + point = (point_homogeneous[0][0], point_homogeneous[1][0]) + + return point + + def calculate_curvature(self,points): + # Assuming points is a list of (x, y) tuples + if len(points) < 3: + return 0 # Not enough points to calculate curvature + + # Select three points from the list: start, middle, and end + p1, p2, p3 = points[0], points[len(points) // 2], points[-1] + + # Calculate the curvature using the three-point formula + a = np.linalg.norm(np.array(p2) - np.array(p1)) # L2 norm + b = np.linalg.norm(np.array(p3) - np.array(p2)) + c = np.linalg.norm(np.array(p3) - np.array(p1)) + # Calculate the curvature + s = (a + b + c) / 2 + # Avoid division by zero + # If the area of the triangle is zero, the curvature is also zero + # This can happen if the three points are collinear (on the same line) + # In that case, the curvature is zero + # Curvature is defined as the inverse of the radius of the osculating circle at a point on a curve + # Heron's formula------------------------- + # Heron's formula is used to calculate the area of a triangle: A = sqrt(s(s - a)(s - b)(s - c)) + # A larger triangle area does not necessarily mean smaller curvature, and vice versa + # Curvature is not directly proportional to the area of the triangle formed by three points on a curve + # curvature formula is K = 4A / (abc) and called menger curvature + + if a == 0 or b == 0 or c == 0: + return 0.0 + + area = np.sqrt(s * (s - a) * (s - b) * (s - c)) + K = 0 if s == 0 else 4 * area / (a * b * c) + + min_steering_gain = 0.5 # Minimum steering gain + max_steering_gain = 10.0 # Maximum steering gain + #self.steering_gain = min(max(K*100.0, min_steering_gain), max_steering_gain) + #self.steering_gain = 0.5 + #self.get_logger().info(f"Steering_gain: {self.steering_gain}, Curvature: {K}") + self.curve = K + return K + + def calculate_curvature_new(self, points): + # Assuming points is a list of (x, y) tuples + if len(points) < 3: + return 0 # Not enough points to calculate curvature + + # Extract x and y coordinates from points + x = np.array([p[0] for p in points]) + y = np.array([p[1] for p in points]) + + # Fit a second-degree polynomial (quadratic) to the points + degree = 2 + coefficients = np.polyfit(x, y, 2) + polynomial = np.poly1d(coefficients) + + # Calculate the first and second derivatives of the polynomial + first_derivative = np.polyder(polynomial, 1) + second_derivative = np.polyder(polynomial, 2) + + # Calculate the curvature at the midpoint of the points + midpoint_x = x[len(x) // 2] + dx = first_derivative(midpoint_x) + ddx = second_derivative(midpoint_x) + curvature = np.abs(ddx) / (1 + dx**2)**(3/2) + + return curvature + + def categorize_path(self, centerline_points): + curvature_threshold = 1.0 # Initial threshold value (1.0) + curvature = self.calculate_curvature(centerline_points) + #self.get_logger().info(f"Curvature: {curvature}") + + if curvature < curvature_threshold or curvature == 0: + self.state = "straight" # "straight" + self.offset = 0 + else: + self.state = "turning" + self.offset = self.track_width / 2 + + #self.get_logger().info(f"State: {self.state}") + + # Function that adjust the speed of the car based on the distance to the next waypoints + # Takes the average of the middle 75 points of the lidar scan and adjust the speed based on the average distance + def speed_control(self): + target_speed = None + #self.categorize_path(self.target_centerline_points) + if self.state == "straight": + target_speed = 10.5 # 4.5, 10.5 (har ikke nået top hastighed med 10.5!!) + elif self.state == "turning" or self.state == "left" or self.state == "right": + target_speed = 3.5 # 0.5 1.5 + middle_index = int(len(self.lidar_ranges) / 2) + middle_size = 75 + middle_slice = self.lidar_ranges[middle_index - middle_size : middle_index + middle_size + 1] + mean_distance = np.mean(middle_slice) + max_distance = 2.0 + min_distance = 0.0 + + speed = None + #self.get_logger().info(f"Mean distance: {mean_distance}") + # speed proportional to the distance interval [min_distance, max_distance] and goal_speed + if mean_distance > max_distance: + speed = target_speed + elif min_distance < mean_distance < max_distance: + speed = target_speed * (mean_distance - min_distance) / (max_distance - min_distance) + # speed = (max_speed - min_speed) * (average_distance - min_distance) / (max_distance - min_distance) + min_speed + else: + speed = 0.0 + #self.get_logger().info(f"Speed: {speed}-----------------------------") + return speed + +def main(args=None): + """Run the Follow The Gap node.""" + rclpy.init(args=args) + lidar_processor = LidarDataProcessor() + rclpy.spin(lidar_processor) + # Loop to keep the program running + #while rclpy.ok(): + # lidar_processor.control_car() + #rate.sleep() + #rclpy.spin(lidar_processor) + lidar_processor.destroy_node() + rclpy.shutdown() + +if __name__ == '__main__': + main() + """ + try: + rclpy.init(anonymous=True) + lidar_processor = LidarDataProcessor() + + # Loop to keep the program running + while not rclpy.is_shutdown(): + lidar_processor.control_car() + #rate.sleep() + + except rclpy.ROSInterruptException: + pass + """ diff --git a/src/race_algorithm_pkg/race_algorithm_pkg/helper_functions.py b/src/race_algorithm_pkg/race_algorithm_pkg/helper_functions.py new file mode 100644 index 0000000..d4e4109 --- /dev/null +++ b/src/race_algorithm_pkg/race_algorithm_pkg/helper_functions.py @@ -0,0 +1,237 @@ +from math import sqrt +import numpy as np + +def findCircle(x1, y1, x2, y2, x3, y3): + x12 = x1 - x2 + x13 = x1 - x3 + + y12 = y1 - y2 + y13 = y1 - y3 + + y31 = y3 - y1 + y21 = y2 - y1 + + x31 = x3 - x1 + x21 = x2 - x1 + + # x1^2 - x3^2 + sx13 = pow(x1, 2) - pow(x3, 2) + + # y1^2 - y3^2 + sy13 = pow(y1, 2) - pow(y3, 2) + + sx21 = pow(x2, 2) - pow(x1, 2) + sy21 = pow(y2, 2) - pow(y1, 2) + + f = (((sx13) * (x12) + (sy13) * + (x12) + (sx21) * (x13) + + (sy21) * (x13)) // (2 * + ((y31) * (x12) - (y21) * (x13)))) + + g = (((sx13) * (y12) + (sy13) * (y12) + + (sx21) * (y13) + (sy21) * (y13)) // + (2 * ((x31) * (y12) - (x21) * (y13)))) + + c = (-pow(x1, 2) - pow(y1, 2) - + 2 * g * x1 - 2 * f * y1) + + # eqn of circle be x^2 + y^2 + 2*g*x + 2*f*y + c = 0 + # where centre is (h = -g, k = -f) and + # radius r as r^2 = h^2 + k^2 - c + h = -g + k = -f + sqr_of_r = h * h + k * k - c + + # r is the radius + r = round(sqrt(sqr_of_r), 5) + + return h, k, r + +def calculate_curvature(point1, point2, point3): + # Use vector math to compute the angle between segments + vec1 = np.array(point2) - np.array(point1) + vec2 = np.array(point3) - np.array(point2) + + cosine_angle = np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2)) + angle = np.arccos(np.clip(cosine_angle, -1.0, 1.0)) + + return angle +def distance_from_line(point, line_start, line_end): + """Calculate the perpendicular distance of a point from a line defined by two points.""" + line_vec = np.array(line_end) - np.array(line_start) + point_vec = np.array(point) - np.array(line_start) + line_len = np.linalg.norm(line_vec) + line_unitvec = line_vec / line_len + point_vec_scaled = point_vec / line_len + t = np.dot(line_unitvec, point_vec_scaled) + nearest = line_vec * t + dist = np.linalg.norm(point_vec - nearest) + return dist + +def find_apex_point(points, entry_point, exit_point): + """Find the apex point which is the furthest away from the line between entry and exit points.""" + max_distance = -1 + apex_point = None + for point in points: + dist = distance_from_line(point, entry_point, exit_point) + if dist > max_distance: + max_distance = dist + apex_point = point + return apex_point + +def find_curvature_from_radius(radius): + return 1 / radius # k = 1/R + +def entry_apex_exit(centerline_points, left_side_points, right_side_points,turntype, car_xy): + # Find the apex point, which is the point that is at the pointies point of the curve of the self.left_side_points = [] and self.right_side_points = [] + # Turn type describes if we look at the left or right side of track to find this apex point. + # Exit point it the last point of the centerline_points + # Entry point is the first point of the centerline_points or the cars current position, depens on what works best. + # The apex point is the point that is the furthest away from the line between the entry and exit point. + + # First method----------------------- + # Find the entry and exit points of the track + entry_point = car_xy if car_xy else centerline_points[0] + exit_point = centerline_points[-1] + # Select the side points based on the turn type + side_points = left_side_points if turntype == 'left' else right_side_points + # Find the apex point, which is the point that is the furthest away from the line between the entry and exit points + apex_point = find_apex_point(side_points, entry_point, exit_point) + # First method end + + + #apex_index = len(centerline_points) // 2 + # 2 method----------------- + # Iterate through centerline to detect high-curvature region + max_curvature = 0 + apex_index = 0 + + for i in range(1, len(centerline_points)-1): + curvature = calculate_curvature(centerline_points[i-1], centerline_points[i], centerline_points[i+1]) + if curvature > max_curvature: + max_curvature = curvature + apex_index = i + # 2 method end + + apex_point = centerline_points[apex_index] + + # Find the entry and exit points of the track + entry_point = centerline_points[0] + exit_point = centerline_points[-1] + + return entry_point, apex_point, exit_point + +def entry_apex_exit_method3(centerline_points): + + max_curvature = 0 + apex_index = 0 + + for i in range(1, len(centerline_points)-1): + cx, xy, r = findCircle(centerline_points[i-1][0], centerline_points[i-1][1], + centerline_points[i][0], centerline_points[i][1], + centerline_points[i+1][0], centerline_points[i+1][1]) + curvature = find_curvature_from_radius(r) + if curvature > max_curvature: + max_curvature = curvature + apex_index = i + + apex_point = centerline_points[apex_index] + + # Find the entry and exit points of the track + entry_point = centerline_points[0] + exit_point = centerline_points[-1] + + return entry_point, apex_point, exit_point + +def ecludean_distance(point1, point2): + return sqrt((point1[0] - point2[0])**2 + (point1[1] - point2[1])**2) + + +def get_halfe_track_width(centerlinepoints,leftright_side_points): + track_widths = [] + for left_point, right_point in zip(centerlinepoints, leftright_side_points): + track_widths.append(ecludean_distance(left_point, right_point)) + return track_widths + +def calculate_optimal_apex_shift(curviture, half_track_width): + # Base shift amount as a fraction of track width + base_shift = 0.3 * 2 * half_track_width # 0.3 not tested jet + + SHARP_TURN_THRESHOLD = None # IDK what to put here + GENTLE_TURN_THRESHOLD = None + + # Adjust shift based on turn radius + if curviture > SHARP_TURN_THRESHOLD: + # Sharp turn: reduce the shift + shift_amount = base_shift * 0.5 + elif curviture < GENTLE_TURN_THRESHOLD: + # Gentle turn: increase the shift + shift_amount = base_shift * 1.5 + else: + # Moderate turn: use base shift + shift_amount = base_shift + + return shift_amount + +def find_optimal_apex(centerline, boundaries, apex_index, half_track_width, curviture, turn_direction,car_width): + # Calculate shift based on turn radius + shift_amount = calculate_optimal_apex_shift(curviture, half_track_width) + + # Shift the apex towards the inside of the turn if there is a turn direction to follow + initial_apex = centerline[apex_index] + pbefore = centerline[apex_index-1] + pafter = centerline[apex_index+1] + + if turn_direction == 'left' or turn_direction == 'right': + new_apex = calculate_shifted_apex(turn_direction, np.array(initial_apex), shift_amount,pbefore,pafter) + + # Check boundaries and adjust if necessary + left_boundary, right_boundary = boundaries[apex_index] + + # Safety margin to ensure the apex point is not too close to the track boundary + safety_margin = 0.1 * 2 * half_track_width + car_width/2 # 0.1 not tested jet + + # if new_apex is close to the boundary, adjust it to be at a safe distance + if np.isclose(new_apex, left_boundary, atol=safety_margin): + new_apex = calculate_shifted_apex(turn_direction, np.array(initial_apex)+ safety_margin, shift_amount,pbefore,pafter) + elif np.isclose(new_apex, right_boundary, atol=safety_margin): + new_apex = calculate_shifted_apex(turn_direction, np.array(initial_apex)- safety_margin, shift_amount,pbefore,pafter) + + return new_apex if new_apex != None else initial_apex + +def calculate_shifted_apex(turn_direction, apex_point, shift_amount,pbefore,pafter): + # find the direction vector of the shift + # pafter = (x,y) + d = np.array(pafter) - np.array(pbefore) + + # find the normal vector of the direction vector + if turn_direction == 'left': + n = np.array([-d[1], d[0]]) + elif turn_direction == 'right': + n = np.array([d[1], -d[0]]) + + # normalize the normal vector + n /= np.linalg.norm(n) + + # new apex + new_apex = apex_point + shift_amount * n + + return new_apex + +def adjust_apex_point(apex_point, entry_point, exit_point, track_width,car_width): + # Adjust the apex point to be on the line between the entry and exit points, + # and at a distance of half the track width from the centerline. + # This is done by first calculating the angle of the line between the entry and exit points, + # and then calculating the angle of the line between the entry point and the apex point. + # The apex point is then moved along the line between the entry and exit points to be at the same angle + # but at a distance of half the track width from the centerline. + entry_exit_angle = np.arctan2(exit_point[1] - entry_point[1], exit_point[0] - entry_point[0]) + entry_apex_angle = np.arctan2(apex_point[1] - entry_point[1], apex_point[0] - entry_point[0]) + angle_diff = entry_exit_angle - entry_apex_angle + apex_distance = track_width / 2 / np.sin(angle_diff) + apex_point = (entry_point[0] + apex_distance * np.cos(entry_exit_angle), + entry_point[1] + apex_distance * np.sin(entry_exit_angle)) + # Accounting for the width of the car + apex_point = (apex_point[0] + car_width * np.cos(entry_exit_angle + np.pi/2), + apex_point[1] + car_width * np.sin(entry_exit_angle + np.pi/2)) + return apex_point diff --git a/src/race_algorithm_pkg/race_algorithm_pkg/test.py b/src/race_algorithm_pkg/race_algorithm_pkg/test.py new file mode 100644 index 0000000..69c27ba --- /dev/null +++ b/src/race_algorithm_pkg/race_algorithm_pkg/test.py @@ -0,0 +1,73 @@ +import numpy as np +import matplotlib.pyplot as plt + +def calculate_magnitude(vector): + return np.linalg.norm(vector) + +def normalize_vector(vector): + magnitude = calculate_magnitude(vector) + if magnitude == 0: + return vector + return vector / magnitude + +def calculate_angle(A, B): + dot_product = np.dot(A, B) + angle = np.arccos(dot_product) + return angle + +def determine_direction(A, B): + A_normalized = normalize_vector(A) + B_normalized = normalize_vector(B) + angle = calculate_angle(A_normalized, B_normalized) + + print(f"Angle: {np.degrees(angle)} degrees") # Debugging print statement + + if np.isclose(angle, 0, atol=np.radians(5)): # Small threshold to account for floating-point precision + return 0 # Straight + else: + cross_product = np.cross(A_normalized, B_normalized) + return 1 if cross_product > 0 else -1 # Left or Right + +def plot_vectors(A, B, direction, ax): + ax.quiver(0, 0, A[0], A[1], angles='xy', scale_units='xy', scale=1, color='r', label='Vector A') + ax.quiver(0, 0, B[0], B[1], angles='xy', scale_units='xy', scale=1, color='b', label='Vector B') + ax.set_xlim(-10, 50) + ax.set_ylim(-10, 50) + ax.axhline(0, color='black', linewidth=0.5) + ax.axvline(0, color='black', linewidth=0.5) + ax.grid(color='gray', linestyle='--', linewidth=0.5) + ax.set_aspect('equal', adjustable='box') + ax.legend() + + if direction == 0: + ax.set_title("Direction: Straight") + elif direction == 1: + ax.set_title("Direction: Left") + else: + ax.set_title("Direction: Right") + +# Example 2D vectors +A1 = np.array([1, 2]) +B1 = np.array([4, 5]) +A2 = np.array([10, 20]) +B2 = np.array([40, 50]) +A3 = np.array([1, 0]) +B3 = np.array([2, 0]) +A4 = np.array([10, 10]) +B4 = np.array([20, 20.1]) + +# Determine directions +direction1 = determine_direction(A1, B1) +direction2 = determine_direction(A2, B2) +direction3 = determine_direction(A3, B3) +direction4 = determine_direction(A4, B4) + +# Plotting +fig, axs = plt.subplots(1, 4, figsize=(20, 5)) + +plot_vectors(A1, B1, direction1, axs[0]) +plot_vectors(A2, B2, direction2, axs[1]) +plot_vectors(A3, B3, direction3, axs[2]) +plot_vectors(A4, B4, direction4, axs[3]) + +plt.show() \ No newline at end of file -- GitLab From aa5b4c45f92ead43d4a23ec3fab40f906059bca6 Mon Sep 17 00:00:00 2001 From: Mikkel Ejsing Skov Petersen <202007669@post.au.dk> Date: Mon, 9 Dec 2024 20:09:23 +0100 Subject: [PATCH 4/4] Updated documentation and cleaned code --- README.md | 3 +- src/race_algorithm_pkg/README.md | 94 ++++++-- .../CenterlineDrivingSuper.py | 211 +++--------------- 3 files changed, 113 insertions(+), 195 deletions(-) diff --git a/README.md b/README.md index 39ea6b1..368a284 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,7 @@ ## Project description This project is about a race algorithm for the f1tenth car. It enables the f1tenth to compete in competitions with the f1tenth car. +This project is based on the [Driving algorithms](https://gitlab.au.dk/auf1tenth-community/f1tenth-simulator/driving-algorithms) repository. Improvements and adaptations to work in ROS2 foxy have been made to the original algorithm. ## Installation and build instructions @@ -58,4 +59,4 @@ sudo apt install python3-ament-copyright python3-ament-flake8 python3-ament-pep2 ``` ## Project status -This project is under development, with the aim to win the F1TENTH competition. \ No newline at end of file +This project is under development. \ No newline at end of file diff --git a/src/race_algorithm_pkg/README.md b/src/race_algorithm_pkg/README.md index 48f4523..162e263 100644 --- a/src/race_algorithm_pkg/README.md +++ b/src/race_algorithm_pkg/README.md @@ -1,26 +1,91 @@ -# Racing Algorithm Package For the F1tenth AU Team +# Racing Algorithm Package For the AUF1tenth Team ## Package description This package is about a racing algorithm for the f1tenth car. It enables the f1tenth to compete in competitions with the f1tenth car. + ### Description of algorithm -This algorithm is based upon the **CenterLine drive algorithm**, see gitlab project [Driving algorithms](https://gitlab.au.dk/auf1tenth-community/f1tenth-simulator/driving-algorithms), where there has been made a few improvements. These improvements is firstly the regulation of speed, as this was hard set to 1.0 in the orignal algorthm. Then there is also been made a **characterization algorithm** that can identifity the state of the track, i.e. if the car is going straight or turning. It does this by ulitizing the centerlines which is created in the CenterLine drive algorithm. At last the code was transisted to ros2 instead of ros1 aswell as small improvemnts in the code. +The Centerline follow algorithm is based upon the **CenterLine drive algorithm**, see gitlab project [Driving algorithms](https://gitlab.au.dk/auf1tenth-community/f1tenth-simulator/driving-algorithms), where there has been made a few improvements. These improvements is firstly the regulation of speed, as this was hard set to 1.0 in the orignal algorthm. Then there is also been made a **characterization algorithm** that can identifity the state of the track, i.e. if the car is going straight or turning. It does this by ulitizing the centerlines which is created in the CenterLine drive algorithm. At last the code was transisted to ros2 instead of ros1 aswell as improvemnts in the code. + ### Detecting turns and straights -To identify if the car is on a turn or a straight, it as mentioned above uses the centerlines created. It then takes 3 points from this centerline. The first, the middle and the last point. It then calculates the distances distances between the points for then to find the area of the triangle they create between them using the **Heron's formula**: +To determine if the track is going left, right or straight as simple yet effective method has been used using vectors, angles, and crossproduct. By using the angle and crossproduct each target vector can be determined as being to the left or to the right or on top of the reference vector. left is represented as $-1$, right is represented as $(1)$, and straight is represented as "0". Then by counting how there is of each it can be determined if it is a straight, left or rith turn. + + A target vector as defined as a vector which goes from the cars position to each centerline point excluding the last one. A reference vector is defined as a vector which goes from the cars position to the last centerline point. + +#### Construction of vectors +To identify the type of direction the tracks goes the centerline points, reference vector $\vec{a}$, and a target vector $\vec{b}$ are used. The centerline is a list containing all the caluclated centerline points there is based on lidar readings and polynomial fitting. The reference and target vector are constructed using the reference coordinate, which is the position $(x,y)$ of the f1tenth car, and two target points as follow: + +$$ +ref\_cord=(r_x,r_y)\\ +targ\_cord=(t_x,t_y)\\ +targ2\_cord=(t_{x2},t_{y2})\\\;\\ +\vec{a} = \begin{bmatrix} t_x - r_x \\ t_y - r_y \end{bmatrix} = \begin{bmatrix} a_x \\ a_y\end{bmatrix}\\ +\vec{b} = \begin{bmatrix} t_{x2} - r_x \\ t_{y2} - r_y \end{bmatrix}= \begin{bmatrix} b_x \\ b_y\end{bmatrix}$$ +Where $targ$ is the last centerline point in the centerline point list, so the point furthest away from the car. $targ2$ is the closest point to the car in the centerline point list (In the first iteration). +$$ +\vec{a}_{\text{normalized}} = \frac{\vec{a}}{\|\vec{a}\|} \\ +\vec{b}_{\text{normalized}} = \frac{\vec{b}}{\|\vec{b}\|} +$$ +After the vectors are made they are normalized, as can be seen above. + +#### Chategorize direction using signs + +To chategorize each target vector to be to the left or to the right of the reference vector the angle and crossproduct is used. To calculate the angle between two vectors, the dot product and the arccosine function are used as follows: +$$ +\phi = \arccos(\vec{a} \cdot \vec{b}) +$$ +Where: +- $||\vec{a}||=1$ and $||\vec{b}||=1$ +- $(\vec{a} \cdot \vec{b})$ is the dot product of vectors $\vec{a} \text{ and } \vec{b}$. + +Based on this angle it can be determined if the line can be chategorized as going in the "straight" direction. If the angle is within two degrees of 0 radians it is chategorized as "straight" $sign=0$. If this is not the case, it is time to see wether it is left or right. +To determine this the crossproduct is used, between the two normalized vectors, as follow: +$$ +\text{cross\_product} = \vec{a}_{\text{normalized}} \times \vec{b}_{\text{normalized}} $$ -s = \frac{a + b + c}{2} \\ -area = \sqrt{s \cdot (s - a) \cdot (s - b) \cdot (s - c)} + +The direction is determined as follows: + +$$ +sign= \begin{cases} +1 & \text{if } \text{cross\_product} > 0 \\ +-1 & \text{if } \text{cross\_product} \leq 0 +\end{cases} +$$ + +Where: +- \(1\) indicates the direction is to the right. +- \(-1\) indicates the direction is to the left. + +This process is repeated for each centerline point in the centerlinepoint list. How this is implemented can be seen in the `CenterlineDrivingSuper.py` file, specifically at lines 242-323. + + +### Path to follow +The point of which the f1tenth car chooses to drive towards can either be the first point of the centerline points or a combination of the first, middle, and last point of the centerline points. If the centerlinepoint list has more than or equal to three points, will the car use the three mentioned points to decide its heading. The steps in doing this is a follow: First it is needed to check if the list center target points is empty, if so the returned is inf. inf means that there is no point to head for. If not the case we procede. If there are at least 3 points in the target centerline, calculate the average coordinate between the first, middle, and last points: +$$ +nofpe=lenght\;of\;target\_centerline\;list\\ +x_f= \text{target\_centerline}[0][0] \\ +x_m = \text{target\_centerline}[\text{nofpe} // 2][0] \\ +x_l = \text{target\_centerline}[-1][0] \\ +y_f = \text{target\_centerline}[0][1] \\ +y_m = \text{target\_centerline}[\text{nofpe} // 2][1] \\ +y_l = \text{target\_centerline}[-1][1] \\ +\text{target\_pose}\_x = \frac{x_f + x_m + x_l}{3} \\ +\text{target\_pose}\_y = \frac{y_f + y_m + y_l}{3} $$ -Then the **curvity** of the circle that goes through the three points is calculated using the Menger curvature formula: -$$curvity = \frac{4 \cdot area}{a \cdot b \cdot c}$$ -If the curvity is below a certain threshold, then the car is on a straight, otherwise it is on a turn. -One thing to keep in mind is that these three points must not be collinear, otherwise divizion by zero can happen, but this is takken into consideration. -If the curvity is below a certain **threshold**, then the car is on a straight, otherwise it is on a turn. +The relative coordinate between the average target point and the car position: +$$ +dx = \text{target\_pose}\_x - x_{\text{car}} \\ +dy = \text{target\_pose}\_y - y_{\text{car}} +$$ -One thing to keep in mind is that these three points must not be collinear, otherwise divizion by zero can happen, but this is takken into consideration. +If there are fewer than 3 points in the target centerline, calculate the relative coordinate between the target point and the car position: +$$ +dx = \text{target\_pose}\_x - x_{\text{car}} \\ +dy = \text{target\_pose}\_y - y_{\text{car}} +$$ -### Curvity threshold -The curvity threshold is set to 1.0, as this was found to be a good value for the f1tenth car. But it can be adjusted if needed in the CenterlineDriveSuper.py file. +Then the differences in x and y is used to calcualte the heading of the car to the target point. How this is done can be seen in the `CenterlineDrivingSuper.py` file, specifically at lines 380 and 381. ### Speed regulation To regulate the speed the car first identifies if it is on a straight or a turn. If it is on a straight, then the **target speed** is set to 4.5, otherwise it is set to 0.5. This is done to make the car go slower in turns, and drive faster on straights. It adjusts the speed by calculating the average distance to the race track sides for the 75 middle laser scan points in front of the car. If the **average distance** is above *max_distance* then the speed is equal to the target speed. If the average distance is between *min_distance* and *max_distance*, then the speed is set to: @@ -28,3 +93,6 @@ To regulate the speed the car first identifies if it is on a straight or a turn. speed = goal_speed * (mean_distance - min_distance) / (max_distance - min_distance) ``` , otherwise it is set to 0.0. *min_distance* and *max_distance* can be adjusted in the file CenterlineDriveSuper.py. + +### Future work +Improvents which could be made to this algorithm is to implement a way to adjust the steering angle based on the speed and future direction of the track (left, right, or straigt). When the car is at high speeds, fast changes in steering angle can result in unstable driving behavoiur. diff --git a/src/race_algorithm_pkg/race_algorithm_pkg/CenterlineDrivingSuper.py b/src/race_algorithm_pkg/race_algorithm_pkg/CenterlineDrivingSuper.py index 1808463..5da8184 100644 --- a/src/race_algorithm_pkg/race_algorithm_pkg/CenterlineDrivingSuper.py +++ b/src/race_algorithm_pkg/race_algorithm_pkg/CenterlineDrivingSuper.py @@ -19,8 +19,6 @@ from geometry_msgs.msg import PoseWithCovarianceStamped from geometry_msgs.msg import Quaternion from tf_transformations import quaternion_from_euler -import helper_functions as hf - class LidarDataProcessor(Node): """ LidarDataProcessor node that uses linear regression to find the centerline of the track.""" def __init__(self): @@ -189,7 +187,7 @@ class LidarDataProcessor(Node): # Plot lidar points and polynomial curves - # Plot tangent lines at evenly spaced points and find intersection points + # Plot tangent lines at evenly spaced points and find intersection points for i in range(len(x_fit_evenly_spaced_after)): tangent_slope = slope[i] tangent_point = (x_fit_evenly_spaced_after[i], y_fit_evenly_spaced_after[i]) @@ -221,13 +219,11 @@ class LidarDataProcessor(Node): #self.get_logger(f"self.target_center_points: {self.target_centerline_points}") - def cross_product_2d_new(self,ref1,ref2, point, margin=1e-2): # 1e-2 is 0.01 + def cross_product_2d(self,ref1,ref2, point, margin=1e-2): # 1e-2 is 0.01 refv = (ref2[0]-ref1[0], ref2[1] - ref1[1]) pointv = (point[0] - ref1[0], point[1] - ref1[1]) # Cross product of two vectors - #self.get_logger().info(f"refv: {refv}, pointv: {pointv}") cross_product = refv[0] * pointv[1] - refv[1] * pointv[0] - #self.get_logger().info(f"cross_product: {cross_product}, Type: {type(cross_product)}") if abs(cross_product) < margin: return 0 return cross_product @@ -243,20 +239,20 @@ class LidarDataProcessor(Node): angle = np.arccos(dot_product) return angle - def determine_direction(self,ref_coord, t_coord, t_coord2): - A = np.array([t_coord[0] - ref_coord[0], t_coord[1] - ref_coord[1]]) - B = np.array([t_coord2[0] - ref_coord[0], t_coord2[1] - ref_coord[1]]) - A_normalized = self.normalize_vector(A) - B_normalized = self.normalize_vector(B) + def determine_direction(self,ref_cord, targ_cord, targ_cord2): + a = np.array([targ_cord[0] - ref_cord[0], targ_cord[1] - ref_cord[1]]) + b = np.array([targ_cord2[0] - ref_cord[0], targ_cord2[1] - ref_cord[1]]) + a_normalized = self.normalize_vector(a) + b_normalized = self.normalize_vector(b) - angle = self.calculate_angle(A_normalized, B_normalized) + angle = self.calculate_angle(a_normalized, b_normalized) # Determine direction based on the angle if np.isclose(angle, 0, atol=np.radians(2)): # Small threshold to account for floating-point precision # 5, 2.5, 2 return 0 # Straight else: - cross_product = np.cross(A_normalized, B_normalized) - return 1 if cross_product > 0 else -1 # Left or Right <- 1 is left and -1 is right SKIFT HER HVIS DET ER FORKERT + cross_product = np.cross(a_normalized, b_normalized) + return 1 if cross_product > 0 else -1 # Left or Right --> -1 is left and 1 is right. def calculate_centerline(self): self.x_car_at_calc = self.x_car @@ -265,48 +261,37 @@ class LidarDataProcessor(Node): points_map = self.transform_to_map_frame() # Shift x_values in points_map based on a constant - # Get direction sign, RIGHT is 1 and LEFT is -1 + # Get direction sign if self.sign == None: self.sign = 0 - #self.get_logger().info(f"len(points_map): {len(points_map)}") - if len(points_map) > 9: #35 - # Reference point (first point in points_map) - cross_products = [self.cross_product_2d_new((self.x_car_at_calc,self.y_car_at_calc), points_map[-1], point) for point in points_map[:-1]] - #self.get_logger().info(f"cross_product_2d_new: {cross_products}") - - # Determine the position of each point relative to refvector - signs = [1 if cp > 0 else -1 if cp < 0 else 0 for cp in cross_products] + + # There needs to be at least 10 points in the centerline + # To determine the direction of the track + if len(points_map) > 9: # Determine direction signs = [self.determine_direction((self.x_car_at_calc,self.y_car_at_calc),points_map[-1], point) for point in points_map[:-1]] - #count_ones = sum(1 for sign in signs if sign == 1) + # Count the number of each direction count_ones = signs.count(1) count_neg_ones = signs.count(-1) count_zeros = signs.count(0) - #self.get_logger().info(f"count_ones: {count_ones}, count_neg_ones: {count_neg_ones}, count_zeros: {count_zeros}") - #self.get_logger().info(f"signs: {signs}") - #self.get_logger().info(f"len(signs): {len(signs)}") if count_zeros > count_neg_ones and count_zeros > count_ones: self.sign = 0 self.state = "straight" self.get_logger().info(f"STRAIGHT") elif count_ones > count_neg_ones: - self.sign = 1 #-1 + self.sign = 1 self.state = "right" self.get_logger().info(f"RIGHT") - else: #count_neg_ones > count_ones: - self.sign = -1 #1 + else: + self.sign = -1 self.state = "left" self.get_logger().info(f"LEFT") - #plot_vectors(self,reference_point1,reference_point2, points) + # If previous turn type is None set it to the current turn type if self.prev_turn_type == None: self.prev_turn_type = self.state - # Increase the consecutive_cat if the previous turn type is the same as the current turn type - if self.prev_turn_type == self.state: #and self.consecutive_cat < 50: - self.consecutive_cat += 1 - else: - self.consecutive_cat = 1 # Reset + marker_msg = Marker() marker_msg.header.frame_id = "/base_link" # /map @@ -322,32 +307,13 @@ class LidarDataProcessor(Node): marker_msg.color.a = 1.0 self.target_centerline = [] - # Normalize points - x_coords = [point[0] for point in points_map] - y_coords = [point[1] for point in points_map] - if not x_coords or not y_coords: # Exit if no points are available - return - x_min, x_max = min(x_coords), max(x_coords) - y_min, y_max = min(y_coords), max(y_coords) - new_min, new_max = -1, 1 # Desired range for normalization -1 - - #self.find_closest_obstacle_index = self.if_find_closest_obstacle(self.lidar_ranges) ## Nyt - for point_map in points_map: - #normalized_x = (point_map[0] - x_min) / (x_max - x_min) * (new_max - new_min) + new_min - #normalized_y = (point_map[1] - y_min) / (y_max - y_min) * (new_max - new_min) + new_min p = Point() - #self.get_logger().info(f"Point: {point_map}") - p.x = point_map[0] # normalized_x - p.y = point_map[1] # normalized_y + p.x = point_map[0] + p.y = point_map[1] p.z = 0. - #offset_x, offset_y = self.calculate_offset(self.find_closest_obstacle_index,self.x_car,self.y_car) + self.target_centerline.append((point_map[0],point_map[1])) - #if offset_x == 0 and offset_y == 0: - # self.target_centerline.append((point_map[0],point_map[1])) - #else: - # self.get_logger().info(f"Offset x: {offset_x}, Offset y: {offset_y}") - # self.target_centerline.append((offset_x,offset_y))# .append((point_map[0],point_map[1])) marker_msg.points.append(p) self.marker_pub.publish(marker_msg) @@ -356,7 +322,6 @@ class LidarDataProcessor(Node): # Extract position self.x_car = msg.pose.pose.position.x self.y_car = msg.pose.pose.position.y - #self.get_logger().info(f"x: {self.x_car}, y: {self.y_car}") # Extract orientation (in quaternion) orientation_quaternion = ( msg.pose.pose.orientation.x, @@ -367,8 +332,7 @@ class LidarDataProcessor(Node): # Convert orientation from quaternion to euler angles (roll, pitch, yaw) (roll, pitch, self.theta_car) = euler_from_quaternion(orientation_quaternion) - #self.control_car() - + # Calculate the distance between the car and the target point def distance_to_next_waypoint(self): if self.target_centerline == [] or self.x_car is None: @@ -390,47 +354,32 @@ class LidarDataProcessor(Node): self.target_pose_x = self.target_centerline [waypoint_index][0] self.target_pose_y = self.target_centerline [waypoint_index][1] - if len(self.target_centerline) >= 3: #self.state == "straight" - #self.get_logger().info(f"Target 3---------------------") + if len(self.target_centerline) >= 3: # Cacluting the average coordinat between thre target points. # Using the three first target points or the first, middle, and the last. nofpe = len(self.target_centerline) # number_of_points_elements self.target_pose_x = (self.target_centerline[0][0] + self.target_centerline[nofpe//2][0] + self.target_centerline[-1][0]) / 3 self.target_pose_y = (self.target_centerline[0][1] + self.target_centerline[nofpe//2][1] + self.target_centerline[-1][1]) / 3 - #self.target_pose_x = self.target_centerline[nofpe//2][0] - #self.target_pose_y = self.target_centerline[nofpe//2][1] - - #self.target_pose_x = (self.target_centerline[0][0] + self.target_centerline[1][0] + self.target_centerline[2][0]) / 3 - #self.target_pose_y = (self.target_centerline[0][1] + self.target_centerline[1][1] + self.target_centerline[2][1]) / 3 # Calculate the relative coordinate between the average target point and the car position self.dx = self.target_pose_x - self.x_car self.dy = self.target_pose_y - self.y_car else: - #self.get_logger().info(f"Target 1---------------------") # Calculate the relative coordinate between three target points and the car position self.dx = self.target_pose_x - self.x_car self.dy = self.target_pose_y - self.y_car - - # original code - #self.dx = self.target_pose_x - self.x_car #+ self.x_car_at_calc - #self.dy = self.target_pose_y - self.y_car #+ self.y_car_at_calc def control_car(self): - #self.get_logger().info("Control car") self.calculate_centerline() - #self.get_logger().info(f"Centerline target points: {self.target_centerline_points}") if self.target_centerline_points == [] or self.x_car is None: self.calculate_centerline() - #self.get_logger().info("No target centerline points") return # Exit if waypoints or odometry data is not available distance_to_next_waypoint = self.distance_to_next_waypoint() - self.calculate_relative_coordinates(0) # 0 is the index of the target point in centerline target array, self.dy and self.dx is calculated + self.calculate_relative_coordinates(0) angle_rad = math.atan2(self.dy, self.dx) angle_difference = (angle_rad - self.theta_car + math.pi) % (2*math.pi) - math.pi speed = self.speed_control() - #self.get_logger().info(f"Angle difference: {angle_difference}, Speed: {speed}") self.publish_drive_command(angle_difference * self.steering_gain, speed) #1.0, 0.5, 3.0 def publish_drive_command(self, steering_angle, speed): @@ -458,94 +407,14 @@ class LidarDataProcessor(Node): return points_map - def calculate_curvature(self,points): - # Assuming points is a list of (x, y) tuples - if len(points) < 3: - return 0 # Not enough points to calculate curvature - - # Select three points from the list: start, middle, and end - p1, p2, p3 = points[0], points[len(points) // 2], points[-1] - - # Calculate the curvature using the three-point formula - a = np.linalg.norm(np.array(p2) - np.array(p1)) # L2 norm - b = np.linalg.norm(np.array(p3) - np.array(p2)) - c = np.linalg.norm(np.array(p3) - np.array(p1)) - # Calculate the curvature - s = (a + b + c) / 2 - # Avoid division by zero - # If the area of the triangle is zero, the curvature is also zero - # This can happen if the three points are collinear (on the same line) - # In that case, the curvature is zero - # Curvature is defined as the inverse of the radius of the osculating circle at a point on a curve - # Heron's formula------------------------- - # Heron's formula is used to calculate the area of a triangle: A = sqrt(s(s - a)(s - b)(s - c)) - # A larger triangle area does not necessarily mean smaller curvature, and vice versa - # Curvature is not directly proportional to the area of the triangle formed by three points on a curve - # curvature formula is K = 4A / (abc) and called menger curvature - - if a == 0 or b == 0 or c == 0: - return 0.0 - - area = np.sqrt(s * (s - a) * (s - b) * (s - c)) - K = 0 if s == 0 else 4 * area / (a * b * c) - - min_steering_gain = 0.5 # Minimum steering gain - max_steering_gain = 10.0 # Maximum steering gain - #self.steering_gain = min(max(K*100.0, min_steering_gain), max_steering_gain) - #self.steering_gain = 0.5 - #self.get_logger().info(f"Steering_gain: {self.steering_gain}, Curvature: {K}") - self.curve = K - return K - - def calculate_curvature_new(self, points): - # Assuming points is a list of (x, y) tuples - if len(points) < 3: - return 0 # Not enough points to calculate curvature - - # Extract x and y coordinates from points - x = np.array([p[0] for p in points]) - y = np.array([p[1] for p in points]) - - # Fit a second-degree polynomial (quadratic) to the points - degree = 2 - coefficients = np.polyfit(x, y, 2) - polynomial = np.poly1d(coefficients) - - # Calculate the first and second derivatives of the polynomial - first_derivative = np.polyder(polynomial, 1) - second_derivative = np.polyder(polynomial, 2) - - # Calculate the curvature at the midpoint of the points - midpoint_x = x[len(x) // 2] - dx = first_derivative(midpoint_x) - ddx = second_derivative(midpoint_x) - curvature = np.abs(ddx) / (1 + dx**2)**(3/2) - - return curvature - - def categorize_path(self, centerline_points): - curvature_threshold = 1.0 # Initial threshold value (1.0) - curvature = self.calculate_curvature(centerline_points) - #self.get_logger().info(f"Curvature: {curvature}") - - if curvature < curvature_threshold or curvature == 0: - self.state = "straight" # "straight" - self.offset = 0 - else: - self.state = "turning" - self.offset = self.track_width / 2 - - #self.get_logger().info(f"State: {self.state}") - # Function that adjust the speed of the car based on the distance to the next waypoints # Takes the average of the middle 75 points of the lidar scan and adjust the speed based on the average distance def speed_control(self): target_speed = None - #self.categorize_path(self.target_centerline_points) if self.state == "straight": - target_speed = 10.5 # 4.5, 10.5 (har ikke nået top hastighed med 10.5!!) - elif self.state == "turning" or self.state == "left" or self.state == "right": - target_speed = 3.5 # 0.5 1.5 + target_speed = 4.5 + elif self.state == "left" or self.state == "right": + target_speed = 0.5 middle_index = int(len(self.lidar_ranges) / 2) middle_size = 75 middle_slice = self.lidar_ranges[middle_index - middle_size : middle_index + middle_size + 1] @@ -554,7 +423,6 @@ class LidarDataProcessor(Node): min_distance = 0.0 speed = None - #self.get_logger().info(f"Mean distance: {mean_distance}") # speed proportional to the distance interval [min_distance, max_distance] and goal_speed if mean_distance > max_distance: speed = target_speed @@ -563,7 +431,6 @@ class LidarDataProcessor(Node): # speed = (max_speed - min_speed) * (average_distance - min_distance) / (max_distance - min_distance) + min_speed else: speed = 0.0 - #self.get_logger().info(f"Speed: {speed}-----------------------------") return speed def main(args=None): @@ -571,26 +438,8 @@ def main(args=None): rclpy.init(args=args) lidar_processor = LidarDataProcessor() rclpy.spin(lidar_processor) - # Loop to keep the program running - #while rclpy.ok(): - # lidar_processor.control_car() - #rate.sleep() - #rclpy.spin(lidar_processor) lidar_processor.destroy_node() rclpy.shutdown() if __name__ == '__main__': main() - """ - try: - rclpy.init(anonymous=True) - lidar_processor = LidarDataProcessor() - - # Loop to keep the program running - while not rclpy.is_shutdown(): - lidar_processor.control_car() - #rate.sleep() - - except rclpy.ROSInterruptException: - pass - """ -- GitLab