import json
import ast
from typing import List
from fastapi import APIRouter, WebSocket
from fastapi.responses import FileResponse
import os
import numpy as np
from OpenRacer.Constants import COMMAND, ACK
from OpenRacer.Model import ModelInterface
[docs]class Routes:
def __init__(self, model:ModelInterface):
self.model = model
self.recorder = model.recorder
self.dashboard = APIRouter(prefix="", tags=["UI"])
self.dashboard.add_route("/", self.ui, methods=["GET"])
self.communicationRoutes = APIRouter(prefix="/api/v1", tags=["Communication"])
self.communicationRoutes.add_api_route("/", self.hello, methods=["GET"])
self.communicationRoutes.add_api_websocket_route("/ws", self.websocket_endpoint)
# TODO: make a new router for data retrival
self.communicationRoutes.add_api_route("/getRecords/{agentId}/{session}", self.getRecords, methods=["GET"])
self.communicationRoutes.add_api_route("/getSessionRun/{session}", self.getSessionRun, methods=["GET"])
self.communicationRoutes.add_api_route("/getAgentRun/{agentId}", self.getAgentRun, methods=["GET"])
self.communicationRoutes.add_api_route("/getProgressChartData", self.getProgressChartData, methods=["GET"])
self.communicationRoutes.add_api_route("/getChartData/{attribute}/{agentId}/{session}", self.getChartData, methods=["GET"])
self.communicationRoutes.add_api_route("/getRunDetails", self.getRunDetails, methods=["GET"])
self.communicationRoutes.add_api_route("/getRaceDetails", self.getRaceDetails, methods=["GET"])
[docs] def ui(self, _):
""" Sends build index.html from react. """
return FileResponse(os.path.join(os.path.dirname(__file__), "frontend", "index.html"))
[docs] def hello(self):
"""Respond with hello. could be use for testing"""
return {"data": "hello"}
[docs] async def websocket_endpoint(self, websocket: WebSocket):
"""This is used for communicating with Unity APP using websockets.
Args:
websocket (WebSocket): Websocket client to recieve and send messages
"""
await websocket.accept()
while True:
signal = await websocket.receive_text()
res = await self.checkCommand(signal)
await websocket.send_text(json.dumps(res))
[docs] def getCommand(self, signal:str) -> List[str]:
"""Seperate the signal into command and value part
Args:
signal (str): It should always be in format of command~value.
Returns:
[command:str, value:ste]: returns the command and value.
"""
assert(len(signal.split("~")) == 2)
return signal.split("~")
[docs] async def checkCommand(self, signal:str):
"""Check message recieved from unity and process to send response
Args:
signal (str): it will be string receivied from unity. Structure will be "command~value".
Returns:
str|dict|list: based on request different types of responses are generated
"""
command, value = self.getCommand(signal)
if command == COMMAND.Track:
track_name = value
filePath = os.path.join(os.getcwd(), f"{track_name}.npy")
_track = None
if not os.path.isfile(filePath):
print(f"file not find at: {filePath}")
_track = np.load(os.path.join(os.path.dirname(__file__), "albert.npy"))
else:
_track = np.load(filePath)
trackVert = [{"x":point[0], "y": 0, "z":point[1]} for point in _track[:-1]]
track = trackVert
return {"track": trackVert}
elif command == COMMAND.TrackAck:
track_coords_string = value
track = ast.literal_eval(track_coords_string)
self.model.setTrack(list(track))
return ACK
elif command == COMMAND.Epoch:
print(f"Completed Epoch{value}")
self.model.sessionEnd(int(value))
return ACK
elif command == COMMAND.Eval:
output = self.model.eval(value, isTraining=True)
return output
elif command == COMMAND.End:
print("Training Ended")
return ACK
elif command == COMMAND.Details:
print(value)
details = json.loads(value)
self.recorder.details(details["epoch"], details["batchSize"], details["trackName"], details["sessionTime"])
return ACK
elif command == COMMAND.Test:
output = self.model.eval(value, isTraining=False)
return output
elif command == COMMAND.Lap:
return ACK
[docs] def getRecords(self, agentId:int, session:int):
return self.recorder.getRecords(agentId, session)
[docs] def getSessionRun(self, session:int):
return self.recorder.getSessionRun(session)
[docs] def getAgentRun(self, agentId:int):
return self.recorder.getAgentRun(agentId)
[docs] def getProgressChartData(self):
return self.recorder.getProgress()
[docs] def getChartData(self, attribute:str, agentId:int, session:int):
return self.recorder.getDetailsOf(attribute, agentId, session)
[docs] def getRunDetails(self):
return self.recorder.runDetails()
[docs] def getRaceDetails(self):
return self.recorder.raceDetails()