SDK 使用说明

教程

1. 设备连接与筛选

本节介绍如何在 wujihandpy 中连接灵巧手设备,以及在存在多台设备时进行筛选的方法。

1.1 接口概述

class Hand:
    def __init__(
        self,
        serial_number: str | None = None,
        *,
        side: str | None = None,
        ...,
    ) -> None:
        ...

1.2 连接设备

调用 wujihandpy.Hand() 可创建一个 Hand 对象并连接灵巧手:

hand = wujihandpy.Hand()

若系统中仅连接一台灵巧手,则会自动连接该设备。

1.3 筛选设备

当系统中连接了多台灵巧手时,使用无参构造函数将导致错误。例如:

[ERROR] 2 devices found with specified vendor id (0x0483)
[ERROR]   Device 1 (0483:2000): Serial Number = 337238793233 <-- Matched #1
[ERROR]   Device 2 (0483:2000): Serial Number = 337238873233 <-- Matched #2
[ERROR] To ensure correct device selection, please specify the Serial Number
Traceback (most recent call last):
  File "/workspaces/quickstart.py", line 5, in <module>
    hand = wujihandpy.Hand()
           ^^^^^^^^^^^^^^^^^
RuntimeError: Failed to init.

为解决该问题,有两种方式指定要连接的设备。

按序列号指定

通过 serial_number 参数明确指定设备:

hand = wujihandpy.Hand(serial_number="337238793233")

这样可确保程序始终连接到正确的设备实例。

按左右手指定

当系统中恰好连接一只左手和一只右手时,可直接用 side 参数选择,无需记忆序列号:

left = wujihandpy.Hand(side="left")
right = wujihandpy.Hand(side="right")

SDK 会自动枚举所有匹配设备、读取每只手的手性标识,再连接对应一只。

sideserial_number 互斥

两者只能选一个传入。同时指定会抛 ValueErrorside 仅接受字符串 "left""right",其他取值会抛 ValueError

匹配失败的常见情形:

  • 没插对应侧的手 → 抛 ConnectionError,提示 No <side> hand found
  • 同侧插了两只 → 抛 ConnectionError,提示改用 serial_number

1.4 查询序列号

灵巧手有两种序列号:

  • USB 序列号:USB 硬件描述符中的标识符,用于连接时区分多台设备。构造函数的 serial_number 参数使用的是此序列号。
  • 产品序列号:固件中存储的产品追踪编号,用于售后和设备管理。

查询 USB 序列号

在命令行中执行以下命令,查看系统中连接的所有灵巧手:

lsusb -v -d 0483:2000 | grep iSerial

输出示例:

  iSerial                 3 337238793233
                            ^^^^^^^^^^^^  <-- USB Serial Number

标注的数字即为 USB 序列号,可直接传递给构造函数的 serial_number 参数。

查询产品序列号

连接设备后,调用 get_product_sn() 获取产品序列号:

hand = wujihandpy.Hand()
product_sn = hand.get_product_sn()
print(product_sn)  # e.g. "WJ-DH5-R-2501001"

2. 读取数据

本节介绍如何以同步模式读取数据。

2.1 接口概述

调用 read_* 系列函数会向灵巧手发送同步读请求,并阻塞当前线程直到设备返回结果。函数返回则表示读取已成功完成。

非实时接口的延迟特性

每次同步请求会导致约 5~10 ms 的阻塞,适用于非实时任务,请勿在实时控制环路或高实时性循环中使用此类 API。

对实时控制场景,请考虑:

2.2 单值读取

当读取层级与对象层级一致时,函数返回单个标量值。

class Hand:
    def read_<dataname>(self) -> datatype: ...

class Finger:
    def read_<dataname>(self) -> datatype: ...

class Joint:
    def read_<dataname>(self) -> datatype: ...

示例 使用 hand 读取灵巧手的上电运行时间(Hand 层):

time = hand.read_system_time()

使用 joint 读取食指近端关节的当前位置(Joint 层):

