Source code for src.Record.record

from src.Basic import Vehicle
from src.Record.frame import Frame, copyto
from src.Basic.Vehicle import read_def, read_state


[docs]class RecordFrame: def __init__(self, lo: int, hi: int): ''' :param lo: low index :param hi: high index ''' self.lo = lo # low index self.hi = hi # high index def __len__(self): ''' :return: the length of the record ''' return self.hi - self.lo + 1
[docs] def write(self, fp): ''' :param fp: the file pointer to write :return: write to file, no return ''' fp.write("%d %d" % (self.lo, self.hi))
[docs]def read_frame(fp): ''' :param fp: the frame pointer to read :return: a RecordFrame object ''' tokens = fp.readline().strip().split(' ') lo = int(tokens[0]) hi = int(tokens[1]) return RecordFrame(lo, hi)
[docs]class RecordState: ''' used to record the information of a state ''' def __init__(self, state: Vehicle.VehicleState, id: int): ''' :param state: Vehicle State :param id: id of the state ''' self.state = state # vehicle state object self.id = id # corresponding vehicle id
[docs]class ListRecord: def __init__(self, timestep: float, frames: list, states: list, defs: dict): ''' timestep::Float64 frames::List of record frame, indicating the index range states::list of record state indicating the vehicle defs::Dict{I, D} ''' self.timestep = timestep self.frames = frames self.states = states self.defs = defs
[docs] def write(self, fp): ''' :param fp: the frame pointer to write :return: write file, no return ''' fp.write("ListRecord{%s, %s, %s}(%d frames)\n" % ('NGSIM_TIMESTEP', 'Array{RecordFrame}', 'Array{RecordState{VehicleState, Int}}', len(self.frames))) fp.write("%.16e\n" % self.timestep) # defs fp.write(str(len(self.defs))) fp.write("\n") for id in self.defs: fp.write(str(id)) fp.write("\n") self.defs[id].write(fp) fp.write("\n") # ids & states fp.write(str(len(self.states))) fp.write("\n") for recstate in self.states: fp.write(str(recstate.id)) fp.write("\n") recstate.state.write(fp) fp.write("\n") # frames fp.write(str(len(self.frames))) fp.write("\n") for recframe in self.frames: recframe.write(fp) fp.write("\n")
[docs] def n_objects_in_frame(self, frame_index: int): ''' :param frame_index: the index of the frame :return: the length of the frame given the index ''' return len(self.frames[frame_index])
@property def nframes(self): ''' :return: the number of all the frames ''' return len(self.frames) @property def nstates(self): ''' :return: the number of all the states ''' return len(self.states) @property def nids(self): ''' :return: the number of all the vehicles ''' return len(self.defs.keys())
[docs]def read_trajdata(fp): ''' :param fp: the frame pointer to read :return: a ListRecord object ''' lines = fp.readline() # skip first line # lines = fp.readline() # skip second line timestep = float(fp.readline()) defs = dict() # read defs n = int(fp.readline()) for i in range(n): id = int(fp.readline()) # TODO: check if need parse /n defs[id] = read_def(fp) # read states n = int(fp.readline()) states = [None for i in range(n)] for i in range(n): id = int(fp.readline()) state = read_state(fp) states[i] = RecordState(state, id) # read frames n = int(fp.readline()) frames = [None for i in range(n)] for i in range(n): frames[i] = read_frame(fp) return ListRecord(timestep, frames, states, defs)
[docs]class SceneRecord: def __init__(self): ''' frames::List{Frame{Vehicle}} timestep::Float64 nframes::Int # number of active Frames ''' self.frames = [] self.timestep = 0 self.nframes = 0 def __getitem__(self, item): ''' :param item: the number of frames before the frame you want to retrieve :return: the frame you want ''' return self.frames[0-item]
[docs] def init(self, capacity: int, timestep: float, frame_capacity: int = 100): ''' Initializing the SceneRecord object :param capacity: the capacity of the object :param timestep: timestep indicator :param frame_capacity: the capacity of a single frame :return: ''' frames = [] for i in range(capacity): frame = Frame() frame.init(frame_capacity) frames.append(frame) self.frames = frames self.timestep = timestep self.nframes = 0
@property def capacity(self): ''' :return: capacity of the SceneRecord object ''' return len(self.frames)
[docs] def empty(self): ''' :return: empty the SceneRecord Stack ''' self.nframes = 0
[docs] def insert(self, frame: Frame, pastframe: int=0): ''' :param frame: the frame to be inserted :param pastframe: <=0, -n represent past n frame :return: no return ''' self.frames[0 - pastframe] = copyto(self.frames[0 - pastframe], frame)
[docs] def push_back_records(self): ''' push back frames from tail to head :return: no return ''' for i in range(min(self.nframes + 1, self.capacity) - 1, 0, -1): self.frames[i] = copyto(self.frames[i], self.frames[i - 1])
[docs] def update(self, frame: Frame): ''' update the first frame :param frame: the frame to be updated :return: no return ''' self.push_back_records() self.insert(frame, 0) self.nframes = min(self.nframes + 1, self.capacity)
[docs]def frame_inbounds(rec: ListRecord, frame_index: int): ''' check if the index is in bound :param rec: ListRecord :param frame_index: index of the frame to be checked :return: false if out of bound true otherwise ''' return 0 <= frame_index < rec.nframes
[docs]def pastframe_inbounds(rec: SceneRecord, pastframe: int): ''' check if the query pastframe is out of bound :param rec: SceneRecord :param pastframe: the query pastframe number :return: false if out of bound true otherwise ''' return 0 <= 0 - pastframe <= rec.nframes - 1
[docs]def get_elapsed_time_3(rec: SceneRecord, pastframe_farthest_back: int, pastframe_most_recent: int): return (pastframe_most_recent - pastframe_farthest_back)*rec.timestep
[docs]def get_def(rec: ListRecord, id: int): ''' get the vehicle definition given a query id :param rec: ListRecord object :param id: query id :return: the vehicle definition for the query id ''' return rec.defs[id]
[docs]def get_vehicle(rec: ListRecord, stateindex: int): ''' get vehicle state given a state index :param rec: ListRecord object :param stateindex: query state index :return: the vehicle state for the query state index ''' recstate = rec.states[stateindex] # print(recstate.id) return Vehicle.Vehicle(recstate.state, get_def(rec, recstate.id), recstate.id)
[docs]def get_scene(frame: Frame, rec: ListRecord, frame_index: int): ''' get the scene given a query frame index :param frame: the buffer for the frame :param rec: the ListRecord object :param frame_index: the frame index :return: the frame for the query index ''' frame.empty() frame_index -= 1 # different index for julia and python if frame_inbounds(rec, frame_index): recframe = rec.frames[frame_index] # print(recframe.lo, recframe.hi) for stateindex in range(recframe.lo - 1, recframe.hi): frame.push(get_vehicle(rec, stateindex)) return frame