from src.Basic import Vehicle
from src.Record.frame import Frame, copyto
from src.Basic.Vehicle import read_def, read_state
[docs]class RecordFrame:
def __init__(self, lo: int, hi: int):
'''
:param lo: low index
:param hi: high index
'''
self.lo = lo # low index
self.hi = hi # high index
def __len__(self):
'''
:return: the length of the record
'''
return self.hi - self.lo + 1
[docs] def write(self, fp):
'''
:param fp: the file pointer to write
:return: write to file, no return
'''
fp.write("%d %d" % (self.lo, self.hi))
[docs]def read_frame(fp):
'''
:param fp: the frame pointer to read
:return: a RecordFrame object
'''
tokens = fp.readline().strip().split(' ')
lo = int(tokens[0])
hi = int(tokens[1])
return RecordFrame(lo, hi)
[docs]class RecordState:
'''
used to record the information of a state
'''
def __init__(self, state: Vehicle.VehicleState, id: int):
'''
:param state: Vehicle State
:param id: id of the state
'''
self.state = state # vehicle state object
self.id = id # corresponding vehicle id
[docs]class ListRecord:
def __init__(self, timestep: float, frames: list, states: list, defs: dict):
'''
timestep::Float64
frames::List of record frame, indicating the index range
states::list of record state indicating the vehicle
defs::Dict{I, D}
'''
self.timestep = timestep
self.frames = frames
self.states = states
self.defs = defs
[docs] def write(self, fp):
'''
:param fp: the frame pointer to write
:return: write file, no return
'''
fp.write("ListRecord{%s, %s, %s}(%d frames)\n" % ('NGSIM_TIMESTEP', 'Array{RecordFrame}', 'Array{RecordState{VehicleState, Int}}', len(self.frames)))
fp.write("%.16e\n" % self.timestep)
# defs
fp.write(str(len(self.defs)))
fp.write("\n")
for id in self.defs:
fp.write(str(id))
fp.write("\n")
self.defs[id].write(fp)
fp.write("\n")
# ids & states
fp.write(str(len(self.states)))
fp.write("\n")
for recstate in self.states:
fp.write(str(recstate.id))
fp.write("\n")
recstate.state.write(fp)
fp.write("\n")
# frames
fp.write(str(len(self.frames)))
fp.write("\n")
for recframe in self.frames:
recframe.write(fp)
fp.write("\n")
[docs] def n_objects_in_frame(self, frame_index: int):
'''
:param frame_index: the index of the frame
:return: the length of the frame given the index
'''
return len(self.frames[frame_index])
@property
def nframes(self):
'''
:return: the number of all the frames
'''
return len(self.frames)
@property
def nstates(self):
'''
:return: the number of all the states
'''
return len(self.states)
@property
def nids(self):
'''
:return: the number of all the vehicles
'''
return len(self.defs.keys())
[docs]def read_trajdata(fp):
'''
:param fp: the frame pointer to read
:return: a ListRecord object
'''
lines = fp.readline() # skip first line
# lines = fp.readline() # skip second line
timestep = float(fp.readline())
defs = dict()
# read defs
n = int(fp.readline())
for i in range(n):
id = int(fp.readline())
# TODO: check if need parse /n
defs[id] = read_def(fp)
# read states
n = int(fp.readline())
states = [None for i in range(n)]
for i in range(n):
id = int(fp.readline())
state = read_state(fp)
states[i] = RecordState(state, id)
# read frames
n = int(fp.readline())
frames = [None for i in range(n)]
for i in range(n):
frames[i] = read_frame(fp)
return ListRecord(timestep, frames, states, defs)
[docs]class SceneRecord:
def __init__(self):
'''
frames::List{Frame{Vehicle}}
timestep::Float64
nframes::Int # number of active Frames
'''
self.frames = []
self.timestep = 0
self.nframes = 0
def __getitem__(self, item):
'''
:param item: the number of frames before the frame you want to retrieve
:return: the frame you want
'''
return self.frames[0-item]
[docs] def init(self, capacity: int, timestep: float, frame_capacity: int = 100):
'''
Initializing the SceneRecord object
:param capacity: the capacity of the object
:param timestep: timestep indicator
:param frame_capacity: the capacity of a single frame
:return:
'''
frames = []
for i in range(capacity):
frame = Frame()
frame.init(frame_capacity)
frames.append(frame)
self.frames = frames
self.timestep = timestep
self.nframes = 0
@property
def capacity(self):
'''
:return: capacity of the SceneRecord object
'''
return len(self.frames)
[docs] def empty(self):
'''
:return: empty the SceneRecord Stack
'''
self.nframes = 0
[docs] def insert(self, frame: Frame, pastframe: int=0):
'''
:param frame: the frame to be inserted
:param pastframe: <=0, -n represent past n frame
:return: no return
'''
self.frames[0 - pastframe] = copyto(self.frames[0 - pastframe], frame)
[docs] def push_back_records(self):
'''
push back frames from tail to head
:return: no return
'''
for i in range(min(self.nframes + 1, self.capacity) - 1, 0, -1):
self.frames[i] = copyto(self.frames[i], self.frames[i - 1])
[docs] def update(self, frame: Frame):
'''
update the first frame
:param frame: the frame to be updated
:return: no return
'''
self.push_back_records()
self.insert(frame, 0)
self.nframes = min(self.nframes + 1, self.capacity)
[docs]def frame_inbounds(rec: ListRecord, frame_index: int):
'''
check if the index is in bound
:param rec: ListRecord
:param frame_index: index of the frame to be checked
:return: false if out of bound true otherwise
'''
return 0 <= frame_index < rec.nframes
[docs]def pastframe_inbounds(rec: SceneRecord, pastframe: int):
'''
check if the query pastframe is out of bound
:param rec: SceneRecord
:param pastframe: the query pastframe number
:return: false if out of bound true otherwise
'''
return 0 <= 0 - pastframe <= rec.nframes - 1
[docs]def get_elapsed_time_3(rec: SceneRecord, pastframe_farthest_back: int, pastframe_most_recent: int):
return (pastframe_most_recent - pastframe_farthest_back)*rec.timestep
[docs]def get_def(rec: ListRecord, id: int):
'''
get the vehicle definition given a query id
:param rec: ListRecord object
:param id: query id
:return: the vehicle definition for the query id
'''
return rec.defs[id]
[docs]def get_vehicle(rec: ListRecord, stateindex: int):
'''
get vehicle state given a state index
:param rec: ListRecord object
:param stateindex: query state index
:return: the vehicle state for the query state index
'''
recstate = rec.states[stateindex]
# print(recstate.id)
return Vehicle.Vehicle(recstate.state, get_def(rec, recstate.id), recstate.id)
[docs]def get_scene(frame: Frame, rec: ListRecord, frame_index: int):
'''
get the scene given a query frame index
:param frame: the buffer for the frame
:param rec: the ListRecord object
:param frame_index: the frame index
:return: the frame for the query index
'''
frame.empty()
frame_index -= 1 # different index for julia and python
if frame_inbounds(rec, frame_index):
recframe = rec.frames[frame_index]
# print(recframe.lo, recframe.hi)
for stateindex in range(recframe.lo - 1, recframe.hi):
frame.push(get_vehicle(rec, stateindex))
return frame