position = hand.finger(1).joint(0).read_joint_actual_position()

2.3 批量读取

当读取层级低于对象层级时,函数返回包含多个数据的矩阵。

class Hand:
    def read_<dataname>(self) -> np.NDArray[datatype]: ...

class Finger:
    def read_<dataname>(self) -> np.NDArray[datatype]: ...

class Joint:
    def read_<dataname>(self) -> np.NDArray[datatype]: ...

示例 使用 hand 请求整手所有关节(共 20 个)的当前位置数据(Joint 层):

positions = hand.read_joint_actual_position()

函数返回一个 np.NDArray[np.float64],每行对应一个手指、每列对应一个关节:

>>> print(positions)
[[ 0.975  0.523  0.271 -0.450]
 [ 0.382  0.241 -0.003 -0.275]
 [-0.299  0.329  0.067 -0.286]
 [-0.122  0.228  0.315 -0.178]
 [ 0.205  0.087  0.288 -0.149]]

同样,可使用 finger 请求单个手指的所有关节位置:

>>> print(hand.finger(1).read_joint_actual_position())
[ 0.382  0.241 -0.003 -0.275]

2.4 更多示例

3. 写入数据

本节介绍如何以同步模式写入数据。

3.1 接口概述

调用 write_* 系列函数会向灵巧手发送同步写请求,并阻塞当前线程直到设备返回结果。函数返回则表示写入已成功完成。

非实时接口的延迟特性

每次同步请求会导致约 5~10 ms 的阻塞,适用于非实时任务,请勿在实时控制环路或高实时性循环中使用此类 API。

对实时控制场景,请考虑:

3.2 单值写入

当写入层级与对象层级一致时,函数使用单个标量值。

class Hand:
    def write_<dataname>(self, datatype) -> None: ...

class Finger:
    def write_<dataname>(self, datatype) -> None: ...

class Joint:
    def write_<dataname>(self, datatype) -> None: ...

示例 使能食指近端关节(Joint 层):

hand.finger(1).joint(0).write_joint_enabled(True)

3.3 批量写入

当写入层级低于对象层级时,函数既可使用单个标量值,也可使用包含多个数据的矩阵。

class Hand:
    @typing.overload
    def write_<dataname>(self, datatype) -> None: ...
    @typing.overload
    def write_<dataname>(self, np.NDArray[datatype]) -> None: ...

class Finger:
    @typing.overload
    def write_<dataname>(self, datatype) -> None: ...
    @typing.overload
    def write_<dataname>(self, np.NDArray[datatype]) -> None: ...

class Joint:
    @typing.overload
    def write_<dataname>(self, datatype) -> None: ...
    @typing.overload
    def write_<dataname>(self, np.NDArray[datatype]) -> None: ...

示例 使用 hand 使能整手所有关节(Joint 层):

hand.write_joint_enabled(True)

或使能食指,失能其余手指:

hand.write_joint_enabled(
    np.array(
        [
            #  J1     J2     J3     J4
            [False, False, False, False],  # F1
            [ True,  True,  True,  True],  # F2
            [False, False, False, False],  # F3
            [False, False, False, False],  # F4
            [False, False, False, False],  # F5
        ],
        dtype=bool,
    )
)

同理可向整手所有关节写入目标控制角度:

hand.write_joint_target_position(0.0)
# Equals to
hand.write_joint_target_position(
    np.array(
        [
            # J1   J2   J3   J4
            [0.0, 0.0, 0.0, 0.0],  # F1
            [0.0, 0.0, 0.0, 0.0],  # F2
            [0.0, 0.0, 0.0, 0.0],  # F3
            [0.0, 0.0, 0.0, 0.0],  # F4
            [0.0, 0.0, 0.0, 0.0],  # F5
        ],
        dtype=np.float64,
    )
)

3.4 非法数值写入

通常,非法数值写入会被自动处理,例如:

  • 超出合法范围的角度会被自动限幅至上下界。

