最佳实践

错误处理

所有 SDK 操作错误统一抛出 WujiException

from wuji_sdk import SdkManager, WujiException

try:
    manager = SdkManager.instance()
    glove = manager.auto_connect(device_name="glove")
    sub = glove.tactile().subscribe()
    frame = sub.recv()
except WujiException as e:
    error_msg = str(e)
    if "DeviceNotFound" in error_msg:
        print("未找到设备,请检查连接")
    elif "Disconnected" in error_msg:
        print("设备已断开")
    elif "Connection timeout" in error_msg:
        print("连接超时,请检查网络和 IP 设置")
    elif "Operation timeout" in error_msg:
        print("操作超时,设备无响应,正在重试...")
    else:
        print(f"错误: {error_msg}")

常见错误类型:

错误说明
DeviceNotFound未找到指定设备
DeviceMismatch设备类型不匹配
Disconnected设备已断开连接
ConnectionTimeout连接超时
OperationTimeout操作超时
PathNotFound资源路径不存在
SchemaMismatch数据类型不匹配
NoData无可用数据
StreamClosed订阅流已关闭

重连策略

设备断开后需要重新连接和订阅:

import time
from wuji_sdk import SdkManager, WujiException

manager = SdkManager.instance()

def connect_with_retry(sn, max_retries=5):
    for attempt in range(max_retries):
        try:
            return manager.connect(sn=sn, device_name="glove")
        except WujiException:
            wait = 2 ** attempt  # 指数退避
            print(f"连接失败,{wait}s 后重试...")
            time.sleep(wait)
    raise RuntimeError("达到最大重试次数")

多设备协调

同时从多个设备采集数据:

import asyncio
from wuji_sdk import SdkManager

async def collect_from_device(glove, name):
    sub = glove.tactile().subscribe()
    while True:
        frame = await sub.recv_async()
        print(f"[{name}] max={max(frame.data):.1f}")

async def main():
    manager = SdkManager.instance()
    devices = manager.scan()

    gloves = []
    for i, dev in enumerate(devices):
        glove = manager.connect(sn=dev.sn, device_name=f"glove_{i}")
        gloves.append((glove, f"glove_{i}"))

    tasks = [collect_from_device(g, n) for g, n in gloves]
    await asyncio.gather(*tasks)

asyncio.run(main())

跨设备合并数据

多设备场景下,使用全局资源获取所有设备的聚合数据,无需手动合并:

manager = SdkManager.instance()
left = manager.connect(sn="WG1JA00XXXXXXXXX", device_name="left")
right = manager.connect(sn="WG1KA00XXXXXXXXX", device_name="right")

# 订阅全局坐标变换——自动合并左右手数据
sub = manager.tf_static().subscribe()
transforms = await sub.recv_async()
for tf in transforms.transforms:
    print(f"{tf.parent_frame_id}{tf.child_frame_id}")

日志控制

调整 SDK 日志级别用于调试:

import wuji_sdk

wuji_sdk.set_log_level("debug")   # 显示调试信息
wuji_sdk.set_log_level("trace")   # 显示所有日志
wuji_sdk.set_log_level("info")    # 默认级别
wuji_sdk.set_log_level("warn")    # 仅警告
wuji_sdk.set_log_level("error")   # 仅错误
wuji_sdk.set_log_level("off")     # 关闭日志

设备日志

获取设备固件运行日志的方式因产品而异,详见对应设备文档:

录制最佳实践

选择合适的压缩算法

  • 实时采集优先用 "lz4"(低延迟),长时间存储归档用 "zstd"(高压缩比)
  • 高频通道(>500 Hz)建议使用 "lz4" 避免压缩成为瓶颈

分集管理

将长时间采集拆分为多个分集(episode),便于后续回放和分析:

import asyncio
from wuji_sdk import TopicRecorder

recorder = TopicRecorder(compression="lz4")
recorder.record(glove.tactile().subscribe())

for i in range(5):
    handle = await recorder.start(f"./data/episode_{i:03d}.mcap")
    await asyncio.sleep(60)  # 每个 episode 60 秒
    await handle.stop()

监控数据质量

长时间录制建议开启质量监控,在告警时及时处理:

import asyncio

handle = await recorder.start("./data/session.mcap")

async def watch_alerts():
    async for alert in handle.subscribe_alerts():
        print(f"告警: {alert.message}")

asyncio.create_task(watch_alerts())

性能优化

减少不必要的订阅

只订阅实际需要的数据流,避免浪费带宽和 CPU:

# ✓ 只订阅需要的数据
sub_tactile = glove.tactile().subscribe()

# ✗ 不要订阅用不到的数据
# sub_imu = glove.imu_palm().subscribe()

及时关闭订阅

不再需要的订阅应及时关闭:

sub = glove.tactile().subscribe_with_callback(callback=on_data)
# ... 完成数据采集
sub.close()  # 释放资源