问题描述
单录制功能没问题,添加DISPLAY和sensor双通道后开始报错OSError: kd_mp4_write_frame failed (first I frame).写入失败,排查类型是由于读取通道中第一帧stream_type为encoder.STREAM_TYPE_I

正常情况下,第一帧为header,第二帧为I帧,之后正常写入,是否是由于两通道初始化绑定错误,导致数据流获取异常,应该如何修改?
以下是双通道初始化代码

复现步骤
示例脚本使用MP4模块API手册例程2
1、单纯示例脚本——可运行
2、示例脚本+线程——可运行
3、拆分sensor初始化、MP4写入,写成两个函数,初始化后3s再读取数据流写入——可运行
4、添加另一个sensor双通道,一个显示一个录制——出错
硬件板卡
01Studio CanMV K230
软件版本
CanMV_K230_01Studio_micropython_v1.5-legacy-41-g65e9758_nncase_v2.9.0
其他信息
整体代码:
保存MP4文件示例(单Sensor多通道版)
注意:你需要一张SD卡来运行这个例子。
chn0:1280x720 YUV420SP → 编码器 → MP4录制
chn1:800x480 RGB565 → 预留显示(snapshot+OSD)
from mpp.mp4_format import *
from mpp.mp4_format_struct import *
from media.vencoder import *
from media.sensor import *
from media.media import *
from media.display import *
import uctypes
import time
import os
------------------- 常量定义 -------------------
视频编码分辨率(chn0)
VIDEO_WIDTH = 1280
VIDEO_HEIGHT = 720
显示分辨率(chn1)
DISPLAY_WIDTH = 800
DISPLAY_HEIGHT = 480
Sensor通道ID
CAM_CHN_ID_0 = 0 # 编码器通道
CAM_CHN_ID_1 = 1 # 显示通道
编码器通道ID
VENC_CHN_ID = VENC_CHN_ID_0
def mp4_muxer_init(file_name, fmp4_flag):
mp4_cfg = k_mp4_config_s()
mp4_cfg.config_type = K_MP4_CONFIG_MUXER
mp4_cfg.muxer_config.file_name[:] = bytes(file_name, 'utf-8')
mp4_cfg.muxer_config.fmp4_flag = fmp4_flag
handle = k_u64_ptr()
ret = kd_mp4_create(handle, mp4_cfg)
if ret:
raise OSError("kd_mp4_create failed.")
return handle.value
def mp4_muxer_create_video_track(mp4_handle, width, height, video_payload_type):
video_track_info = k_mp4_track_info_s()
video_track_info.track_type = K_MP4_STREAM_VIDEO
video_track_info.time_scale = 1000
video_track_info.video_info.width = width
video_track_info.video_info.height = height
video_track_info.video_info.codec_id = video_payload_type
video_track_handle = k_u64_ptr()
ret = kd_mp4_create_track(mp4_handle, video_track_handle, video_track_info)
if ret:
raise OSError("kd_mp4_create_track failed.")
return video_track_handle.value
def mp4_muxer_create_audio_track(mp4_handle,channel,sample_rate, bit_per_sample ,audio_payload_type):
audio_track_info = k_mp4_track_info_s()
audio_track_info.track_type = K_MP4_STREAM_AUDIO
audio_track_info.time_scale = 1000
audio_track_info.audio_info.channels = channel
audio_track_info.audio_info.codec_id = audio_payload_type
audio_track_info.audio_info.sample_rate = sample_rate
audio_track_info.audio_info.bit_per_sample = bit_per_sample
audio_track_handle = k_u64_ptr()
ret = kd_mp4_create_track(mp4_handle, audio_track_handle, audio_track_info)
if ret:
raise OSError("kd_mp4_create_track failed.")
return audio_track_handle.value
def vi_bind_venc_mp4_test(file_name="/sdcard/examples/test.mp4",
venc_payload_type = K_PT_H264):
print("venc_test start (single sensor multi channel)")
venc_chn = VENC_CHN_ID
# 编码宽度对齐16(硬件要求)
video_width_align = ALIGN_UP(VIDEO_WIDTH, 16)
Display.init(Display.ST7701, width=DISPLAY_WIDTH, height=DISPLAY_HEIGHT, to_ide=True)
# ------------------- MP4初始化 -------------------
frame_data = k_mp4_frame_data_s()
save_idr = bytearray(video_width_align * VIDEO_HEIGHT * 3 // 4)
idr_index = 0
# 初始化 mp4 muxer
mp4_handle = mp4_muxer_init(file_name, True)
# 创建 video track(基于chn0的编码分辨率)
video_payload_type = K_MP4_CODEC_ID_H264
mp4_video_track_handle = mp4_muxer_create_video_track(
mp4_handle, VIDEO_WIDTH, VIDEO_HEIGHT, video_payload_type
)
# ------------------- 单Sensor多通道初始化 -------------------
sensor = Sensor()
sensor.reset()
# chn0 —— 编码器专用(1280x720 YUV420SP)
sensor.set_framesize(
width=VIDEO_WIDTH, height=VIDEO_HEIGHT,
alignment=12, chn=CAM_CHN_ID_0
)
sensor.set_pixformat(Sensor.YUV420SP, chn=CAM_CHN_ID_0)
# chn1 —— 显示专用(800x480 RGB565)
sensor.set_framesize(
width=DISPLAY_WIDTH, height=DISPLAY_HEIGHT,
chn=CAM_CHN_ID_1 # 显示通道无需alignment,按硬件默认
)
sensor.set_pixformat(Sensor.RGB565, chn=CAM_CHN_ID_1)
# ------------------ 编码器初始化 -------------------
encoder = Encoder()
# 设置编码器输出buffer(基于chn0的分辨率)
encoder.SetOutBufs(venc_chn, 8, video_width_align, VIDEO_HEIGHT)
# ------------------- 绑定链路(仅chn0绑定编码器) -------------------
# 关键:指定chn=CAM_CHN_ID_0的bind_info,只绑定编码通道
link = MediaManager.link(
sensor.bind_info(chn=CAM_CHN_ID_0)['src'],
(VIDEO_ENCODE_MOD_ID, VENC_DEV_ID, venc_chn)
)
# ------------------- 媒体管理器+编码器配置 -------------------
MediaManager.init()
chnAttr = ChnAttrStr(encoder.PAYLOAD_TYPE_H264,encoder.H264_PROFILE_MAIN,video_width_align, VIDEO_HEIGHT)
streamData = StreamData()
# 创建编码器
encoder.Create(venc_chn, chnAttr)
# 开始编码 + 启动Sensor(单Sensor同时驱动两个通道)
encoder.Start(venc_chn)
sensor.run()
# ------------------- 录制控制逻辑 -------------------
frame_count = 0
print("save stream to file: ", file_name)
video_start_timestamp = 0
get_first_I_frame = False
print("STREAM_TYPE_HEADER:", encoder.STREAM_TYPE_HEADER, "STREAM_TYPE_I:", encoder.STREAM_TYPE_I)
try:
while True:
os.exitpoint()
# 获取chn0的编码码流
encoder.GetStream(venc_chn, streamData)
stream_type = streamData.stream_type[0]
print("current stream type:", stream_type)
# 检索第一个IDR帧(MP4必须以IDR帧开头)
if not get_first_I_frame:
if stream_type == encoder.STREAM_TYPE_I:
get_first_I_frame = True
video_start_timestamp = streamData.pts[0]
# 保存IDR帧数据
save_idr[idr_index:idr_index+streamData.data_size[0]] = uctypes.bytearray_at(
streamData.data[0], streamData.data_size[0]
)
idr_index += streamData.data_size[0]
# 写入第一个IDR帧到MP4
frame_data.codec_id = video_payload_type
frame_data.data = uctypes.addressof(save_idr)
frame_data.data_length = idr_index
frame_data.time_stamp = streamData.pts[0] - video_start_timestamp
ret = kd_mp4_write_frame(mp4_handle, mp4_video_track_handle, frame_data)
if ret:
raise OSError("kd_mp4_write_frame failed (first I frame).")
encoder.ReleaseStream(venc_chn, streamData)
continue
elif stream_type == encoder.STREAM_TYPE_HEADER:
# 保存SPS/PPS等头部信息
save_idr[idr_index:idr_index+streamData.data_size[0]] = uctypes.bytearray_at(
streamData.data[0], streamData.data_size[0]
)
idr_index += streamData.data_size[0]
encoder.ReleaseStream(venc_chn, streamData)
continue
else:
# 非IDR/Header帧,直接释放
encoder.ReleaseStream(venc_chn, streamData)
continue
# 写入后续码流到MP4
frame_data.codec_id = video_payload_type
frame_data.data = streamData.data[0]
frame_data.data_length = streamData.data_size[0]
frame_data.time_stamp = streamData.pts[0] - video_start_timestamp
print(f"写入帧 - 大小: {streamData.data_size[0]}, 类型: {stream_type}, 时间戳: {frame_data.time_stamp}")
ret = kd_mp4_write_frame(mp4_handle, mp4_video_track_handle, frame_data)
if ret:
raise OSError("kd_mp4_write_frame failed.")
encoder.ReleaseStream(venc_chn, streamData)
frame_count += 1
# 【可选】chn1显示逻辑示例(snapshot取帧+OSD)
snapshot_data = sensor.snapshot(chn=CAM_CHN_ID_1) # 取chn1的RGB565帧
Display.show_image(snapshot_data)
display.show_with_osd(snapshot_data, osd_text="RECORDING...") # 叠加OSD显示
# 录制200帧后停止
if frame_count >= 200:
break
except KeyboardInterrupt as e:
print("user stop: ", e)
except BaseException as e:
import sys
sys.print_exception(e)
# ------------------- 资源释放 -------------------
# 停止Sensor(同时停止两个通道)
sensor.stop()
# 销毁链路
del link
# 停止/销毁编码器
encoder.Stop(venc_chn)
encoder.Destroy(venc_chn)
# 清理媒体管理器
MediaManager.deinit()
# 销毁MP4句柄
kd_mp4_destroy_tracks(mp4_handle)
kd_mp4_destroy(mp4_handle)
print("venc_test stop")
if name == "main":
os.exitpoint(os.EXITPOINT_ENABLE)
vi_bind_venc_mp4_test("/sdcard/examples/test.mp4", K_PT_H264)