但这并非绝对,当数值存在逻辑错误时,例如:

  • 将关节的 effort limit 设为非法值(过大或过小)

会导致操作失败,抛出 TimeoutError。

3.5 更多示例

4. 实时控制

本节介绍在 wujihandpy 中以实时模式控制灵巧手的方法。

4.1 接口概述

class Hand:
    def realtime_controller(self, enable_upstream: bool, filter: filter.IFilter) -> IController:
        ...

class IController:
    def set_joint_target_position(self, value_array: np.NDArray[np.float64]) -> None:
        ...
    def get_joint_actual_position(self) -> np.NDArray[np.float64]:
        ...
    def get_joint_actual_effort(self) -> np.NDArray[np.float64]:
        ...
    def close(self) -> None:
        ...

调用 Hand.realtime_controller() 可使灵巧手进入实时控制模式,并返回一个实时控制器对象 IController。 进入实时控制模式后,电机侧会启动一个 16 kHz 实时滤波器,将控制量滤波为平滑的 16 kHz 信号。

4.2 滤波器

实时控制器需要一个滤波器对象 (filter.IFilter) 来平滑地调整目标值。

滤波器的初值会自动设置为灵巧手的实际位置,因此无需担心初始过渡问题。

wujihandpy 提供以下常用滤波器:

  • filter.LowPass(cutoff_freq: float):一阶低通滤波器,截止频率可调。当设置截止频率 >= 1 kHz 时,滤波器被关闭。

由于滤波器逻辑运行于电机驱动板上,不可使用自定义滤波器。

4.3 上下文管理器

IController 可作为上下文管理器使用:

with hand.realtime_controller(
    enable_upstream=False,
    filter=wujihandpy.filter.LowPass(cutoff_freq=2.0),
) as controller:
    ...  # 使用 controller 执行实时控制

with 块结束时,会自动调用 controller.close(),确保灵巧手正确退出实时控制模式。

自动析构

若未使用上下文管理器,IController 在析构时也会自动调用 close(),由于 Python 的垃圾回收存在延迟,灵巧手可能不会立即退出实时模式。若不需精确控制退出时机,可直接依赖析构行为。

4.4 实时写入

def set_joint_target_position(self, value_array: np.NDArray[np.float64]) -> None: ...

set_joint_target_position() 函数会非阻塞地将目标控制量尽可能快地发送至电机驱动板。

4.5 实时读取

电机以 1 kHz 的频率自动上报当前位置和 effort。以下函数均为非阻塞调用,会立即返回缓存中最新的读取结果。

def get_joint_actual_position(self) -> np.NDArray[np.float64]: ...
def get_joint_actual_effort(self) -> np.NDArray[np.float64]: ...

什么是 Effort?

Effort 是电流空间的执行器作用量,经过滤波处理后输出。它不是实际测量的电流值,应将其理解为相对驱动强度。

典型应用场景:

  • 碰撞检测:effort 突增表示关节受阻
  • 负载监控:可通过 effort / effort_limit 计算当前输出百分比

示例

import wujihandpy
import numpy as np

hand = wujihandpy.Hand()
try:
    hand.write_joint_enabled(True)
    # 进入实时控制模式,使用截止频率 5 Hz 的低通滤波器
    with hand.realtime_controller(
        enable_upstream=True, filter=wujihandpy.filter.LowPass(cutoff_freq=5.0)
    ) as controller:
        while True:
            # 实时写入目标位置(非阻塞)
            controller.set_joint_target_position(np.zeros((5, 4)))
            # 实时读取位置和 effort(非阻塞)
            position = controller.get_joint_actual_position()
            effort = controller.get_joint_actual_effort()
            print(f"pos: {position[1, :]}  effort: {effort[1, :]}")
finally:
    hand.write_joint_enabled(False)

4.6 进阶示例

5. 异步读/写

本节介绍在 wujihandpy 中以异步模式读/写数据的方法。

5.1 接口概述

