Data Subscription

Wuji SDK uses a subscription mechanism to receive device data. Each data stream corresponds to a sensor or computed result, accessed through semantic APIs.

Subscription Modes

Async Receive

sub = glove.tactile().subscribe()
frame = await sub.recv_async()  # Wait for next data frame

Sync Non-blocking Receive

sub = glove.tactile().subscribe()
frame = sub.recv()  # Returns None when no data available

Callback Receive

def on_data(frame):
    print(f"Max: {max(frame.data):.1f}")

sub = glove.tactile().subscribe_with_callback(callback=on_data)
# Receives automatically in background, non-blocking

Closing Subscriptions

Callback subscriptions must be manually closed:

sub = glove.tactile().subscribe_with_callback(callback=on_data)
# ... after some time
sub.close()

All subscriptions on a device are automatically closed when the device is disconnected.

Cross-device Merged Subscription

Global resources automatically aggregate data from all connected devices. Subscribe to them through the SdkManager-level API. Global resources in the current version:

ResourceTypeDescription
tf_staticFrameTransformsStatic coordinate transforms (e.g., wrist → emf_tx)
tfFrameTransformsDynamic coordinate transforms (e.g., waist → wrist)

Query the full list of global resources at runtime with manager.global_topics():

manager = SdkManager.instance()

# List all global resources
for topic in manager.global_topics():
    print(f"{topic.path} (sub={topic.can_sub})")

# Subscribe to a global resource — data is merged from all connected devices
sub = manager.tf_static().subscribe()
transforms = await sub.recv_async()

C SDK equivalents: wuji_subscribe_tf(...) and wuji_subscribe_tf_static(...) subscribe to the global coordinate transforms (callback signature WujiFrameTransforms, device-independent).

Differences between device resources and global resources:

DimensionDevice ResourceGlobal Resource
Accessglove.tactile().subscribe()manager.tf_static().subscribe()
Data sourceSingle deviceMerged from all connected devices
Query APIdevice.topics()manager.global_topics()

Available Data Streams

For device-specific data streams and data structures, see the corresponding device documentation:

Wuji Hand Data Subscription

Use wuji_sdk.WujiHand to subscribe to 20-joint state, publish joint commands, and subscribe to the paired tactile glove's data.

Subscribing to Joint State

hand.joint_state().subscribe() returns Subscription[HandJointState] and delivers real-time 20-joint position, with optional velocity and effort:

sub = hand.joint_state().subscribe()
for _ in range(100):
    state = await sub.recv_async()
    print(f"seq={state.header.seq}, joint0={state.position[0]:+.3f}")
sub.close()

Publishing Joint Commands (Realtime Control)

hand.realtime_controller(LowPass(cutoff_hz=...)) opens a realtime control session via a with context, with the lowpass filter suppressing high-frequency jitter. Inside the session, get a PUB handle from hand.joint_command().publisher() and call publisher.send(target) to push the 20-joint target position:

from wuji_sdk import LowPass

with hand.realtime_controller(LowPass(cutoff_hz=5.0)):
    publisher = hand.joint_command().publisher()
    target = [0.0] * 20
    target[0] = 0.3
    publisher.send(target)
    publisher.close()

For continuous control, call publisher.send() at a fixed rate (typically 100 Hz) inside the with block.

Effort Limits

hand.set_all_effort_limit(amps) sets the torque limit on all 20 joints to the same amperage, and hand.get_all_effort_limit() reads back a length-20 list[float].

  • Default is 1.5 A, suitable for most applications
  • Valid range is 0.0 to 3.5 A — values above 3.5 are clamped to 3.5
hand.set_all_effort_limit(1.5)
limits = hand.get_all_effort_limit()

Modifying the effort limit changes the maximum output capability of each joint. The firmware also reduces the effective limit based on joint temperature. For the full reference and risk notes, see Wuji Hand SDK Guide — joint_effort_limit.

Tactile Glove Companion Data

When a tactile glove shares the USB bus and matches the hand's handedness, the SDK auto-pairs it during connect. Check the pairing status and subscribe to pressure frames:

if hand.is_tactile_attached():
    tactile_sub = hand.tactile.subscribe_pressure_frame()
    frame = await tactile_sub.recv_async()
    print(f"max pressure: {max(frame.pressure):.2f}")
    tactile_sub.close()