Best Practices
Error Handling
All SDK operation errors raise WujiException:
from wuji_sdk import SdkManager, WujiException
try:
manager = SdkManager.instance()
glove = manager.auto_connect(device_name="glove")
sub = glove.tactile().subscribe()
frame = sub.recv()
except WujiException as e:
error_msg = str(e)
if "DeviceNotFound" in error_msg:
print("Device not found, check connection")
elif "Disconnected" in error_msg:
print("Device disconnected")
elif "Connection timeout" in error_msg:
print("Connection timed out, check network/IP settings")
elif "Operation timeout" in error_msg:
print("Operation timed out, device not responding, retrying...")
else:
print(f"Error: {error_msg}")Common error types:
| Error | Description |
|---|---|
DeviceNotFound | Specified device not found |
DeviceMismatch | Device type mismatch |
Disconnected | Device has been disconnected |
ConnectionTimeout | Connection timed out |
OperationTimeout | Operation timed out |
PathNotFound | Resource path does not exist |
SchemaMismatch | Data type mismatch |
NoData | No data available |
StreamClosed | Subscription stream has been closed |
Reconnection Strategy
After disconnection, reconnect and resubscribe:
import time
from wuji_sdk import SdkManager, WujiException
manager = SdkManager.instance()
def connect_with_retry(sn, max_retries=5):
for attempt in range(max_retries):
try:
return manager.connect(sn=sn, device_name="glove")
except WujiException:
wait = 2 ** attempt # Exponential backoff
print(f"Connection failed, retrying in {wait}s...")
time.sleep(wait)
raise RuntimeError("Max retries exceeded")Multi-device Coordination
Collect data from multiple devices simultaneously:
import asyncio
from wuji_sdk import SdkManager
async def collect_from_device(glove, name):
sub = glove.tactile().subscribe()
while True:
frame = await sub.recv_async()
print(f"[{name}] max={max(frame.data):.1f}")
async def main():
manager = SdkManager.instance()
devices = manager.scan()
gloves = []
for i, dev in enumerate(devices):
glove = manager.connect(sn=dev.sn, device_name=f"glove_{i}")
gloves.append((glove, f"glove_{i}"))
tasks = [collect_from_device(g, n) for g, n in gloves]
await asyncio.gather(*tasks)
asyncio.run(main())Cross-device Merged Data
In multi-device setups, use global resources to get aggregated data from all devices without manual merging:
manager = SdkManager.instance()
left = manager.connect(sn="WG1JA00XXXXXXXXX", device_name="left")
right = manager.connect(sn="WG1KA00XXXXXXXXX", device_name="right")
# Subscribe to global coordinate transforms — automatically merges left and right hand data
sub = manager.tf_static().subscribe()
transforms = await sub.recv_async()
for tf in transforms.transforms:
print(f"{tf.parent_frame_id} → {tf.child_frame_id}")Log Control
Adjust SDK log level for debugging:
import wuji_sdk
wuji_sdk.set_log_level("debug") # Show debug information
wuji_sdk.set_log_level("trace") # Show all logs
wuji_sdk.set_log_level("info") # Default level
wuji_sdk.set_log_level("warn") # Warnings only
wuji_sdk.set_log_level("error") # Errors only
wuji_sdk.set_log_level("off") # Disable loggingDevice Logging
Device logging access varies by product. See the corresponding device documentation:
Recording Best Practices
Choose the Right Compression
- Use
"lz4"for real-time capture (low latency). Use"zstd"for long-term storage (high compression ratio) - For high-frequency channels (>500 Hz), prefer
"lz4"to avoid compression becoming a bottleneck
Episode Management
Split long capture sessions into multiple episodes for easier playback and analysis:
import asyncio
from wuji_sdk import TopicRecorder
recorder = TopicRecorder(compression="lz4")
recorder.record(glove.tactile().subscribe())
for i in range(5):
handle = await recorder.start(f"./data/episode_{i:03d}.mcap")
await asyncio.sleep(60) # 60 seconds per episode
await handle.stop()Monitor Data Quality
For long recording sessions, enable quality monitoring and handle alerts promptly:
import asyncio
handle = await recorder.start("./data/session.mcap")
async def watch_alerts():
async for alert in handle.subscribe_alerts():
print(f"Warning: {alert.message}")
asyncio.create_task(watch_alerts())Performance Optimization
Reduce Unnecessary Subscriptions
Only subscribe to data streams you actually need to avoid wasting bandwidth and CPU:
# ✓ Subscribe only to needed data
sub_tactile = glove.tactile().subscribe()
# ✗ Don't subscribe to unused data
# sub_imu = glove.imu_palm().subscribe()Close Subscriptions Promptly
Close subscriptions when they're no longer needed:
sub = glove.tactile().subscribe_with_callback(callback=on_data)
# ... done collecting data
sub.close() # Release resources