Source code for OpenRacer.Recorder

import datetime
import os
import sqlite3


[docs]class Recorder: createInputTable = """ CREATE TABLE if not exists step( timestamp DATETIME DEFAULT CURRENT_TIMESTAMP NOT NULL, all_wheels_on_track BOOLEAN CHECK(all_wheels_on_track IN(0, 1)), x float, y float, closest_waypoint1 int, closest_waypoint2 int, distance_from_center float, is_crashed BOOLEAN CHECK(is_crashed IN(0, 1)), is_left_of_center BOOLEAN CHECK(is_left_of_center IN(0, 1)), is_reversed BOOLEAN CHECK(is_reversed IN(0, 1)), progress float, speed float, steering_angle float, steps int, track_length float, track_width float, actionX float, actionY float, reward float, agentId int, session int);""" createDetailsTable=""" CREATE TABLE if not exists details( sessionCount int, agentCount int, trackName string, sessionTime int ); """ createRaceTable=""" CREATE TABLE if not exists race( timestamp DATETIME DEFAULT CURRENT_TIMESTAMP NOT NULL, all_wheels_on_track BOOLEAN CHECK(all_wheels_on_track IN(0, 1)), x float, y float, closest_waypoint1 int, closest_waypoint2 int, distance_from_center float, is_crashed BOOLEAN CHECK(is_crashed IN(0, 1)), is_left_of_center BOOLEAN CHECK(is_left_of_center IN(0, 1)), is_reversed BOOLEAN CHECK(is_reversed IN(0, 1)), progress float, speed float, steering_angle float, steps int, track_length float, track_width float, actionX float, actionY float, reward float, agentId int, lap int); """ def __init__(self, modelName:str): """setup recorder for a mode and start connection for same. Args: modelName (str): Name of model, this will be used for directory name and filename """ debug = True USE_TEST_RECODER = os.getenv("USE_TEST_RECORDER") if not USE_TEST_RECODER or USE_TEST_RECODER == "False": debug = False if debug: dbName = "test.db" else: dbName = f"{modelName}.db" dirPath = os.path.join(os.getcwd(), modelName, "db") dbPath = os.path.join(dirPath, dbName) os.makedirs(dirPath, exist_ok=True) print(f"DB path for model: {modelName} is {dbPath}") self.con = sqlite3.connect(dbPath, check_same_thread=False, isolation_level=None) self.con.execute('pragma journal_mode=wal') print(f"Initialized a recorder: {dbName}") self.cur = self.con.cursor() self.ReadCur = self.con.cursor() if debug: return print("Creating table") self.cur.execute(self.createInputTable) self.cur.execute(self.createDetailsTable) self.cur.execute(self.createRaceTable)
[docs] def recordStep(self, formatedInput, action, reward, session): data = [] for agentId in range(len(formatedInput)): values = [datetime.datetime.now()] for i in formatedInput[agentId].values(): d = i if type(d) == bool: d = 1 if d else 0 elif type(d) == list: values.append(d[0]) d = d[1] values.append(d) values.append(action[agentId][0]) values.append(action[agentId][1]) values.append(reward[agentId]) values.append(agentId) values.append(session) data.append(values) self.cur.executemany("INSERT INTO step VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)", data) self.con.commit()
[docs] def recordRaceStep(self, formatedInput, action, reward, session): data = [] for agentId in range(len(formatedInput)): values = [datetime.datetime.now()] for i in formatedInput[agentId].values(): d = i if type(d) == bool: d = 1 if d else 0 elif type(d) == list: values.append(d[0]) d = d[1] values.append(d) values.append(action[agentId][0]) values.append(action[agentId][1]) values.append(reward[agentId]) values.append(agentId) values.append(session) data.append(values) self.cur.executemany("INSERT INTO race VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)", data) self.con.commit()
[docs] def details(self, sessionCount, agentCount, trackName, sessionTime): self.cur.execute("INSERT INTO details VALUES(?,?,?,?)", [sessionCount, agentCount, trackName, sessionTime]) self.con.commit()
[docs] def getRecords(self, agentId:int, session:int): res = self.ReadCur.execute("SELECT * FROM step WHERE agentId=? and session=?", [agentId, session]) return res.fetchall()
[docs] def getSessionRun(self, session:int): res = self.ReadCur.execute("SELECT * FROM step WHERE session=?", [session]) return res.fetchall()
[docs] def getAgentRun(self, agentId:int): res = self.ReadCur("SELECT * FROM step WHERE agentId=?", [agentId]) return res.fetchall()
[docs] def getProgress(self): res = self.ReadCur.execute("SELECT max(progress), session FROM step GROUP BY session") return res.fetchall()
[docs] def getDetailsOf(self,attribute:str, agent:int, session:int): readCur = self.con.cursor() if attribute == "progress": res = readCur.execute("SELECT progress, timestamp from step where session=? and agentId=?", [session, agent]) elif attribute == "reward": res = readCur.execute("SELECT reward, timestamp from step where session=? and agentId=?", [session, agent]) elif attribute == "speed": res = readCur.execute("SELECT speed, timestamp from step where session=? and agentId=?", [session, agent]) return res.fetchall()
[docs] def runDetails(self): readCur = self.con.cursor() res = readCur.execute("SELECT * FROM details order by rowid desc LIMIT 1") return res.fetchone()
# TODO: Seperate tables based on data to make it more space efficient
[docs] def raceDetails(self): readCur = self.con.cursor() res = readCur.execute("SELECT progress, reward, speed, lap, timestamp from race") return res.fetchall()