Best Practices

Error Handling

All SDK operation errors raise WujiException:

from wuji_sdk import SdkManager, WujiException

try:
    manager = SdkManager.instance()
    glove = manager.auto_connect(device_name="glove")
    sub = glove.tactile().subscribe()
    frame = sub.recv()
except WujiException as e:
    error_msg = str(e)
    if "DeviceNotFound" in error_msg:
        print("Device not found, check connection")
    elif "Disconnected" in error_msg:
        print("Device disconnected")
    elif "Connection timeout" in error_msg:
        print("Connection timed out, check network/IP settings")
    elif "Operation timeout" in error_msg:
        print("Operation timed out, device not responding, retrying...")
    else:
        print(f"Error: {error_msg}")

Common error types:

ErrorDescription
DeviceNotFoundSpecified device not found
DeviceMismatchDevice type mismatch
DisconnectedDevice has been disconnected
ConnectionTimeoutConnection timed out
OperationTimeoutOperation timed out
PathNotFoundResource path does not exist
SchemaMismatchData type mismatch
NoDataNo data available
StreamClosedSubscription stream has been closed

Reconnection Strategy

After disconnection, reconnect and resubscribe:

import time
from wuji_sdk import SdkManager, WujiException

manager = SdkManager.instance()

def connect_with_retry(sn, max_retries=5):
    for attempt in range(max_retries):
        try:
            return manager.connect(sn=sn, device_name="glove")
        except WujiException:
            wait = 2 ** attempt  # Exponential backoff
            print(f"Connection failed, retrying in {wait}s...")
            time.sleep(wait)
    raise RuntimeError("Max retries exceeded")

Multi-device Coordination

Collect data from multiple devices simultaneously:

import asyncio
from wuji_sdk import SdkManager

async def collect_from_device(glove, name):
    sub = glove.tactile().subscribe()
    while True:
        frame = await sub.recv_async()
        print(f"[{name}] max={max(frame.data):.1f}")

async def main():
    manager = SdkManager.instance()
    devices = manager.scan()

    gloves = []
    for i, dev in enumerate(devices):
        glove = manager.connect(sn=dev.sn, device_name=f"glove_{i}")
        gloves.append((glove, f"glove_{i}"))

    tasks = [collect_from_device(g, n) for g, n in gloves]
    await asyncio.gather(*tasks)

asyncio.run(main())

Cross-device Merged Data

In multi-device setups, use global resources to get aggregated data from all devices without manual merging:

manager = SdkManager.instance()
left = manager.connect(sn="WG1JA00XXXXXXXXX", device_name="left")
right = manager.connect(sn="WG1KA00XXXXXXXXX", device_name="right")

# Subscribe to global coordinate transforms — automatically merges left and right hand data
sub = manager.tf_static().subscribe()
transforms = await sub.recv_async()
for tf in transforms.transforms:
    print(f"{tf.parent_frame_id}{tf.child_frame_id}")

Log Control

Adjust SDK log level for debugging:

import wuji_sdk

wuji_sdk.set_log_level("debug")   # Show debug information
wuji_sdk.set_log_level("trace")   # Show all logs
wuji_sdk.set_log_level("info")    # Default level
wuji_sdk.set_log_level("warn")    # Warnings only
wuji_sdk.set_log_level("error")   # Errors only
wuji_sdk.set_log_level("off")     # Disable logging

Device Logging

Device logging access varies by product. See the corresponding device documentation:

Recording Best Practices

Choose the Right Compression

  • Use "lz4" for real-time capture (low latency). Use "zstd" for long-term storage (high compression ratio)
  • For high-frequency channels (>500 Hz), prefer "lz4" to avoid compression becoming a bottleneck

Episode Management

Split long capture sessions into multiple episodes for easier playback and analysis:

import asyncio
from wuji_sdk import TopicRecorder

recorder = TopicRecorder(compression="lz4")
recorder.record(glove.tactile().subscribe())

for i in range(5):
    handle = await recorder.start(f"./data/episode_{i:03d}.mcap")
    await asyncio.sleep(60)  # 60 seconds per episode
    await handle.stop()

Monitor Data Quality

For long recording sessions, enable quality monitoring and handle alerts promptly:

import asyncio

handle = await recorder.start("./data/session.mcap")

async def watch_alerts():
    async for alert in handle.subscribe_alerts():
        print(f"Warning: {alert.message}")

asyncio.create_task(watch_alerts())

Performance Optimization

Reduce Unnecessary Subscriptions

Only subscribe to data streams you actually need to avoid wasting bandwidth and CPU:

# ✓ Subscribe only to needed data
sub_tactile = glove.tactile().subscribe()

# ✗ Don't subscribe to unused data
# sub_imu = glove.imu_palm().subscribe()

Close Subscriptions Promptly

Close subscriptions when they're no longer needed:

sub = glove.tactile().subscribe_with_callback(callback=on_data)
# ... done collecting data
sub.close()  # Release resources