快速开始
SDK 提供三个完整示例,演示数据订阅和录制的典型用法。完整源码见 GitHub 仓库。运行前请确保手套已通过以太网连接且网络配置正确。
回调订阅(0.subscribe_callback.py)
使用 subscribe_with_callback() 注册回调函数,数据到达时自动在后台线程中执行。适合不使用 asyncio 的同步程序,或需要将数据处理与主线程逻辑解耦的场景。
python 0.subscribe_callback.py该示例自动扫描连接所有可用手套,为每个手套注册 6 个数据流的回调(tactile、tactile_zones、emf_poses、hand_joint_angles、hand_skeleton、tactile_point_cloud)。主线程保持空闲等待,回调在后台持续触发。程序退出时通过 sub.close() 清理订阅资源。
预期输出:
Found 1 device(s)
SN=WG1KA00XXXXXXXXX, Address=192.168.1.101:50001
Connected: WG1KA00XXXXXXXXX (FW=0.6.0, right)
[glove_0][Zones] palm=-0.0 thumb=-0.1
[glove_0][Tactile] max=0.7
[glove_0][EmfPoses] thumb=[+0.061, +0.019, -0.058]
[glove_0][JointAngles] conf=['0.26', '0.48', '0.43', '0.43', '0.40']
[glove_0][Skeleton] wrist=[+0.000, +0.000, +0.000] joints=21
Subscribed to 6 streams. Ctrl+C to stop.
...回调订阅的详细说明详见 数据订阅。
异步订阅(1.subscribe_async.py)
使用 Python async/await 模式,通过 recv_async() 异步接收数据。适合需要在同一事件循环中编排多个协程的场景。
python 1.subscribe_async.py该示例会自动扫描并连接所有可用手套,同时订阅 6 个数据流(tactile、tactile_zones、emf_poses、hand_joint_angles、hand_skeleton、tactile_point_cloud),使用 asyncio.gather 并发处理。
预期输出:
Found 1 device(s)
SN=WG1KA00XXXXXXXXX, Address=192.168.1.101:50001
Connected: WG1KA00XXXXXXXXX (FW=0.6.0, right)
Subscribed to 6 streams. Ctrl+C to stop.
[glove_0][EmfPoses] thumb=[+0.061, +0.019, -0.058]
[glove_0][JointAngles] conf=['0.26', '0.48', '0.43', '0.43', '0.40']
[glove_0][Zones] palm=-0.0 thumb=-0.1
[glove_0][Tactile] max=0.7
[glove_0][Zones] palm=-0.0 thumb=-0.1
[glove_0][Tactile] max=0.7
[glove_0][EmfPoses] thumb=[+0.062, +0.019, -0.058]
[glove_0][Skeleton] wrist=[+0.000, +0.000, +0.000] joints=21
[glove_0][JointAngles] conf=['0.55', '0.35', '0.63', '0.29', '0.34']
[glove_0][Skeleton] wrist=[+0.000, +0.000, +0.000] joints=21
...异步订阅和跨设备合并订阅的详细说明详见 数据订阅。
数据录制(2.recording.py)
使用 TopicRecorder 将传感器数据录制到 MCAP 文件,支持 LZ4 压缩。
python 2.recording.py该示例连接一个手套,注册 3 个数据流(tactile、emf_poses、hand_skeleton)到录制器,录制 10 秒后停止并输出统计摘要。
预期输出:
Connected: WG1KA00XXXXXXXXX
Recording to ./data/20260317_150357.mcap ...
Done — 3606 frames, 3.90 MB, 10.0s录制功能的完整用法(暂停/恢复、分集切换、质量监控等)详见 数据录制。
Wuji Hand 上手
通过 wuji-sdk 接入 Wuji Hand 的最小上手示例。SDK 在扫描时将其透出为 USB 设备。连接后使能电机、订阅 20 关节实时位置,最后用 realtime_controller + LowPass 让食指、中指、无名指、小指做一次平滑握拳再张开。更多用法见 设备连接、数据订阅 与 数据参考。
运行前预留手指自由活动的空间,避免碰撞周围物体。
import asyncio
import math
import time
from wuji_sdk import SdkManager, TransportType, LowPass
async def main():
manager = SdkManager.instance()
usb_devices = [d for d in manager.scan() if d.transport_type == TransportType.Usb]
if not usb_devices:
print("未发现 Wuji Hand")
return
hand = manager.connect(sn=usb_devices[0].sn, device_name="wuji_hand")
print(f"已连接: {hand.serial_number} ({hand.handedness_name()})")
hand.enable()
try:
# 实时控制:先打开 controller,再在 with 块内 subscribe 与发布指令
with hand.realtime_controller(LowPass(cutoff_hz=5.0)):
# 订阅 5 帧实时关节状态
sub = hand.joint_state().subscribe()
for _ in range(5):
state = await sub.recv_async()
joints = ", ".join(f"{p:+.3f}" for p in state.position[:4])
print(f"[seq={state.header.seq}] joint[0..3]={joints}")
sub.close()
# 100 Hz 推流 cosine 轨迹,4 指做一次握拳再张开
publisher = hand.joint_command().publisher()
update_period = 1.0 / 100.0
duration_s = 4.0
start = time.monotonic()
while True:
t = time.monotonic() - start
if t >= duration_s:
break
# y 从 0 平滑到 1.6 再回到 0
y = (1 - math.cos(2 * math.pi * t / duration_s)) * 0.8
target = [0.0] * 20
# 食指、中指、无名指、小指的 J1、J3、J4 跟随 y
for finger in range(1, 5):
base = finger * 4
target[base + 0] = y
target[base + 2] = y
target[base + 3] = y
publisher.send(target)
time.sleep(update_period)
publisher.close()
finally:
hand.disable()
manager.disconnect(device_name="wuji_hand")
asyncio.run(main())运行效果
终端按顺序打印连接信息和 5 帧关节状态:
已连接: XXXXXX.YYMMDD.NNN (Left)
[seq=0] joint[0..3]=-0.097, -0.016, -0.012, -0.006
[seq=1] joint[0..3]=-0.095, -0.017, -0.012, -0.006
[seq=2] joint[0..3]=-0.093, -0.017, -0.012, -0.006
...
[seq=4] joint[0..3]=-0.091, -0.016, -0.012, -0.006打印结束后,食指、中指、无名指、小指在 4 秒内平滑握拳再张开。示例结束时电机失能,关节按自然受力恢复。