读/写接口均提供异步版本,函数名以 _async 为后缀,用于在协程环境中执行非阻塞的 I/O 操作。

async def read_<dataname>_async(self) -> datatype: ...
async def read_<dataname>_async(self) -> np.NDArray[datatype]: ...  # 批量读取

async def write_<dataname>_async(self, value: datatype): ...
async def write_<dataname>_async(self, values: np.NDArray[datatype]): ...  # 批量写入

调用异步接口时需使用 await,在等待期间不会阻塞事件循环。函数返回则表示操作已成功完成。

非实时接口的延迟特性

异步请求同样需要 5~10 ms 才能完成,其本质并不更快,仅是不阻塞事件循环。

与同步接口相比,异步接口的唯一区别在于其允许在等待期间执行其他协程任务。 因此请勿在实时控制环路或高实时性循环中调用此类 API。

对实时控制场景,请考虑:

示例

import asyncio
import wujihandpy

async def main():
    hand = wujihandpy.Hand()
    try:
        await hand.write_joint_enabled_async(True)
        # 并行执行以下任务:
        # - 使食指第 0 关节转至 1.57 rad(约 90° 下压)
        # - 使食指第 2 关节转至 1.57 rad(约 90° 下压)
        # - 同时等待 0.5 秒
        await asyncio.gather(
            hand.finger(1).joint(0).write_joint_target_position_async(1.57),
            hand.finger(1).joint(2).write_joint_target_position_async(1.57),
            asyncio.sleep(0.5),
        )
    finally:
        await hand.write_joint_enabled_async(False)

# 启动事件循环并运行主任务
asyncio.run(main())

5.2 进阶示例

6. 多线程操作

本节介绍在 wujihandpy 中进行多线程操作的方法。

6.1 接口概述

默认情况下,wujihandpy 会对 Hand 对象进行线程安全检查,以防止多线程并发访问导致的数据竞争问题。若需在多线程环境中使用同一个 Hand 对象,需先调用 Hand.disable_thread_safe_check() 禁用该检查。

class Hand:
    def disable_thread_safe_check(self) -> None:
        ...

6.2 多线程安全注意事项

重要:禁用线程安全检查后,用户需要自行保证线程安全。

  • 必须在多线程操作 Hand 对象之前调用 disable_thread_safe_check()
  • 用户需要自行使用互斥锁(threading.Lock)等同步机制保护对 Hand 对象的并发访问
  • 建议使用 with lock: 语句块包裹所有对 Hand 对象及其派生对象(如 IController)的操作

示例 以下示例展示了如何在多线程环境中使用实时控制器:

import threading
import time
import wujihandpy
import numpy as np
import math

hand = wujihandpy.Hand()

# 禁用线程安全检查,允许多线程访问
hand.disable_thread_safe_check()
print("Thread safety check disabled, multi-threaded access enabled")

# 创建互斥锁确保线程安全
lock = threading.Lock()

def reader_thread(ctrl):
    """周期性读取关节位置"""
    while True:
        with lock:
            position = ctrl.get_joint_actual_position()
        print(f"[Reader] Position:\n{position}")
        time.sleep(0.01)

def writer_thread(ctrl):
    """周期性写入目标位置(余弦波运动)"""
    x = 0
    while True:
        y = (1 - math.cos(x)) * 0.8
        target = np.array(
            [
                [0, 0, 0, 0],  # F1
                [y, 0, y, y],  # F2
                [y, 0, y, y],  # F3
                [y, 0, y, y],  # F4
                [y, 0, y, y],  # F5
            ],
            dtype=np.float64,
        )
        with lock:
            ctrl.set_joint_target_position(target)
        print(f"[Writer] Target: {y:.4f} rad")
        x += math.pi / 100.0
        time.sleep(0.01)

hand.write_joint_enabled(True)
with hand.realtime_controller(
    enable_upstream=True,
    filter=wujihandpy.filter.LowPass(cutoff_freq=5.0)
) as ctrl:
    threads = [
        threading.Thread(target=reader_thread, args=(ctrl,)),
        threading.Thread(target=writer_thread, args=(ctrl,)),
    ]
    for t in threads:
        t.start()
    for t in threads:
        t.join()

