数据订阅

Wuji SDK 通过订阅机制接收设备数据。每个数据流对应一种传感器或计算结果,通过语义化 API 访问。

订阅模式

异步接收

sub = glove.tactile().subscribe()
frame = await sub.recv_async()  # 等待下一帧数据

同步非阻塞接收

sub = glove.tactile().subscribe()
frame = sub.recv()  # 无数据时返回 None

回调接收

def on_data(frame):
    print(f"Max: {max(frame.data):.1f}")

sub = glove.tactile().subscribe_with_callback(callback=on_data)
# 后台自动接收,不阻塞主线程

关闭订阅

回调订阅需要手动关闭:

sub = glove.tactile().subscribe_with_callback(callback=on_data)
# ... 使用一段时间后
sub.close()

断开设备连接时,该设备上的所有订阅会自动关闭。

跨设备合并订阅

全局资源会自动聚合所有已连接设备的数据,通过 SdkManager 级别的 API 订阅。当前版本的全局资源:

资源类型说明
tf_staticFrameTransforms静态坐标变换(如 wrist → emf_tx)
tfFrameTransforms动态坐标变换(如 waist → wrist)

运行时通过 manager.global_topics() 查询完整的全局资源列表:

manager = SdkManager.instance()

# 查看所有全局资源
for topic in manager.global_topics():
    print(f"{topic.path} (sub={topic.can_sub})")

# 订阅全局资源——数据来自所有已连接设备的合并
sub = manager.tf_static().subscribe()
transforms = await sub.recv_async()

C SDK 等价:wuji_subscribe_tf(...)wuji_subscribe_tf_static(...) 订阅全局坐标变换(回调签名 WujiFrameTransforms,设备无关)。

全局资源与设备资源的区别:

维度设备资源全局资源
访问方式glove.tactile().subscribe()manager.tf_static().subscribe()
数据来源单个设备所有已连接设备的合并
查询 APIdevice.topics()manager.global_topics()

可用数据流

各设备支持的数据流和数据结构详见对应设备文档:

Wuji Hand 数据订阅

通过 wuji_sdk.WujiHand 订阅 Wuji Hand 的 20 关节状态、发布关节指令、订阅配对触觉手套的数据。

订阅关节状态

hand.joint_state().subscribe() 返回 Subscription[HandJointState],异步消费 20 关节实时位置,可选含速度与力矩:

sub = hand.joint_state().subscribe()
for _ in range(100):
    state = await sub.recv_async()
    print(f"seq={state.header.seq}, joint0={state.position[0]:+.3f}")
sub.close()

发布关节指令(实时控制)

hand.realtime_controller(LowPass(cutoff_hz=...))with 上下文打开实时控制会话,低通滤波抑制高频抖动。在会话内用 hand.joint_command().publisher() 拿到 PUB 句柄,调用 publisher.send(target) 推送 20 关节目标位置:

from wuji_sdk import LowPass

with hand.realtime_controller(LowPass(cutoff_hz=5.0)):
    publisher = hand.joint_command().publisher()
    target = [0.0] * 20
    target[0] = 0.3
    publisher.send(target)
    publisher.close()

持续控制时按固定频率(典型 100 Hz)在 with 块内循环调用 publisher.send()

电机力矩上限

hand.set_all_effort_limit(amps) 把 20 个关节的力矩上限统一设为指定安培值,hand.get_all_effort_limit() 读回长度 20 的 list[float]

  • 默认值 1.5 A,适用于大多数场景
  • 合法范围 0.0–3.5 A,超过 3.5 会被限制到 3.5
hand.set_all_effort_limit(1.5)
limits = hand.get_all_effort_limit()

修改 effort limit 会改变关节最大输出能力。固件还会根据关节温度自动降低实际生效的值。完整说明与风险提示见 Wuji Hand SDK 使用说明 — joint_effort_limit

触觉手套同步数据

手与触觉手套同 USB 总线接入且手性匹配时,SDK 在 connect 中自动配对。检测配对状态并订阅压力帧:

if hand.is_tactile_attached():
    tactile_sub = hand.tactile.subscribe_pressure_frame()
    frame = await tactile_sub.recv_async()
    print(f"max pressure: {max(frame.pressure):.2f}")
    tactile_sub.close()