Source code for src.trajdata

# Zhihao Zhang
# NGSIM dataset processor trajdata.py file

import math
import os
import numpy as np
from src import ngsim_trajdata
from src import trajectory_smoothing
from src.Vec import VecSE2
from src import const
from src.Roadway import roadway
from src.Record import record
from src.Basic import Vehicle
from tqdm import tqdm
from src.Record.record import read_trajdata


[docs]def symmetric_exponential_moving_average(arr: list, T: float, dt: float = 0.1): delta = T / dt N = len(arr) retval = [] for i in range(N): Z = 0.0 x = 0.0 D = min(int(round(3 * delta)), i) if i + D > N - 1: D = N - i - 1 for k in range(i - D, i + D + 1): e = math.exp(-abs(i-k)/delta) Z += e x += arr[k] * e retval.append(x / Z) return retval
[docs]class FilterTrajectoryResult: def __init__(self, trajdata: ngsim_trajdata.NGSIMTrajdata, carid: int): dfstart = trajdata.car2start[carid] N = trajdata.df.at[dfstart, 'n_frames_in_dataset'] x_arr = [] y_arr = [] theta_arr = [] v_arr = [] for i in range(N): x_arr.append(trajdata.df.at[dfstart + i, 'global_x']) y_arr.append(trajdata.df.at[dfstart + i, 'global_y']) theta_arr.append(math.atan2(y_arr[4] - y_arr[0], x_arr[4] - x_arr[0])) v_arr.append(trajdata.df.at[dfstart, 'speed']) # hypot(ftr.y_arr[lookahead] - y₀, ftr.x_arr[lookahead] - x₀)/ν.Δt if v_arr[0] < 1.0: # small speed # estimate with greater lookahead theta_arr[0] = math.atan2(y_arr[-1] - y_arr[0], x_arr[-1] - x_arr[0]) self.carid = carid self.x_arr = x_arr self.y_arr = y_arr self.theta_arr = theta_arr self.v_arr = v_arr def __len__(self): return len(self.x_arr)
[docs]def filter_trajectory(ftr: FilterTrajectoryResult, v: trajectory_smoothing.VehicleSystem = trajectory_smoothing.VehicleSystem()): mu = [ftr.x_arr[0], ftr.y_arr[0], ftr.theta_arr[0], ftr.v_arr[0]] sigma = 1e-1 cov_ = np.diag([sigma * 0.01, sigma * 0.01, sigma * 0.1, sigma]) # assume control is centered u = [0.0, 0.0] z = [None, None] for i in range(1, len(ftr)): # pull observation z[0] = ftr.x_arr[i] z[1] = ftr.y_arr[i] # apply extended Kalman filter mu, cov_ = trajectory_smoothing.EKF(v, mu, cov_, u, z) # strong result ftr.x_arr[i] = mu[0] ftr.y_arr[i] = mu[1] ftr.theta_arr.append(mu[2]) ftr.v_arr.append(mu[3]) return ftr
[docs]def copy(trajdata: ngsim_trajdata.NGSIMTrajdata, ftr: FilterTrajectoryResult): dfstart = trajdata.car2start[ftr.carid] N = trajdata.df.at[dfstart, 'n_frames_in_dataset'] # copy results back to trajdata # print("start copying: ") for i in range(N): #print(dfstart, i, N) #print('global_x') trajdata.df.at[dfstart + i, 'global_x'] = ftr.x_arr[i] #print('global_y') trajdata.df.at[dfstart + i, 'global_y'] = ftr.y_arr[i] trajdata.df.at[dfstart + i, 'speed'] = ftr.v_arr[i] #print("speed") if i > 0: a = ftr.x_arr[i] b = ftr.x_arr[i-1] c = ftr.y_arr[i] d = ftr.y_arr[i-1] trajdata.df.at[dfstart + i, 'speed'] = math.hypot(a-b, c-d) / const.NGSIM_TIMESTEP else: a = ftr.x_arr[i + 1] b = ftr.x_arr[i] c = ftr.y_arr[i + 1] d = ftr.y_arr[i] trajdata.df.at[dfstart + i, 'speed'] = math.hypot(a-b, c-d) / const.NGSIM_TIMESTEP #print("global_heading") trajdata.df.at[dfstart + i, 'global_heading'] = ftr.theta_arr[i] return trajdata
[docs]def filter_given_trajectory(trajdata: ngsim_trajdata.NGSIMTrajdata, carid: int): ''' :param trajdata: trajectory data file, NGSIMTrajdata object :param carid: the id of the car that we want to filter :return: a smoothed trajectory ''' # Filters the given vehicle's trajectory using an Extended Kalman Filter ftr = FilterTrajectoryResult(trajdata, carid) # run pre-smoothing ftr.x_arr = symmetric_exponential_moving_average(ftr.x_arr, const.SMOOTHING_WIDTH_POS) ftr.y_arr = symmetric_exponential_moving_average(ftr.y_arr, const.SMOOTHING_WIDTH_POS) ftr = filter_trajectory(ftr) trajdata = copy(trajdata, ftr) # print("finish copy") return trajdata
[docs]def load_ngsim_trajdata(filepath: str, autofilter: bool = True): ''' :param filepath: the path of the raw trajectory data file :param autofilter: indicates do the filter or not :return: the filtered(or not) data class ''' print("loading from file: ") tdraw = ngsim_trajdata.NGSIMTrajdata(filepath) if autofilter and os.path.splitext(filepath)[1] == ".txt": print("filtering: ") for carid in tqdm(ngsim_trajdata.carid_set(tdraw)): # print(carid) tdraw = filter_given_trajectory(tdraw, carid) return tdraw
[docs]def convert(tdraw: ngsim_trajdata.NGSIMTrajdata, roadway: roadway.Roadway): ''' :param tdraw: trajectory raw data :param roadway: roadway class :return: ListRecord(), a preprocessed and integrated version of tdraw and roadway ''' df = tdraw.df vehdefs = {} states = [] frames = [] print("convert: Vehicle definition") for id, dfind in tdraw.car2start.items(): vehdefs[id] = Vehicle.VehicleDef(df.at[dfind, 'class'], df.at[dfind, 'length'] * const.METERS_PER_FOOT, df.at[dfind, 'width'] * const.METERS_PER_FOOT) state_ind = 0 print("convert: frames and states") prev_x = {} prev_y = {} for frame in tqdm(range(1, tdraw.nframes + 1)): # change from 1 to 0 frame_lo = state_ind + 1 # print("frame: {}".format(frame)) for id in ngsim_trajdata.carsinframe(tdraw, frame): # print("id: {}".format(id)) if id not in prev_x: prev_x[id] = 0 prev_y[id] = 0 dfind = ngsim_trajdata.car_df_index(tdraw, id, frame) assert dfind != -1 theta = math.atan2(df.at[dfind, 'global_y'] - prev_y[id], df.at[dfind, 'global_x'] - prev_x[id]) posG = VecSE2.VecSE2(df.at[dfind, 'global_x'] * const.METERS_PER_FOOT, df.at[dfind, 'global_y'] * const.METERS_PER_FOOT, theta) prev_x[id] = df.at[dfind, 'global_x'] prev_y[id] = df.at[dfind, 'global_y'] # print(df.at[dfind, 'global_heading']) speed = df.at[dfind, 'speed'] * const.METERS_PER_FOOT state_ind += 1 # print(state_ind) state = Vehicle.VehicleState() state.set(posG, roadway, speed) states.append(record.RecordState(state, id)) frame_hi = state_ind frames.append(record.RecordFrame(frame_lo, frame_hi)) return record.ListRecord(const.NGSIM_TIMESTEP, frames, states, vehdefs)
[docs]def get_corresponding_roadway(filename: str): ''' :param filename: the path of the saved roadway file :return: the roadway class object ''' retval = None if "i101" in filename: with open(os.path.join(const.DIR, "../data/ngsim_80.txt"), "r") as fp_80: retval = roadway.read_roadway(fp_80) fp_80.close() elif "i80" in filename: with open(os.path.join(const.DIR, "../data/ngsim_101.txt"), "r") as fp_101: retval = roadway.read_roadway(fp_101) fp_101.close() elif "holo" in filename: with open(os.path.join(const.DIR, "../data/ngsim_HOLO.txt"), "r") as fp_holo: retval = roadway.read_roadway(fp_holo) fp_holo.close() else: raise ValueError("no such roadway file, check your file name") return retval
[docs]def convert_raw_ngsim_to_trajdatas(): ''' convert the raw trajectory data and roadway to a integrated version and save to a file :return: no return ''' for filepath in const.NGSIM_TRAJDATA_PATHS: filename = os.path.split(filepath)[1] print("converting " + filename) roadway = get_corresponding_roadway(filename) print("finish loading roadway.") print("Start loading NGSIM trajectory data.") tdraw = load_ngsim_trajdata(filepath) print("finish loading NGSIM trajectory data.") print("Start converting.") # no problems until here trajdata = convert(tdraw, roadway) print("finish converting") outpath = os.path.join(const.DIR, "../data/trajdata_" + filename) print("save to {}".format(outpath)) with open(outpath, "w") as fp: trajdata.write(fp) fp.close() print("Finish saving the file")
[docs]def load_trajdata(filepath: str): ''' load trajdata from the file given the file path :param filepath: file path :return: ListRecord object ''' with open(filepath, "r") as fp: td = read_trajdata(fp) return td