教程
1. 设备连接与筛选
本节介绍如何在 wujihandpy 中连接灵巧手设备,以及在存在多台设备时进行筛选的方法。
1.1 接口概述
class Hand:
def __init__(
self,
serial_number: str | None = None,
*,
side: str | None = None,
...,
) -> None:
...1.2 连接设备
调用 wujihandpy.Hand() 可创建一个 Hand 对象并连接灵巧手:
hand = wujihandpy.Hand()若系统中仅连接一台灵巧手,则会自动连接该设备。
1.3 筛选设备
当系统中连接了多台灵巧手时,使用无参构造函数将导致错误。例如:
[ERROR] 2 devices found with specified vendor id (0x0483)
[ERROR] Device 1 (0483:2000): Serial Number = 337238793233 <-- Matched #1
[ERROR] Device 2 (0483:2000): Serial Number = 337238873233 <-- Matched #2
[ERROR] To ensure correct device selection, please specify the Serial Number
Traceback (most recent call last):
File "/workspaces/quickstart.py", line 5, in <module>
hand = wujihandpy.Hand()
^^^^^^^^^^^^^^^^^
RuntimeError: Failed to init.为解决该问题,有两种方式指定要连接的设备。
按序列号指定
通过 serial_number 参数明确指定设备:
hand = wujihandpy.Hand(serial_number="337238793233")这样可确保程序始终连接到正确的设备实例。
按左右手指定
当系统中恰好连接一只左手和一只右手时,可直接用 side 参数选择,无需记忆序列号:
left = wujihandpy.Hand(side="left")
right = wujihandpy.Hand(side="right")SDK 会自动枚举所有匹配设备、读取每只手的手性标识,再连接对应一只。
side 与 serial_number 互斥
两者只能选一个传入。同时指定会抛 ValueError。side 仅接受字符串 "left" 或 "right",其他取值会抛 ValueError。
匹配失败的常见情形:
- 没插对应侧的手 → 抛
ConnectionError,提示No <side> hand found - 同侧插了两只 → 抛
ConnectionError,提示改用serial_number
1.4 查询序列号
灵巧手有两种序列号:
- USB 序列号:USB 硬件描述符中的标识符,用于连接时区分多台设备。构造函数的
serial_number参数使用的是此序列号。 - 产品序列号:固件中存储的产品追踪编号,用于售后和设备管理。
查询 USB 序列号
在命令行中执行以下命令,查看系统中连接的所有灵巧手:
lsusb -v -d 0483:2000 | grep iSerial输出示例:
iSerial 3 337238793233
^^^^^^^^^^^^ <-- USB Serial Number标注的数字即为 USB 序列号,可直接传递给构造函数的 serial_number 参数。
查询产品序列号
连接设备后,调用 get_product_sn() 获取产品序列号:
hand = wujihandpy.Hand()
product_sn = hand.get_product_sn()
print(product_sn) # e.g. "WJ-DH5-R-2501001"2. 读取数据
本节介绍如何以同步模式读取数据。
2.1 接口概述
调用 read_* 系列函数会向灵巧手发送同步读请求,并阻塞当前线程直到设备返回结果。函数返回则表示读取已成功完成。
非实时接口的延迟特性
每次同步请求会导致约 5~10 ms 的阻塞,适用于非实时任务,请勿在实时控制环路或高实时性循环中使用此类 API。
对实时控制场景,请考虑:
- 使用 单向/双向实时控制器
- 在旁路低速循环中使用 异步 API
2.2 单值读取
当读取层级与对象层级一致时,函数返回单个标量值。
class Hand:
def read_<dataname>(self) -> datatype: ...
class Finger:
def read_<dataname>(self) -> datatype: ...
class Joint:
def read_<dataname>(self) -> datatype: ...示例
使用 hand 读取灵巧手的上电运行时间(Hand 层):
time = hand.read_system_time()使用 joint 读取食指近端关节的当前位置(Joint 层):
position = hand.finger(1).joint(0).read_joint_actual_position()2.3 批量读取
当读取层级低于对象层级时,函数返回包含多个数据的矩阵。
class Hand:
def read_<dataname>(self) -> np.NDArray[datatype]: ...
class Finger:
def read_<dataname>(self) -> np.NDArray[datatype]: ...
class Joint:
def read_<dataname>(self) -> np.NDArray[datatype]: ...示例
使用 hand 请求整手所有关节(共 20 个)的当前位置数据(Joint 层):
positions = hand.read_joint_actual_position()函数返回一个 np.NDArray[np.float64],每行对应一个手指、每列对应一个关节:
>>> print(positions)
[[ 0.975 0.523 0.271 -0.450]
[ 0.382 0.241 -0.003 -0.275]
[-0.299 0.329 0.067 -0.286]
[-0.122 0.228 0.315 -0.178]
[ 0.205 0.087 0.288 -0.149]]同样,可使用 finger 请求单个手指的所有关节位置:
>>> print(hand.finger(1).read_joint_actual_position())
[ 0.382 0.241 -0.003 -0.275]2.4 更多示例
3. 写入数据
本节介绍如何以同步模式写入数据。
3.1 接口概述
调用 write_* 系列函数会向灵巧手发送同步写请求,并阻塞当前线程直到设备返回结果。函数返回则表示写入已成功完成。
非实时接口的延迟特性
每次同步请求会导致约 5~10 ms 的阻塞,适用于非实时任务,请勿在实时控制环路或高实时性循环中使用此类 API。
对实时控制场景,请考虑:
- 使用 单向/双向实时控制器
- 在旁路低速循环中使用 异步 API
3.2 单值写入
当写入层级与对象层级一致时,函数使用单个标量值。
class Hand:
def write_<dataname>(self, datatype) -> None: ...
class Finger:
def write_<dataname>(self, datatype) -> None: ...
class Joint:
def write_<dataname>(self, datatype) -> None: ...示例 使能食指近端关节(Joint 层):
hand.finger(1).joint(0).write_joint_enabled(True)3.3 批量写入
当写入层级低于对象层级时,函数既可使用单个标量值,也可使用包含多个数据的矩阵。
class Hand:
@typing.overload
def write_<dataname>(self, datatype) -> None: ...
@typing.overload
def write_<dataname>(self, np.NDArray[datatype]) -> None: ...
class Finger:
@typing.overload
def write_<dataname>(self, datatype) -> None: ...
@typing.overload
def write_<dataname>(self, np.NDArray[datatype]) -> None: ...
class Joint:
@typing.overload
def write_<dataname>(self, datatype) -> None: ...
@typing.overload
def write_<dataname>(self, np.NDArray[datatype]) -> None: ...示例
使用 hand 使能整手所有关节(Joint 层):
hand.write_joint_enabled(True)或使能食指,失能其余手指:
hand.write_joint_enabled(
np.array(
[
# J1 J2 J3 J4
[False, False, False, False], # F1
[ True, True, True, True], # F2
[False, False, False, False], # F3
[False, False, False, False], # F4
[False, False, False, False], # F5
],
dtype=bool,
)
)同理可向整手所有关节写入目标控制角度:
hand.write_joint_target_position(0.0)
# Equals to
hand.write_joint_target_position(
np.array(
[
# J1 J2 J3 J4
[0.0, 0.0, 0.0, 0.0], # F1
[0.0, 0.0, 0.0, 0.0], # F2
[0.0, 0.0, 0.0, 0.0], # F3
[0.0, 0.0, 0.0, 0.0], # F4
[0.0, 0.0, 0.0, 0.0], # F5
],
dtype=np.float64,
)
)3.4 非法数值写入
通常,非法数值写入会被自动处理,例如:
- 超出合法范围的角度会被自动限幅至上下界。
但这并非绝对,当数值存在逻辑错误时,例如:
- 将关节的 effort limit 设为非法值(过大或过小)
会导致操作失败,抛出 TimeoutError。
3.5 更多示例
4. 实时控制
本节介绍在 wujihandpy 中以实时模式控制灵巧手的方法。
4.1 接口概述
class Hand:
def realtime_controller(self, enable_upstream: bool, filter: filter.IFilter) -> IController:
...
class IController:
def set_joint_target_position(self, value_array: np.NDArray[np.float64]) -> None:
...
def get_joint_actual_position(self) -> np.NDArray[np.float64]:
...
def get_joint_actual_effort(self) -> np.NDArray[np.float64]:
...
def close(self) -> None:
...调用 Hand.realtime_controller() 可使灵巧手进入实时控制模式,并返回一个实时控制器对象 IController。
进入实时控制模式后,电机侧会启动一个 16 kHz 实时滤波器,将控制量滤波为平滑的 16 kHz 信号。
4.2 滤波器
实时控制器需要一个滤波器对象 (filter.IFilter) 来平滑地调整目标值。
滤波器的初值会自动设置为灵巧手的实际位置,因此无需担心初始过渡问题。
wujihandpy 提供以下常用滤波器:
filter.LowPass(cutoff_freq: float):一阶低通滤波器,截止频率可调。当设置截止频率 >= 1 kHz 时,滤波器被关闭。
由于滤波器逻辑运行于电机驱动板上,不可使用自定义滤波器。
4.3 上下文管理器
IController 可作为上下文管理器使用:
with hand.realtime_controller(
enable_upstream=False,
filter=wujihandpy.filter.LowPass(cutoff_freq=2.0),
) as controller:
... # 使用 controller 执行实时控制当 with 块结束时,会自动调用 controller.close(),确保灵巧手正确退出实时控制模式。
自动析构
若未使用上下文管理器,IController 在析构时也会自动调用 close(),由于 Python 的垃圾回收存在延迟,灵巧手可能不会立即退出实时模式。若不需精确控制退出时机,可直接依赖析构行为。
4.4 实时写入
def set_joint_target_position(self, value_array: np.NDArray[np.float64]) -> None: ...set_joint_target_position() 函数会非阻塞地将目标控制量尽可能快地发送至电机驱动板。
4.5 实时读取
电机以 1 kHz 的频率自动上报当前位置和 effort。以下函数均为非阻塞调用,会立即返回缓存中最新的读取结果。
def get_joint_actual_position(self) -> np.NDArray[np.float64]: ...
def get_joint_actual_effort(self) -> np.NDArray[np.float64]: ...什么是 Effort?
Effort 是电流空间的执行器作用量,经过滤波处理后输出。它不是实际测量的电流值,应将其理解为相对驱动强度。
典型应用场景:
- 碰撞检测:effort 突增表示关节受阻
- 负载监控:可通过
effort / effort_limit计算当前输出百分比
示例
import wujihandpy
import numpy as np
hand = wujihandpy.Hand()
try:
hand.write_joint_enabled(True)
# 进入实时控制模式,使用截止频率 5 Hz 的低通滤波器
with hand.realtime_controller(
enable_upstream=True, filter=wujihandpy.filter.LowPass(cutoff_freq=5.0)
) as controller:
while True:
# 实时写入目标位置(非阻塞)
controller.set_joint_target_position(np.zeros((5, 4)))
# 实时读取位置和 effort(非阻塞)
position = controller.get_joint_actual_position()
effort = controller.get_joint_actual_effort()
print(f"pos: {position[1, :]} effort: {effort[1, :]}")
finally:
hand.write_joint_enabled(False)4.6 进阶示例
5. 异步读/写
本节介绍在 wujihandpy 中以异步模式读/写数据的方法。
5.1 接口概述
读/写接口均提供异步版本,函数名以 _async 为后缀,用于在协程环境中执行非阻塞的 I/O 操作。
async def read_<dataname>_async(self) -> datatype: ...
async def read_<dataname>_async(self) -> np.NDArray[datatype]: ... # 批量读取
async def write_<dataname>_async(self, value: datatype): ...
async def write_<dataname>_async(self, values: np.NDArray[datatype]): ... # 批量写入调用异步接口时需使用 await,在等待期间不会阻塞事件循环。函数返回则表示操作已成功完成。
非实时接口的延迟特性
异步请求同样需要 5~10 ms 才能完成,其本质并不更快,仅是不阻塞事件循环。
与同步接口相比,异步接口的唯一区别在于其允许在等待期间执行其他协程任务。 因此请勿在实时控制环路或高实时性循环中调用此类 API。
对实时控制场景,请考虑:
- 使用 单向/双向实时控制器
- 在旁路低速循环中使用 异步 API
示例
import asyncio
import wujihandpy
async def main():
hand = wujihandpy.Hand()
try:
await hand.write_joint_enabled_async(True)
# 并行执行以下任务:
# - 使食指第 0 关节转至 1.57 rad(约 90° 下压)
# - 使食指第 2 关节转至 1.57 rad(约 90° 下压)
# - 同时等待 0.5 秒
await asyncio.gather(
hand.finger(1).joint(0).write_joint_target_position_async(1.57),
hand.finger(1).joint(2).write_joint_target_position_async(1.57),
asyncio.sleep(0.5),
)
finally:
await hand.write_joint_enabled_async(False)
# 启动事件循环并运行主任务
asyncio.run(main())5.2 进阶示例
6. 多线程操作
本节介绍在 wujihandpy 中进行多线程操作的方法。
6.1 接口概述
默认情况下,wujihandpy 会对 Hand 对象进行线程安全检查,以防止多线程并发访问导致的数据竞争问题。若需在多线程环境中使用同一个 Hand 对象,需先调用 Hand.disable_thread_safe_check() 禁用该检查。
class Hand:
def disable_thread_safe_check(self) -> None:
...6.2 多线程安全注意事项
重要:禁用线程安全检查后,用户需要自行保证线程安全。
- 必须在多线程操作
Hand对象之前调用disable_thread_safe_check() - 用户需要自行使用互斥锁(
threading.Lock)等同步机制保护对Hand对象的并发访问 - 建议使用
with lock:语句块包裹所有对Hand对象及其派生对象(如IController)的操作
示例 以下示例展示了如何在多线程环境中使用实时控制器:
import threading
import time
import wujihandpy
import numpy as np
import math
hand = wujihandpy.Hand()
# 禁用线程安全检查,允许多线程访问
hand.disable_thread_safe_check()
print("Thread safety check disabled, multi-threaded access enabled")
# 创建互斥锁确保线程安全
lock = threading.Lock()
def reader_thread(ctrl):
"""周期性读取关节位置"""
while True:
with lock:
position = ctrl.get_joint_actual_position()
print(f"[Reader] Position:\n{position}")
time.sleep(0.01)
def writer_thread(ctrl):
"""周期性写入目标位置(余弦波运动)"""
x = 0
while True:
y = (1 - math.cos(x)) * 0.8
target = np.array(
[
[0, 0, 0, 0], # F1
[y, 0, y, y], # F2
[y, 0, y, y], # F3
[y, 0, y, y], # F4
[y, 0, y, y], # F5
],
dtype=np.float64,
)
with lock:
ctrl.set_joint_target_position(target)
print(f"[Writer] Target: {y:.4f} rad")
x += math.pi / 100.0
time.sleep(0.01)
hand.write_joint_enabled(True)
with hand.realtime_controller(
enable_upstream=True,
filter=wujihandpy.filter.LowPass(cutoff_freq=5.0)
) as ctrl:
threads = [
threading.Thread(target=reader_thread, args=(ctrl,)),
threading.Thread(target=writer_thread, args=(ctrl,)),
]
for t in threads:
t.start()
for t in threads:
t.join()6.3 预期效果
运行示例后,终端将输出类似以下内容:
Thread safety check disabled, multi-threaded access enabled
[Reader] Position:
[[ 0.012 0.003 -0.005 0.008]
[ 0.015 0.002 0.007 0.011]
[ 0.009 -0.001 0.004 0.006]
[ 0.018 0.005 0.012 0.009]
[ 0.011 0.004 0.008 0.013]]
[Writer] Target: 0.0000 rad
[Reader] Position:
[[ 0.012 0.003 -0.005 0.008]
[ 0.016 0.002 0.008 0.012]
[ 0.010 -0.001 0.005 0.007]
[ 0.019 0.005 0.013 0.010]
[ 0.012 0.004 0.009 0.014]]
[Writer] Target: 0.0004 rad
[Writer] Target: 0.0016 rad
[Reader] Position:
[[ 0.012 0.003 -0.005 0.008]
[ 0.018 0.002 0.010 0.014]
[ 0.012 -0.001 0.007 0.009]
[ 0.021 0.005 0.015 0.012]
[ 0.014 0.004 0.011 0.016]]
...- 首行显示线程安全检查已禁用
[Reader]打印当前关节位置矩阵(5×4,对应 5 个手指各 4 个关节)[Writer]打印当前目标位置(余弦波,从 0 逐渐增大)- 两个线程交替输出,顺序可能交错,这是多线程的正常现象
- F1(拇指)保持不动,F2(食指)、F3(中指)、F4(无名指)、F5(小指)会做周期性弯曲-伸展动作
6.4 进阶示例
7. USB 断连处理
设备运行期间,灵巧手与电脑之间的 USB 连接可能中断,例如数据线被拔出。本节介绍 wujihandpy 在断连时的行为,以及捕获断连异常的方法。
7.1 断连时的行为
USB 连接中断时,wujihandpy 会抛出异常。捕获该异常即可在断连后恢复,进程不会崩溃。
断连后再次发起的操作会立即抛出 Python 内置的 ConnectionError,不再等待 500 ms 超时,涉及接口包括:
- 同步读 / 写(
read_*/write_*) - 异步读 / 写(
read_*_async/write_*_async)
断连瞬间已在等待中的异步请求是个例外:它会以 TimeoutError 结束,而非 ConnectionError。若代码中存在并发的异步操作,建议捕获两者的共同基类 OSError,即可同时覆盖断连与超时。
7.2 捕获断连异常
将设备操作放入 try 块,捕获 ConnectionError 即可处理断连:
import wujihandpy
import time
def main():
hand = wujihandpy.Hand()
print("Reading. Unplug USB to test disconnect handling.\n")
try:
while True:
v = hand.read_input_voltage()
print(f"input_voltage: {v:.2f}V")
time.sleep(0.5)
except ConnectionError as e:
print(f"\nDisconnect detected: {e}")
if __name__ == "__main__":
main()运行脚本后,在读取过程中拔出 USB 线缆,循环会立即被 ConnectionError 中断,程序正常退出而不会崩溃。
7.3 实时控制模式下的断连检测
实时控制器的非阻塞接口(set_joint_target_position()、get_joint_actual_position()、get_joint_actual_effort())在断连时不会抛出 ConnectionError。
实时控制环路依赖 PDO 高速数据,断连时这些接口仅停止更新缓存,不抛出异常。若需在实时控制场景中检测断连,请在旁路低速循环中周期性调用一次同步读取(如 read_input_voltage())作为探测,并捕获 ConnectionError。
实时控制器以上下文管理器使用时,with 块结束会自动调用 close() 完成清理。断连状态下 close() 也会安全返回,无需额外处理。