最佳实践
错误处理
所有 SDK 操作错误统一抛出 WujiException:
from wuji_sdk import SdkManager, WujiException
try:
manager = SdkManager.instance()
glove = manager.auto_connect(device_name="glove")
sub = glove.tactile().subscribe()
frame = sub.recv()
except WujiException as e:
error_msg = str(e)
if "DeviceNotFound" in error_msg:
print("未找到设备,请检查连接")
elif "Disconnected" in error_msg:
print("设备已断开")
elif "Connection timeout" in error_msg:
print("连接超时,请检查网络和 IP 设置")
elif "Operation timeout" in error_msg:
print("操作超时,设备无响应,正在重试...")
else:
print(f"错误: {error_msg}")常见错误类型:
| 错误 | 说明 |
|---|---|
DeviceNotFound | 未找到指定设备 |
DeviceMismatch | 设备类型不匹配 |
Disconnected | 设备已断开连接 |
ConnectionTimeout | 连接超时 |
OperationTimeout | 操作超时 |
PathNotFound | 资源路径不存在 |
SchemaMismatch | 数据类型不匹配 |
NoData | 无可用数据 |
StreamClosed | 订阅流已关闭 |
重连策略
设备断开后需要重新连接和订阅:
import time
from wuji_sdk import SdkManager, WujiException
manager = SdkManager.instance()
def connect_with_retry(sn, max_retries=5):
for attempt in range(max_retries):
try:
return manager.connect(sn=sn, device_name="glove")
except WujiException:
wait = 2 ** attempt # 指数退避
print(f"连接失败,{wait}s 后重试...")
time.sleep(wait)
raise RuntimeError("达到最大重试次数")多设备协调
同时从多个设备采集数据:
import asyncio
from wuji_sdk import SdkManager
async def collect_from_device(glove, name):
sub = glove.tactile().subscribe()
while True:
frame = await sub.recv_async()
print(f"[{name}] max={max(frame.data):.1f}")
async def main():
manager = SdkManager.instance()
devices = manager.scan()
gloves = []
for i, dev in enumerate(devices):
glove = manager.connect(sn=dev.sn, device_name=f"glove_{i}")
gloves.append((glove, f"glove_{i}"))
tasks = [collect_from_device(g, n) for g, n in gloves]
await asyncio.gather(*tasks)
asyncio.run(main())跨设备合并数据
多设备场景下,使用全局资源获取所有设备的聚合数据,无需手动合并:
manager = SdkManager.instance()
left = manager.connect(sn="WG1JA00XXXXXXXXX", device_name="left")
right = manager.connect(sn="WG1KA00XXXXXXXXX", device_name="right")
# 订阅全局坐标变换——自动合并左右手数据
sub = manager.tf_static().subscribe()
transforms = await sub.recv_async()
for tf in transforms.transforms:
print(f"{tf.parent_frame_id} → {tf.child_frame_id}")日志控制
调整 SDK 日志级别用于调试:
import wuji_sdk
wuji_sdk.set_log_level("debug") # 显示调试信息
wuji_sdk.set_log_level("trace") # 显示所有日志
wuji_sdk.set_log_level("info") # 默认级别
wuji_sdk.set_log_level("warn") # 仅警告
wuji_sdk.set_log_level("error") # 仅错误
wuji_sdk.set_log_level("off") # 关闭日志设备日志
获取设备固件运行日志的方式因产品而异,详见对应设备文档:
录制最佳实践
选择合适的压缩算法
- 实时采集优先用
"lz4"(低延迟),长时间存储归档用"zstd"(高压缩比) - 高频通道(>500 Hz)建议使用
"lz4"避免压缩成为瓶颈
分集管理
将长时间采集拆分为多个分集(episode),便于后续回放和分析:
import asyncio
from wuji_sdk import TopicRecorder
recorder = TopicRecorder(compression="lz4")
recorder.record(glove.tactile().subscribe())
for i in range(5):
handle = await recorder.start(f"./data/episode_{i:03d}.mcap")
await asyncio.sleep(60) # 每个 episode 60 秒
await handle.stop()监控数据质量
长时间录制建议开启质量监控,在告警时及时处理:
import asyncio
handle = await recorder.start("./data/session.mcap")
async def watch_alerts():
async for alert in handle.subscribe_alerts():
print(f"告警: {alert.message}")
asyncio.create_task(watch_alerts())性能优化
减少不必要的订阅
只订阅实际需要的数据流,避免浪费带宽和 CPU:
# ✓ 只订阅需要的数据
sub_tactile = glove.tactile().subscribe()
# ✗ 不要订阅用不到的数据
# sub_imu = glove.imu_palm().subscribe()及时关闭订阅
不再需要的订阅应及时关闭:
sub = glove.tactile().subscribe_with_callback(callback=on_data)
# ... 完成数据采集
sub.close() # 释放资源