Data Recording
Real-time sensor data (tactile, IMU, hand tracking, and more) is ephemeral—recording persists it to files for offline analysis, algorithm debugging, or dataset building. Wuji SDK includes a built-in recording engine that synchronously writes multi-channel sensor data to MCAP files. The workflow takes three steps:
- Create a recorder — Create a
TopicRecorderand choose a compression algorithm - Register channels — Call
recorder.record(sub)to register subscription channels, supporting device and global resources in the same session - Start recording — Call
await recorder.start(path)to begin recording, which returns aRecordingHandlefor control
During recording, pause and resume at any time, or use episode switching to split a continuous capture into separate files. The engine includes built-in quality monitoring that tracks frame drop rate, inter-frame jitter, and sync offset in real time. When recording stops, a recording summary is returned with aggregate statistics.
For runnable code, see the Examples at the bottom of this page.
Compression Options
Choose a compression algorithm with the compression parameter when creating TopicRecorder:
| Option | Description |
|---|---|
"lz4" | Low-latency compression for real-time use (default) |
"zstd" | High compression ratio for storage and archiving |
"none" | No compression, fastest write speed |
Recording Global Resources
Cross-device merged global resources (such as coordinate transforms) can also be recorded, mixed with device resources:
recorder.record(manager.tf_static().subscribe()) # Global resource
recorder.record(glove.tactile().subscribe()) # Device resourcePause and Resume
Pause and resume recording at any time. Device data continues to stream while paused, but nothing is written to the file.
await handle.pause() # Pause — data is not written
await handle.resume() # Resume recordingEpisode Switching
Call start() multiple times on the same TopicRecorder to switch output files without re-registering channels. Use this to split a continuous capture session into separate recording segments:
# Episode 1
handle1 = await recorder.start("./data/episode_001.mcap")
await asyncio.sleep(10)
await handle1.stop()
# Episode 2 — reuses the same channel configuration
handle2 = await recorder.start("./data/episode_002.mcap")
await asyncio.sleep(10)
await handle2.stop()Quality Monitoring
Monitor data quality metrics in real time during recording:
async for metrics in handle.subscribe_metrics():
print(f"Drop rate: {metrics.frame_drop_rate:.4f}")
print(f"Jitter: {metrics.frame_jitter_us:.1f} us")
print(f"Sync offset: {metrics.sync_offset_ms:.2f} ms")Quality Alerts
The recording engine includes a built-in SPC alert mechanism that triggers when quality metrics exceed thresholds for a sustained period:
async for alert in handle.subscribe_alerts():
print(f"Alert: {alert.message}")
print(f" Metric: {alert.metric}, Current: {alert.current_value}, Threshold: {alert.threshold}")Recording Status
Monitor the runtime state of a recording:
async for status in handle.subscribe_status():
print(f"State: {status.state}, Frames: {status.frame_count}, Duration: {status.duration_s:.1f}s")Recording Summary
handle.stop() returns a RecordingSummary with recording statistics:
summary = await handle.stop()
print(f"Total frames: {summary.total_frames}")
print(f"File size: {summary.file_size / 1024 / 1024:.2f} MB")
print(f"Duration: {summary.duration_s:.1f}s")
print(f"Drop rate: {summary.quality.frame_drop_rate:.4f}")
print(f"Sync offset: {summary.quality.avg_sync_offset_ms:.2f} ms")Examples
Basic Example
Minimal runnable example—create a recorder, register channels, record for 10 seconds, then stop:
import asyncio
import os
from datetime import datetime
from wuji_sdk import SdkManager, TopicRecorder
async def main():
manager = SdkManager.instance()
glove = manager.auto_connect(device_name="glove")
try:
recorder = TopicRecorder(compression="lz4")
recorder.record(glove.tactile().subscribe())
recorder.record(glove.emf_poses().subscribe())
recorder.record(glove.hand_skeleton().subscribe())
os.makedirs("./data", exist_ok=True)
path = f"./data/{datetime.now().strftime('%Y%m%d_%H%M%S')}_session.mcap"
handle = await recorder.start(path)
print(f"Recording to {path} ...")
await asyncio.sleep(10)
summary = await handle.stop()
print(f"Done: {summary.total_frames} frames, {summary.file_size / 1024 / 1024:.2f} MB")
finally:
glove.disconnect()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nStopped by user")Expected output:
Recording to ./data/20260317_153912_session.mcap ...
Done: 3609 frames, 3.91 MBFull Example
Demonstrates pause/resume, quality monitoring, episode switching, and recording summary:
import asyncio
import os
from datetime import datetime
from wuji_sdk import SdkManager, TopicRecorder
async def monitor_quality(handle):
"""Monitor recording quality metrics"""
try:
async for metrics in handle.subscribe_metrics():
if metrics.frame_drop_rate > 0.01:
print(f" [Quality] Drop rate: {metrics.frame_drop_rate:.4f}, "
f"Jitter: {metrics.frame_jitter_us:.1f}us, "
f"Sync offset: {metrics.sync_offset_ms:.2f}ms")
except asyncio.CancelledError:
pass
async def main():
manager = SdkManager.instance()
os.makedirs("./data", exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# Episode 1: record 10 seconds with pause/resume demo
glove = manager.auto_connect(device_name="glove")
try:
recorder = TopicRecorder(compression="lz4")
recorder.record(glove.tactile().subscribe())
recorder.record(glove.emf_poses().subscribe())
recorder.record(glove.hand_skeleton().subscribe())
path1 = f"./data/{timestamp}_episode_001.mcap"
handle = await recorder.start(path1)
metrics_task = asyncio.create_task(monitor_quality(handle))
print(f"Recording: {path1}")
await asyncio.sleep(5)
print("Pausing...")
await handle.pause()
await asyncio.sleep(2)
print("Resuming...")
await handle.resume()
await asyncio.sleep(5)
metrics_task.cancel()
try:
await metrics_task
except asyncio.CancelledError:
pass
summary = await handle.stop()
print(f"Episode 1: {summary.total_frames} frames, "
f"{summary.file_size / 1024 / 1024:.2f} MB, "
f"{summary.duration_s:.1f}s")
# Episode 2: switch output file, reuse the same recorder
path2 = f"./data/{timestamp}_episode_002.mcap"
handle2 = await recorder.start(path2)
print(f"\nRecording: {path2}")
await asyncio.sleep(5)
summary2 = await handle2.stop()
print(f"Episode 2: {summary2.total_frames} frames, "
f"{summary2.file_size / 1024 / 1024:.2f} MB, "
f"{summary2.duration_s:.1f}s")
finally:
glove.disconnect()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nStopped by user")Expected output:
Recording: ./data/20260317_153943_episode_001.mcap
Pausing...
Resuming...
Episode 1: 3606 frames, 3.91 MB, 12.0s
Recording: ./data/20260317_153943_episode_002.mcap
Episode 2: 1812 frames, 1.97 MB, 5.0s