Data Subscription
Wuji SDK uses a subscription mechanism to receive device data. Each data stream corresponds to a sensor or computed result, accessed through semantic APIs.
Subscription Modes
Async Receive
sub = glove.tactile().subscribe()
frame = await sub.recv_async() # Wait for next data frameSync Non-blocking Receive
sub = glove.tactile().subscribe()
frame = sub.recv() # Returns None when no data availableCallback Receive
def on_data(frame):
print(f"Max: {max(frame.data):.1f}")
sub = glove.tactile().subscribe_with_callback(callback=on_data)
# Receives automatically in background, non-blockingClosing Subscriptions
Callback subscriptions must be manually closed:
sub = glove.tactile().subscribe_with_callback(callback=on_data)
# ... after some time
sub.close()All subscriptions on a device are automatically closed when the device is disconnected.
Cross-device Merged Subscription
Global resources automatically aggregate data from all connected devices. Subscribe to them through the SdkManager-level API. Global resources in the current version:
| Resource | Type | Description |
|---|---|---|
tf_static | FrameTransforms | Static coordinate transforms (e.g., wrist → emf_tx) |
tf | FrameTransforms | Dynamic coordinate transforms (e.g., waist → wrist) |
Query the full list of global resources at runtime with manager.global_topics():
manager = SdkManager.instance()
# List all global resources
for topic in manager.global_topics():
print(f"{topic.path} (sub={topic.can_sub})")
# Subscribe to a global resource — data is merged from all connected devices
sub = manager.tf_static().subscribe()
transforms = await sub.recv_async()C SDK equivalents: wuji_subscribe_tf(...) and wuji_subscribe_tf_static(...) subscribe to the global coordinate transforms (callback signature WujiFrameTransforms, device-independent).
Differences between device resources and global resources:
| Dimension | Device Resource | Global Resource |
|---|---|---|
| Access | glove.tactile().subscribe() | manager.tf_static().subscribe() |
| Data source | Single device | Merged from all connected devices |
| Query API | device.topics() | manager.global_topics() |
Available Data Streams
For device-specific data streams and data structures, see the corresponding device documentation:
- Wuji Glove SDK data reference — Tactile, EMF poses, hand tracking, IMU, coordinate transforms
Wuji Hand Data Subscription
Use wuji_sdk.WujiHand to subscribe to 20-joint state, publish joint commands, and subscribe to the paired tactile glove's data.
Subscribing to Joint State
hand.joint_state().subscribe() returns Subscription[HandJointState] and delivers real-time 20-joint position, with optional velocity and effort:
sub = hand.joint_state().subscribe()
for _ in range(100):
state = await sub.recv_async()
print(f"seq={state.header.seq}, joint0={state.position[0]:+.3f}")
sub.close()Publishing Joint Commands (Realtime Control)
hand.realtime_controller(LowPass(cutoff_hz=...)) opens a realtime control session via a with context, with the lowpass filter suppressing high-frequency jitter. Inside the session, get a PUB handle from hand.joint_command().publisher() and call publisher.send(target) to push the 20-joint target position:
from wuji_sdk import LowPass
with hand.realtime_controller(LowPass(cutoff_hz=5.0)):
publisher = hand.joint_command().publisher()
target = [0.0] * 20
target[0] = 0.3
publisher.send(target)
publisher.close()For continuous control, call publisher.send() at a fixed rate (typically 100 Hz) inside the with block.
Effort Limits
hand.set_all_effort_limit(amps) sets the torque limit on all 20 joints to the same amperage, and hand.get_all_effort_limit() reads back a length-20 list[float].
- Default is 1.5 A, suitable for most applications
- Valid range is 0.0 to 3.5 A — values above 3.5 are clamped to 3.5
hand.set_all_effort_limit(1.5)
limits = hand.get_all_effort_limit()Modifying the effort limit changes the maximum output capability of each joint. The firmware also reduces the effective limit based on joint temperature. For the full reference and risk notes, see Wuji Hand SDK Guide — joint_effort_limit.
Tactile Glove Companion Data
When a tactile glove shares the USB bus and matches the hand's handedness, the SDK auto-pairs it during connect. Check the pairing status and subscribe to pressure frames:
if hand.is_tactile_attached():
tactile_sub = hand.tactile.subscribe_pressure_frame()
frame = await tactile_sub.recv_async()
print(f"max pressure: {max(frame.pressure):.2f}")
tactile_sub.close()