数据录制
实时采集的传感器数据(触觉、IMU、手部追踪等)转瞬即逝——录制功能将这些数据持久化到文件,供后续离线分析、算法调试或数据集构建使用。Wuji SDK 内置录制引擎,将多通道传感器数据同步写入 MCAP 格式文件。整体流程分三步:
- 创建录制器 — 用
TopicRecorder创建录制器,选择压缩算法 - 注册通道 — 调用
recorder.record(sub)注册要录制的订阅通道,支持设备资源和全局资源混合注册 - 开始录制 — 调用
await recorder.start(path)启动录制,返回RecordingHandle控制句柄
录制过程中可随时暂停和恢复,也可通过分集切换将连续采集拆分为多个文件。录制引擎内置质量监控,实时跟踪丢帧率、帧间抖动和同步偏差等指标。录制结束后返回录制摘要,汇总统计信息。
如需直接运行代码,见页面底部示例。
压缩选项
创建 TopicRecorder 时通过 compression 参数选择压缩算法:
| 选项 | 说明 |
|---|---|
"lz4" | 低延迟压缩,适合实时场景(默认) |
"zstd" | 高压缩比,适合存储归档 |
"none" | 不压缩,写入速度最快 |
录制全局资源
跨设备合并的全局资源(如坐标变换)也可以录制,与设备资源混合注册:
recorder.record(manager.tf_static().subscribe()) # 全局资源
recorder.record(glove.tactile().subscribe()) # 设备资源暂停与恢复
录制过程中可随时暂停和恢复。暂停期间设备数据仍在订阅,但不会写入文件。
await handle.pause() # 暂停——数据不会写入
await handle.resume() # 恢复录制分集切换
同一个 TopicRecorder 可多次调用 start() 切换输出文件,无需重新注册通道。适合将连续采集拆分为多个独立录制片段:
# Episode 1
handle1 = await recorder.start("./data/episode_001.mcap")
await asyncio.sleep(10)
await handle1.stop()
# Episode 2——复用已注册的通道配置
handle2 = await recorder.start("./data/episode_002.mcap")
await asyncio.sleep(10)
await handle2.stop()质量监控
录制期间可实时监控数据质量指标:
async for metrics in handle.subscribe_metrics():
print(f"Drop rate: {metrics.frame_drop_rate:.4f}")
print(f"Jitter: {metrics.frame_jitter_us:.1f} us")
print(f"Sync offset: {metrics.sync_offset_ms:.2f} ms")质量告警
录制引擎内置 SPC 告警机制,当质量指标持续超过阈值时触发告警:
async for alert in handle.subscribe_alerts():
print(f"Alert: {alert.message}")
print(f" Metric: {alert.metric}, Current: {alert.current_value}, Threshold: {alert.threshold}")录制状态
监控录制的运行状态:
async for status in handle.subscribe_status():
print(f"State: {status.state}, Frames: {status.frame_count}, Duration: {status.duration_s:.1f}s")录制摘要
handle.stop() 返回 RecordingSummary,包含录制的统计信息:
summary = await handle.stop()
print(f"Total frames: {summary.total_frames}")
print(f"File size: {summary.file_size / 1024 / 1024:.2f} MB")
print(f"Duration: {summary.duration_s:.1f}s")
print(f"Drop rate: {summary.quality.frame_drop_rate:.4f}")
print(f"Sync offset: {summary.quality.avg_sync_offset_ms:.2f} ms")示例
基本示例
最小可运行示例——创建录制器、注册通道、录制 10 秒后停止:
import asyncio
import os
from datetime import datetime
from wuji_sdk import SdkManager, TopicRecorder
async def main():
manager = SdkManager.instance()
glove = manager.auto_connect(device_name="glove")
try:
recorder = TopicRecorder(compression="lz4")
recorder.record(glove.tactile().subscribe())
recorder.record(glove.emf_poses().subscribe())
recorder.record(glove.hand_skeleton().subscribe())
os.makedirs("./data", exist_ok=True)
path = f"./data/{datetime.now().strftime('%Y%m%d_%H%M%S')}_session.mcap"
handle = await recorder.start(path)
print(f"Recording to {path} ...")
await asyncio.sleep(10)
summary = await handle.stop()
print(f"Done: {summary.total_frames} frames, {summary.file_size / 1024 / 1024:.2f} MB")
finally:
glove.disconnect()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nStopped by user")预期输出:
Recording to ./data/20260317_153912_session.mcap ...
Done: 3609 frames, 3.91 MB完整示例
综合演示暂停/恢复、质量监控、分集切换和录制摘要:
import asyncio
import os
from datetime import datetime
from wuji_sdk import SdkManager, TopicRecorder
async def monitor_quality(handle):
"""Monitor recording quality metrics"""
try:
async for metrics in handle.subscribe_metrics():
if metrics.frame_drop_rate > 0.01:
print(f" [Quality] Drop rate: {metrics.frame_drop_rate:.4f}, "
f"Jitter: {metrics.frame_jitter_us:.1f}us, "
f"Sync offset: {metrics.sync_offset_ms:.2f}ms")
except asyncio.CancelledError:
pass
async def main():
manager = SdkManager.instance()
os.makedirs("./data", exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# Episode 1:录制 10 秒,演示暂停/恢复
glove = manager.auto_connect(device_name="glove")
try:
recorder = TopicRecorder(compression="lz4")
recorder.record(glove.tactile().subscribe())
recorder.record(glove.emf_poses().subscribe())
recorder.record(glove.hand_skeleton().subscribe())
path1 = f"./data/{timestamp}_episode_001.mcap"
handle = await recorder.start(path1)
metrics_task = asyncio.create_task(monitor_quality(handle))
print(f"Recording: {path1}")
await asyncio.sleep(5)
print("Pausing...")
await handle.pause()
await asyncio.sleep(2)
print("Resuming...")
await handle.resume()
await asyncio.sleep(5)
metrics_task.cancel()
try:
await metrics_task
except asyncio.CancelledError:
pass
summary = await handle.stop()
print(f"Episode 1: {summary.total_frames} frames, "
f"{summary.file_size / 1024 / 1024:.2f} MB, "
f"{summary.duration_s:.1f}s")
# Episode 2:切换输出文件,复用同一录制器
path2 = f"./data/{timestamp}_episode_002.mcap"
handle2 = await recorder.start(path2)
print(f"\nRecording: {path2}")
await asyncio.sleep(5)
summary2 = await handle2.stop()
print(f"Episode 2: {summary2.total_frames} frames, "
f"{summary2.file_size / 1024 / 1024:.2f} MB, "
f"{summary2.duration_s:.1f}s")
finally:
glove.disconnect()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nStopped by user")预期输出:
Recording: ./data/20260317_153943_episode_001.mcap
Pausing...
Resuming...
Episode 1: 3606 frames, 3.91 MB, 12.0s
Recording: ./data/20260317_153943_episode_002.mcap
Episode 2: 1812 frames, 1.97 MB, 5.0s