数据录制

实时采集的传感器数据(触觉、IMU、手部追踪等)转瞬即逝——录制功能将这些数据持久化到文件,供后续离线分析、算法调试或数据集构建使用。Wuji SDK 内置录制引擎,将多通道传感器数据同步写入 MCAP 格式文件。整体流程分三步:

  1. 创建录制器 — 用 TopicRecorder 创建录制器,选择压缩算法
  2. 注册通道 — 调用 recorder.record(sub) 注册要录制的订阅通道,支持设备资源和全局资源混合注册
  3. 开始录制 — 调用 await recorder.start(path) 启动录制,返回 RecordingHandle 控制句柄

录制过程中可随时暂停和恢复,也可通过分集切换将连续采集拆分为多个文件。录制引擎内置质量监控,实时跟踪丢帧率、帧间抖动和同步偏差等指标。录制结束后返回录制摘要,汇总统计信息。

如需直接运行代码,见页面底部示例

压缩选项

创建 TopicRecorder 时通过 compression 参数选择压缩算法:

选项说明
"lz4"低延迟压缩,适合实时场景(默认)
"zstd"高压缩比,适合存储归档
"none"不压缩,写入速度最快

录制全局资源

跨设备合并的全局资源(如坐标变换)也可以录制,与设备资源混合注册:

recorder.record(manager.tf_static().subscribe())  # 全局资源
recorder.record(glove.tactile().subscribe())       # 设备资源

暂停与恢复

录制过程中可随时暂停和恢复。暂停期间设备数据仍在订阅,但不会写入文件。

await handle.pause()     # 暂停——数据不会写入
await handle.resume()    # 恢复录制

分集切换

同一个 TopicRecorder 可多次调用 start() 切换输出文件,无需重新注册通道。适合将连续采集拆分为多个独立录制片段:

# Episode 1
handle1 = await recorder.start("./data/episode_001.mcap")
await asyncio.sleep(10)
await handle1.stop()

# Episode 2——复用已注册的通道配置
handle2 = await recorder.start("./data/episode_002.mcap")
await asyncio.sleep(10)
await handle2.stop()

质量监控

录制期间可实时监控数据质量指标:

async for metrics in handle.subscribe_metrics():
    print(f"Drop rate: {metrics.frame_drop_rate:.4f}")
    print(f"Jitter: {metrics.frame_jitter_us:.1f} us")
    print(f"Sync offset: {metrics.sync_offset_ms:.2f} ms")

质量告警

录制引擎内置 SPC 告警机制,当质量指标持续超过阈值时触发告警:

async for alert in handle.subscribe_alerts():
    print(f"Alert: {alert.message}")
    print(f"  Metric: {alert.metric}, Current: {alert.current_value}, Threshold: {alert.threshold}")

录制状态

监控录制的运行状态:

async for status in handle.subscribe_status():
    print(f"State: {status.state}, Frames: {status.frame_count}, Duration: {status.duration_s:.1f}s")

录制摘要

handle.stop() 返回 RecordingSummary,包含录制的统计信息:

summary = await handle.stop()
print(f"Total frames: {summary.total_frames}")
print(f"File size: {summary.file_size / 1024 / 1024:.2f} MB")
print(f"Duration: {summary.duration_s:.1f}s")
print(f"Drop rate: {summary.quality.frame_drop_rate:.4f}")
print(f"Sync offset: {summary.quality.avg_sync_offset_ms:.2f} ms")

示例

基本示例

最小可运行示例——创建录制器、注册通道、录制 10 秒后停止:

import asyncio
import os
from datetime import datetime
from wuji_sdk import SdkManager, TopicRecorder

async def main():
    manager = SdkManager.instance()
    glove = manager.auto_connect(device_name="glove")

    try:
        recorder = TopicRecorder(compression="lz4")
        recorder.record(glove.tactile().subscribe())
        recorder.record(glove.emf_poses().subscribe())
        recorder.record(glove.hand_skeleton().subscribe())

        os.makedirs("./data", exist_ok=True)
        path = f"./data/{datetime.now().strftime('%Y%m%d_%H%M%S')}_session.mcap"
        handle = await recorder.start(path)
        print(f"Recording to {path} ...")
        await asyncio.sleep(10)

        summary = await handle.stop()
        print(f"Done: {summary.total_frames} frames, {summary.file_size / 1024 / 1024:.2f} MB")
    finally:
        glove.disconnect()

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("\nStopped by user")

预期输出:

Recording to ./data/20260317_153912_session.mcap ...
Done: 3609 frames, 3.91 MB

完整示例

综合演示暂停/恢复、质量监控、分集切换和录制摘要:

import asyncio
import os
from datetime import datetime
from wuji_sdk import SdkManager, TopicRecorder

async def monitor_quality(handle):
    """Monitor recording quality metrics"""
    try:
        async for metrics in handle.subscribe_metrics():
            if metrics.frame_drop_rate > 0.01:
                print(f"  [Quality] Drop rate: {metrics.frame_drop_rate:.4f}, "
                      f"Jitter: {metrics.frame_jitter_us:.1f}us, "
                      f"Sync offset: {metrics.sync_offset_ms:.2f}ms")
    except asyncio.CancelledError:
        pass

async def main():
    manager = SdkManager.instance()
    os.makedirs("./data", exist_ok=True)
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

    # Episode 1:录制 10 秒,演示暂停/恢复
    glove = manager.auto_connect(device_name="glove")
    try:
        recorder = TopicRecorder(compression="lz4")
        recorder.record(glove.tactile().subscribe())
        recorder.record(glove.emf_poses().subscribe())
        recorder.record(glove.hand_skeleton().subscribe())

        path1 = f"./data/{timestamp}_episode_001.mcap"
        handle = await recorder.start(path1)
        metrics_task = asyncio.create_task(monitor_quality(handle))

        print(f"Recording: {path1}")
        await asyncio.sleep(5)

        print("Pausing...")
        await handle.pause()
        await asyncio.sleep(2)

        print("Resuming...")
        await handle.resume()
        await asyncio.sleep(5)

        metrics_task.cancel()
        try:
            await metrics_task
        except asyncio.CancelledError:
            pass
        summary = await handle.stop()
        print(f"Episode 1: {summary.total_frames} frames, "
              f"{summary.file_size / 1024 / 1024:.2f} MB, "
              f"{summary.duration_s:.1f}s")

        # Episode 2:切换输出文件,复用同一录制器
        path2 = f"./data/{timestamp}_episode_002.mcap"
        handle2 = await recorder.start(path2)
        print(f"\nRecording: {path2}")
        await asyncio.sleep(5)

        summary2 = await handle2.stop()
        print(f"Episode 2: {summary2.total_frames} frames, "
              f"{summary2.file_size / 1024 / 1024:.2f} MB, "
              f"{summary2.duration_s:.1f}s")
    finally:
        glove.disconnect()

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("\nStopped by user")

预期输出:

Recording: ./data/20260317_153943_episode_001.mcap
Pausing...
Resuming...
Episode 1: 3606 frames, 3.91 MB, 12.0s

Recording: ./data/20260317_153943_episode_002.mcap
Episode 2: 1812 frames, 1.97 MB, 5.0s