Data Recording

Real-time sensor data (tactile, IMU, hand tracking, and more) is ephemeral—recording persists it to files for offline analysis, algorithm debugging, or dataset building. Wuji SDK includes a built-in recording engine that synchronously writes multi-channel sensor data to MCAP files. The workflow takes three steps:

  1. Create a recorder — Create a TopicRecorder and choose a compression algorithm
  2. Register channels — Call recorder.record(sub) to register subscription channels, supporting device and global resources in the same session
  3. Start recording — Call await recorder.start(path) to begin recording, which returns a RecordingHandle for control

During recording, pause and resume at any time, or use episode switching to split a continuous capture into separate files. The engine includes built-in quality monitoring that tracks frame drop rate, inter-frame jitter, and sync offset in real time. When recording stops, a recording summary is returned with aggregate statistics.

For runnable code, see the Examples at the bottom of this page.

Compression Options

Choose a compression algorithm with the compression parameter when creating TopicRecorder:

OptionDescription
"lz4"Low-latency compression for real-time use (default)
"zstd"High compression ratio for storage and archiving
"none"No compression, fastest write speed

Recording Global Resources

Cross-device merged global resources (such as coordinate transforms) can also be recorded, mixed with device resources:

recorder.record(manager.tf_static().subscribe())  # Global resource
recorder.record(glove.tactile().subscribe())       # Device resource

Pause and Resume

Pause and resume recording at any time. Device data continues to stream while paused, but nothing is written to the file.

await handle.pause()     # Pause — data is not written
await handle.resume()    # Resume recording

Episode Switching

Call start() multiple times on the same TopicRecorder to switch output files without re-registering channels. Use this to split a continuous capture session into separate recording segments:

# Episode 1
handle1 = await recorder.start("./data/episode_001.mcap")
await asyncio.sleep(10)
await handle1.stop()

# Episode 2 — reuses the same channel configuration
handle2 = await recorder.start("./data/episode_002.mcap")
await asyncio.sleep(10)
await handle2.stop()

Quality Monitoring

Monitor data quality metrics in real time during recording:

async for metrics in handle.subscribe_metrics():
    print(f"Drop rate: {metrics.frame_drop_rate:.4f}")
    print(f"Jitter: {metrics.frame_jitter_us:.1f} us")
    print(f"Sync offset: {metrics.sync_offset_ms:.2f} ms")

Quality Alerts

The recording engine includes a built-in SPC alert mechanism that triggers when quality metrics exceed thresholds for a sustained period:

async for alert in handle.subscribe_alerts():
    print(f"Alert: {alert.message}")
    print(f"  Metric: {alert.metric}, Current: {alert.current_value}, Threshold: {alert.threshold}")

Recording Status

Monitor the runtime state of a recording:

async for status in handle.subscribe_status():
    print(f"State: {status.state}, Frames: {status.frame_count}, Duration: {status.duration_s:.1f}s")

Recording Summary

handle.stop() returns a RecordingSummary with recording statistics:

summary = await handle.stop()
print(f"Total frames: {summary.total_frames}")
print(f"File size: {summary.file_size / 1024 / 1024:.2f} MB")
print(f"Duration: {summary.duration_s:.1f}s")
print(f"Drop rate: {summary.quality.frame_drop_rate:.4f}")
print(f"Sync offset: {summary.quality.avg_sync_offset_ms:.2f} ms")

Examples

Basic Example

Minimal runnable example—create a recorder, register channels, record for 10 seconds, then stop:

import asyncio
import os
from datetime import datetime
from wuji_sdk import SdkManager, TopicRecorder

async def main():
    manager = SdkManager.instance()
    glove = manager.auto_connect(device_name="glove")

    try:
        recorder = TopicRecorder(compression="lz4")
        recorder.record(glove.tactile().subscribe())
        recorder.record(glove.emf_poses().subscribe())
        recorder.record(glove.hand_skeleton().subscribe())

        os.makedirs("./data", exist_ok=True)
        path = f"./data/{datetime.now().strftime('%Y%m%d_%H%M%S')}_session.mcap"
        handle = await recorder.start(path)
        print(f"Recording to {path} ...")
        await asyncio.sleep(10)

        summary = await handle.stop()
        print(f"Done: {summary.total_frames} frames, {summary.file_size / 1024 / 1024:.2f} MB")
    finally:
        glove.disconnect()

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("\nStopped by user")

Expected output:

Recording to ./data/20260317_153912_session.mcap ...
Done: 3609 frames, 3.91 MB

Full Example

Demonstrates pause/resume, quality monitoring, episode switching, and recording summary:

import asyncio
import os
from datetime import datetime
from wuji_sdk import SdkManager, TopicRecorder

async def monitor_quality(handle):
    """Monitor recording quality metrics"""
    try:
        async for metrics in handle.subscribe_metrics():
            if metrics.frame_drop_rate > 0.01:
                print(f"  [Quality] Drop rate: {metrics.frame_drop_rate:.4f}, "
                      f"Jitter: {metrics.frame_jitter_us:.1f}us, "
                      f"Sync offset: {metrics.sync_offset_ms:.2f}ms")
    except asyncio.CancelledError:
        pass

async def main():
    manager = SdkManager.instance()
    os.makedirs("./data", exist_ok=True)
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

    # Episode 1: record 10 seconds with pause/resume demo
    glove = manager.auto_connect(device_name="glove")
    try:
        recorder = TopicRecorder(compression="lz4")
        recorder.record(glove.tactile().subscribe())
        recorder.record(glove.emf_poses().subscribe())
        recorder.record(glove.hand_skeleton().subscribe())

        path1 = f"./data/{timestamp}_episode_001.mcap"
        handle = await recorder.start(path1)
        metrics_task = asyncio.create_task(monitor_quality(handle))

        print(f"Recording: {path1}")
        await asyncio.sleep(5)

        print("Pausing...")
        await handle.pause()
        await asyncio.sleep(2)

        print("Resuming...")
        await handle.resume()
        await asyncio.sleep(5)

        metrics_task.cancel()
        try:
            await metrics_task
        except asyncio.CancelledError:
            pass
        summary = await handle.stop()
        print(f"Episode 1: {summary.total_frames} frames, "
              f"{summary.file_size / 1024 / 1024:.2f} MB, "
              f"{summary.duration_s:.1f}s")

        # Episode 2: switch output file, reuse the same recorder
        path2 = f"./data/{timestamp}_episode_002.mcap"
        handle2 = await recorder.start(path2)
        print(f"\nRecording: {path2}")
        await asyncio.sleep(5)

        summary2 = await handle2.stop()
        print(f"Episode 2: {summary2.total_frames} frames, "
              f"{summary2.file_size / 1024 / 1024:.2f} MB, "
              f"{summary2.duration_s:.1f}s")
    finally:
        glove.disconnect()

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("\nStopped by user")

Expected output:

Recording: ./data/20260317_153943_episode_001.mcap
Pausing...
Resuming...
Episode 1: 3606 frames, 3.91 MB, 12.0s

Recording: ./data/20260317_153943_episode_002.mcap
Episode 2: 1812 frames, 1.97 MB, 5.0s