robotengine.ho_robot
ho_robot 是 robotengine 控制 ho 机器人的节点。
ho_link 与 机器人之间的通讯是自动的,在连接好设备并确定串口是正常开启后,会自动与机器人进行通讯并更新。
如果配置了 url ho_link 节点会自动发送机器人的状态 HoState 到 url 指定的地址。
ho_link 会不断被动地接收机器人的状态并更新,但是不会主动向机器人发送数据。
使用 ho_link.update() 函数可以向机器人发送数据。
挂载 ho_link 节点后,_process()的处理速度会显著受到影响,请酌情调整 engine 的运行频率。
1""" 2 3ho_robot 是 robotengine 控制 ho 机器人的节点。 4 5ho_link 与 机器人之间的通讯是自动的,在连接好设备并确定串口是正常开启后,会自动与机器人进行通讯并更新。 6 7如果配置了 url ho_link 节点会自动发送机器人的状态 HoState 到 url 指定的地址。 8 9ho_link 会不断被动地接收机器人的状态并更新,但是不会主动向机器人发送数据。 10 11使用 ho_link.update() 函数可以向机器人发送数据。 12 13挂载 ho_link 节点后,_process()的处理速度会显著受到影响,请酌情调整 engine 的运行频率。 14""" 15 16from robotengine.node import Node 17from robotengine.serial_io import SerialIO, DeviceType, CheckSumType 18from robotengine.tools import hex2str, warning, error, info, near, vector_angle, vector_length, compute_vector_projection, find_closest_vectors 19from robotengine.signal import Signal 20from robotengine.timer import Timer 21from typing import List, Tuple 22from enum import Enum 23import requests 24import threading 25import time 26import random 27import multiprocessing 28import tkinter as tk 29from ttkbootstrap import ttk 30import ttkbootstrap as ttkb 31from fastapi import FastAPI, Request 32import uvicorn 33from urllib.parse import urlparse 34import copy 35import math 36import json 37import os 38 39class HoMode(Enum): 40 """ Ho 电机模态 """ 41 S = 0 42 """ 停止 """ 43 I = 1 44 """ 电流控制 """ 45 V = 2 46 """ 速度控制 """ 47 P = 3 48 """ 位置控制 """ 49 50class AlignState: 51 """ 帧和时间戳对齐的状态数据 """ 52 def __init__(self, id: int, i: float, v: float, p: float, frame: int, timestamp: float) -> None: 53 """ 54 初始化对齐状态数据 55 56 :param id: 电机 id 57 :param i: 电流 58 :param v: 速度 59 :param p: 位置 60 :param frame: 当前帧 61 :param timestamp: 当前时间戳 62 """ 63 self.id = id 64 """ 电机 id """ 65 self.i: float = i 66 """ 电流 """ 67 self.v: float = v 68 """ 速度 """ 69 self.p: float = p 70 """ 位置 """ 71 self.frame = frame 72 """ 此状态数据对应的帧 """ 73 self.timestamp = timestamp 74 """ 此状态数据对应的时间戳 """ 75 76 def to_dict(self): 77 """ 转换为字典 """ 78 return { 79 "id": self.id, 80 "i": self.i, 81 "v": self.v, 82 "p": self.p, 83 "frame": self.frame, 84 "timestamp": self.timestamp 85 } 86 87 def __repr__(self): 88 return f"AlignState(id={self.id}, i={round(self.i, 2)}, v={round(self.v, 2)}, p={round(self.p, 2)}, frame={self.frame}, timestamp={round(self.timestamp, 2)})" 89 90class HoState: 91 """ Ho 机器人状态 """ 92 def __init__(self, states: List[AlignState], random_state=False) -> None: 93 """ 94 初始化 Ho 机器人状态 95 96 :param states: 帧和时间戳对齐的状态数据列表 97 :param random_state: 是否随机生成状态数据 98 """ 99 if not random_state: 100 self._states = states 101 else: 102 self._states = [] 103 for i in range(1, 9): 104 self._states.append(AlignState(i, random.uniform(-1.0, 1.0), random.uniform(-360.0, 360.0), random.uniform(-1000.0, 1000.0), 0, 0.0)) 105 106 def get_state(self, id: int) -> AlignState: 107 """ 108 获取指定 id 的状态 109 """ 110 for state in self._states: 111 if state.id == id: 112 return state 113 return None 114 115 def get_states(self) -> List[AlignState]: 116 """ 117 获取所有状态 118 """ 119 return copy.deepcopy(self._states) 120 121 def to_dict(self): 122 """ 123 转换为字典 124 """ 125 return { 126 "states": [state.to_dict() for state in self._states] 127 } 128 129 def __repr__(self): 130 state_str = "" 131 for state in self._states: 132 state_str += str(state) 133 if state != self._states[-1]: 134 state_str += "\n" 135 return f"HoState(\n{state_str})" 136 137class HoLink(Node): 138 """ Ho 机器人链接节点 """ 139 def __init__(self, name="HoLink", buffer_capacity: int=1024, url=None, warn=True) -> None: 140 """ 141 初始化 Ho 机器人链接节点 142 143 :param name: 节点名称 144 :param buffer_capacity: 存储状态数据的缓冲区的容量 145 :param url: 数据发送的 url 146 :param read_mode: 串口读取模式 147 :param warn: 是否显示警告 148 """ 149 super().__init__(name) 150 self._data_length = 84 151 self._receive_data = None 152 self._url = url 153 self._warn = warn 154 155 if self._url: 156 self._shutdown = multiprocessing.Event() 157 self._pending_capacity = 256 158 self._pending_requests = multiprocessing.Queue() 159 self._http_process = multiprocessing.Process(target=self._http_request, daemon=True, name=self.name+"HttpProcess") 160 self._http_process.start() 161 162 self._buffer_capacity: int = buffer_capacity 163 """ 存储状态数据的缓冲区的容量 """ 164 self._state_buffer: List[HoState] = [] 165 """ 存储状态数据的缓冲区 """ 166 167 self.sio: SerialIO = SerialIO(name="HoSerialIO", device_type=DeviceType.STM32F407, checksum_type=CheckSumType.SUM16, header=[0x0D, 0x0A], warn=warn, baudrate=1000000, timeout=1.0) 168 """ 串口节点 HoLink 会主动挂载一个已经配置好的串口节点 """ 169 self.add_child(self.sio) 170 171 self.receive: Signal = Signal(bytes) 172 """ 信号,当接收到数据时触发(无论是否通过校验和) """ 173 self.ho_state_update: Signal = Signal(HoState) 174 """ 信号,当接收到数据并成功通过校验和,将状态数据更新到信号参数中时触发 """ 175 176 def _ready(self) -> None: 177 pass 178 179 def get_ho_state(self) -> HoState: 180 """ 181 获取机器人当前最新的状态数据 182 """ 183 if len(self._state_buffer) == 0: 184 return None 185 return self._state_buffer[-1] 186 187 def get_ho_state_buffer(self) -> List[HoState]: 188 """ 189 获取机器人当前的状态数据缓冲区 190 """ 191 return copy.deepcopy(self._state_buffer) 192 193 def _add_pending_request(self, ho_state: HoState): 194 """ 195 向请求队列中添加请求 196 """ 197 self._pending_requests.put(ho_state) 198 if self._pending_requests.qsize() > self._pending_capacity: 199 if self._warn: 200 warning(f"{self.name} 向 {self._url} 发送请求时,请求队列已满,将丢弃最早的请求,可能会导致数据丢失") 201 self._pending_requests.get() 202 203 def _send_request(self, ho_state_dict: dict) -> None: 204 start_time = time.perf_counter() 205 try: 206 response = requests.post(self._url, json=ho_state_dict, timeout=0.1) 207 208 end_time = time.perf_counter() 209 latency = end_time - start_time 210 # print(f"Request latency: {round(latency * 1000, 2)} ms") 211 212 except requests.RequestException as e: 213 if self._warn: 214 warning(f"请求失败: {e}") 215 except Exception as e: 216 if self._warn: 217 warning(f"发生未知错误: {e}") 218 219 def _http_request(self): 220 info(f"{self.name} 已开启向服务地址 {self._url} 发送数据的功能") 221 while not self._shutdown.is_set(): 222 if not self._pending_requests.empty(): 223 ho_state = self._pending_requests.get() 224 self._send_request(ho_state.to_dict()) 225 226 def update(self, id: int, mode: HoMode, i: float, v: float, p: float) -> None: 227 """ 228 向机器人发送数据 229 """ 230 data = bytes([id]) + bytes([mode.value]) + self._encode(p, 100.0, 4) + \ 231 self._encode(v, 100.0, 4) + self._encode(i, 100.0, 2) 232 # print(f"发送数据: {hex2str(data)}") 233 self.sio.transmit(data) 234 235 def _process(self, delta) -> None: 236 self._receive_data = self.sio.receive(self._data_length) 237 if self._receive_data: 238 if self.sio.check_sum(self._receive_data): 239 states = [] 240 receive_data = self._receive_data[2:-2] 241 242 id = 1 243 for i in range(0, 80, 10): 244 _data = receive_data[i:i+10] 245 _p = self._decode(_data[0:4], 100.0, 4) 246 _v = self._decode(_data[4:8], 100.0, 4) 247 _i = self._decode(_data[8:10], 100.0, 2) 248 249 align_state = AlignState(id=id, i=_i, v=_v, p=_p, frame=self.engine.get_frame(), timestamp=self.engine.get_timestamp()) 250 states.append(align_state) 251 id += 1 252 253 ho_state = HoState(states) 254 self._state_buffer.append(ho_state) 255 256 if len(self._state_buffer) > self._buffer_capacity: 257 self._state_buffer.pop(0) 258 259 self.ho_state_update.emit(ho_state) 260 if self._url: 261 self._add_pending_request(ho_state) 262 else: 263 if self._warn: 264 warning(f"{self.name} 长度为 {len(self._receive_data)} 的数据 {hex2str(self._receive_data)} 校验和错误") 265 self.receive.emit(self._receive_data) 266 267 def _encode(self, value: float, scale_factor: float, byte_length: int) -> bytes: 268 max_value = (1 << (8 * byte_length - 1)) 269 max_scaled_value = max_value / scale_factor 270 271 if abs(value) >= max_scaled_value: 272 raise ValueError(f"要编码的值 {round(value, 2)} 超出范围 [-{max_scaled_value}, {max_scaled_value}]") 273 274 encoded_value = int(value * scale_factor) + max_value 275 276 max_value_for_length = (1 << (8 * byte_length)) - 1 277 if encoded_value > max_value_for_length: 278 raise ValueError(f"编码值 {encoded_value} 超出了 {byte_length} 字节的最大值 {max_value_for_length}") 279 280 byte_data = [] 281 for i in range(byte_length): 282 byte_data.insert(0, encoded_value & 0xFF) 283 encoded_value >>= 8 284 285 return bytes(byte_data) 286 287 def _decode(self, data: bytes, scale_factor: float, byte_length: int) -> float: 288 if len(data) != byte_length: 289 raise ValueError(f"数据长度 {len(data)} 与指定的字节长度 {byte_length} 不匹配") 290 max_value = (1 << (8 * byte_length - 1)) 291 292 decoded_value = 0 293 for i in range(byte_length): 294 decoded_value <<= 8 295 decoded_value |= data[i] 296 297 decoded_value -= max_value 298 299 return decoded_value / scale_factor 300 301 # def _on_engine_exit(self): 302 # if self._url: 303 # self._shutdown.set() 304 # self._http_process.join() 305 306 307class HoServer: 308 def __init__(self, url: str, capacity=1024, ui: bool=True, ui_frequency: float=30.0) -> None: 309 """ 310 初始化 HoServer 实例。 311 312 :param url: 服务器的 URL。 313 :param capacity: 数据缓冲区的最大容量。 314 :param ui: 是否启用 UI 界面。 315 :param ui_frequency: UI 更新频率(Hz)。 316 """ 317 self._url = url 318 parsed_url = urlparse(url) 319 self._host = parsed_url.hostname 320 self._port = parsed_url.port 321 self._path = parsed_url.path 322 323 self._ui = ui 324 self._ui_frequency = ui_frequency 325 self._capacity = capacity 326 self._data_buffer = [] 327 """ 328 数据缓冲区 329 """ 330 331 self._data_queue = multiprocessing.Queue() 332 self._shutdown = multiprocessing.Event() 333 334 # 启动 FastAPI 应用进程 335 self._app_process = multiprocessing.Process(target=self._run_app, args=(self._path, self._host, self._port), daemon=True) 336 337 def _update_data(self): 338 """ 339 从数据队列中读取数据并更新缓冲区。 340 """ 341 while not self._shutdown.is_set(): 342 if not self._data_queue.empty(): 343 ho_state = self._data_queue.get() 344 self._data_buffer.append(ho_state) 345 if len(self._data_buffer) > self._capacity: 346 self._data_buffer.pop(0) 347 348 def has_data(self): 349 """ 350 检查缓冲区中是否有数据。 351 352 :return: 如果缓冲区中有数据,则返回 True,否则返回 False。 353 """ 354 return len(self._data_buffer) > 0 355 356 def get_data(self) -> HoState: 357 """ 358 获取缓冲区中最新的数据。 359 360 :return: 缓冲区中最新的数据,如果缓冲区为空,则返回 None。 361 """ 362 if not self.has_data(): 363 return None 364 return self._data_buffer.pop(-1) 365 366 def get_data_buffer(self) -> List[HoState]: 367 """ 368 获取缓冲区。 369 370 注意:若需要从数据缓冲区中读取数据,请尽快取出,否则缓冲区溢出后最开始的数据会丢失 371 372 :return: 缓冲区。 373 """ 374 return copy.deepcopy(self._data_buffer) 375 376 def length(self) -> int: 377 """ 378 获取缓冲区中的数据长度。 379 380 :return: 缓冲区中的数据长度。 381 """ 382 return len(self._data_buffer) 383 384 def _init_ui(self) -> None: 385 """ 386 初始化 UI。 387 """ 388 self.root = tk.Tk() 389 self.root.title("HoServer") 390 self.root.geometry("800x600") 391 392 def run(self) -> None: 393 """ 394 启动服务器并运行 UI 更新线程(如果启用 UI)。 395 """ 396 self._app_process.start() 397 398 # 数据更新线程 399 self._data_thread = threading.Thread(target=self._update_data, daemon=True) 400 self._data_thread.start() 401 402 if self._ui: 403 self._init_ui() 404 # UI 更新线程 405 self._ui_thread = threading.Thread(target=self._update_ui, daemon=True) 406 self._ui_thread.start() 407 408 self.root.mainloop() 409 410 def _run_app(self, path: str, host: str, port: int) -> None: 411 """ 412 启动 FastAPI 服务器并监听请求。 413 414 :param path: API 路径。 415 :param host: 服务器主机。 416 :param port: 服务器端口。 417 """ 418 app = FastAPI() 419 app.add_api_route(path, self._handle_data, methods=["POST"]) 420 421 uvicorn.run(app, host=host, port=port) 422 423 async def _handle_data(self, request: Request) -> dict: 424 """ 425 处理接收到的 POST 请求数据。 426 427 :param request: FastAPI 请求对象。 428 :return: 处理结果。 429 """ 430 json_data = await request.json() 431 states_data = json_data.get("states", []) 432 433 states = [] 434 for state_data in states_data: 435 state = AlignState( 436 id=state_data["id"], 437 i=state_data["i"], 438 v=state_data["v"], 439 p=state_data["p"], 440 frame=state_data["frame"], 441 timestamp=state_data["timestamp"] 442 ) 443 states.append(state) 444 445 ho_state = HoState(states=states) 446 self._data_queue.put(ho_state) 447 return {"message": "Data received"} 448 449 def _init_ui(self) -> None: 450 """ 451 初始化 UI 界面。 452 """ 453 self.root = ttkb.Window(themename="superhero", title="HoServer") 454 455 frame = ttk.Frame(self.root) 456 frame.pack(padx=10, pady=10) 457 458 columns = ['Id', 'Frame', 'Timestamp', 'i', 'v', 'p'] 459 self.entries = {} 460 461 # 创建表头 462 for col, column_name in enumerate(columns): 463 label = ttk.Label(frame, text=column_name, width=5) 464 label.grid(row=0, column=col, padx=5, pady=5) 465 466 # 创建数据输入框 467 for row in range(8): 468 id_label = ttk.Label(frame, text=f"{row + 1}", width=5) 469 id_label.grid(row=row + 1, column=0, padx=5, pady=5) 470 for col in range(5): 471 entry = ttk.Entry(frame, width=15, state='normal') 472 entry.grid(row=row + 1, column=col + 1, padx=5, pady=10) 473 self.entries[(row, col)] = entry 474 475 def _update_ui(self) -> None: 476 """ 477 根据数据缓冲区更新 UI 界面。 478 """ 479 def update() -> None: 480 if len(self._data_buffer) == 0: 481 return 482 ho_state = self._data_buffer[-1] 483 484 # 清空当前数据 485 for row in range(8): 486 for col in range(5): 487 self.entries[(row, col)].delete(0, tk.END) 488 489 # 更新数据 490 for row in range(8): 491 align_state = ho_state.get_state(row + 1) 492 self.entries[(row, 0)].insert(0, str(align_state.frame)) 493 self.entries[(row, 1)].insert(0, str(align_state.timestamp)) 494 self.entries[(row, 2)].insert(0, str(round(align_state.i, 2))) 495 self.entries[(row, 3)].insert(0, str(round(align_state.v, 2))) 496 self.entries[(row, 4)].insert(0, str(round(align_state.p, 2))) 497 498 time_interval = 1.0 / self._ui_frequency 499 while not self._shutdown.is_set(): 500 time.sleep(time_interval) 501 502 self.root.after(0, update) 503 504 505 def __del__(self) -> None: 506 """ 507 清理资源,停止线程和进程。 508 """ 509 self._shutdown.set() 510 self._app_process.join() 511 self._data_thread.join() 512 if self._ui: 513 self._ui_thread.join() 514 515 516class ManualState(Enum): 517 """ 手动状态枚举 """ 518 IDLE = 0 519 """ 空闲状态,所有电机停止运动 """ 520 RUNNING = 1 521 """ 运行状态 """ 522 RETURNING = 2 523 """ 返回状态 """ 524 ZEROING = 3 525 """ 设置零点状态 """ 526 527class HoManual(Node): 528 """ Ho 机器人的手动控制节点 """ 529 from robotengine import InputEvent 530 def __init__(self, link: HoLink, name="Manual", rotation_velocity: float = 360.0, running_scale: float=100.0, zeroing_scale: float=100.0, axis_threshold: float=0.1) -> None: 531 """ 532 初始化 HoManual 实例。 533 534 :param link: HoLink 实例。 535 :param name: 节点名称。 536 :param rotation_velocity: 旋转速度(度/秒)。 537 :param running_scale: 运行状态的缩放因子。 538 :param zeroing_scale: 设置零点状态的缩放因子。 539 :param axis_threshold: 轴的阈值。 540 """ 541 from robotengine import StateMachine 542 super().__init__(name) 543 self._debug = False 544 self._valid = True 545 546 self._link = link 547 self._link.ho_state_update.connect(self._on_ho_state_update) 548 549 self.state_machine = StateMachine(ManualState.IDLE, name="HoManualStateMachine") 550 self.add_child(self.state_machine) 551 552 self._zero_angles = { 553 4: 0.0, 554 5: 0.0, 555 6: 0.0, 556 7: 0.0, 557 } 558 self._zero_index = 4 559 560 self._is_tension = False 561 self._is_rotation = False 562 self._rotation_velocity = rotation_velocity 563 self._base_angles = [math.pi / 4, math.pi / 4 * 3, math.pi / 4 * 5, math.pi / 4 * 7] 564 565 self._running_scale = running_scale 566 self._zeroing_scale = zeroing_scale 567 self._axis_threshold = axis_threshold 568 569 self._before_returning = None 570 571 self.exit() 572 573 def _init(self): 574 _load_zero_angles = self._load_from_json("zero_angles.json") 575 if _load_zero_angles: 576 info("成功加载 zero_angles.json") 577 for i in range(4, 8): 578 self._zero_angles[i] = _load_zero_angles[str(i)] 579 info(f"{i}: {self._zero_angles[i]}") 580 581 def _input(self, event: InputEvent) -> None: 582 if not self._valid: 583 return 584 585 state = self.state_machine.current_state 586 587 if event.is_action_pressed("X"): 588 self.tension(not self._is_tension) 589 590 elif event.is_action_pressed("Y"): 591 self.rotation(not self._is_rotation, self._rotation_velocity) 592 593 if state == ManualState.ZEROING: 594 if event.is_action_pressed("LEFT_SHOULDER"): 595 self._change_index(-1) 596 597 elif event.is_action_pressed("RIGHT_SHOULDER"): 598 self._change_index(1) 599 600 elif event.is_action_pressed("A"): 601 if self._debug: 602 return 603 ho_state = self._link.get_ho_state() 604 if not ho_state: 605 return 606 for i in range(4, 8): 607 state = ho_state.get_state(i) 608 self._zero_angles[i] = state.p 609 self._save_to_json("zero_angles.json", self._zero_angles) 610 611 def _change_index(self, dir: int) -> None: 612 self.lock(self._zero_index) 613 self._zero_index += dir 614 if self._zero_index > 7: 615 self._zero_index = 4 616 elif self._zero_index < 4: 617 self._zero_index = 7 618 info(f" 当前电机: {self._zero_index}") 619 620 def _on_ho_state_update(self, ho_state: HoState): 621 if not self._valid: 622 return 623 624 state = self.state_machine.current_state 625 626 if state == ManualState.IDLE: 627 pass 628 629 elif state == ManualState.RUNNING: 630 x_value = self._threshold(self.input.get_action_strength("RIGHT_X")) 631 y_value = self._threshold(self.input.get_action_strength("LEFT_Y")) 632 self.turn(x_value, y_value, ho_state) 633 634 l_value = self._threshold(self.input.get_action_strength("TRIGGER_LEFT")) 635 r_value = self._threshold(self.input.get_action_strength("TRIGGER_RIGHT")) 636 self.move(l_value, r_value, ho_state) 637 638 elif state == ManualState.RETURNING: 639 for i in range(4, 8): 640 self._link.update(i, HoMode.P, 2.0, 100.0, self._zero_angles[i]) 641 642 elif state == ManualState.ZEROING: 643 direction = self.input.get_action_strength("LEFT_Y") 644 direction = self._threshold(direction) 645 velocity = direction * self._zeroing_scale 646 if not self._debug: 647 self._link.update(self._zero_index, HoMode.V, 2.0, velocity, 0.0) 648 649 def tick(self, state: ManualState, delta: float) -> None: 650 if state == ManualState.IDLE: 651 pass 652 653 elif state == ManualState.RUNNING: 654 pass 655 656 elif state == ManualState.RETURNING: 657 pass 658 659 elif state == ManualState.ZEROING: 660 pass 661 662 def _threshold(self, value: float) -> float: 663 if abs(value) < self._axis_threshold: 664 return 0.0 665 return value 666 667 def get_next_state(self, state: ManualState) -> ManualState: 668 if state == ManualState.IDLE: 669 if self.input.is_action_pressed("START"): 670 self.input.flush_action("START") 671 return ManualState.RUNNING 672 673 if self.input.is_action_pressed("B"): 674 self.input.flush_action("B") 675 return ManualState.RETURNING 676 677 elif self.input.is_action_pressed("RIGHT_STICK"): 678 self.input.flush_action("RIGHT_STICK") 679 return ManualState.ZEROING 680 681 elif state == ManualState.RUNNING: 682 if self.input.is_action_pressed("START"): 683 self.input.flush_action("START") 684 return ManualState.IDLE 685 686 if self.input.is_action_pressed("B"): 687 self.input.flush_action("B") 688 return ManualState.RETURNING 689 690 elif state == ManualState.RETURNING: 691 if self.input.is_action_pressed("B"): 692 self.input.flush_action("B") 693 return self._before_returning 694 695 elif state == ManualState.ZEROING: 696 if self.input.is_action_pressed("RIGHT_STICK"): 697 self.input.flush_action("RIGHT_STICK") 698 return ManualState.IDLE 699 700 return self.state_machine.KEEP_CURRENT 701 702 def transition_state(self, from_state: ManualState, to_state: ManualState) -> None: 703 print("") 704 info(f"{from_state if from_state is not None else 'START'} -> {to_state}") 705 info(f"TENSION: {self._is_tension}, ROTATION: {self._is_rotation}") 706 707 if from_state == ManualState.IDLE: 708 pass 709 710 elif from_state == ManualState.RUNNING: 711 pass 712 713 elif from_state == ManualState.RETURNING: 714 pass 715 716 elif from_state == ManualState.ZEROING: 717 pass 718 719 info(" Y: 开关旋转") 720 info(" X: 开关张紧") 721 if to_state == ManualState.IDLE: 722 for i in range(1, 9): 723 if i == 2 or i == 3: 724 continue 725 self.stop(i) 726 info(" START: 进入 RUNNING 状态") 727 info(" B: 进入 RETURNING 状态") 728 info(" RIGHT_STICK: 进入 ZEROING 状态") 729 730 elif to_state == ManualState.RUNNING: 731 for i in range(1, 9): 732 if i == 2 or i == 3: 733 continue 734 self.lock(i) 735 info(" START: 返回 IDLE 状态") 736 info(" B: 进入 RETURNING 状态") 737 738 elif to_state == ManualState.RETURNING: 739 self._before_returning = from_state 740 info(" B: 返回之前的状态") 741 742 elif to_state == ManualState.ZEROING: 743 for i in range(1, 9): 744 if i == 2 or i == 3: 745 continue 746 self.lock(i) 747 info(" RIGHT_STICK: 返回 IDLE 状态") 748 info(" LEFT_SHOULDER: 切换到上一个电机") 749 info(" RIGHT_SHOULDER: 切换到下一个电机") 750 info(" A: 保存当前位置为零点") 751 info(f" 当前电机: {self._zero_index}") 752 753 def lock(self, id: int) -> None: 754 """ 755 锁定指定的电机。 756 757 :param id: 电机编号 758 """ 759 if self._debug: 760 info(f"{self.name} 锁定电机 {id}") 761 return 762 self._link.update(id, HoMode.V, 2.0, 0.0, 0.0) 763 764 def lock_all(self) -> None: 765 """ 766 锁定所有的电机。 767 """ 768 for i in range(1, 9): 769 self.lock(i) 770 771 def stop(self, id: int) -> None: 772 """ 773 停止指定的电机。 774 775 :param id: 电机编号 776 """ 777 if self._debug: 778 info(f"{self.name} 停止电机 {id}") 779 return 780 self._link.update(id, HoMode.S, 0.0, 0.0, 0.0) 781 782 def stop_all(self) -> None: 783 """ 784 停止所有的电机。 785 """ 786 for i in range(1, 9): 787 self.stop(i) 788 789 def tension(self, on: bool, i: float=0.8) -> None: 790 """ 791 驱动牵引电机,张紧导管 792 793 :param on: 是否开启牵引 794 :param i: 牵引电流(A) 795 """ 796 self._is_tension = on 797 if on: 798 self._link.update(2, HoMode.V, i, -360.0, 0.0) 799 self._link.update(3, HoMode.V, i, 360.0, 0.0) 800 else: 801 self.stop(2) 802 self.stop(3) 803 804 def rotation(self, on: bool, velocity: float = 360.0) -> None: 805 """ 806 驱动旋转电机,旋转换能器 807 808 :param on: 是否开启旋转 809 :param velocity: 旋转速度(度/秒) 810 """ 811 self._is_rotation = on 812 if on: 813 self._link.update(8, HoMode.V, 2.0, velocity, 0.0) 814 else: 815 self.stop(8) 816 817 def turn(self, x_value: float, y_value: float, ho_state: HoState) -> None: 818 """ 819 驱动转向电机,转向导管 820 821 :param x_value: 横向控制值 822 :param y_value: 纵向控制值 823 :param ho_state: Ho 机器人状态 824 """ 825 if x_value == 0 and y_value == 0: 826 for i in range(4, 8): 827 self._link.update(i, HoMode.V, 2.0, 0.0, 0.0) 828 else: 829 projection = compute_vector_projection(x_value, y_value, self._base_angles) 830 control_values = [v * self._running_scale for v in projection] 831 832 for i in range(4, 8): 833 if control_values[i-4] > 0: 834 a_id = i 835 b_id = (i + 2) % 4 + 4 836 a_state = ho_state.get_state(a_id) 837 b_state = ho_state.get_state(b_id) 838 a_near = near(a_state.p, self._zero_angles[a_id]) 839 b_near = near(b_state.p, self._zero_angles[b_id]) 840 841 if a_near and not b_near: 842 self._link.update(b_id, HoMode.P, 2.0, control_values[i-4], self._zero_angles[b_id]) 843 elif (not a_near and b_near) or (a_near and b_near): 844 self._link.update(a_id, HoMode.V, 2.0, control_values[i-4], 0.0) 845 elif not a_near and not b_near: 846 self._link.update(a_id, HoMode.V, 2.0, control_values[i-4], 0.0) 847 self._link.update(b_id, HoMode.P, 2.0, control_values[i-4], self._zero_angles[b_id]) 848 849 def move(self, l_value: float, r_value: float, ho_state: HoState) -> None: 850 """ 851 驱动移动电机,移动导管 852 853 :param l_value: 左侧移动控制值 854 :param r_value: 右侧移动控制值 855 :param ho_state: Ho 机器人状态 856 """ 857 if l_value != 0 and r_value != 0: 858 self._link.update(1, HoMode.V, 2.0, 0.0, 0.0) 859 return 860 861 if l_value != 0: 862 self._link.update(1, HoMode.V, 2.0, -l_value * self._running_scale, 0.0) 863 return 864 865 if r_value != 0: 866 self._link.update(1, HoMode.V, 2.0, r_value * self._running_scale, 0.0) 867 return 868 869 def is_running(self) -> bool: 870 """ 871 判断当前节点是否处于运行状态。 872 873 :return: 如果当前节点处于运行状态,则返回 True,否则返回 False。 874 """ 875 return self._valid 876 877 def enter(self) -> None: 878 """ 879 进入节点。 880 """ 881 self.state_machine.freeze = False 882 self.state_machine.first_tick = True 883 self.state_machine.current_state = ManualState.IDLE 884 self._is_rotation = False 885 self._is_tension = False 886 self._valid = True 887 888 def exit(self) -> None: 889 """ 890 退出节点。 891 """ 892 self.state_machine.freeze = True 893 self.state_machine.first_tick = True 894 self.state_machine.current_state = ManualState.IDLE 895 self._valid = False 896 self._is_rotation = False 897 self._is_tension = False 898 self.stop_all() 899 900 def _save_to_json(self, file_name, data): 901 with open(file_name, 'w') as f: 902 json.dump(data, f) 903 info(f" {self.name} 保存 {file_name} 成功") 904 905 def _load_from_json(self, file_name): 906 if not os.path.exists(file_name): 907 warning(f"{file_name} 不存在,无法读取 zero_angles 将使用 0.0 作为初始值") 908 return None 909 with open(file_name, 'r') as f: 910 return json.load(f)
class
HoMode(enum.Enum):
40class HoMode(Enum): 41 """ Ho 电机模态 """ 42 S = 0 43 """ 停止 """ 44 I = 1 45 """ 电流控制 """ 46 V = 2 47 """ 速度控制 """ 48 P = 3 49 """ 位置控制 """
Ho 电机模态
Inherited Members
- enum.Enum
- name
- value
class
AlignState:
51class AlignState: 52 """ 帧和时间戳对齐的状态数据 """ 53 def __init__(self, id: int, i: float, v: float, p: float, frame: int, timestamp: float) -> None: 54 """ 55 初始化对齐状态数据 56 57 :param id: 电机 id 58 :param i: 电流 59 :param v: 速度 60 :param p: 位置 61 :param frame: 当前帧 62 :param timestamp: 当前时间戳 63 """ 64 self.id = id 65 """ 电机 id """ 66 self.i: float = i 67 """ 电流 """ 68 self.v: float = v 69 """ 速度 """ 70 self.p: float = p 71 """ 位置 """ 72 self.frame = frame 73 """ 此状态数据对应的帧 """ 74 self.timestamp = timestamp 75 """ 此状态数据对应的时间戳 """ 76 77 def to_dict(self): 78 """ 转换为字典 """ 79 return { 80 "id": self.id, 81 "i": self.i, 82 "v": self.v, 83 "p": self.p, 84 "frame": self.frame, 85 "timestamp": self.timestamp 86 } 87 88 def __repr__(self): 89 return f"AlignState(id={self.id}, i={round(self.i, 2)}, v={round(self.v, 2)}, p={round(self.p, 2)}, frame={self.frame}, timestamp={round(self.timestamp, 2)})"
帧和时间戳对齐的状态数据
AlignState(id: int, i: float, v: float, p: float, frame: int, timestamp: float)
53 def __init__(self, id: int, i: float, v: float, p: float, frame: int, timestamp: float) -> None: 54 """ 55 初始化对齐状态数据 56 57 :param id: 电机 id 58 :param i: 电流 59 :param v: 速度 60 :param p: 位置 61 :param frame: 当前帧 62 :param timestamp: 当前时间戳 63 """ 64 self.id = id 65 """ 电机 id """ 66 self.i: float = i 67 """ 电流 """ 68 self.v: float = v 69 """ 速度 """ 70 self.p: float = p 71 """ 位置 """ 72 self.frame = frame 73 """ 此状态数据对应的帧 """ 74 self.timestamp = timestamp 75 """ 此状态数据对应的时间戳 """
初始化对齐状态数据
:param id: 电机 id
:param i: 电流
:param v: 速度
:param p: 位置
:param frame: 当前帧
:param timestamp: 当前时间戳
class
HoState:
91class HoState: 92 """ Ho 机器人状态 """ 93 def __init__(self, states: List[AlignState], random_state=False) -> None: 94 """ 95 初始化 Ho 机器人状态 96 97 :param states: 帧和时间戳对齐的状态数据列表 98 :param random_state: 是否随机生成状态数据 99 """ 100 if not random_state: 101 self._states = states 102 else: 103 self._states = [] 104 for i in range(1, 9): 105 self._states.append(AlignState(i, random.uniform(-1.0, 1.0), random.uniform(-360.0, 360.0), random.uniform(-1000.0, 1000.0), 0, 0.0)) 106 107 def get_state(self, id: int) -> AlignState: 108 """ 109 获取指定 id 的状态 110 """ 111 for state in self._states: 112 if state.id == id: 113 return state 114 return None 115 116 def get_states(self) -> List[AlignState]: 117 """ 118 获取所有状态 119 """ 120 return copy.deepcopy(self._states) 121 122 def to_dict(self): 123 """ 124 转换为字典 125 """ 126 return { 127 "states": [state.to_dict() for state in self._states] 128 } 129 130 def __repr__(self): 131 state_str = "" 132 for state in self._states: 133 state_str += str(state) 134 if state != self._states[-1]: 135 state_str += "\n" 136 return f"HoState(\n{state_str})"
Ho 机器人状态
HoState(states: List[AlignState], random_state=False)
93 def __init__(self, states: List[AlignState], random_state=False) -> None: 94 """ 95 初始化 Ho 机器人状态 96 97 :param states: 帧和时间戳对齐的状态数据列表 98 :param random_state: 是否随机生成状态数据 99 """ 100 if not random_state: 101 self._states = states 102 else: 103 self._states = [] 104 for i in range(1, 9): 105 self._states.append(AlignState(i, random.uniform(-1.0, 1.0), random.uniform(-360.0, 360.0), random.uniform(-1000.0, 1000.0), 0, 0.0))
初始化 Ho 机器人状态
:param states: 帧和时间戳对齐的状态数据列表
:param random_state: 是否随机生成状态数据
107 def get_state(self, id: int) -> AlignState: 108 """ 109 获取指定 id 的状态 110 """ 111 for state in self._states: 112 if state.id == id: 113 return state 114 return None
获取指定 id 的状态
138class HoLink(Node): 139 """ Ho 机器人链接节点 """ 140 def __init__(self, name="HoLink", buffer_capacity: int=1024, url=None, warn=True) -> None: 141 """ 142 初始化 Ho 机器人链接节点 143 144 :param name: 节点名称 145 :param buffer_capacity: 存储状态数据的缓冲区的容量 146 :param url: 数据发送的 url 147 :param read_mode: 串口读取模式 148 :param warn: 是否显示警告 149 """ 150 super().__init__(name) 151 self._data_length = 84 152 self._receive_data = None 153 self._url = url 154 self._warn = warn 155 156 if self._url: 157 self._shutdown = multiprocessing.Event() 158 self._pending_capacity = 256 159 self._pending_requests = multiprocessing.Queue() 160 self._http_process = multiprocessing.Process(target=self._http_request, daemon=True, name=self.name+"HttpProcess") 161 self._http_process.start() 162 163 self._buffer_capacity: int = buffer_capacity 164 """ 存储状态数据的缓冲区的容量 """ 165 self._state_buffer: List[HoState] = [] 166 """ 存储状态数据的缓冲区 """ 167 168 self.sio: SerialIO = SerialIO(name="HoSerialIO", device_type=DeviceType.STM32F407, checksum_type=CheckSumType.SUM16, header=[0x0D, 0x0A], warn=warn, baudrate=1000000, timeout=1.0) 169 """ 串口节点 HoLink 会主动挂载一个已经配置好的串口节点 """ 170 self.add_child(self.sio) 171 172 self.receive: Signal = Signal(bytes) 173 """ 信号,当接收到数据时触发(无论是否通过校验和) """ 174 self.ho_state_update: Signal = Signal(HoState) 175 """ 信号,当接收到数据并成功通过校验和,将状态数据更新到信号参数中时触发 """ 176 177 def _ready(self) -> None: 178 pass 179 180 def get_ho_state(self) -> HoState: 181 """ 182 获取机器人当前最新的状态数据 183 """ 184 if len(self._state_buffer) == 0: 185 return None 186 return self._state_buffer[-1] 187 188 def get_ho_state_buffer(self) -> List[HoState]: 189 """ 190 获取机器人当前的状态数据缓冲区 191 """ 192 return copy.deepcopy(self._state_buffer) 193 194 def _add_pending_request(self, ho_state: HoState): 195 """ 196 向请求队列中添加请求 197 """ 198 self._pending_requests.put(ho_state) 199 if self._pending_requests.qsize() > self._pending_capacity: 200 if self._warn: 201 warning(f"{self.name} 向 {self._url} 发送请求时,请求队列已满,将丢弃最早的请求,可能会导致数据丢失") 202 self._pending_requests.get() 203 204 def _send_request(self, ho_state_dict: dict) -> None: 205 start_time = time.perf_counter() 206 try: 207 response = requests.post(self._url, json=ho_state_dict, timeout=0.1) 208 209 end_time = time.perf_counter() 210 latency = end_time - start_time 211 # print(f"Request latency: {round(latency * 1000, 2)} ms") 212 213 except requests.RequestException as e: 214 if self._warn: 215 warning(f"请求失败: {e}") 216 except Exception as e: 217 if self._warn: 218 warning(f"发生未知错误: {e}") 219 220 def _http_request(self): 221 info(f"{self.name} 已开启向服务地址 {self._url} 发送数据的功能") 222 while not self._shutdown.is_set(): 223 if not self._pending_requests.empty(): 224 ho_state = self._pending_requests.get() 225 self._send_request(ho_state.to_dict()) 226 227 def update(self, id: int, mode: HoMode, i: float, v: float, p: float) -> None: 228 """ 229 向机器人发送数据 230 """ 231 data = bytes([id]) + bytes([mode.value]) + self._encode(p, 100.0, 4) + \ 232 self._encode(v, 100.0, 4) + self._encode(i, 100.0, 2) 233 # print(f"发送数据: {hex2str(data)}") 234 self.sio.transmit(data) 235 236 def _process(self, delta) -> None: 237 self._receive_data = self.sio.receive(self._data_length) 238 if self._receive_data: 239 if self.sio.check_sum(self._receive_data): 240 states = [] 241 receive_data = self._receive_data[2:-2] 242 243 id = 1 244 for i in range(0, 80, 10): 245 _data = receive_data[i:i+10] 246 _p = self._decode(_data[0:4], 100.0, 4) 247 _v = self._decode(_data[4:8], 100.0, 4) 248 _i = self._decode(_data[8:10], 100.0, 2) 249 250 align_state = AlignState(id=id, i=_i, v=_v, p=_p, frame=self.engine.get_frame(), timestamp=self.engine.get_timestamp()) 251 states.append(align_state) 252 id += 1 253 254 ho_state = HoState(states) 255 self._state_buffer.append(ho_state) 256 257 if len(self._state_buffer) > self._buffer_capacity: 258 self._state_buffer.pop(0) 259 260 self.ho_state_update.emit(ho_state) 261 if self._url: 262 self._add_pending_request(ho_state) 263 else: 264 if self._warn: 265 warning(f"{self.name} 长度为 {len(self._receive_data)} 的数据 {hex2str(self._receive_data)} 校验和错误") 266 self.receive.emit(self._receive_data) 267 268 def _encode(self, value: float, scale_factor: float, byte_length: int) -> bytes: 269 max_value = (1 << (8 * byte_length - 1)) 270 max_scaled_value = max_value / scale_factor 271 272 if abs(value) >= max_scaled_value: 273 raise ValueError(f"要编码的值 {round(value, 2)} 超出范围 [-{max_scaled_value}, {max_scaled_value}]") 274 275 encoded_value = int(value * scale_factor) + max_value 276 277 max_value_for_length = (1 << (8 * byte_length)) - 1 278 if encoded_value > max_value_for_length: 279 raise ValueError(f"编码值 {encoded_value} 超出了 {byte_length} 字节的最大值 {max_value_for_length}") 280 281 byte_data = [] 282 for i in range(byte_length): 283 byte_data.insert(0, encoded_value & 0xFF) 284 encoded_value >>= 8 285 286 return bytes(byte_data) 287 288 def _decode(self, data: bytes, scale_factor: float, byte_length: int) -> float: 289 if len(data) != byte_length: 290 raise ValueError(f"数据长度 {len(data)} 与指定的字节长度 {byte_length} 不匹配") 291 max_value = (1 << (8 * byte_length - 1)) 292 293 decoded_value = 0 294 for i in range(byte_length): 295 decoded_value <<= 8 296 decoded_value |= data[i] 297 298 decoded_value -= max_value 299 300 return decoded_value / scale_factor 301 302 # def _on_engine_exit(self): 303 # if self._url: 304 # self._shutdown.set() 305 # self._http_process.join()
Ho 机器人链接节点
HoLink(name='HoLink', buffer_capacity: int = 1024, url=None, warn=True)
140 def __init__(self, name="HoLink", buffer_capacity: int=1024, url=None, warn=True) -> None: 141 """ 142 初始化 Ho 机器人链接节点 143 144 :param name: 节点名称 145 :param buffer_capacity: 存储状态数据的缓冲区的容量 146 :param url: 数据发送的 url 147 :param read_mode: 串口读取模式 148 :param warn: 是否显示警告 149 """ 150 super().__init__(name) 151 self._data_length = 84 152 self._receive_data = None 153 self._url = url 154 self._warn = warn 155 156 if self._url: 157 self._shutdown = multiprocessing.Event() 158 self._pending_capacity = 256 159 self._pending_requests = multiprocessing.Queue() 160 self._http_process = multiprocessing.Process(target=self._http_request, daemon=True, name=self.name+"HttpProcess") 161 self._http_process.start() 162 163 self._buffer_capacity: int = buffer_capacity 164 """ 存储状态数据的缓冲区的容量 """ 165 self._state_buffer: List[HoState] = [] 166 """ 存储状态数据的缓冲区 """ 167 168 self.sio: SerialIO = SerialIO(name="HoSerialIO", device_type=DeviceType.STM32F407, checksum_type=CheckSumType.SUM16, header=[0x0D, 0x0A], warn=warn, baudrate=1000000, timeout=1.0) 169 """ 串口节点 HoLink 会主动挂载一个已经配置好的串口节点 """ 170 self.add_child(self.sio) 171 172 self.receive: Signal = Signal(bytes) 173 """ 信号,当接收到数据时触发(无论是否通过校验和) """ 174 self.ho_state_update: Signal = Signal(HoState) 175 """ 信号,当接收到数据并成功通过校验和,将状态数据更新到信号参数中时触发 """
初始化 Ho 机器人链接节点
:param name: 节点名称
:param buffer_capacity: 存储状态数据的缓冲区的容量
:param url: 数据发送的 url
:param read_mode: 串口读取模式
:param warn: 是否显示警告
180 def get_ho_state(self) -> HoState: 181 """ 182 获取机器人当前最新的状态数据 183 """ 184 if len(self._state_buffer) == 0: 185 return None 186 return self._state_buffer[-1]
获取机器人当前最新的状态数据
188 def get_ho_state_buffer(self) -> List[HoState]: 189 """ 190 获取机器人当前的状态数据缓冲区 191 """ 192 return copy.deepcopy(self._state_buffer)
获取机器人当前的状态数据缓冲区
227 def update(self, id: int, mode: HoMode, i: float, v: float, p: float) -> None: 228 """ 229 向机器人发送数据 230 """ 231 data = bytes([id]) + bytes([mode.value]) + self._encode(p, 100.0, 4) + \ 232 self._encode(v, 100.0, 4) + self._encode(i, 100.0, 2) 233 # print(f"发送数据: {hex2str(data)}") 234 self.sio.transmit(data)
向机器人发送数据
class
HoServer:
308class HoServer: 309 def __init__(self, url: str, capacity=1024, ui: bool=True, ui_frequency: float=30.0) -> None: 310 """ 311 初始化 HoServer 实例。 312 313 :param url: 服务器的 URL。 314 :param capacity: 数据缓冲区的最大容量。 315 :param ui: 是否启用 UI 界面。 316 :param ui_frequency: UI 更新频率(Hz)。 317 """ 318 self._url = url 319 parsed_url = urlparse(url) 320 self._host = parsed_url.hostname 321 self._port = parsed_url.port 322 self._path = parsed_url.path 323 324 self._ui = ui 325 self._ui_frequency = ui_frequency 326 self._capacity = capacity 327 self._data_buffer = [] 328 """ 329 数据缓冲区 330 """ 331 332 self._data_queue = multiprocessing.Queue() 333 self._shutdown = multiprocessing.Event() 334 335 # 启动 FastAPI 应用进程 336 self._app_process = multiprocessing.Process(target=self._run_app, args=(self._path, self._host, self._port), daemon=True) 337 338 def _update_data(self): 339 """ 340 从数据队列中读取数据并更新缓冲区。 341 """ 342 while not self._shutdown.is_set(): 343 if not self._data_queue.empty(): 344 ho_state = self._data_queue.get() 345 self._data_buffer.append(ho_state) 346 if len(self._data_buffer) > self._capacity: 347 self._data_buffer.pop(0) 348 349 def has_data(self): 350 """ 351 检查缓冲区中是否有数据。 352 353 :return: 如果缓冲区中有数据,则返回 True,否则返回 False。 354 """ 355 return len(self._data_buffer) > 0 356 357 def get_data(self) -> HoState: 358 """ 359 获取缓冲区中最新的数据。 360 361 :return: 缓冲区中最新的数据,如果缓冲区为空,则返回 None。 362 """ 363 if not self.has_data(): 364 return None 365 return self._data_buffer.pop(-1) 366 367 def get_data_buffer(self) -> List[HoState]: 368 """ 369 获取缓冲区。 370 371 注意:若需要从数据缓冲区中读取数据,请尽快取出,否则缓冲区溢出后最开始的数据会丢失 372 373 :return: 缓冲区。 374 """ 375 return copy.deepcopy(self._data_buffer) 376 377 def length(self) -> int: 378 """ 379 获取缓冲区中的数据长度。 380 381 :return: 缓冲区中的数据长度。 382 """ 383 return len(self._data_buffer) 384 385 def _init_ui(self) -> None: 386 """ 387 初始化 UI。 388 """ 389 self.root = tk.Tk() 390 self.root.title("HoServer") 391 self.root.geometry("800x600") 392 393 def run(self) -> None: 394 """ 395 启动服务器并运行 UI 更新线程(如果启用 UI)。 396 """ 397 self._app_process.start() 398 399 # 数据更新线程 400 self._data_thread = threading.Thread(target=self._update_data, daemon=True) 401 self._data_thread.start() 402 403 if self._ui: 404 self._init_ui() 405 # UI 更新线程 406 self._ui_thread = threading.Thread(target=self._update_ui, daemon=True) 407 self._ui_thread.start() 408 409 self.root.mainloop() 410 411 def _run_app(self, path: str, host: str, port: int) -> None: 412 """ 413 启动 FastAPI 服务器并监听请求。 414 415 :param path: API 路径。 416 :param host: 服务器主机。 417 :param port: 服务器端口。 418 """ 419 app = FastAPI() 420 app.add_api_route(path, self._handle_data, methods=["POST"]) 421 422 uvicorn.run(app, host=host, port=port) 423 424 async def _handle_data(self, request: Request) -> dict: 425 """ 426 处理接收到的 POST 请求数据。 427 428 :param request: FastAPI 请求对象。 429 :return: 处理结果。 430 """ 431 json_data = await request.json() 432 states_data = json_data.get("states", []) 433 434 states = [] 435 for state_data in states_data: 436 state = AlignState( 437 id=state_data["id"], 438 i=state_data["i"], 439 v=state_data["v"], 440 p=state_data["p"], 441 frame=state_data["frame"], 442 timestamp=state_data["timestamp"] 443 ) 444 states.append(state) 445 446 ho_state = HoState(states=states) 447 self._data_queue.put(ho_state) 448 return {"message": "Data received"} 449 450 def _init_ui(self) -> None: 451 """ 452 初始化 UI 界面。 453 """ 454 self.root = ttkb.Window(themename="superhero", title="HoServer") 455 456 frame = ttk.Frame(self.root) 457 frame.pack(padx=10, pady=10) 458 459 columns = ['Id', 'Frame', 'Timestamp', 'i', 'v', 'p'] 460 self.entries = {} 461 462 # 创建表头 463 for col, column_name in enumerate(columns): 464 label = ttk.Label(frame, text=column_name, width=5) 465 label.grid(row=0, column=col, padx=5, pady=5) 466 467 # 创建数据输入框 468 for row in range(8): 469 id_label = ttk.Label(frame, text=f"{row + 1}", width=5) 470 id_label.grid(row=row + 1, column=0, padx=5, pady=5) 471 for col in range(5): 472 entry = ttk.Entry(frame, width=15, state='normal') 473 entry.grid(row=row + 1, column=col + 1, padx=5, pady=10) 474 self.entries[(row, col)] = entry 475 476 def _update_ui(self) -> None: 477 """ 478 根据数据缓冲区更新 UI 界面。 479 """ 480 def update() -> None: 481 if len(self._data_buffer) == 0: 482 return 483 ho_state = self._data_buffer[-1] 484 485 # 清空当前数据 486 for row in range(8): 487 for col in range(5): 488 self.entries[(row, col)].delete(0, tk.END) 489 490 # 更新数据 491 for row in range(8): 492 align_state = ho_state.get_state(row + 1) 493 self.entries[(row, 0)].insert(0, str(align_state.frame)) 494 self.entries[(row, 1)].insert(0, str(align_state.timestamp)) 495 self.entries[(row, 2)].insert(0, str(round(align_state.i, 2))) 496 self.entries[(row, 3)].insert(0, str(round(align_state.v, 2))) 497 self.entries[(row, 4)].insert(0, str(round(align_state.p, 2))) 498 499 time_interval = 1.0 / self._ui_frequency 500 while not self._shutdown.is_set(): 501 time.sleep(time_interval) 502 503 self.root.after(0, update) 504 505 506 def __del__(self) -> None: 507 """ 508 清理资源,停止线程和进程。 509 """ 510 self._shutdown.set() 511 self._app_process.join() 512 self._data_thread.join() 513 if self._ui: 514 self._ui_thread.join()
HoServer(url: str, capacity=1024, ui: bool = True, ui_frequency: float = 30.0)
309 def __init__(self, url: str, capacity=1024, ui: bool=True, ui_frequency: float=30.0) -> None: 310 """ 311 初始化 HoServer 实例。 312 313 :param url: 服务器的 URL。 314 :param capacity: 数据缓冲区的最大容量。 315 :param ui: 是否启用 UI 界面。 316 :param ui_frequency: UI 更新频率(Hz)。 317 """ 318 self._url = url 319 parsed_url = urlparse(url) 320 self._host = parsed_url.hostname 321 self._port = parsed_url.port 322 self._path = parsed_url.path 323 324 self._ui = ui 325 self._ui_frequency = ui_frequency 326 self._capacity = capacity 327 self._data_buffer = [] 328 """ 329 数据缓冲区 330 """ 331 332 self._data_queue = multiprocessing.Queue() 333 self._shutdown = multiprocessing.Event() 334 335 # 启动 FastAPI 应用进程 336 self._app_process = multiprocessing.Process(target=self._run_app, args=(self._path, self._host, self._port), daemon=True)
初始化 HoServer 实例。
:param url: 服务器的 URL。
:param capacity: 数据缓冲区的最大容量。
:param ui: 是否启用 UI 界面。
:param ui_frequency: UI 更新频率(Hz)。
def
has_data(self):
349 def has_data(self): 350 """ 351 检查缓冲区中是否有数据。 352 353 :return: 如果缓冲区中有数据,则返回 True,否则返回 False。 354 """ 355 return len(self._data_buffer) > 0
检查缓冲区中是否有数据。
:return: 如果缓冲区中有数据,则返回 True,否则返回 False。
357 def get_data(self) -> HoState: 358 """ 359 获取缓冲区中最新的数据。 360 361 :return: 缓冲区中最新的数据,如果缓冲区为空,则返回 None。 362 """ 363 if not self.has_data(): 364 return None 365 return self._data_buffer.pop(-1)
获取缓冲区中最新的数据。
:return: 缓冲区中最新的数据,如果缓冲区为空,则返回 None。
367 def get_data_buffer(self) -> List[HoState]: 368 """ 369 获取缓冲区。 370 371 注意:若需要从数据缓冲区中读取数据,请尽快取出,否则缓冲区溢出后最开始的数据会丢失 372 373 :return: 缓冲区。 374 """ 375 return copy.deepcopy(self._data_buffer)
获取缓冲区。
注意:若需要从数据缓冲区中读取数据,请尽快取出,否则缓冲区溢出后最开始的数据会丢失
:return: 缓冲区。
def
length(self) -> int:
377 def length(self) -> int: 378 """ 379 获取缓冲区中的数据长度。 380 381 :return: 缓冲区中的数据长度。 382 """ 383 return len(self._data_buffer)
获取缓冲区中的数据长度。
:return: 缓冲区中的数据长度。
def
run(self) -> None:
393 def run(self) -> None: 394 """ 395 启动服务器并运行 UI 更新线程(如果启用 UI)。 396 """ 397 self._app_process.start() 398 399 # 数据更新线程 400 self._data_thread = threading.Thread(target=self._update_data, daemon=True) 401 self._data_thread.start() 402 403 if self._ui: 404 self._init_ui() 405 # UI 更新线程 406 self._ui_thread = threading.Thread(target=self._update_ui, daemon=True) 407 self._ui_thread.start() 408 409 self.root.mainloop()
启动服务器并运行 UI 更新线程(如果启用 UI)。
class
ManualState(enum.Enum):
517class ManualState(Enum): 518 """ 手动状态枚举 """ 519 IDLE = 0 520 """ 空闲状态,所有电机停止运动 """ 521 RUNNING = 1 522 """ 运行状态 """ 523 RETURNING = 2 524 """ 返回状态 """ 525 ZEROING = 3 526 """ 设置零点状态 """
手动状态枚举
Inherited Members
- enum.Enum
- name
- value
528class HoManual(Node): 529 """ Ho 机器人的手动控制节点 """ 530 from robotengine import InputEvent 531 def __init__(self, link: HoLink, name="Manual", rotation_velocity: float = 360.0, running_scale: float=100.0, zeroing_scale: float=100.0, axis_threshold: float=0.1) -> None: 532 """ 533 初始化 HoManual 实例。 534 535 :param link: HoLink 实例。 536 :param name: 节点名称。 537 :param rotation_velocity: 旋转速度(度/秒)。 538 :param running_scale: 运行状态的缩放因子。 539 :param zeroing_scale: 设置零点状态的缩放因子。 540 :param axis_threshold: 轴的阈值。 541 """ 542 from robotengine import StateMachine 543 super().__init__(name) 544 self._debug = False 545 self._valid = True 546 547 self._link = link 548 self._link.ho_state_update.connect(self._on_ho_state_update) 549 550 self.state_machine = StateMachine(ManualState.IDLE, name="HoManualStateMachine") 551 self.add_child(self.state_machine) 552 553 self._zero_angles = { 554 4: 0.0, 555 5: 0.0, 556 6: 0.0, 557 7: 0.0, 558 } 559 self._zero_index = 4 560 561 self._is_tension = False 562 self._is_rotation = False 563 self._rotation_velocity = rotation_velocity 564 self._base_angles = [math.pi / 4, math.pi / 4 * 3, math.pi / 4 * 5, math.pi / 4 * 7] 565 566 self._running_scale = running_scale 567 self._zeroing_scale = zeroing_scale 568 self._axis_threshold = axis_threshold 569 570 self._before_returning = None 571 572 self.exit() 573 574 def _init(self): 575 _load_zero_angles = self._load_from_json("zero_angles.json") 576 if _load_zero_angles: 577 info("成功加载 zero_angles.json") 578 for i in range(4, 8): 579 self._zero_angles[i] = _load_zero_angles[str(i)] 580 info(f"{i}: {self._zero_angles[i]}") 581 582 def _input(self, event: InputEvent) -> None: 583 if not self._valid: 584 return 585 586 state = self.state_machine.current_state 587 588 if event.is_action_pressed("X"): 589 self.tension(not self._is_tension) 590 591 elif event.is_action_pressed("Y"): 592 self.rotation(not self._is_rotation, self._rotation_velocity) 593 594 if state == ManualState.ZEROING: 595 if event.is_action_pressed("LEFT_SHOULDER"): 596 self._change_index(-1) 597 598 elif event.is_action_pressed("RIGHT_SHOULDER"): 599 self._change_index(1) 600 601 elif event.is_action_pressed("A"): 602 if self._debug: 603 return 604 ho_state = self._link.get_ho_state() 605 if not ho_state: 606 return 607 for i in range(4, 8): 608 state = ho_state.get_state(i) 609 self._zero_angles[i] = state.p 610 self._save_to_json("zero_angles.json", self._zero_angles) 611 612 def _change_index(self, dir: int) -> None: 613 self.lock(self._zero_index) 614 self._zero_index += dir 615 if self._zero_index > 7: 616 self._zero_index = 4 617 elif self._zero_index < 4: 618 self._zero_index = 7 619 info(f" 当前电机: {self._zero_index}") 620 621 def _on_ho_state_update(self, ho_state: HoState): 622 if not self._valid: 623 return 624 625 state = self.state_machine.current_state 626 627 if state == ManualState.IDLE: 628 pass 629 630 elif state == ManualState.RUNNING: 631 x_value = self._threshold(self.input.get_action_strength("RIGHT_X")) 632 y_value = self._threshold(self.input.get_action_strength("LEFT_Y")) 633 self.turn(x_value, y_value, ho_state) 634 635 l_value = self._threshold(self.input.get_action_strength("TRIGGER_LEFT")) 636 r_value = self._threshold(self.input.get_action_strength("TRIGGER_RIGHT")) 637 self.move(l_value, r_value, ho_state) 638 639 elif state == ManualState.RETURNING: 640 for i in range(4, 8): 641 self._link.update(i, HoMode.P, 2.0, 100.0, self._zero_angles[i]) 642 643 elif state == ManualState.ZEROING: 644 direction = self.input.get_action_strength("LEFT_Y") 645 direction = self._threshold(direction) 646 velocity = direction * self._zeroing_scale 647 if not self._debug: 648 self._link.update(self._zero_index, HoMode.V, 2.0, velocity, 0.0) 649 650 def tick(self, state: ManualState, delta: float) -> None: 651 if state == ManualState.IDLE: 652 pass 653 654 elif state == ManualState.RUNNING: 655 pass 656 657 elif state == ManualState.RETURNING: 658 pass 659 660 elif state == ManualState.ZEROING: 661 pass 662 663 def _threshold(self, value: float) -> float: 664 if abs(value) < self._axis_threshold: 665 return 0.0 666 return value 667 668 def get_next_state(self, state: ManualState) -> ManualState: 669 if state == ManualState.IDLE: 670 if self.input.is_action_pressed("START"): 671 self.input.flush_action("START") 672 return ManualState.RUNNING 673 674 if self.input.is_action_pressed("B"): 675 self.input.flush_action("B") 676 return ManualState.RETURNING 677 678 elif self.input.is_action_pressed("RIGHT_STICK"): 679 self.input.flush_action("RIGHT_STICK") 680 return ManualState.ZEROING 681 682 elif state == ManualState.RUNNING: 683 if self.input.is_action_pressed("START"): 684 self.input.flush_action("START") 685 return ManualState.IDLE 686 687 if self.input.is_action_pressed("B"): 688 self.input.flush_action("B") 689 return ManualState.RETURNING 690 691 elif state == ManualState.RETURNING: 692 if self.input.is_action_pressed("B"): 693 self.input.flush_action("B") 694 return self._before_returning 695 696 elif state == ManualState.ZEROING: 697 if self.input.is_action_pressed("RIGHT_STICK"): 698 self.input.flush_action("RIGHT_STICK") 699 return ManualState.IDLE 700 701 return self.state_machine.KEEP_CURRENT 702 703 def transition_state(self, from_state: ManualState, to_state: ManualState) -> None: 704 print("") 705 info(f"{from_state if from_state is not None else 'START'} -> {to_state}") 706 info(f"TENSION: {self._is_tension}, ROTATION: {self._is_rotation}") 707 708 if from_state == ManualState.IDLE: 709 pass 710 711 elif from_state == ManualState.RUNNING: 712 pass 713 714 elif from_state == ManualState.RETURNING: 715 pass 716 717 elif from_state == ManualState.ZEROING: 718 pass 719 720 info(" Y: 开关旋转") 721 info(" X: 开关张紧") 722 if to_state == ManualState.IDLE: 723 for i in range(1, 9): 724 if i == 2 or i == 3: 725 continue 726 self.stop(i) 727 info(" START: 进入 RUNNING 状态") 728 info(" B: 进入 RETURNING 状态") 729 info(" RIGHT_STICK: 进入 ZEROING 状态") 730 731 elif to_state == ManualState.RUNNING: 732 for i in range(1, 9): 733 if i == 2 or i == 3: 734 continue 735 self.lock(i) 736 info(" START: 返回 IDLE 状态") 737 info(" B: 进入 RETURNING 状态") 738 739 elif to_state == ManualState.RETURNING: 740 self._before_returning = from_state 741 info(" B: 返回之前的状态") 742 743 elif to_state == ManualState.ZEROING: 744 for i in range(1, 9): 745 if i == 2 or i == 3: 746 continue 747 self.lock(i) 748 info(" RIGHT_STICK: 返回 IDLE 状态") 749 info(" LEFT_SHOULDER: 切换到上一个电机") 750 info(" RIGHT_SHOULDER: 切换到下一个电机") 751 info(" A: 保存当前位置为零点") 752 info(f" 当前电机: {self._zero_index}") 753 754 def lock(self, id: int) -> None: 755 """ 756 锁定指定的电机。 757 758 :param id: 电机编号 759 """ 760 if self._debug: 761 info(f"{self.name} 锁定电机 {id}") 762 return 763 self._link.update(id, HoMode.V, 2.0, 0.0, 0.0) 764 765 def lock_all(self) -> None: 766 """ 767 锁定所有的电机。 768 """ 769 for i in range(1, 9): 770 self.lock(i) 771 772 def stop(self, id: int) -> None: 773 """ 774 停止指定的电机。 775 776 :param id: 电机编号 777 """ 778 if self._debug: 779 info(f"{self.name} 停止电机 {id}") 780 return 781 self._link.update(id, HoMode.S, 0.0, 0.0, 0.0) 782 783 def stop_all(self) -> None: 784 """ 785 停止所有的电机。 786 """ 787 for i in range(1, 9): 788 self.stop(i) 789 790 def tension(self, on: bool, i: float=0.8) -> None: 791 """ 792 驱动牵引电机,张紧导管 793 794 :param on: 是否开启牵引 795 :param i: 牵引电流(A) 796 """ 797 self._is_tension = on 798 if on: 799 self._link.update(2, HoMode.V, i, -360.0, 0.0) 800 self._link.update(3, HoMode.V, i, 360.0, 0.0) 801 else: 802 self.stop(2) 803 self.stop(3) 804 805 def rotation(self, on: bool, velocity: float = 360.0) -> None: 806 """ 807 驱动旋转电机,旋转换能器 808 809 :param on: 是否开启旋转 810 :param velocity: 旋转速度(度/秒) 811 """ 812 self._is_rotation = on 813 if on: 814 self._link.update(8, HoMode.V, 2.0, velocity, 0.0) 815 else: 816 self.stop(8) 817 818 def turn(self, x_value: float, y_value: float, ho_state: HoState) -> None: 819 """ 820 驱动转向电机,转向导管 821 822 :param x_value: 横向控制值 823 :param y_value: 纵向控制值 824 :param ho_state: Ho 机器人状态 825 """ 826 if x_value == 0 and y_value == 0: 827 for i in range(4, 8): 828 self._link.update(i, HoMode.V, 2.0, 0.0, 0.0) 829 else: 830 projection = compute_vector_projection(x_value, y_value, self._base_angles) 831 control_values = [v * self._running_scale for v in projection] 832 833 for i in range(4, 8): 834 if control_values[i-4] > 0: 835 a_id = i 836 b_id = (i + 2) % 4 + 4 837 a_state = ho_state.get_state(a_id) 838 b_state = ho_state.get_state(b_id) 839 a_near = near(a_state.p, self._zero_angles[a_id]) 840 b_near = near(b_state.p, self._zero_angles[b_id]) 841 842 if a_near and not b_near: 843 self._link.update(b_id, HoMode.P, 2.0, control_values[i-4], self._zero_angles[b_id]) 844 elif (not a_near and b_near) or (a_near and b_near): 845 self._link.update(a_id, HoMode.V, 2.0, control_values[i-4], 0.0) 846 elif not a_near and not b_near: 847 self._link.update(a_id, HoMode.V, 2.0, control_values[i-4], 0.0) 848 self._link.update(b_id, HoMode.P, 2.0, control_values[i-4], self._zero_angles[b_id]) 849 850 def move(self, l_value: float, r_value: float, ho_state: HoState) -> None: 851 """ 852 驱动移动电机,移动导管 853 854 :param l_value: 左侧移动控制值 855 :param r_value: 右侧移动控制值 856 :param ho_state: Ho 机器人状态 857 """ 858 if l_value != 0 and r_value != 0: 859 self._link.update(1, HoMode.V, 2.0, 0.0, 0.0) 860 return 861 862 if l_value != 0: 863 self._link.update(1, HoMode.V, 2.0, -l_value * self._running_scale, 0.0) 864 return 865 866 if r_value != 0: 867 self._link.update(1, HoMode.V, 2.0, r_value * self._running_scale, 0.0) 868 return 869 870 def is_running(self) -> bool: 871 """ 872 判断当前节点是否处于运行状态。 873 874 :return: 如果当前节点处于运行状态,则返回 True,否则返回 False。 875 """ 876 return self._valid 877 878 def enter(self) -> None: 879 """ 880 进入节点。 881 """ 882 self.state_machine.freeze = False 883 self.state_machine.first_tick = True 884 self.state_machine.current_state = ManualState.IDLE 885 self._is_rotation = False 886 self._is_tension = False 887 self._valid = True 888 889 def exit(self) -> None: 890 """ 891 退出节点。 892 """ 893 self.state_machine.freeze = True 894 self.state_machine.first_tick = True 895 self.state_machine.current_state = ManualState.IDLE 896 self._valid = False 897 self._is_rotation = False 898 self._is_tension = False 899 self.stop_all() 900 901 def _save_to_json(self, file_name, data): 902 with open(file_name, 'w') as f: 903 json.dump(data, f) 904 info(f" {self.name} 保存 {file_name} 成功") 905 906 def _load_from_json(self, file_name): 907 if not os.path.exists(file_name): 908 warning(f"{file_name} 不存在,无法读取 zero_angles 将使用 0.0 作为初始值") 909 return None 910 with open(file_name, 'r') as f: 911 return json.load(f)
Ho 机器人的手动控制节点
HoManual( link: HoLink, name='Manual', rotation_velocity: float = 360.0, running_scale: float = 100.0, zeroing_scale: float = 100.0, axis_threshold: float = 0.1)
531 def __init__(self, link: HoLink, name="Manual", rotation_velocity: float = 360.0, running_scale: float=100.0, zeroing_scale: float=100.0, axis_threshold: float=0.1) -> None: 532 """ 533 初始化 HoManual 实例。 534 535 :param link: HoLink 实例。 536 :param name: 节点名称。 537 :param rotation_velocity: 旋转速度(度/秒)。 538 :param running_scale: 运行状态的缩放因子。 539 :param zeroing_scale: 设置零点状态的缩放因子。 540 :param axis_threshold: 轴的阈值。 541 """ 542 from robotengine import StateMachine 543 super().__init__(name) 544 self._debug = False 545 self._valid = True 546 547 self._link = link 548 self._link.ho_state_update.connect(self._on_ho_state_update) 549 550 self.state_machine = StateMachine(ManualState.IDLE, name="HoManualStateMachine") 551 self.add_child(self.state_machine) 552 553 self._zero_angles = { 554 4: 0.0, 555 5: 0.0, 556 6: 0.0, 557 7: 0.0, 558 } 559 self._zero_index = 4 560 561 self._is_tension = False 562 self._is_rotation = False 563 self._rotation_velocity = rotation_velocity 564 self._base_angles = [math.pi / 4, math.pi / 4 * 3, math.pi / 4 * 5, math.pi / 4 * 7] 565 566 self._running_scale = running_scale 567 self._zeroing_scale = zeroing_scale 568 self._axis_threshold = axis_threshold 569 570 self._before_returning = None 571 572 self.exit()
初始化 HoManual 实例。
:param link: HoLink 实例。
:param name: 节点名称。
:param rotation_velocity: 旋转速度(度/秒)。
:param running_scale: 运行状态的缩放因子。
:param zeroing_scale: 设置零点状态的缩放因子。
:param axis_threshold: 轴的阈值。
668 def get_next_state(self, state: ManualState) -> ManualState: 669 if state == ManualState.IDLE: 670 if self.input.is_action_pressed("START"): 671 self.input.flush_action("START") 672 return ManualState.RUNNING 673 674 if self.input.is_action_pressed("B"): 675 self.input.flush_action("B") 676 return ManualState.RETURNING 677 678 elif self.input.is_action_pressed("RIGHT_STICK"): 679 self.input.flush_action("RIGHT_STICK") 680 return ManualState.ZEROING 681 682 elif state == ManualState.RUNNING: 683 if self.input.is_action_pressed("START"): 684 self.input.flush_action("START") 685 return ManualState.IDLE 686 687 if self.input.is_action_pressed("B"): 688 self.input.flush_action("B") 689 return ManualState.RETURNING 690 691 elif state == ManualState.RETURNING: 692 if self.input.is_action_pressed("B"): 693 self.input.flush_action("B") 694 return self._before_returning 695 696 elif state == ManualState.ZEROING: 697 if self.input.is_action_pressed("RIGHT_STICK"): 698 self.input.flush_action("RIGHT_STICK") 699 return ManualState.IDLE 700 701 return self.state_machine.KEEP_CURRENT
703 def transition_state(self, from_state: ManualState, to_state: ManualState) -> None: 704 print("") 705 info(f"{from_state if from_state is not None else 'START'} -> {to_state}") 706 info(f"TENSION: {self._is_tension}, ROTATION: {self._is_rotation}") 707 708 if from_state == ManualState.IDLE: 709 pass 710 711 elif from_state == ManualState.RUNNING: 712 pass 713 714 elif from_state == ManualState.RETURNING: 715 pass 716 717 elif from_state == ManualState.ZEROING: 718 pass 719 720 info(" Y: 开关旋转") 721 info(" X: 开关张紧") 722 if to_state == ManualState.IDLE: 723 for i in range(1, 9): 724 if i == 2 or i == 3: 725 continue 726 self.stop(i) 727 info(" START: 进入 RUNNING 状态") 728 info(" B: 进入 RETURNING 状态") 729 info(" RIGHT_STICK: 进入 ZEROING 状态") 730 731 elif to_state == ManualState.RUNNING: 732 for i in range(1, 9): 733 if i == 2 or i == 3: 734 continue 735 self.lock(i) 736 info(" START: 返回 IDLE 状态") 737 info(" B: 进入 RETURNING 状态") 738 739 elif to_state == ManualState.RETURNING: 740 self._before_returning = from_state 741 info(" B: 返回之前的状态") 742 743 elif to_state == ManualState.ZEROING: 744 for i in range(1, 9): 745 if i == 2 or i == 3: 746 continue 747 self.lock(i) 748 info(" RIGHT_STICK: 返回 IDLE 状态") 749 info(" LEFT_SHOULDER: 切换到上一个电机") 750 info(" RIGHT_SHOULDER: 切换到下一个电机") 751 info(" A: 保存当前位置为零点") 752 info(f" 当前电机: {self._zero_index}")
def
lock(self, id: int) -> None:
754 def lock(self, id: int) -> None: 755 """ 756 锁定指定的电机。 757 758 :param id: 电机编号 759 """ 760 if self._debug: 761 info(f"{self.name} 锁定电机 {id}") 762 return 763 self._link.update(id, HoMode.V, 2.0, 0.0, 0.0)
锁定指定的电机。
:param id: 电机编号
def
lock_all(self) -> None:
765 def lock_all(self) -> None: 766 """ 767 锁定所有的电机。 768 """ 769 for i in range(1, 9): 770 self.lock(i)
锁定所有的电机。
def
stop(self, id: int) -> None:
772 def stop(self, id: int) -> None: 773 """ 774 停止指定的电机。 775 776 :param id: 电机编号 777 """ 778 if self._debug: 779 info(f"{self.name} 停止电机 {id}") 780 return 781 self._link.update(id, HoMode.S, 0.0, 0.0, 0.0)
停止指定的电机。
:param id: 电机编号
def
stop_all(self) -> None:
783 def stop_all(self) -> None: 784 """ 785 停止所有的电机。 786 """ 787 for i in range(1, 9): 788 self.stop(i)
停止所有的电机。
def
tension(self, on: bool, i: float = 0.8) -> None:
790 def tension(self, on: bool, i: float=0.8) -> None: 791 """ 792 驱动牵引电机,张紧导管 793 794 :param on: 是否开启牵引 795 :param i: 牵引电流(A) 796 """ 797 self._is_tension = on 798 if on: 799 self._link.update(2, HoMode.V, i, -360.0, 0.0) 800 self._link.update(3, HoMode.V, i, 360.0, 0.0) 801 else: 802 self.stop(2) 803 self.stop(3)
驱动牵引电机,张紧导管
:param on: 是否开启牵引
:param i: 牵引电流(A)
def
rotation(self, on: bool, velocity: float = 360.0) -> None:
805 def rotation(self, on: bool, velocity: float = 360.0) -> None: 806 """ 807 驱动旋转电机,旋转换能器 808 809 :param on: 是否开启旋转 810 :param velocity: 旋转速度(度/秒) 811 """ 812 self._is_rotation = on 813 if on: 814 self._link.update(8, HoMode.V, 2.0, velocity, 0.0) 815 else: 816 self.stop(8)
驱动旋转电机,旋转换能器
:param on: 是否开启旋转
:param velocity: 旋转速度(度/秒)
818 def turn(self, x_value: float, y_value: float, ho_state: HoState) -> None: 819 """ 820 驱动转向电机,转向导管 821 822 :param x_value: 横向控制值 823 :param y_value: 纵向控制值 824 :param ho_state: Ho 机器人状态 825 """ 826 if x_value == 0 and y_value == 0: 827 for i in range(4, 8): 828 self._link.update(i, HoMode.V, 2.0, 0.0, 0.0) 829 else: 830 projection = compute_vector_projection(x_value, y_value, self._base_angles) 831 control_values = [v * self._running_scale for v in projection] 832 833 for i in range(4, 8): 834 if control_values[i-4] > 0: 835 a_id = i 836 b_id = (i + 2) % 4 + 4 837 a_state = ho_state.get_state(a_id) 838 b_state = ho_state.get_state(b_id) 839 a_near = near(a_state.p, self._zero_angles[a_id]) 840 b_near = near(b_state.p, self._zero_angles[b_id]) 841 842 if a_near and not b_near: 843 self._link.update(b_id, HoMode.P, 2.0, control_values[i-4], self._zero_angles[b_id]) 844 elif (not a_near and b_near) or (a_near and b_near): 845 self._link.update(a_id, HoMode.V, 2.0, control_values[i-4], 0.0) 846 elif not a_near and not b_near: 847 self._link.update(a_id, HoMode.V, 2.0, control_values[i-4], 0.0) 848 self._link.update(b_id, HoMode.P, 2.0, control_values[i-4], self._zero_angles[b_id])
驱动转向电机,转向导管
:param x_value: 横向控制值
:param y_value: 纵向控制值
:param ho_state: Ho 机器人状态
850 def move(self, l_value: float, r_value: float, ho_state: HoState) -> None: 851 """ 852 驱动移动电机,移动导管 853 854 :param l_value: 左侧移动控制值 855 :param r_value: 右侧移动控制值 856 :param ho_state: Ho 机器人状态 857 """ 858 if l_value != 0 and r_value != 0: 859 self._link.update(1, HoMode.V, 2.0, 0.0, 0.0) 860 return 861 862 if l_value != 0: 863 self._link.update(1, HoMode.V, 2.0, -l_value * self._running_scale, 0.0) 864 return 865 866 if r_value != 0: 867 self._link.update(1, HoMode.V, 2.0, r_value * self._running_scale, 0.0) 868 return
驱动移动电机,移动导管
:param l_value: 左侧移动控制值
:param r_value: 右侧移动控制值
:param ho_state: Ho 机器人状态
def
is_running(self) -> bool:
870 def is_running(self) -> bool: 871 """ 872 判断当前节点是否处于运行状态。 873 874 :return: 如果当前节点处于运行状态,则返回 True,否则返回 False。 875 """ 876 return self._valid
判断当前节点是否处于运行状态。
:return: 如果当前节点处于运行状态,则返回 True,否则返回 False。
def
enter(self) -> None:
878 def enter(self) -> None: 879 """ 880 进入节点。 881 """ 882 self.state_machine.freeze = False 883 self.state_machine.first_tick = True 884 self.state_machine.current_state = ManualState.IDLE 885 self._is_rotation = False 886 self._is_tension = False 887 self._valid = True
进入节点。
def
exit(self) -> None:
889 def exit(self) -> None: 890 """ 891 退出节点。 892 """ 893 self.state_machine.freeze = True 894 self.state_machine.first_tick = True 895 self.state_machine.current_state = ManualState.IDLE 896 self._valid = False 897 self._is_rotation = False 898 self._is_tension = False 899 self.stop_all()
退出节点。
class
HoManual.InputEvent:
139class InputEvent: 140 """ 输入事件基类 """ 141 def __init__(self): 142 pass 143 144 def get_action_strength(self, action: str) -> float: 145 """ 返回某个动作的强度 """ 146 pass 147 148 def is_action_pressed(self, action: str) -> bool: 149 """ 检查某个动作是否被按下 """ 150 pass 151 152 def is_action_released(self, action: str) -> bool: 153 """ 检查某个动作是否被释放 """ 154 pass
输入事件基类