数据订阅
Wuji SDK 通过订阅机制接收设备数据。每个数据流对应一种传感器或计算结果,通过语义化 API 访问。
订阅模式
异步接收
sub = glove.tactile().subscribe()
frame = await sub.recv_async() # 等待下一帧数据同步非阻塞接收
sub = glove.tactile().subscribe()
frame = sub.recv() # 无数据时返回 None回调接收
def on_data(frame):
print(f"Max: {max(frame.data):.1f}")
sub = glove.tactile().subscribe_with_callback(callback=on_data)
# 后台自动接收,不阻塞主线程关闭订阅
回调订阅需要手动关闭:
sub = glove.tactile().subscribe_with_callback(callback=on_data)
# ... 使用一段时间后
sub.close()断开设备连接时,该设备上的所有订阅会自动关闭。
跨设备合并订阅
全局资源会自动聚合所有已连接设备的数据,通过 SdkManager 级别的 API 订阅。当前版本的全局资源:
| 资源 | 类型 | 说明 |
|---|---|---|
tf_static | FrameTransforms | 静态坐标变换(如 wrist → emf_tx) |
tf | FrameTransforms | 动态坐标变换(如 waist → wrist) |
运行时通过 manager.global_topics() 查询完整的全局资源列表:
manager = SdkManager.instance()
# 查看所有全局资源
for topic in manager.global_topics():
print(f"{topic.path} (sub={topic.can_sub})")
# 订阅全局资源——数据来自所有已连接设备的合并
sub = manager.tf_static().subscribe()
transforms = await sub.recv_async()C SDK 等价:wuji_subscribe_tf(...) 与 wuji_subscribe_tf_static(...) 订阅全局坐标变换(回调签名 WujiFrameTransforms,设备无关)。
全局资源与设备资源的区别:
| 维度 | 设备资源 | 全局资源 |
|---|---|---|
| 访问方式 | glove.tactile().subscribe() | manager.tf_static().subscribe() |
| 数据来源 | 单个设备 | 所有已连接设备的合并 |
| 查询 API | device.topics() | manager.global_topics() |
可用数据流
各设备支持的数据流和数据结构详见对应设备文档:
- Wuji Glove SDK 数据参考 — 触觉、EMF 位姿、手部追踪、IMU、坐标变换
Wuji Hand 数据订阅
通过 wuji_sdk.WujiHand 订阅 Wuji Hand 的 20 关节状态、发布关节指令、订阅配对触觉手套的数据。
订阅关节状态
hand.joint_state().subscribe() 返回 Subscription[HandJointState],异步消费 20 关节实时位置,可选含速度与力矩:
sub = hand.joint_state().subscribe()
for _ in range(100):
state = await sub.recv_async()
print(f"seq={state.header.seq}, joint0={state.position[0]:+.3f}")
sub.close()发布关节指令(实时控制)
hand.realtime_controller(LowPass(cutoff_hz=...)) 以 with 上下文打开实时控制会话,低通滤波抑制高频抖动。在会话内用 hand.joint_command().publisher() 拿到 PUB 句柄,调用 publisher.send(target) 推送 20 关节目标位置:
from wuji_sdk import LowPass
with hand.realtime_controller(LowPass(cutoff_hz=5.0)):
publisher = hand.joint_command().publisher()
target = [0.0] * 20
target[0] = 0.3
publisher.send(target)
publisher.close()持续控制时按固定频率(典型 100 Hz)在 with 块内循环调用 publisher.send()。
电机力矩上限
hand.set_all_effort_limit(amps) 把 20 个关节的力矩上限统一设为指定安培值,hand.get_all_effort_limit() 读回长度 20 的 list[float]。
- 默认值 1.5 A,适用于大多数场景
- 合法范围 0.0–3.5 A,超过 3.5 会被限制到 3.5
hand.set_all_effort_limit(1.5)
limits = hand.get_all_effort_limit()修改 effort limit 会改变关节最大输出能力。固件还会根据关节温度自动降低实际生效的值。完整说明与风险提示见 Wuji Hand SDK 使用说明 — joint_effort_limit。
触觉手套同步数据
手与触觉手套同 USB 总线接入且手性匹配时,SDK 在 connect 中自动配对。检测配对状态并订阅压力帧:
if hand.is_tactile_attached():
tactile_sub = hand.tactile.subscribe_pressure_frame()
frame = await tactile_sub.recv_async()
print(f"max pressure: {max(frame.pressure):.2f}")
tactile_sub.close()