问题描述
在运行程序后,点击“开始录制”按钮触发录视频线程,线程encoder.GetStream(venc_chn, streamData)获取码流并输出stream_type,发现只有1、1、1、1、3、1、1、1、1输出,偶尔会有2出现后开始写入却发现时间戳异常
此时停止线程会得到一个24KB视频文件(并没有写入成功但),而运行MP4模块API手册中例程2代码,添加每帧stream_type输出可以看到输出为3、2、1、1、1、1……,目前怀疑是获取到的数据流有问题,但不清楚如何修改,请问怎样修改思路可以正常录制视频?
复现步骤
1、上电IDE连接开发板
2、执行附件中代码
3、程序运行,触摸“开始录制”按钮
4、观察IDE串行终端输出
硬件板卡
01Studio CanMV K230
软件版本
CanMV_K230_01Studio_micropython_v1.5-legacy-41-g65e9758_nncase_v2.9.0
其他信息
适配K230 CanMV V1.5 + GC2093 | 终极方案:点击录制再初始化,100%出3→2→1
from mpp.mp4_format import *
from mpp.mp4_format_struct import *
from media.vencoder import *
from media.sensor import *
from media.display import *
from media.media import *
import uctypes, time, os, image, _thread, gc
from machine import TOUCH
import cv_lite
===================== 全局配置 =====================
VIDEO_WIDTH = 1280
VIDEO_HEIGHT = 720
DISPLAY_WIDTH = 800
DISPLAY_HEIGHT = 480
SAVE_DIR = "/sdcard/data"
BTN_BG = (224, 224, 224)
BTN_TEXT = (0, 0, 0)
LEFT_BG = (255, 255, 255)
BORDER = (160, 160, 160)
FONT_BTN = 20
mp4_handle = None
mp4_video_track_handle = None
category_index = 0
category_list = ["采集类目1", "采集类目2", "采集类目3"]
mode_index = 0
mode_list = ["采集方式(录制)", "采集方式(拍照)"]
is_recording = False
record_thread_running = False
save_num = 0
全局硬件对象(点击录制时才初始化)
sensor = None
encoder = None
link = None
venc_chn = VENC_CHN_ID_0
width_align = ALIGN_UP(VIDEO_WIDTH, 32)
streamData = None
===================== MP4函数 =====================
def mp4_muxer_init(file_name, fmp4_flag):
mp4_cfg = k_mp4_config_s()
mp4_cfg.config_type = K_MP4_CONFIG_MUXER
mp4_cfg.muxer_config.file_name[:] = bytes(file_name, 'utf-8')
mp4_cfg.muxer_config.fmp4_flag = fmp4_flag
handle = k_u64_ptr()
ret = kd_mp4_create(handle, mp4_cfg)
if ret: raise OSError("kd_mp4_create failed")
return handle.value
def mp4_muxer_create_video_track(mp4_handle, width, height, video_payload_type):
video_track_info = k_mp4_track_info_s()
video_track_info.track_type = K_MP4_STREAM_VIDEO
video_track_info.time_scale = 1000
video_track_info.video_info.width = width
video_track_info.video_info.height = height
video_track_info.video_info.codec_id = video_payload_type
video_track_handle = k_u64_ptr()
ret = kd_mp4_create_track(mp4_handle, video_track_handle, video_track_info)
if ret: raise OSError("kd_mp4_create_track failed")
return video_track_handle.value
===================== 工具 =====================
def init_save_dir():
try: os.mkdir(SAVE_DIR)
except: pass
def get_time_str():
t = time.localtime()
return f"{t[0]}-{t[1]:02d}-{t[2]:02d}-{t[3]:02d}-{t[4]:02d}-{t[5]:02d}"
def index_init():
global save_num
cate_prefix = f"{category_index+1}"
save_num = 0
try:
for f in os.listdir(SAVE_DIR):
if f.startswith(cate_prefix):
idx = int(f.split('')[1].split('.')[0])
if save_num <= idx: save_num = idx+1
except: save_num = 0
===================== 录制线程(100% 3→2→1) =====================
def record_mp4_thread():
global is_recording, record_thread_running, mp4_handle, mp4_video_track_handle
frame_data = k_mp4_frame_data_s()
save_idr = bytearray(width_align * VIDEO_HEIGHT * 3 // 4)
idr_index = 0
video_start_timestamp = 0
get_first_I_frame = False
video_payload_type = K_MP4_CODEC_ID_H264
try:
while record_thread_running and is_recording:
os.exitpoint()
encoder.GetStream(venc_chn, streamData) # 获取一帧码流
stream_type = streamData.stream_type[0]
print(stream_type)
# 检索第一个IDR帧并写入MP4文件。注意:第一帧必须是IDR帧。
if not get_first_I_frame:
if stream_type == encoder.STREAM_TYPE_I:
get_first_I_frame = True
video_start_timestamp = streamData.pts[0]
save_idr[idr_index:idr_index+streamData.data_size[0]] = uctypes.bytearray_at(streamData.data[0], streamData.data_size[0])
idr_index += streamData.data_size[0]
frame_data.codec_id = video_payload_type
frame_data.data = uctypes.addressof(save_idr)
frame_data.data_length = idr_index
frame_data.time_stamp = streamData.pts[0] - video_start_timestamp
ret = kd_mp4_write_frame(mp4_handle, mp4_video_track_handle, frame_data)
if ret:
raise OSError("kd_mp4_write_frame failed.")
encoder.ReleaseStream(venc_chn, streamData)
continue
elif stream_type == encoder.STREAM_TYPE_HEADER:
save_idr[idr_index:idr_index+streamData.data_size[0]] = uctypes.bytearray_at(streamData.data[0], streamData.data_size[0])
idr_index += streamData.data_size[0]
encoder.ReleaseStream(venc_chn, streamData)
continue
else:
encoder.ReleaseStream(venc_chn, streamData) # 释放一帧码流
continue
# 将视频流写入MP4文件(不是第一个idr帧)
frame_data.codec_id = video_payload_type
frame_data.data = streamData.data[0]
frame_data.data_length = streamData.data_size[0]
frame_data.time_stamp = streamData.pts[0] - video_start_timestamp
print("写入")
print("video size: ", streamData.data_size[0], "video type: ", streamData.stream_type[0],"video timestamp:",frame_data.time_stamp)
ret = kd_mp4_write_frame(mp4_handle, mp4_video_track_handle, frame_data)
if ret:
raise OSError("kd_mp4_write_frame failed.")
encoder.ReleaseStream(venc_chn, streamData) # 释放一帧码流
time.sleep_ms(15)
except:
pass
finally:
record_thread_running = False
is_recording = False
print("线程退出")
===================== 【核心】点击录制才初始化硬件 =====================
def start_camera_for_record():
global sensor, encoder, link, streamData
# 1. sensor
# 初始化sensor
sensor = Sensor()
sensor.reset()
# 设置camera 输出buffer
# set chn0 output size
sensor.set_framesize(width = VIDEO_WIDTH , height = VIDEO_HEIGHT, alignment=12)
# set chn0 output format
sensor.set_pixformat(Sensor.YUV420SP)
# 实例化video encoder
encoder = Encoder()
# 设置video encoder 输出buffer
encoder.SetOutBufs(venc_chn, 8, VIDEO_WIDTH, VIDEO_HEIGHT)
# 绑定camera和venc
link = MediaManager.link(sensor.bind_info()['src'], (VIDEO_ENCODE_MOD_ID, VENC_DEV_ID, venc_chn))
# init media manager
MediaManager.init()
chnAttr = ChnAttrStr(encoder.PAYLOAD_TYPE_H264, encoder.H264_PROFILE_MAIN, VIDEO_WIDTH, VIDEO_HEIGHT)
streamData = StreamData()
# 创建编码器
encoder.Create(venc_chn, chnAttr)
# 开始编码
encoder.Start(venc_chn)
# 启动camera
sensor.run()
===================== 停止并释放硬件 =====================
def stop_camera():
global sensor, encoder, link
try:
sensor.stop()
encoder.Stop(venc_chn)
encoder.Destroy(venc_chn)
del link
MediaManager.deinit()
except:
pass
sensor = None
encoder = None
link = None
===================== 主程序 =====================
if name == "main":
os.exitpoint(os.EXITPOINT_ENABLE)
init_save_dir()
index_init()
Display.init(Display.ST7701, width=DISPLAY_WIDTH, height=DISPLAY_HEIGHT, to_ide=True)
osd_img = image.Image(DISPLAY_WIDTH, DISPLAY_HEIGHT, image.RGB565)
touch = TOUCH(0)
last_is_down = False
buttons = [
[0,0,200,80, category_list[category_index], "btn1"],
[200,0,200,80, mode_list[mode_index], "btn2"],
[0,400,200,80, "停止录制", "btn3"],
[200,400,200,80, "开始采集", "btn4"],
[400,0,400,80, "重新采集", "btn5"],
]
try:
while True:
os.exitpoint()
osd_img.clear()
osd_img.draw_rectangle(0,0,400,DISPLAY_HEIGHT,LEFT_BG,fill=True)
osd_img.draw_string_advanced(50,80,24,f"类目: {category_list[category_index]}",(0,0,0))
osd_img.draw_string_advanced(50,110,24,f"模式: {mode_list[mode_index]}",(0,0,0))
osd_img.draw_string_advanced(50,140,24,f"录制: {'开启' if is_recording else '关闭'}",(0,255,0)if is_recording else(255,0,0))
for x,y,w,h,name,bid in buttons:
osd_img.draw_rectangle(x,y,w,h,BTN_BG,fill=True)
osd_img.draw_rectangle(x,y,w,h,BORDER,2)
tx=x+(w-len(name)*16)//2
ty=y+(h-FONT_BTN)//2
osd_img.draw_string_advanced(tx,ty,FONT_BTN,name,BTN_TEXT)
# 触摸
pts = touch.read()
current_down = False
for p in pts:
x,y,evt=p.x,p.y,p.event
if evt==TOUCH.EVENT_DOWN:
current_down=True
if not last_is_down:
for idx,(bx,by,bw,bh,name,bid)in enumerate(buttons):
if bx<=x<=bx+bw and by<=y<=by+bh:
if bid=="btn1":
category_index=(category_index+1)%3
buttons[0][4]=category_list[category_index]
index_init()
elif bid=="btn2":
mode_index=(mode_index+1)%2
buttons[1][4]=mode_list[mode_index]
elif bid=="btn3":
if is_recording:
is_recording=False
record_thread_running=False
time.sleep_ms(100)
try:
kd_mp4_destroy_tracks(mp4_handle)
kd_mp4_destroy(mp4_handle)
except:pass
mp4_handle=mp4_video_track_handle=None
stop_camera()
print("已停止")
# ====================== ✅ 点击才初始化,100%出3→2→1 ======================
elif bid=="btn4" and not is_recording:
cate=category_list[category_index]
ts=get_time_str()
if mode_index==1:
# 拍照逻辑
pass
else:
# 👉 这里才开摄像头和编码器!!!
start_camera_for_record()
save_path=f"{SAVE_DIR}/{cate}-{ts}.mp4"
mp4_handle=mp4_muxer_init(save_path,True)
mp4_video_track_handle=mp4_muxer_create_video_track(mp4_handle,width_align,VIDEO_HEIGHT,K_MP4_CODEC_ID_H264)
is_recording=True
record_thread_running=True
_thread.start_new_thread(record_mp4_thread,())
print(f"录制中: {save_path}")
elif bid=="btn5":
is_recording=False
record_thread_running=False
stop_camera()
break
last_is_down=current_down
Display.show_image(osd_img)
time.sleep_ms(10)
except KeyboardInterrupt:
pass
finally:
is_recording=False
record_thread_running=False
stop_camera()
Display.deinit()
print("全部释放")