6.3 预期效果

运行示例后,终端将输出类似以下内容:

Thread safety check disabled, multi-threaded access enabled
[Reader] Position:
[[ 0.012  0.003 -0.005  0.008]
 [ 0.015  0.002  0.007  0.011]
 [ 0.009 -0.001  0.004  0.006]
 [ 0.018  0.005  0.012  0.009]
 [ 0.011  0.004  0.008  0.013]]
[Writer] Target: 0.0000 rad
[Reader] Position:
[[ 0.012  0.003 -0.005  0.008]
 [ 0.016  0.002  0.008  0.012]
 [ 0.010 -0.001  0.005  0.007]
 [ 0.019  0.005  0.013  0.010]
 [ 0.012  0.004  0.009  0.014]]
[Writer] Target: 0.0004 rad
[Writer] Target: 0.0016 rad
[Reader] Position:
[[ 0.012  0.003 -0.005  0.008]
 [ 0.018  0.002  0.010  0.014]
 [ 0.012 -0.001  0.007  0.009]
 [ 0.021  0.005  0.015  0.012]
 [ 0.014  0.004  0.011  0.016]]
...
  • 首行显示线程安全检查已禁用
  • [Reader] 打印当前关节位置矩阵(5×4,对应 5 个手指各 4 个关节)
  • [Writer] 打印当前目标位置(余弦波,从 0 逐渐增大)
  • 两个线程交替输出,顺序可能交错,这是多线程的正常现象
  • F1(拇指)保持不动,F2(食指)、F3(中指)、F4(无名指)、F5(小指)会做周期性弯曲-伸展动作

6.4 进阶示例

7. USB 断连处理

设备运行期间,灵巧手与电脑之间的 USB 连接可能中断,例如数据线被拔出。本节介绍 wujihandpy 在断连时的行为,以及捕获断连异常的方法。

7.1 断连时的行为

USB 连接中断时,wujihandpy 会抛出异常。捕获该异常即可在断连后恢复,进程不会崩溃。

断连后再次发起的操作会立即抛出 Python 内置的 ConnectionError,不再等待 500 ms 超时,涉及接口包括:

  • 同步读 / 写(read_* / write_*
  • 异步读 / 写(read_*_async / write_*_async

断连瞬间已在等待中的异步请求是个例外:它会以 TimeoutError 结束,而非 ConnectionError。若代码中存在并发的异步操作,建议捕获两者的共同基类 OSError,即可同时覆盖断连与超时。

7.2 捕获断连异常

将设备操作放入 try 块,捕获 ConnectionError 即可处理断连:

import wujihandpy
import time


def main():
    hand = wujihandpy.Hand()
    print("Reading. Unplug USB to test disconnect handling.\n")

    try:
        while True:
            v = hand.read_input_voltage()
            print(f"input_voltage: {v:.2f}V")
            time.sleep(0.5)
    except ConnectionError as e:
        print(f"\nDisconnect detected: {e}")


if __name__ == "__main__":
    main()

运行脚本后,在读取过程中拔出 USB 线缆,循环会立即被 ConnectionError 中断,程序正常退出而不会崩溃。

7.3 实时控制模式下的断连检测

实时控制器的非阻塞接口(set_joint_target_position()get_joint_actual_position()get_joint_actual_effort())在断连时不会抛出 ConnectionError

实时控制环路依赖 PDO 高速数据,断连时这些接口仅停止更新缓存,不抛出异常。若需在实时控制场景中检测断连,请在旁路低速循环中周期性调用一次同步读取(如 read_input_voltage())作为探测,并捕获 ConnectionError

实时控制器以上下文管理器使用时,with 块结束会自动调用 close() 完成清理。断连状态下 close() 也会安全返回,无需额外处理。

7.4 更多示例