下面是代码
from media.mp4format import *
from mpp.mp4_format import *
from mpp.mp4_format_struct import *
from media.vencoder import *
from media.sensor import *
from media.media import *
import uctypes
import time
import os
import gc
from logger import logger
class DualSensorRecorder:
def init(self, sensor0_file="/data/sensor0_recording.mp4",
sensor1_file="/data/sensor1_recording.mp4",
width=1280, height=720,
venc_payload_type=K_PT_H264,
fps=30):
"""
初始化双传感器录制器
Args:
sensor0_file: 传感器0录制的MP4文件路径
sensor1_file: 传感器1录制的MP4文件路径
width: 视频宽度
height: 视频高度
venc_payload_type: 编码格式 (K_PT_H264 或 K_PT_H265)
fps: 帧率
"""
self.sensor0_file = sensor0_file
self.sensor1_file = sensor1_file
self.width = ALIGN_UP(width, 16)
self.height = height
self.fps = fps
self.venc_payload_type = venc_payload_type
# 传感器对象
self.sensor0 = None
self.sensor1 = None
# 编码器对象
self.encoder0 = None
self.encoder1 = None
# MP4处理器
self.mp4_handle0 = None
self.mp4_handle1 = None
self.mp4_video_track_handle0 = None
self.mp4_video_track_handle1 = None
# 媒体链接
self.link0 = None
self.link1 = None
# 录制状态
self.recording = False
self.pipeline_started = False
# 会话内时间戳/I帧状态
self.video_start_timestamp0 = 0
self.video_start_timestamp1 = 0
self.get_first_I_frame0 = False
self.get_first_I_frame1 = False
self.frame_count0 = 0
self.frame_count1 = 0
# 首帧写入需要的临时缓冲(累积HEADER + I帧)
# 预留足够空间以容纳SPS/PPS等HEADER与IDR,视分辨率与码率可调
self.save_idr0 = bytearray(width * height * 3 *5// 4)
self.save_idr1 = bytearray(width * height * 3 *5// 4)
self.idr_index0 = 0
self.idr_index1 = 0
logger.info(f"双传感器录制器已初始化:")
logger.info(f" 分辨率: {width}x{height}")
logger.info(f" 帧率: {fps}")
logger.info(f" 编码格式: {'H264' if venc_payload_type == K_PT_H264 else 'H265'}")
logger.info(f" 传感器0输出: {sensor0_file}")
logger.info(f" 传感器1输出: {sensor1_file}")
try:
logger.info("正在启动媒体管线...")
# 初始化传感器
self.sensor0 = Sensor(id=1, width=self.width, height=self.height, fps=self.fps)
self.setup_sensor(0, self.sensor0)
self.sensor1 = Sensor(id=2, width=self.width, height=self.height, fps=self.fps)
self.setup_sensor(1, self.sensor1)
# 创建编码器
logger.info("正在创建编码器...")
self.encoder0 = Encoder()
self.encoder1 = Encoder()
# 设置编码器缓冲区
logger.debug("设置编码器缓冲区前")
self.encoder0.SetOutBufs(VENC_CHN_ID_0, 4, self.width, self.height)
self.encoder1.SetOutBufs(VENC_CHN_ID_1, 4, self.width, self.height)
logger.debug("设置编码器缓冲区后")
# 初始化媒体管理器(在设置编码器缓冲区之前,避免VB池未就绪导致阻塞)
logger.debug("MediaManager.init() 前")
MediaManager.init()
logger.debug("MediaManager.init() 后")
# 创建编码器属性
if self.venc_payload_type == K_PT_H264:
chnAttr0 = ChnAttrStr(self.encoder0.PAYLOAD_TYPE_H264, self.encoder0.H264_PROFILE_MAIN, self.width, self.height)
chnAttr1 = ChnAttrStr(self.encoder1.PAYLOAD_TYPE_H264, self.encoder1.H264_PROFILE_MAIN, self.width, self.height)
elif self.venc_payload_type == K_PT_H265:
chnAttr0 = ChnAttrStr(self.encoder0.PAYLOAD_TYPE_H265, self.encoder0.H265_PROFILE_MAIN, self.width, self.height)
chnAttr1 = ChnAttrStr(self.encoder1.PAYLOAD_TYPE_H265, self.encoder1.H265_PROFILE_MAIN, self.width, self.height)
# 创建编码器
logger.debug("创建编码器通道0前")
self.encoder0.Create(VENC_CHN_ID_0, chnAttr0)
logger.debug("创建编码器通道0后")
logger.debug("创建编码器通道1前")
self.encoder1.Create(VENC_CHN_ID_1, chnAttr1)
logger.debug("创建编码器通道1后")
logger.info("编码器创建成功")
# 绑定传感器和编码器
logger.info("正在绑定传感器到编码器...")
self.link0 = MediaManager.link(
self.sensor0.bind_info()['src'],
(VIDEO_ENCODE_MOD_ID, VENC_DEV_ID, VENC_CHN_ID_0))
self.link1 = MediaManager.link(
self.sensor1.bind_info()['src'],
(VIDEO_ENCODE_MOD_ID, VENC_DEV_ID, VENC_CHN_ID_1))
# 启动编码器
logger.debug("启动编码器通道0前")
self.encoder0.Start(VENC_CHN_ID_0)
logger.debug("启动编码器通道0后")
logger.debug("启动编码器通道1前")
self.encoder1.Start(VENC_CHN_ID_1)
logger.debug("启动编码器通道1后")
# 启动传感器(多传感器只需要调用一次run())
self.sensor0.run()
self.pipeline_started = True
logger.success("媒体管线启动成功!")
except Exception as e:
logger.error(f"启动媒体管线出错: {e}")
self.stop_pipeline()
raise
def mp4_muxer_init(self, file_name, fmp4_flag=False):
"""初始化MP4复用器"""
mp4_cfg = k_mp4_config_s()
mp4_cfg.config_type = K_MP4_CONFIG_MUXER
mp4_cfg.muxer_config.file_name[:] = bytes(file_name, 'utf-8')
mp4_cfg.muxer_config.fmp4_flag = fmp4_flag
handle = k_u64_ptr()
ret = kd_mp4_create(handle, mp4_cfg)
if ret:
raise OSError(f"kd_mp4_create failed for {file_name}")
return handle.value
def mp4_muxer_create_video_track(self, mp4_handle, width, height, video_payload_type):
"""创建MP4视频轨道"""
video_track_info = k_mp4_track_info_s()
video_track_info.track_type = K_MP4_STREAM_VIDEO
video_track_info.time_scale = 1000
video_track_info.video_info.width = width
video_track_info.video_info.height = height
video_track_info.video_info.codec_id = video_payload_type
video_track_handle = k_u64_ptr()
ret = kd_mp4_create_track(mp4_handle, video_track_handle, video_track_info)
if ret:
raise OSError("kd_mp4_create_track failed")
return video_track_handle.value
def setup_sensor(self, sensor_id, sensor_obj):
"""配置传感器"""
sensor_obj.reset()
# 设置传感器输出尺寸和格式
sensor_obj.set_framesize(width=self.width, height=self.height, alignment=12)
sensor_obj.set_pixformat(Sensor.YUV420SP)
logger.info(f"传感器 {sensor_id} 已配置: {self.width}x{self.height} YUV420SP")
def start_recording(self, sensor0_file=None, sensor1_file=None, fmp4_flag=True):
"""开始一次录制(仅创建MP4复用器与轨道,不动媒体管线)"""
if not self.pipeline_started:
logger.warning("管线未启动,无法开始录制")
return
# 安全检查:如果已有未完成的录制,先清理
if self.recording:
logger.warning("检测到未完成的录制,先停止旧录制")
try:
self.stop_recording()
except Exception as e:
logger.error(f"清理旧录制失败: {e}")
# 安全检查:如果MP4句柄未清理,先清理
if self.mp4_handle0 or self.mp4_handle1:
logger.warning("检测到残留的MP4句柄,先清理")
try:
if self.mp4_handle0:
kd_mp4_destroy_tracks(self.mp4_handle0)
kd_mp4_destroy(self.mp4_handle0)
self.mp4_handle0 = None
self.mp4_video_track_handle0 = None
if self.mp4_handle1:
kd_mp4_destroy_tracks(self.mp4_handle1)
kd_mp4_destroy(self.mp4_handle1)
self.mp4_handle1 = None
self.mp4_video_track_handle1 = None
except Exception as e:
logger.error(f"清理残留MP4句柄失败: {e}")
if sensor0_file:
self.sensor0_file = sensor0_file
if sensor1_file:
self.sensor1_file = sensor1_file
logger.info("正在开始录制会话...")
# 创建MP4处理器
try:
self.mp4_handle0 = self.mp4_muxer_init(self.sensor0_file, fmp4_flag)
self.mp4_handle1 = self.mp4_muxer_init(self.sensor1_file, fmp4_flag)
except Exception as e:
logger.error(f"创建MP4处理器失败: {e}")
# 清理已创建的资源
if self.mp4_handle0:
try:
kd_mp4_destroy(self.mp4_handle0)
except:
pass
self.mp4_handle0 = None
if self.mp4_handle1:
try:
kd_mp4_destroy(self.mp4_handle1)
except:
pass
self.mp4_handle1 = None
raise
# 创建视频轨道
if self.venc_payload_type == K_PT_H264:
video_payload_type = K_MP4_CODEC_ID_H264
elif self.venc_payload_type == K_PT_H265:
video_payload_type = K_MP4_CODEC_ID_H265
self.mp4_video_track_handle0 = self.mp4_muxer_create_video_track(
self.mp4_handle0, self.width, self.height, video_payload_type)
self.mp4_video_track_handle1 = self.mp4_muxer_create_video_track(
self.mp4_handle1, self.width, self.height, video_payload_type)
# 重置会话内的时间戳/I帧状态
self.video_start_timestamp0 = 0
self.video_start_timestamp1 = 0
self.get_first_I_frame0 = False
self.get_first_I_frame1 = False
self.idr_index0 = 0
self.idr_index1 = 0
self.recording = True
logger.success("录制会话已开始。")
def record_step(self):
"""单步拉流并写入,一次调用处理一批可用包,不阻塞控制时长"""
if not self.recording:
return
# 编码类型
video_payload_type = K_MP4_CODEC_ID_H264 if self.venc_payload_type == K_PT_H264 else K_MP4_CODEC_ID_H265
# 传感器0
streamData0 = StreamData()
frame_data0 = k_mp4_frame_data_s()
ret0 = self.encoder0.GetStream(VENC_CHN_ID_0, streamData0, timeout=10)
if ret0 == 0:
if not self.get_first_I_frame0:
for pack_idx in range(0, streamData0.pack_cnt):
stream_type = streamData0.stream_type[pack_idx]
if stream_type == self.encoder0.STREAM_TYPE_I:
self.get_first_I_frame0 = True
self.video_start_timestamp0 = streamData0.pts[pack_idx]
# HEADER + I 帧合并写入
pack_len = streamData0.data_size[pack_idx]
self.save_idr0[self.idr_index0:self.idr_index0+pack_len] = \
uctypes.bytearray_at(streamData0.data[pack_idx], pack_len)
self.idr_index0 += pack_len
frame_data0.codec_id = video_payload_type
frame_data0.data = uctypes.addressof(self.save_idr0)
frame_data0.data_length = self.idr_index0
frame_data0.time_stamp = streamData0.pts[pack_idx] - self.video_start_timestamp0
ret = kd_mp4_write_frame(self.mp4_handle0, self.mp4_video_track_handle0, frame_data0)
if ret:
raise OSError("kd_mp4_write_frame failed.")
break
elif stream_type == self.encoder0.STREAM_TYPE_HEADER:
pack_len = streamData0.data_size[pack_idx]
self.save_idr0[self.idr_index0:self.idr_index0+pack_len] = \
uctypes.bytearray_at(streamData0.data[pack_idx], pack_len)
self.idr_index0 += pack_len
continue
else:
continue
self.encoder0.ReleaseStream(VENC_CHN_ID_0, streamData0)
else:
for pack_idx in range(0, streamData0.pack_cnt):
frame_data0.codec_id = video_payload_type
frame_data0.data = streamData0.data[pack_idx]
frame_data0.data_length = streamData0.data_size[pack_idx]
frame_data0.time_stamp = streamData0.pts[pack_idx] - self.video_start_timestamp0
ret = kd_mp4_write_frame(self.mp4_handle0, self.mp4_video_track_handle0, frame_data0)
if ret:
raise OSError("kd_mp4_write_frame failed.")
self.frame_count0 += 1
self.encoder0.ReleaseStream(VENC_CHN_ID_0, streamData0)
# 传感器1
streamData1 = StreamData()
frame_data1 = k_mp4_frame_data_s()
ret1 = self.encoder1.GetStream(VENC_CHN_ID_1, streamData1, timeout=10)
if ret1 == 0:
if not self.get_first_I_frame1:
for pack_idx in range(0, streamData1.pack_cnt):
stream_type = streamData1.stream_type[pack_idx]
if stream_type == self.encoder1.STREAM_TYPE_I:
self.get_first_I_frame1 = True
self.video_start_timestamp1 = streamData1.pts[pack_idx]
# HEADER + I 帧合并写入
pack_len = streamData1.data_size[pack_idx]
self.save_idr1[self.idr_index1:self.idr_index1+pack_len] = \
uctypes.bytearray_at(streamData1.data[pack_idx], pack_len)
self.idr_index1 += pack_len
frame_data1.codec_id = video_payload_type
frame_data1.data = uctypes.addressof(self.save_idr1)
frame_data1.data_length = self.idr_index1
frame_data1.time_stamp = streamData1.pts[pack_idx] - self.video_start_timestamp1
ret = kd_mp4_write_frame(self.mp4_handle1, self.mp4_video_track_handle1, frame_data1)
if ret:
raise OSError("kd_mp4_write_frame failed.")
break
elif stream_type == self.encoder1.STREAM_TYPE_HEADER:
pack_len = streamData1.data_size[pack_idx]
self.save_idr1[self.idr_index1:self.idr_index1+pack_len] = \
uctypes.bytearray_at(streamData1.data[pack_idx], pack_len)
self.idr_index1 += pack_len
continue
else:
continue
self.encoder1.ReleaseStream(VENC_CHN_ID_1, streamData1)
else:
for pack_idx in range(0, streamData1.pack_cnt):
frame_data1.codec_id = video_payload_type
frame_data1.data = streamData1.data[pack_idx]
frame_data1.data_length = streamData1.data_size[pack_idx]
frame_data1.time_stamp = streamData1.pts[pack_idx] - self.video_start_timestamp1
ret = kd_mp4_write_frame(self.mp4_handle1, self.mp4_video_track_handle1, frame_data1)
if ret:
raise OSError("kd_mp4_write_frame failed.")
self.frame_count1 += 1
self.encoder1.ReleaseStream(VENC_CHN_ID_1, streamData1)
def stop_recording(self):
"""停止当前录制(仅销毁MP4相关,不动媒体管线)
注意:确保文件完全写入后再销毁MP4句柄
"""
logger.info("正在停止录制会话...")
try:
# 先设置recording为False,阻止新的record_step调用
self.recording = False
# 等待一小段时间,确保最后的record_step完成
time.sleep_ms(50)
# 销毁MP4句柄(这会确保文件完整写入)
if self.mp4_handle0:
try:
kd_mp4_destroy_tracks(self.mp4_handle0)
kd_mp4_destroy(self.mp4_handle0)
except Exception as e:
logger.error(f"销毁MP4句柄0失败: {e}")
finally:
self.mp4_handle0 = None
self.mp4_video_track_handle0 = None
if self.mp4_handle1:
try:
kd_mp4_destroy_tracks(self.mp4_handle1)
kd_mp4_destroy(self.mp4_handle1)
except Exception as e:
logger.error(f"销毁MP4句柄1失败: {e}")
finally:
self.mp4_handle1 = None
self.mp4_video_track_handle1 = None
# 重置状态
self.video_start_timestamp0 = 0
self.video_start_timestamp1 = 0
self.get_first_I_frame0 = False
self.get_first_I_frame1 = False
self.idr_index0 = 0
self.idr_index1 = 0
logger.success("录制会话已停止,文件已写入完成。")
except Exception as e:
logger.error(f"停止录制会话出错: {e}")
# 确保状态被清理
self.recording = False
def stop_pipeline(self):
"""停止媒体管线(停止传感器/编码器,解绑,释放VB)"""
logger.info("正在停止媒体管线...")
try:
# 确保已停止录制
self.stop_recording()
# 停止传感器
if self.sensor0:
self.sensor0.stop()
if self.sensor1:
self.sensor1.stop()
# 销毁媒体链接
if self.link0:
del self.link0
self.link0 = None
if self.link1:
del self.link1
self.link1 = None
# 停止编码器
if self.encoder0:
self.encoder0.Stop(VENC_CHN_ID_0)
self.encoder0.Destroy(VENC_CHN_ID_0)
self.encoder0 = None
if self.encoder1:
self.encoder1.Stop(VENC_CHN_ID_1)
self.encoder1.Destroy(VENC_CHN_ID_1)
self.encoder1 = None
# 释放媒体缓冲
MediaManager.deinit()
time.sleep(0.5)
# 清空对象引用
self.sensor0 = None
self.sensor1 = None
self.mp4_video_track_handle0 = None
self.mp4_video_track_handle1 = None
# 等待底层释放
gc.collect()
time.sleep(0.5)
self.pipeline_started = False
logger.success("媒体管线已停止。")
except Exception as e:
logger.error(f"停止媒体管线出错: {e}")
def mp4_recording(recorder, base_file="test"):
"""
测试MP4录制功能
Args:
recorder: DualSensorRecorder 实例(已初始化)
base_file: 基础文件名(纯文件名,如 "test"),会自动生成 /data/base_file_1.mp4 和 /data/base_file_2.mp4
Returns:
bool: 如果录制成功且文件存在则返回True,否则返回False
"""
from config import get_config, set_config
if recorder is None or not recorder.pipeline_started:
logger.warning("录制器未初始化,无法进行录制测试")
return False
# 根据基础文件名生成两个传感器文件
sensor0_file = f"/data/{base_file}_1.mp4"
sensor1_file = f"/data/{base_file}_2.mp4"
try:
# 开始录制
recorder.start_recording(
sensor0_file=sensor0_file,
sensor1_file=sensor1_file,
)
# 录制循环:根据全局变量控制
# 添加最大录制时长作为安全机制(默认60秒)
max_duration = 60 # 最大录制时长(秒)
start_time = time.time()
# 检测是否为测试模式(base_file包含"test"关键字)
is_test_mode = "test" in base_file.lower()
test_duration = 2 # 测试模式下的录制时长(秒)
while get_config('recording', 'is_recording', False):
elapsed_time = time.time() - start_time
# 测试模式:2秒后自动停止
if is_test_mode and elapsed_time >= test_duration:
logger.info(f"测试模式:已达到 {test_duration} 秒,自动停止录制")
set_config('recording', 'is_recording', False)
break
# 检查是否超过最大录制时长(安全机制)
if elapsed_time > max_duration:
logger.warning(f"达到最大录制时长 {max_duration} 秒,自动停止录制")
set_config('recording', 'is_recording', False)
break
try:
recorder.record_step()
except Exception as e:
logger.debug(f"录制步骤异常: {e}")
# 使用更短的休眠时间,提高响应速度
time.sleep_ms(10)
# 停止录制
recorder.stop_recording()
# 验证文件是否存在(兼容MicroPython和标准Python)
def file_exists(file_path):
"""检查文件是否存在(兼容不同Python环境)"""
if not file_path:
return False
try:
# 标准Python: 尝试使用 os.path.exists
try:
import os.path
if hasattr(os.path, 'exists'):
return os.path.exists(file_path)
except:
pass
# MicroPython或标准Python的回退方案: 尝试打开文件检查
try:
with open(file_path, 'rb') as f:
pass
return True
except:
return False
except:
return False
files_exist = True
# 检查传感器0文件
if sensor0_file:
if file_exists(sensor0_file):
logger.debug(f"验证文件存在: {sensor0_file}")
# 只在测试模式下删除文件
if is_test_mode:
try:
os.remove(sensor0_file)
logger.info(f"已删除测试文件: {sensor0_file}")
except Exception as e:
logger.error(f"删除文件 {sensor0_file} 时出错: {e}")
else:
logger.warning(f"警告: 文件不存在: {sensor0_file}")
files_exist = False
else:
files_exist = False
# 检查传感器1文件
if sensor1_file:
if file_exists(sensor1_file):
logger.debug(f"验证文件存在: {sensor1_file}")
# 只在测试模式下删除文件
if is_test_mode:
try:
os.remove(sensor1_file)
logger.info(f"已删除测试文件: {sensor1_file}")
except Exception as e:
logger.error(f"删除文件 {sensor1_file} 时出错: {e}")
else:
logger.warning(f"警告: 文件不存在: {sensor1_file}")
files_exist = False
else:
files_exist = False
return files_exist
except KeyboardInterrupt:
logger.warning("用户中断录制")
return False
except Exception as e:
logger.error(f"MP4录制测试错误: {e}")
return False
def main():
"""
主函数:初始化双路录制器
返回初始化好的recorder对象
"""
os.exitpoint(os.EXITPOINT_ENABLE)
recorder = None
try:
# 实例化双路录制(初始化媒体管线)
logger.info("正在初始化双路录制器...")
recorder = DualSensorRecorder(
width=1920,
height=1080,
venc_payload_type=K_PT_H264,
fps=30
)
logger.success("双路录制器初始化成功")
# 初始化后延时2秒,然后设置录制标志为False
time.sleep(2)
from config import set_config
set_config('recording', 'is_recording', False)
logger.info("初始化完成,录制标志已设置为False")
return recorder
except Exception as e:
logger.error(f"初始化失败: {e}")
return None
if name == "main":
main()